Reputation: 2158
I am using kafka_2.11-0.10.2.1
and the pubsub connector provided by google here. All I care to do is push data from a Kafka Topic to a PubSub one using a standalone connector. I followed all steps as I should have:
cps-kafka-connector.jar
cps-sink-connector.properties
file in kafka's config
directory. The file looks like this:name=CPSConnector connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector tasks.max=10 topics=kafka_topic cps.topic=pubsub_topic cps.project=my_gcp_project_12345
connect-standalone.properties
:key.converter.schemas.enable=false value.converter.schemas.enable=false
kafka_topic
and sent some messages as follows:$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_topic $ hello streams $ kafka streams rock
$ bin/connect-standalone.sh config/connect-standalone.properties config/cps-sink-connector.properties
and the intention is to the run:
$ gcloud beta pubsub subscriptions pull subscription_to_pubsub_topic
to collect those messages. However, the following errors occur and I cannot get my head around them. Any thoughts? Am I using the wrong input? What would be the correct sample input?
[2017-05-04 17:34:40,898] INFO Discovered coordinator 10.33.19.146:9092 (id: 2147483647 rack: null) for group connect-CPSConnector. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:586)
[2017-05-04 17:34:40,899] INFO Revoking previously assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:397)
[2017-05-04 17:34:40,900] INFO (Re-)joining group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:420)
[2017-05-04 17:34:40,936] ERROR Task CPSConnector-4 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:401)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
at [Source: [B@3c06c37d; line: 1, column: 11]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
at [Source: [B@3c06c37d; line: 1, column: 11]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2404)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:303)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:401)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-05-04 17:34:40,941] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:142)
[2017-05-04 17:34:43,837] INFO Revoking previously assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:397)
[2017-05-04 17:34:43,838] INFO (Re-)joining group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:420)
[2017-05-04 17:34:43,846] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,846] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,847] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,847] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,848] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,850] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,851] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,851] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,851] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,853] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,846] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,851] INFO Setting newly assigned partitions [test8-0] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,851] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,851] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,850] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,856] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,846] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,854] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,862] ERROR Task CPSConnector-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:401)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
at [Source: [B@32a6e3e6; line: 1, column: 11]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
at [Source: [B@32a6e3e6; line: 1, column: 11]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524)
...
Upvotes: 3
Views: 2642
Reputation: 17261
These lines in connect-standalone.properties
do not disable converters:
key.converter.schemas.enable=false
value.converter.schemas.enable=false
They disable including a schema with certain converters such as the JSON converter. The lines you are interested in are:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
The key.converter
and value.converter
fields indicate the format of the data in the Kafka messages' key and value, respectively. Since you are publishing messages that are not valid JSON, you are seeing this error. You will want to set these converters to the StringConverter:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
Upvotes: 3