Description
Using the following simple scala code I am able to produce a ConcurrentModificationException pretty consistently when connecting to a multi-node cluster with many buckets:
import com.couchbase.client.java._
|
|
object InitTest extends App{
|
import collection.JavaConversions._
|
|
val bucketNames = List("cache","session","clicks","campaignevent","datacenter","shorturl","metering")
|
val serverUris = List("build04", "build02", "build01")
|
val password = "foo"
|
val cluster = CouchbaseAsyncCluster.create(serverUris)
|
|
bucketNames foreach(cluster.openBucket(_, password))
|
|
Thread.sleep(30000)
|
}
|
The exception stack trace is as follows:
rx.exceptions.OnErrorNotImplementedException
|
at rx.Observable$30.onError(Observable.java:7092)
|
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:154)
|
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:111)
|
at rx.internal.operators.OperatorMap$1.onError(OperatorMap.java:49)
|
at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:70)
|
at rx.internal.operators.OperatorSingle$1.onError(OperatorSingle.java:90)
|
at rx.internal.operators.OperatorTakeLast$1.onError(OperatorTakeLast.java:65)
|
at rx.internal.operators.OperatorMerge$MergeSubscriber.innerError(OperatorMerge.java:429)
|
at rx.internal.operators.OperatorMerge$MergeSubscriber.onError(OperatorMerge.java:404)
|
at rx.internal.operators.OperatorMap$1.onError(OperatorMap.java:49)
|
at rx.internal.operators.OperatorMerge$MergeSubscriber.innerError(OperatorMerge.java:429)
|
at rx.internal.operators.OperatorMerge$MergeSubscriber.access$800(OperatorMerge.java:93)
|
at rx.internal.operators.OperatorMerge$InnerSubscriber.onError(OperatorMerge.java:552)
|
at rx.Observable.unsafeSubscribe(Observable.java:7311)
|
at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMerge.java:188)
|
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:158)
|
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:93)
|
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
|
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:41)
|
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:30)
|
at rx.Observable$1.call(Observable.java:145)
|
at rx.Observable$1.call(Observable.java:137)
|
at rx.Observable$1.call(Observable.java:145)
|
at rx.Observable$1.call(Observable.java:137)
|
at rx.Observable$1.call(Observable.java:145)
|
at rx.Observable$1.call(Observable.java:137)
|
at rx.Observable$1.call(Observable.java:145)
|
at rx.Observable$1.call(Observable.java:137)
|
at rx.Observable$1.call(Observable.java:145)
|
at rx.Observable$1.call(Observable.java:137)
|
at rx.Observable$1.call(Observable.java:145)
|
at rx.Observable$1.call(Observable.java:137)
|
at rx.Observable$1.call(Observable.java:145)
|
at rx.Observable$1.call(Observable.java:137)
|
at rx.Observable$1.call(Observable.java:145)
|
at rx.Observable$1.call(Observable.java:137)
|
at rx.Observable.subscribe(Observable.java:7393)
|
at rx.Observable.subscribe(Observable.java:7083)
|
at com.couchbase.client.core.RequestHandler$1.call(RequestHandler.java:157)
|
at com.couchbase.client.core.RequestHandler$1.call(RequestHandler.java:151)
|
at rx.Observable$31.onNext(Observable.java:7139)
|
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:130)
|
at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
|
at rx.subjects.PublishSubject.onNext(PublishSubject.java:121)
|
at com.couchbase.client.core.config.DefaultConfigurationProvider.upsertBucketConfig(DefaultConfigurationProvider.java:386)
|
at com.couchbase.client.core.config.DefaultConfigurationProvider.access$000(DefaultConfigurationProvider.java:91)
|
at com.couchbase.client.core.config.DefaultConfigurationProvider$7.call(DefaultConfigurationProvider.java:257)
|
at com.couchbase.client.core.config.DefaultConfigurationProvider$7.call(DefaultConfigurationProvider.java:254)
|
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
|
at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:84)
|
at rx.internal.operators.OperatorOnErrorResumeNextViaObservable$1.onNext(OperatorOnErrorResumeNextViaObservable.java:64)
|
at rx.internal.operators.OperatorTake$1.onNext(OperatorTake.java:67)
|
at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:635)
|
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
|
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
|
at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:635)
|
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
|
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
|
at com.couchbase.client.core.utils.Buffers$2$1.onNext(Buffers.java:90)
|
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:130)
|
at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
|
at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101)
|
at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:199)
|
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
|
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
|
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
|
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
|
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
|
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
|
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
|
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
|
at java.lang.Thread.run(Thread.java:722)
|
Caused by: java.util.ConcurrentModificationException
|
at java.util.HashMap$HashIterator.nextEntry(HashMap.java:894)
|
at java.util.HashMap$ValueIterator.next(HashMap.java:922)
|
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:96)
|
at rx.Subscriber.setProducer(Subscriber.java:139)
|
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47)
|
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
|
at rx.Observable.unsafeSubscribe(Observable.java:7304)
|
... 58 more
|