Yeshwant KAKAD
Yeshwant KAKAD

Reputation: 299

NiFi: best flow to insert data from Kafka to Cassandra?

I spent 2 days on my research and now I need you guys help. Thank you in advance.

I have following flow: 1) ConsumeKafka (messages are in JSON format) 2) EvaluateJsonPath 3) UpdateAttributes 4) AttributesToJson

All above flow is working but following rest of flow is not working: 5) PutCassandraRecord (I need help on how to configure this processor. I know my Cassandra server, port, keyspace, table name, record reader is JsonPathReader). What else??? 6) added controller service - JsonPathReader (here I need help as to how this record reader must be configured). 7) I am getting exception as attached file below. Where and how to I get or configure Schema Registry? enter image description here

I checked this question and answer: Apache Nifi/Cassandra - how to load CSV into Cassandra table

Guys if my flow is wrong please correct me. Thanks.

regards, Yeshwant

Upvotes: 0

Views: 741

Answers (1)

notNull
notNull

Reputation: 31510

There are multiple ways we can configure Record Reader/writer controller service

i will try to explain the below two Schema Access Strategy

  • Use 'Schema Name' Property
  • Use 'Schema Text' Property

Use SchemaText property:

In this access strategy processor will look for avro.schema attribute in VariableRegistry/FlowfileAttributes (or) we can give schema in the property value.

Example:

I have given schema text property value as my avro schema enter image description here

Use 'Schema Name' Property:

enter image description here

In this strategy processor checks Schema Name property value ${valor.vaengine} (it's an attribute name) so we need to have value for this attribute associated with the flowfile.

Then controller service uses ${valor.vaengine} value uses appropriate schema from AvroSchemaRegistry that has been used by this controller service.

In your case your flowfile not having ${valor.vaengine} attribute, to add this attribute to the flowfile use UpdateAttribute processor add new property as

valor.vaengine

<schema_name_in_avroschemaregistry>

Use this template for more details regards to configuring/usage of Record Reader/writer Controller services


You are using JsonPathReader controller service for this controller service

we need to add atleast one user defined properties to enable the controller service, like property name as id value as $.id

Upvotes: 1

Related Questions