Reputation: 1383
I am new to Spark Structured Streaming processing and currently working on one use case where the structured streaming application will get the events from Azure IoT Hub-Event hub (say after every 20 secs).
The task is to consume those events and process it in real time manner. For this I have written below Spark Structured streaming program in Spark-Java.
Below are the important points
Questions:
...
public class EventSubscriber {
public static void main(String args[]) throws InterruptedException, StreamingQueryException {
String eventHubCompatibleEndpoint = "<My-EVENT HUB END POINT CONNECTION STRING>";
String connString = new ConnectionStringBuilder(eventHubCompatibleEndpoint).build();
EventHubsConf eventHubsConf = new EventHubsConf(connString).setConsumerGroup("$Default")
.setStartingPosition(EventPosition.fromEndOfStream()).setMaxRatePerPartition(100)
.setReceiverTimeout(java.time.Duration.ofMinutes(10));
SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("IoT Spark Streaming");
SparkSession spSession = SparkSession.builder()
.appName("IoT Spark Streaming")
.config(sparkConf).config("spark.sql.streaming.checkpointLocation", "<MY-CHECKPOINT-LOCATION>")
.getOrCreate();
Dataset<Row> inputStreamDF = spSession.readStream()
.format("eventhubs")
.options(eventHubsConf.toMap())
.load();
Dataset<Row> bodyRow = inputStreamDF.withColumn("body", new Column("body").cast(DataTypes.StringType)).select("body");
StructType jsonStruct = new StructType()
.add("eventType", DataTypes.StringType)
.add("payload", DataTypes.StringType);
Dataset<Row> messageRow = bodyRow.map((MapFunction<Row, Row>) value -> {
String valStr = value.getString(0).toString();
String payload = valStr;
Gson gson = new GsonBuilder().serializeNulls().setPrettyPrinting().create();
JsonObject jsonObj = gson.fromJson(valStr, JsonObject.class);
JsonElement methodName = jsonObj.get("method");
String eventType = null;
if(methodName != null) {
eventType = "OTHER_EVENT";
} else {
eventType = "DEVICE_EVENT";
}
Row jsonRow = RowFactory.create(eventType, payload);
return jsonRow;
}, RowEncoder.apply(jsonStruct));
messageRow.printSchema();
Dataset<Row> deviceEventRowDS = messageRow.filter("eventType = 'DEVICE_EVENT'");
deviceEventRowDS.printSchema();
Dataset<DeviceEvent> deviceEventDS = deviceEventRowDS.map((MapFunction<Row, DeviceEvent>) value -> {
String jsonString = value.getString(1).toString();
Gson gson = new GsonBuilder().serializeNulls().setPrettyPrinting().create();
DeviceMessage deviceMessage = gson.fromJson(jsonString, DeviceMessage.class);
DeviceEvent deviceEvent = deviceMessage.getDeviceEvent();
return deviceEvent;
}, Encoders.bean(DeviceEvent.class));
deviceEventDS.printSchema();
Dataset<Row> messageDataset = deviceEventDS.select(
functions.col("eventType"),
functions.col("deviceID"),
functions.col("description"),
functions.to_timestamp(functions.col("eventDate"), "yyyy-MM-dd hh:mm:ss").as("eventDate"),
functions.col("deviceModel"),
functions.col("pingRate"))
.select("eventType", "deviceID", "description", "eventDate", "deviceModel", "pingRate");
messageDataset.printSchema();
Dataset<Row> devWindowDataset = messageDataset.withWatermark("eventDate", "10 minutes")
.groupBy(functions.col("deviceID"),
functions.window(
functions.col("eventDate"), "10 minutes", "5 minutes"))
.count();
devWindowDataset.printSchema();
StreamingQuery query = devWindowDataset.writeStream().outputMode("append")
.format("parquet")
.option("truncate", "false")
.option("path", "<MY-PARQUET-FILE-OUTPUT-LOCATION>")
.start();
query.awaitTermination();
}}
...
Any help or direction regarding to this will be useful.
Thanks and Regards,
Avinash Deshmukh
Upvotes: 1
Views: 908
Reputation: 74619
Is it possible to store multiple events in parquet format in a file based on the window time?
Yes.
How does the window operation works in this case?
The following code is the main part of the Spark Structured Streaming application:
Dataset<Row> devWindowDataset = messageDataset
.withWatermark("eventDate", "10 minutes")
.groupBy(
functions.col("deviceID"),
functions.window(functions.col("eventDate"), "10 minutes", "5 minutes"))
.count();
That says that the underlying state store(s) should keep state per deviceID
and eventDate
for 10 minutes and the extra 10 minutes
(per withWatermark
) for late events. In other words, you should see results coming out once an event has eventDate
20 minutes past the start of the streaming query.
withWatermark
is for late events so even when the groupBy
would produce a result, the result won't get emitted only until the watermark threshold is crossed.
And the same procedure is applied every 10 minutes (+ 10 minutes of watermark) with a 5-minute window slide.
Think of the groupBy
with window
operator as a multi-column aggregation.
Also I would like to check the event state with previous event and based on some calculations (say event is not received by 5 minutes) I want to update the state.
That sounds like a use case for KeyValueGroupedDataset.flatMapGroupsWithState operator (aka Arbitrary Stateful Streaming Aggregation). Consult Arbitrary Stateful Operations.
It could also be possible that you want simply one of the many aggregation standard functions or a user-defined aggregation function (UDAF).
Upvotes: 2