tenderfoot
tenderfoot

Reputation: 69

How to convert streaming Dataset[String] to Dataset[Array[String]]?

I want to convert Dataframe to Dataset of Array of Strings in a streaming query. Trying following approach

           DF
            .toJSON
            .write
            .format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
            .option("kafka.bootstrap.servers", "brokers")
            .option("topic", "topicname")
            .save()

Current Output is

{'col1':valuex , 'col2':'value2'}
{'col1':valuey , 'col2':'value3'}
{'col1':valuez , 'col2':'value4'}
{'col1':value1 , 'col2':'value5'}

Desired Output is

[{'col1':valuex , 'col2':'value2'},{'col1':valuey , 'col2':'value3'},{'col1':valuez , 'col2':'value4'},{'col1':value1 , 'col2':'value5'}]

I want to reduce my dataframe size from let say i have 5 string rows in dataset[string] , so it should be reduced to 1 single count which will have array of 5 strings ?

Upvotes: 1

Views: 290

Answers (1)

tenderfoot
tenderfoot

Reputation: 69

The following code seems to work just fine.

         DF
          .toJSON
          .withColumn("timestamp", lit(System.currentTimeMillis()))
          .groupBy("timestamp").agg(collect_list('value) as 'value)
          .toJSON
          .write
          .format("kafka")
          .option("kafka.bootstrap.servers", "...")
          .option("topic", "...")
          .save

Upvotes: 1

Related Questions