Reputation: 3294
I have data stored into a kafka topic and i want to move to cassandra using the CassandraSinkConnector
from landoop.
I am having this error while i am trying to start the connector:
Caused by: java.lang.IllegalArgumentException: A KCQL error occurred.FIELD_ID is not a valid field name
at com.datamountaineer.streamreactor.connect.converters.Transform$.raiseException$1(Transform.scala:40)
at com.datamountaineer.streamreactor.connect.converters.Transform$.apply(Transform.scala:83)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$com$datamountaineer$streamreactor$connect$cassandra$sink$CassandraJsonWriter$$insert$1.apply(CassandraJsonWriter.scala:182)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$com$datamountaineer$streamreactor$connect$cassandra$sink$CassandraJsonWriter$$insert$1.apply(CassandraJsonWriter.scala:181)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
Seems the schema has something that schema registry validation doesn`t like, but i dont get what is the problem. This is my schema (this is autogenerated from Attunity Replicate):
{
"type": "record",
"name": "DataRecord",
"namespace": "com.attunity.queue.msg.test 1 dlx express.DCSDBA.PURGE_SETUP",
"fields": [
{
"name": "data",
"type": {
"type": "record",
"name": "Data",
"fields": [
{
"name": "FIELD_ID",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "SERVER_INSTANCE",
"type": [
"null",
"int"
],
"default": null
}
]
}
},
{
"name": "beforeData",
"type": [
"null",
"Data"
],
"default": null
},
{
"name": "headers",
"type": {
"type": "record",
"name": "Headers",
"namespace": "com.attunity.queue.msg",
"fields": [
{
"name": "operation",
"type": {
"type": "enum",
"name": "operation",
"symbols": [
"INSERT",
"UPDATE",
"DELETE",
"REFRESH"
]
}
},
{
"name": "changeSequence",
"type": "string"
},
{
"name": "timestamp",
"type": "string"
},
{
"name": "streamPosition",
"type": "string"
},
{
"name": "transactionId",
"type": "string"
},
{
"name": "changeMask",
"type": [
"null",
"bytes"
],
"default": null
},
{
"name": "columnMask",
"type": [
"null",
"bytes"
],
"default": null
},
{
"name": "transactionEventCounter",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "transactionLastEvent",
"type": [
"null",
"boolean"
],
"default": null
}
]
}
}
]
}
This is my sink configuration:
connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
connect.cassandra.key.space=eucsarch
topics=DCSDBA.PURGE_SETUP1
tasks.max=1
connect.cassandra.mapping.collection.to.json=false
connect.cassandra.kcql=INSERT INTO purge_setup SELECT data.* FROM DCSDBA.PURGE_SETUP1
connect.cassandra.password=pass
connect.cassandra.username=user
value.converter.schema.registry.url=http://schema-registry:8081
connect.cassandra.contact.points=cassandra.local
connect.cassandra.port=9042
value.converter=io.confluent.connect.avro.AvroConverter
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
Upvotes: 1
Views: 480
Reputation: 39840
I had a couple of similar issues with Kafka Connect and Attunity Replicate. Although I'd have to take a look at your original data stream, the following SMT did the trick for me:
"transforms":"ExtractField",
"transforms.ExtractField.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractField.field":"data"
Upvotes: 1