Uploaded image for project: 'Couchbase Kafka Connector'
  1. Couchbase Kafka Connector
  2. KAFKAC-62

NPE when loading data from a topic

    XMLWordPrintable

Details

    • Bug
    • Resolution: Cannot Reproduce
    • Major
    • None
    • 3.1.0
    • None

    Description

      When using Kafka Streams APIs, for example:

              KStreamBuilder builder = new KStreamBuilder();
              KStream<String, String> source = builder.stream(TOPIC_NAME_INPUT);
              KStream<String, String> result = source.mapValues(jsonString -> anonymiseJSON(jsonString));
              result.to(TOPIC_NAME_OUTPUT);
       
      And then using the Couchbase Sink connector to load the data from the TOPIC_NAME_OUTPUT, this is the stacktrace I get:

      [2017-02-01 14:14:30,111] ERROR Task anonymised-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:404)
      com.couchbase.client.java.error.CannotRetryException: maximum number of attempts reached after 5 retries
          at com.couchbase.client.java.util.retry.RetryWithDelayHandler.call(RetryWithDelayHandler.java:101)
          at com.couchbase.client.java.util.retry.RetryWithDelayHandler.call(RetryWithDelayHandler.java:42)
          at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:69)
          at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:252)
          at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:323)
          at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
          at rx.internal.operators.OnSubscribeRedo$3$1.onNext(OnSubscribeRedo.java:302)
          at rx.internal.operators.OnSubscribeRedo$3$1.onNext(OnSubscribeRedo.java:284)
          at rx.internal.operators.NotificationLite.accept(NotificationLite.java:152)
          at rx.subjects.SubjectSubscriptionManager$SubjectObserver.emitNext(SubjectSubscriptionManager.java:255)
          at rx.subjects.BehaviorSubject.onNext(BehaviorSubject.java:162)
          at rx.internal.operators.OnSubscribeRedo$2$1.onError(OnSubscribeRedo.java:237)
          at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:268)
          at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:824)
          at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:585)
          at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:574)
          at rx.internal.operators.OperatorMerge$MergeSubscriber.onError(OperatorMerge.java:278)
          at rx.internal.operators.OnSubscribeMap$MapSubscriber.onError(OnSubscribeMap.java:88)
          at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:73)
          at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.fastpath(OnSubscribeFromIterable.java:173)
          at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:86)
          at rx.Subscriber.setProducer(Subscriber.java:211)
          at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102)
          at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:63)
          at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:34)
          at rx.Observable.unsafeSubscribe(Observable.java:9861)
          at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
          at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
          at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
          at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
          at rx.Observable.unsafeSubscribe(Observable.java:9861)
          at rx.internal.operators.OnSubscribeRedo$2.call(OnSubscribeRedo.java:273)
          at rx.internal.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue(TrampolineScheduler.java:73)
          at rx.internal.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule(TrampolineScheduler.java:52)
          at rx.internal.operators.OnSubscribeRedo$4$1.onNext(OnSubscribeRedo.java:336)
          at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:399)
          at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:357)
          at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:852)
          at rx.internal.operators.OnSubscribeTimerOnce$1.call(OnSubscribeTimerOnce.java:49)
          at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:189)
          at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
          at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
          at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.NullPointerException
          at com.couchbase.connect.kafka.CouchbaseSinkTask.convert(CouchbaseSinkTask.java:126)
          at com.couchbase.connect.kafka.CouchbaseSinkTask.access$000(CouchbaseSinkTask.java:50)
          at com.couchbase.connect.kafka.CouchbaseSinkTask$1.call(CouchbaseSinkTask.java:107)
          at com.couchbase.connect.kafka.CouchbaseSinkTask$1.call(CouchbaseSinkTask.java:104)
          at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:69)
          ... 29 more
      Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: org.apache.kafka.connect.sink.SinkRecord.class
          at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:109)
          at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:73)
          ... 29 more
      [2017-02-01 14:14:30,116] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:405)
      [2017-02-01 14:14:30,116] INFO WorkerSinkTask{id=anonymised-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
      [2017-02-01 14:14:30,120] ERROR Task anonymised-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
      org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
          at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:406)
          at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:240)
          at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
          at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
          at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
          at java.lang.Thread.run(Thread.java:745)
      [2017-02-01 14:14:30,130] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)

      Attachments

        No reviews matched the request. Check your Options in the drop-down menu of this sections header.

        Activity

          People

            david.nault David Nault
            perry Perry Krug
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Gerrit Reviews

                There are no open Gerrit changes

                PagerDuty