Uploaded image for project: 'Couchbase Kafka Connector'
  1. Couchbase Kafka Connector
  2. KAFKAC-38

Autorelease exception when connecting to large bucket

    XMLWordPrintable

Details

    • Bug
    • Resolution: Won't Fix
    • Major
    • None
    • 2.0.0
    • None

    Description

      stanley shi reported that on bucket with 225244 items it raise this exception

      Exception in thread "main" java.lang.IllegalStateException: The Content of this Observable is already released. Subscribe earlier or tune the CouchbaseEnvironment#autoreleaseAfter() setting.
      at com.couchbase.client.core.utils.UnicastAutoReleaseSubject$OnSubscribeAction.call(UnicastAutoReleaseSubject.java:230)
      at com.couchbase.client.core.utils.UnicastAutoReleaseSubject$OnSubscribeAction.call(UnicastAutoReleaseSubject.java:202)
      at rx.Observable.unsafeSubscribe(Observable.java:8171)
      at rx.subjects.SerializedSubject$1.call(SerializedSubject.java:45)
      at rx.subjects.SerializedSubject$1.call(SerializedSubject.java:41)
      at rx.Observable.unsafeSubscribe(Observable.java:8171)
      at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:231)
      at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:140)
      at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
      at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:102)
      at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85)
      at rx.internal.operators.OperatorToObservableList$1.onCompleted(OperatorToObservableList.java:93)
      at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:609)
      at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:521)
      at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:813)
      at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
      at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:609)
      at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:521)
      at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:813)
      at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onCompleted(SubjectSubscriptionManager.java:230)
      at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:102)
      at com.couchbase.client.core.endpoint.AbstractGenericHandler.completeResponse(AbstractGenericHandler.java:348)
      at com.couchbase.client.core.endpoint.AbstractGenericHandler.access$000(AbstractGenericHandler.java:71)
      at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:366)
      at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)
      

      changing autoreleaseAfter() does not seem to help

      2016-03-31 02:31:24 INFO CouchbaseCore:121 - CouchbaseKafkaEnvironment: {sslEnabled=false, sslKeystoreFile='null', sslKeystorePassword='null', queryEnabled=false, queryPort=8093, bootstrapHttpEnabled=true, bootstrapCarrierEnabled=true, bootstrapHttpDirectPort=8091, bootstrapHttpSslPort=18091, bootstrapCarrierDirectPort=11210, bootstrapCarrierSslPort=11207, ioPoolSize=3, computationPoolSize=3, responseBufferSize=16384, requestBufferSize=16384, kvServiceEndpoints=1, viewServiceEndpoints=1, queryServiceEndpoints=1, searchServiceEndpoints=1, ioPool=NioEventLoopGroup, coreScheduler=CoreScheduler, eventBus=DefaultEventBus, packageNameAndVersion=couchbase-jvm-core/1.2.6 (git: 1.2.6), dcpEnabled=true, retryStrategy=BestEffort, maxRequestLifetime=75000, retryDelay=ExponentialDelay
      {growBy 1.0 MICROSECONDS, powers of 2; lower=100, upper=100000}
      , reconnectDelay=ExponentialDelay
      {growBy 1.0 MILLISECONDS, powers of 2; lower=32, upper=4096}
      , observeIntervalDelay=ExponentialDelay
      {growBy 1.0 MICROSECONDS, powers of 2; lower=10, upper=100000}
      , keepAliveInterval=30000, autoreleaseAfter=1500, bufferPoolingEnabled=true, tcpNodelayEnabled=true, mutationTokensEnabled=false, socketConnectTimeout=1000, dcpConnectionBufferSize=20971520, dcpConnectionBufferAckThreshold=0.2, dcpConnectionName=dcp/core-io, callbacksOnIoPool=false, kafkaKeySerializerClass=kafka.serializer.StringEncoder, kafkaFilterClass=com.ebay.Filter, kafkaValueSerializerClass=com.ebay., kafkaEventBufferSize=16384, kafkaTopic=mytopic, kafkaZookeeperAddress=10., couchbaseStateSerializerClass=com.ebay.p13n.cb2kafka.state.LocalFileStateSerializer, couchbaseStateSerializationThreshold=2, couchbaseBucket=mybucket, couchbaseNodes=.ebay.com}
      

      But everything works with another bucket with 594 items

      Attachments

        No reviews matched the request. Check your Options in the drop-down menu of this sections header.

        Activity

          People

            avsej Sergey Avseyev
            avsej Sergey Avseyev
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Gerrit Reviews

                There are no open Gerrit changes

                PagerDuty