Reputation: 980
I'm trying to write a dataframe to kafka in JSON format and add a key to the data frame in Scala, i'm currently working with this sample from the kafka-spark:
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
Is there a to_json
method that can be used (instead of the json(path)
option which I believe writes out to a file in JSON format) and is there a key
option that can be used to replace the null
value with an actual key.
Upvotes: 2
Views: 4701
Reputation: 3283
This is a minimal example in scala. Let's say you have a dataframe df
with columns x
and y
. Here's a minimal example:
val dataDS = df
.select(
$"x".cast(StringType),
$"y".cast(StringType)
)
.toJSON
.withColumn("key", lit("keyname"))
dataDS
.write
.format("kafka")
.option("kafka.bootstrap.servers", "servername:port")
.option("topic", "topicname")
.save()
Remember you need the spark-sql-kafka
library: e.g. for spark-shell
is loaded with
spark-shell --packages "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1"
Upvotes: 2
Reputation: 39501
You can use toJSON()
method on dataframe to convert your record to json message.
df = spark.createDataFrame([('user_first_name','user_last_nmae',100)], ['first_name','last_name','ID'])
import json
from datetime import datetime
from pyspark.sql.functions import lit
json_df = json.loads(df.withColumn('date_as_key', lit(datetime.now().date())).toJSON().first())
print json_df
{u'date_as_key': u'2019-07-29', u'first_name': u'user_first_name', u'last_name': u'user_last_nmae', u'ID': 100}
Your code may be like this
from pyspark.sql.functions import lit
df.withColumn('key', lit(datetime.now())).toJSON()
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
Scala:
import org.apache.spark.sql.Column;
someDF.withColumn("key",lit("name")).show() // replace "name" with your variable
someDF.withColumn("key",lit("name")).toJSON.first() // toJSON is available as variable on dataframe in Scala
someDF.withColumn("key",lit("name")).toJSON.first()
res5: String = {"number":8,"word":"bat","key":"name"}
Upvotes: -1
Reputation: 18495
You can make use of the to_json
SQL function to convert your columns into a JSON.
See Scala code below which is also making use of the Spark SQL built-in function struct
in Spark version 2.4.5. Just make sure that your are naming your columns as key
and value
or applying corresponding aliases in your selectExpr
.
import org.apache.spark.sql.functions.{col, struct, to_json}
import org.apache.spark.sql.SparkSession
object Main extends App {
val spark = SparkSession.builder()
.appName("myAppName")
.master("local[*]")
.getOrCreate()
// create DataFrame
import spark.implicits._
val df = Seq((3, "Alice"), (5, "Bob")).toDF("age", "name")
df.show(false)
// convert columns into json string
val df2 = df.select(col("name"),to_json(struct($"*"))).toDF("key", "value")
df2.show(false)
// +-----+------------------------+
// |key |value |
// +-----+------------------------+
// |Alice|{"age":3,"name":"Alice"}|
// |Bob |{"age":5,"name":"Bob"} |
// +-----+------------------------+
// write to Kafka with jsonString as value
df2.selectExpr("key", "value")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test-topic")
.save()
}
This will return the following data into your Kafka topic:
kafka-console-consumer --bootstrap-server localhost:9092 --property print.key=true --property print.value=true --topic test-topic
Alice {"age":3,"name":"Alice"}
Bob {"age":5,"name":"Bob"}
Upvotes: 2