Nayan Goswami
Nayan Goswami

Reputation: 81

kafka couchbase sink connector getting disconnected after dumping some records

I've installed confluent_3.3.0 and started zookeper, schema-registry and kafka broker . And downloaded couchbase connector from below link https://github.com/couchbase/kafka-connect-couchbase

Running sink connector using below command

./bin/connect-standalone etc/kafka/connect-standalone.properties /home/nayangiri/couch-connect-test/kafka-connect-couchbase/config/quickstart-couchbase-sink.properties

After running connector, I'm starting publishing JSON using kafka-python library.

The problem is, connector is getting disconnected without dumping all published messages with below error

[2017-11-07 20:12:39,815] WARN This transcoder (JsonBinaryTranscoder) does not support mutation tokens - this method is a stub and needs to be implemented on custom transcoders. (com.couchbase.client.java.transcoder.AbstractTranscoder:150)
[2017-11-07 20:12:44,821] WARN This transcoder (JsonBinaryTranscoder) does not support mutation tokens - this method is a stub and needs to be implemented on custom transcoders. (com.couchbase.client.java.transcoder.AbstractTranscoder:150)
[2017-11-07 20:12:44,821] WARN This transcoder (JsonBinaryTranscoder) does not support mutation tokens - this method is a stub and needs to be implemented on custom transcoders. (com.couchbase.client.java.transcoder.AbstractTranscoder:150)
[2017-11-07 20:12:44,823] ERROR Task test-couchbase-sink-1 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:455)
com.couchbase.client.java.error.CannotRetryException: maximum number of attempts reached after 5 retries
    at com.couchbase.client.java.util.retry.RetryWithDelayHandler.call(RetryWithDelayHandler.java:101)
    at com.couchbase.client.java.util.retry.RetryWithDelayHandler.call(RetryWithDelayHandler.java:42)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:69)
    at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:252)
    at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:323)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
    at rx.internal.operators.OnSubscribeRedo$3$1.onNext(OnSubscribeRedo.java:302)
    at rx.internal.operators.OnSubscribeRedo$3$1.onNext(OnSubscribeRedo.java:284)
    at rx.internal.operators.NotificationLite.accept(NotificationLite.java:135)
    at rx.subjects.SubjectSubscriptionManager$SubjectObserver.emitNext(SubjectSubscriptionManager.java:253)
    at rx.subjects.BehaviorSubject.onNext(BehaviorSubject.java:160)
    at rx.observers.SerializedObserver.onNext(SerializedObserver.java:91)
    at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)
    at rx.internal.operators.OnSubscribeRedo$2$1.onError(OnSubscribeRedo.java:237)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:266)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:818)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:579)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.onError(OperatorMerge.java:852)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onError(OnSubscribeMap.java:88)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:73)
    at rx.observers.Subscribers$5.onNext(Subscribers.java:235)
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101)
    at rx.internal.producers.SingleProducer.request(SingleProducer.java:65)
    at rx.Subscriber.setProducer(Subscriber.java:211)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102)
    at rx.Subscriber.setProducer(Subscriber.java:205)
    at rx.Subscriber.setProducer(Subscriber.java:205)
    at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:103)
    at com.couchbase.client.core.endpoint.AbstractGenericHandler.completeResponse(AbstractGenericHandler.java:390)
    at com.couchbase.client.core.endpoint.AbstractGenericHandler.access$000(AbstractGenericHandler.java:72)
    at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:408)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException
    at com.couchbase.connect.kafka.util.JsonBinaryTranscoder.newDocument(JsonBinaryTranscoder.java:40)
    at com.couchbase.connect.kafka.util.JsonBinaryTranscoder.newDocument(JsonBinaryTranscoder.java:30)
    at com.couchbase.client.java.transcoder.AbstractTranscoder.newDocument(AbstractTranscoder.java:133)
    at com.couchbase.client.java.CouchbaseAsyncBucket$16.call(CouchbaseAsyncBucket.java:568)
    at com.couchbase.client.java.CouchbaseAsyncBucket$16.call(CouchbaseAsyncBucket.java:560)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:69)
    ... 19 more
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.couchbase.client.core.message.kv.UpsertResponse.class
    at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:118)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:73)
    ... 19 more
[2017-11-07 20:12:44,830] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:456)
[2017-11-07 20:12:44,830] ERROR Task test-couchbase-sink-1 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:457)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[2017-11-07 20:12:44,831] **ERROR Task is being killed and will not recover until manually restarted** (org.apache.kafka.connect.runtime.WorkerTask:149)
[2017-11-07 20:12:44,836] INFO Closed bucket test (com.couchbase.client.core.config.ConfigurationProvider:115)
[2017-11-07 20:12:44,836] INFO Disconnected from Node 10.103.2.76/localhost (com.couchbase.client.core.node.Node:115)

[2017-11-07 20:12:44,839] INFO [null][KeyValueEndpoint]: Got notified from Channel as inactive, attempting reconnect. (com.couchbase.client.core.endpoint.Endpoint:115)

Thank you for Reading

Upvotes: 1

Views: 505

Answers (1)

dnault
dnault

Reputation: 8909

Thanks for raising this issue. This is a regression in version 3.2.0 of the connector. It is being tracked as KAFKAC-83.

The fix is included in version 3.2.1, scheduled for release on November 21, 2017 released on November 8, 2017.

In the meantime you may wish to temporarily downgrade to version 3.1.3, or build the connector from the latest source code.

PSA: The Couchbase forums have a dedicated section for discussion related to the Kafka connector.

Upvotes: 1

Related Questions