Reputation: 81
I am trying to decode and process protobuf-encoded MQTT messages (from an Eclipse Mosquitto broker) using Apache Beam. In addition to the encoded fields, I also want to process the full topic of each message for grouping and aggregations, as well as the timestamp.
I can connect to Mosquitto via
val options = PipelineOptionsFactory.create()
val pipeline = Pipeline.create(options)
val mqttReader: MqttIO.Read = MqttIO
.read()
.withConnectionConfiguration(
MqttIO.ConnectionConfiguration.create(
"tcp://localhost:1884",
"my/topic/+"
)
)
val readMessages = pipeline.apply<PCollection<ByteArray>>(mqttReader)
In order to decode the messages, I have compiled the .proto
schema (in my case quote.proto
containing the Quote
message) via Gradle, which allows my to transform ByteArray
into Quote
objects via Quote.parseFrom()
:
val quotes = readMessages
.apply(
ParDo.of(object : DoFn<ByteArray, QuoteOuterClass.Quote>() {
@ProcessElement
fun processElement(context: ProcessContext) {
val protoRow = context.element()
context.output(QuoteOuterClass.Quote.parseFrom(protoRow))
}
})
)
Using this, in the next apply
, I can then access individual fields with a ProcessFunction and a lambda, e.g. { quote -> "${quote.volume}" }
. However, there are two problems:
ProtoCoder
class, but I cannot figure out how to use it in conjunction with MqttIO. I suspect that the implementation has to look similar to val coder = ProtoCoder
.of(QuoteOuterClass.Quote::class.java)
.withExtensionsFrom(QuoteOuterClass::class.java)
Instead of a PCollection<ByteArray>
, the Kafka IO reader provides a PCollection<KafkaRecord<Long, String>>
, which has all the relevant fields (including topic). I am wondering if something similar can be achieved with Mqtt + ProtoBuf.
A similar implementation to what I want to achieve can be done in Spark Structured Streaming + Apache Bahir as follows:
val df_mqttStream = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", topic)
.load(brokerUrl)
val parsePayload = ProtoSQL.udf { bytes: Array[Byte] => Quote.parseFrom(bytes) }
val quotesDS = df_mqttStream.select("id", "topic", "payload")
.withColumn("quote", parsePayload($"payload"))
.select("id", "topic", "quote.*")
However, with Spark 2.4 (the latest supported version), accessing the message topic is broken (related issue, my ticket in Apache Jira).
Upvotes: 1
Views: 895
Reputation: 81
From my understanding, the latest version of Apache Beam (2.27.0) does simply not offer a way to extract the specific topics of MQTT messages.
I have extended the MqttIO to return MqttMessage
objects that include a topic
(and a timestamp
) in addition to the byte array payload
. The changes currently exist as a pull request draft.
With these changes, the topic can simply be accessed as message.topic
.
val readMessages = pipeline.apply<PCollection<MqttMessage>>(mqttReader)
val topicOfMessages: PCollection<String> = mqttMessages
.apply(
ParDo.of(object : DoFn<MqttMessage, String>() {
@ProcessElement
fun processElement(
@Element message: MqttMessage,
out: OutputReceiver<String>
) { out.output(message.topic) }
})
)
Upvotes: 1