McPeanutbutter
McPeanutbutter

Reputation: 81

Working with Protobuf-encoded MQTT streams in Apache Beam

I am trying to decode and process protobuf-encoded MQTT messages (from an Eclipse Mosquitto broker) using Apache Beam. In addition to the encoded fields, I also want to process the full topic of each message for grouping and aggregations, as well as the timestamp.

What I have tried so far

I can connect to Mosquitto via

val options = PipelineOptionsFactory.create()
val pipeline = Pipeline.create(options)

val mqttReader: MqttIO.Read = MqttIO
    .read()
    .withConnectionConfiguration(
        MqttIO.ConnectionConfiguration.create(
            "tcp://localhost:1884",
            "my/topic/+"
        )
    )

val readMessages = pipeline.apply<PCollection<ByteArray>>(mqttReader)

In order to decode the messages, I have compiled the .proto schema (in my case quote.proto containing the Quote message) via Gradle, which allows my to transform ByteArray into Quote objects via Quote.parseFrom():

val quotes = readMessages
    .apply(
        ParDo.of(object : DoFn<ByteArray, QuoteOuterClass.Quote>() {
            @ProcessElement
            fun processElement(context: ProcessContext) {
                val protoRow = context.element()
                context.output(QuoteOuterClass.Quote.parseFrom(protoRow))
            }
        })
    )

Using this, in the next apply, I can then access individual fields with a ProcessFunction and a lambda, e.g. { quote -> "${quote.volume}" }. However, there are two problems:

  1. With this pipeline I do not have access to the topic or timestamp of each message.
  2. After sending the decoded messages back to the broker with plain UTF8 encoding, I believe that they do not get decoded correctly.

Additional considerations

  1. Apache Beam provides a ProtoCoder class, but I cannot figure out how to use it in conjunction with MqttIO. I suspect that the implementation has to look similar to
    val coder = ProtoCoder
        .of(QuoteOuterClass.Quote::class.java)
        .withExtensionsFrom(QuoteOuterClass::class.java)
  1. Instead of a PCollection<ByteArray>, the Kafka IO reader provides a PCollection<KafkaRecord<Long, String>>, which has all the relevant fields (including topic). I am wondering if something similar can be achieved with Mqtt + ProtoBuf.

  2. A similar implementation to what I want to achieve can be done in Spark Structured Streaming + Apache Bahir as follows:

val df_mqttStream = spark.readStream
    .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    .option("topic", topic)
    .load(brokerUrl)

val parsePayload = ProtoSQL.udf { bytes: Array[Byte] => Quote.parseFrom(bytes) }

val quotesDS = df_mqttStream.select("id", "topic", "payload")
    .withColumn("quote", parsePayload($"payload"))
    .select("id", "topic", "quote.*")

However, with Spark 2.4 (the latest supported version), accessing the message topic is broken (related issue, my ticket in Apache Jira).

Upvotes: 1

Views: 895

Answers (1)

McPeanutbutter
McPeanutbutter

Reputation: 81

From my understanding, the latest version of Apache Beam (2.27.0) does simply not offer a way to extract the specific topics of MQTT messages.

I have extended the MqttIO to return MqttMessage objects that include a topic (and a timestamp) in addition to the byte array payload. The changes currently exist as a pull request draft.

With these changes, the topic can simply be accessed as message.topic.

val readMessages = pipeline.apply<PCollection<MqttMessage>>(mqttReader)

val topicOfMessages: PCollection<String> = mqttMessages
    .apply(
        ParDo.of(object : DoFn<MqttMessage, String>() {
            @ProcessElement
            fun processElement(
                @Element message: MqttMessage,
                out: OutputReceiver<String>
            ) { out.output(message.topic) }
        })
    )

Upvotes: 1

Related Questions