Frank Rojas
Frank Rojas

Reputation: 17

Parsing data from Kafka in Apache Flink

I am just getting started on Apache Flink (Scala API), my issue is following: I am trying to stream data from Kafka into Apache Flink based on one example from the Flink site:

val stream =
  env.addSource(new FlinkKafkaConsumer09("testing", new SimpleStringSchema() , properties))

Everything works correctly, the stream.print() statement displays the following on the screen:

2018-05-16 10:22:44 AM|1|11|-71.16|40.27

I would like to use a case class in order to load the data, I've tried using

flatMap(p=>p.split("|")) 

but it's only splitting the data one character at a time.

Basically the expected results is to be able to populate 5 fields of the case class as follows

  field(0)=2018-05-16 10:22:44 AM
  field(1)=1
  field(2)=11
  field(3)=-71.16 
  field(4)=40.27

but it's now doing:

   field(0) = 2
   field(1) = 0
   field(3) = 1
   field(4) = 8 

etc...

Any advice would be greatly appreciated.

Thank you in advance

Frank

Upvotes: 0

Views: 662

Answers (1)

Till Rohrmann
Till Rohrmann

Reputation: 13346

The problem is the usage of String.split. If you call it with a String, then the method expects it to be a regular expression. Thus, p.split("\\|") would be the correct regular expression for your input data. Alternatively, you can also call the split variant where you specify the separating character p.split('|'). Both solutions should give you the desired result.

Upvotes: 3

Related Questions