Reputation: 21
I have a spark structured streaming scala job which reads json messages from the kafka and writes the data to the S3. I have a confluent schema registry configured and the schema is in json format with type=object. Now I am able to retrieve the schema from the registry but I need to use this schema on the dataframe containing records from kafka.
val restService = new RestService(schemaRegistryURL)
val valueRestResponseSchema = restService.getLatestVersion(schemaName) // return type is io.confluent.kafka.schemaregistry.client.rest.entities.Schema
Now I want to use valueRestResponseSchema to the below code. How do I convert the valueRestResponseSchema to structtype to be able to apply in from_json? val values: DataFrame = df.selectExpr("CAST(value AS STRING) as data").select(from_json(col("data"), valueRestResponseSchema).as("data"))
Is there any Json schema converters available to use? Something similar to below post but for json. Integrating Spark Structured Streaming with the Confluent Schema Registry
Upvotes: 0
Views: 465
Reputation: 191983
convert the valueRestResponseSchema to structtype
You can get the raw json schema from that object, but you'll need to manually convert into a Spark Struct, if you cannot find any json schema SparkSQL libraries on your own, since Spark doesn't offer that, like it does for Avro.
The Schema isn't required, by the way. You can use get_json_object
with JSONPath expressions against a string.
However, you'll need to use substring
SparkSQL function to remove the first 5 bytes of the value before being able to parse the raw json value.
reads json messages from the kafka and writes the data to the S3
Or you can use Confluent S3 Sink connector instead.
Upvotes: 0