Reputation: 69
I want to convert Dataframe to Dataset of Array of Strings in a streaming query. Trying following approach
DF
.toJSON
.write
.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
.option("kafka.bootstrap.servers", "brokers")
.option("topic", "topicname")
.save()
Current Output is
{'col1':valuex , 'col2':'value2'}
{'col1':valuey , 'col2':'value3'}
{'col1':valuez , 'col2':'value4'}
{'col1':value1 , 'col2':'value5'}
Desired Output is
[{'col1':valuex , 'col2':'value2'},{'col1':valuey , 'col2':'value3'},{'col1':valuez , 'col2':'value4'},{'col1':value1 , 'col2':'value5'}]
I want to reduce my dataframe size from let say i have 5 string rows in dataset[string] , so it should be reduced to 1 single count which will have array of 5 strings ?
Upvotes: 1
Views: 290
Reputation: 69
The following code seems to work just fine.
DF
.toJSON
.withColumn("timestamp", lit(System.currentTimeMillis()))
.groupBy("timestamp").agg(collect_list('value) as 'value)
.toJSON
.write
.format("kafka")
.option("kafka.bootstrap.servers", "...")
.option("topic", "...")
.save
Upvotes: 1