Ken Alton
Ken Alton

Reputation: 734

Flow time stamp through streaming functions

How/is it possible to generate a random number or obtain system time for each time a batch is run with Spark Streaming?

I have two functions which process a batch of messages: 1 - First processes the Key, creates a file (csv) and writes headers 2 - Second processes each of the messages and adds the data to the csv

I wish to store the files for each batch in separate folders:

/output/folderBatch1/file1.csv, file2.csv, etc.csv
/output/folderBatch2/file1.csv, file2.csv, etc.csv
/output/folderBatch3/file1.csv, file2.csv, etc.csv

How can I create a variable, even just a simple counter that Spark Streaming can use?

The code below gets the system time but because it's 'plain Java' it gets executed just once and is the same value on each run of the batch.

JavaPairInputDStream<String, byte[]> messages;
messages = KafkaUtils.createDirectStream(
        jssc,
        String.class,
        byte[].class,
        StringDecoder.class,
        DefaultDecoder.class,
        kafkaParams,
        topicsSet
);

/**
 * Declare what computation needs to be done
 */
JavaPairDStream<String, Iterable<byte[]>> groupedMessages = messages.groupByKey();

String time = Long.toString(System.currentTimeMillis());        //this is only ever run once and is the same value for each batch!

groupedMessages.map(new WriteHeaders(time)).print();

groupedMessages.map(new ProcessMessages(time)).print();

Thank you, KA.

Upvotes: 2

Views: 67

Answers (1)

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149518

You can add the timestamp via an additional map call to and flow it along. This means that instead of a value of type Iterable<byte[]>, you'll have a value of Tuple2<Long, Iterable<byte[]>):

JavaDStream<Tuple2<String, Tuple2<Long, Iterable<byte[]>>>> groupedWithTimeStamp = 
  groupedMessages
    .map((Function<Tuple2<String, Iterable<byte[]>>, 
      Tuple2<String, Tuple2<Long, Iterable<byte[]>>>>) kvp -> 
        new Tuple2<>(kvp._1, new Tuple2<>(System.currentTimeMillis(), kvp._2)));

And now you have the timestamp with you in each map going on from now forward, and you can access it via:

groupedWithTimeStamp.map(value -> value._2._1); // This will access the timestamp.

Upvotes: 1

Related Questions