mike w
mike w

Reputation: 141

SnappyData - Error creating Kafka streaming table

I'm seeing an issue when creating a spark streaming table using kafka from the snappy shell.

'The exception 'Invalid input 'C', expected dmlOperation, insert, withIdentifier, select or put (line 1, column 1):'

Reference: http://snappydatainc.github.io/snappydata/streamingWithSQL/#spark-streaming-overview

Here is my sql:

CREATE STREAM TABLE if not exists sensor_data_stream 
(sensor_id string, metric string)
using kafka_stream 
options (
    storagelevel 'MEMORY_AND_DISK_SER_2',
    rowConverter 'io.snappydata.app.streaming.KafkaStreamToRowsConverter',
    zkQuorum 'localhost:2181',
    groupId 'streamConsumer',
    topics 'test:01');

The shell seems to not like the script at the first character 'C'. I'm attempting to execute the script using the following command:

snappy> run '/scripts/my_test_sensor_script.sql';

any help appreciated!

Upvotes: 3

Views: 259

Answers (2)

Yogesh Mahajan
Yogesh Mahajan

Reputation: 241

Mike, You need to create your own rowConverter class by implementing following trait -

trait StreamToRowsConverter extends Serializable {
  def toRows(message: Any): Seq[Row]
}

and then specify that rowConverter fully qualified class name in the DDL. The rowConverter is specific to a schema. 'io.snappydata.app.streaming.KafkaStreamToRowsConverter' is just an placeholder class name, which should be replaced by your own rowConverter class.

Upvotes: 0

Sachin Janani
Sachin Janani

Reputation: 1319

There is some inconsistency in documentation and actual syntax.The correct syntax is:

CREATE STREAM TABLE sensor_data_stream if not exists (sensor_id string, 
metric string) using kafka_stream 
options (storagelevel 'MEMORY_AND_DISK_SER_2', 
rowConverter 'io.snappydata.app.streaming.KafkaStreamToRowsConverter', 
zkQuorum 'localhost:2181',
 groupId 'streamConsumer',  topics 'test:01');

One more thing you need to do is to write row converter for your data

Upvotes: 3

Related Questions