user14681827
user14681827

Reputation: 153

Json string written to Kafka using Spark is not converted properly on reading

I read a .csv file to create a data frame and I want to write the data to a kafka topic. The code is the following

df = spark.read.format("csv").option("header", "true").load(f'{file_location}')
kafka_df = df.selectExpr("to_json(struct(*)) AS value").selectExpr("CAST(value AS STRING)")
kafka_df.show(truncate=False)

And the data frame looks like this:

value
"{""id"":""d215e9f1-4d0c-42da-8f65-1f4ae72077b3"",""latitude"":""-63.571457254062715"",""longitude"":""-155.7055842710919""}"
"{""id"":""ca3d75b3-86e3-438f-b74f-c690e875ba52"",""latitude"":""-53.36506636464281"",""longitude"":""30.069167069917597""}"
"{""id"":""29e66862-9248-4af7-9126-6880ceb3b45f"",""latitude"":""-23.767505281795835"",""longitude"":""174.593140405442""}"
"{""id"":""451a7e21-6d5e-42c3-85a8-13c740a058a9"",""latitude"":""13.02054867061598"",""longitude"":""20.328402498420786""}"
"{""id"":""09d6c11d-7aae-4d17-8cd8-183157794893"",""latitude"":""-81.48976715040848"",""longitude"":""1.1995769642056189""}"
"{""id"":""393e8760-ef40-482a-a039-d263af3379ba"",""latitude"":""-71.73949722379649"",""longitude"":""112.59922770487054""}"
"{""id"":""d6db8fcf-ee83-41cf-9ec2-5c2909c18534"",""latitude"":""-4.034680969008576"",""longitude"":""60.59645511854336""}"

After I wrote it to Kafka I want to read it and transform the binary data from column "value" back to json string but the result is that the value contains only the id, not the whole string. Any ideea why?

from pyspark.sql import functions as F

df = consume_from_event_hub(topic, bootstrap_servers, config, consumer_group)
string_df = df.select(F.col("value").cast("string"))
string_df.display()
value
794541bc-30e6-4c16-9cd0-3c5c8995a3a4
20ea5b50-0baa-47e3-b921-f9a3ac8873e2
598d2fc1-c919-4498-9226-dd5749d92fc5
86cd5b2b-1c57-466a-a3c8-721811ab6959
807de968-c070-4b8b-86f6-00a865474c35
e708789c-e877-44b8-9504-86fd9a20ef91
9133a888-2e8d-4a5a-87ce-4a53e63b67fc
cd5e3e0d-8b02-45ee-8634-7e056d49bf3b

the CSV the format is this

id,latitude,longitude 
bd6d98e1-d1da-4f41-94ba-8dbd8c8fce42,-86.06318155350924,-108.14300138138589
c39e84c6-8d7b-4cc5-b925-68a5ea406d52,74.20752175171859,-129.9453606091319
011e5fb8-6ab7-4ee9-97bb-acafc2c71e15,19.302250885973592,-103.2154291337162

Upvotes: 1

Views: 372

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191748

You need to remove selectExpr("CAST(value AS STRING)") since to_json already returns a string column

from pyspark.sql.functions import col, to_json, struct
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(f'{file_location}')
kafka_df = df.select(to_json(struct(col("*"))).alias("value"))
kafka_df.show(truncate=False)

I'm not sure what's wrong with the consumer. That should have worked unless consume_from_event_hub does something specifically to extract the ID column

Upvotes: 1

Related Questions