Reputation: 3
Now I'm trying to connect from mqtt messages to kafka (actually spark streaming in kafka)
I used this connecter https://github.com/evokly/kafka-connect-mqtt
And Spark-2.1.0, Kafka - 0.10.1.1
Spark streaming output such like this
({"schema":{"type":"string","optional":false},"payload":"mqtt"},{"schema":{"type":"bytes","optional":false},"payload":"MTIzMTIz"})
and producer code
object mqttProducer {
def main(args: Array[String]) {
val brokerUrl = "tcp://ip"
val topic = "mqtt"
val msg = "123123"
var client: MqttClient = null
// Creating new persistence for mqtt client
val persistence = new MqttDefaultFilePersistence("/tmp")
try {
// mqtt client with specific url and client id
client = new MqttClient(brokerUrl, MqttClient.generateClientId, persistence)
client.connect()
val msgTopic = client.getTopic(topic)
val message = new MqttMessage(msg.getBytes("utf-8"))
while (true) {
msgTopic.publish(message)
println("Publishing Data, Topic : %s, Message : %s".format(msgTopic.getName, message))
Thread.sleep(1000)
}
}
catch {
case e: MqttException => println("Exception Caught: " + e)
}
finally {
client.disconnect()
}
and spark-streaming kafka consumer code
package hb.test1
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
object test2 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("app")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val kafkaParams = Map[String, Object](
"bootstrap.servers" ->
"servers ip",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("mqtt-kafka")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val testStream = stream.map(x => (x.key, x.value))
testStream.print()
ssc.start()
ssc.awaitTermination()
}
}
How could I get String not byte? please help guys
Upvotes: 0
Views: 1893
Reputation: 8335
That payload "MTIzMTIz" is the string "123123" just base64 encoded. If you want to just take the MQTT payload and send it to Kafka without base64 encoding you should use a ByteArrayConverter. In my config for the same mqtt connector I set the value converter like so:
"value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter"
The above ByteArrayConverter comes with the Confluent Enterprise distribution but there are other open source Kafka Connect ByteArrayConverters such as the one included with the qubole/streamx kafka-connect-s3 connector.
There is KIP-128 to add a standard ByteArrayConverter to the Kafka Connect framework
UPDATE: Kafka 0.11 is now released and ships with a ByteArrayConverter. Configure "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
and you should get the raw mqtt payload without Base64 encoding.
Upvotes: 3