waitamin
waitamin

Reputation: 3

How to conver mqtt's payload data to kafka string type

Now I'm trying to connect from mqtt messages to kafka (actually spark streaming in kafka)

I used this connecter https://github.com/evokly/kafka-connect-mqtt

And Spark-2.1.0, Kafka - 0.10.1.1

Spark streaming output such like this

({"schema":{"type":"string","optional":false},"payload":"mqtt"},{"schema":{"type":"bytes","optional":false},"payload":"MTIzMTIz"})

and producer code

object mqttProducer {
def main(args: Array[String]) {
val brokerUrl = "tcp://ip"
val topic = "mqtt"
val msg = "123123"

var client: MqttClient = null

// Creating new persistence for mqtt client
val persistence = new MqttDefaultFilePersistence("/tmp")

try {
  // mqtt client with specific url and client id
  client = new MqttClient(brokerUrl, MqttClient.generateClientId, persistence)

  client.connect()

  val msgTopic = client.getTopic(topic)
  val message = new MqttMessage(msg.getBytes("utf-8"))

  while (true) {
    msgTopic.publish(message)
    println("Publishing Data, Topic : %s, Message : %s".format(msgTopic.getName, message))
    Thread.sleep(1000)
  }
}

catch {
  case e: MqttException => println("Exception Caught: " + e)
}

finally {
  client.disconnect()
}

and spark-streaming kafka consumer code

package hb.test1
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

object test2 {

  def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setAppName("app")
val ssc = new StreamingContext(sparkConf, Seconds(1))    


val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> 
  "servers ip",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("mqtt-kafka")
  val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

val testStream = stream.map(x => (x.key, x.value))


testStream.print()

ssc.start()
ssc.awaitTermination()
  }
}

How could I get String not byte? please help guys

Upvotes: 0

Views: 1893

Answers (1)

Hans Jespersen
Hans Jespersen

Reputation: 8335

That payload "MTIzMTIz" is the string "123123" just base64 encoded. If you want to just take the MQTT payload and send it to Kafka without base64 encoding you should use a ByteArrayConverter. In my config for the same mqtt connector I set the value converter like so:

"value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter"

The above ByteArrayConverter comes with the Confluent Enterprise distribution but there are other open source Kafka Connect ByteArrayConverters such as the one included with the qubole/streamx kafka-connect-s3 connector.

https://github.com/qubole/streamx/blob/8b9d43008d9901b8e6020404f137944ed97522e2/src/main/java/com/qubole/streamx/ByteArrayConverter.java

There is KIP-128 to add a standard ByteArrayConverter to the Kafka Connect framework

https://cwiki.apache.org/confluence/display/KAFKA/KIP-128%3A+Add+ByteArrayConverter+for+Kafka+Connect

UPDATE: Kafka 0.11 is now released and ships with a ByteArrayConverter. Configure "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter" and you should get the raw mqtt payload without Base64 encoding.

Upvotes: 3

Related Questions