Mitra
Mitra

Reputation: 41

Value split is not a member of (String, String)

I am trying to read data from Kafka and Storing into Cassandra tables through Spark RDD's.

Getting error while compiling the code:

/root/cassandra-count/src/main/scala/KafkaSparkCassandra.scala:69: value split is not a member of (String, String)

[error]     val lines = messages.flatMap(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
[error]                                               ^
[error] one error found

[error] (compile:compileIncremental) Compilation failed

Below code : when i run the code manually through interactive spark-shell it works fine, but while compiling code for spark-submit error comes.

// Create direct kafka stream with brokers and topics
val topicsSet = Set[String] (kafka_topic)
val kafkaParams = Map[String, String]("metadata.broker.list" -> kafka_broker)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet)

// Create the processing logic
// Get the lines, split
val lines = messages.map(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
lines.saveToCassandra("stream_poc", "US_city", SomeColumns("city_name", "jan_temp", "lat", "long")) 

Upvotes: 4

Views: 5220

Answers (2)

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149538

KafkaUtils.createDirectStream returns a tuple of key and value (since messages in Kafka are optionally keyed). In your case it's of type (String, String). If you want to split the value, you have to first take it out:

val lines = 
  messages
   .map(line => line._2.split(','))
   .map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))

Or using partial function syntax:

val lines = 
  messages
   .map { case (_, value) => value.split(',') }
   .map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))  

Upvotes: 2

maasg
maasg

Reputation: 37435

All messages in kafka are keyed. The original Kafka stream, in this case messages, is a stream of tuples (key,value).

And as the compile error points out, there's no split method on tuples.

What we want to do here is:

messages.map{ case (key, value)  => value.split(','))} ...

Upvotes: 3

Related Questions