Reputation: 162
I am trying to read data from Kafka topic with the below code:
object Main {
def main(args: Array[String]) {
val sparkSession = createSparkSession()
val df = sparkSession.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test").option("startingOffsets", "earliest").load()
val df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df1.writeStream.format("parquet").option("format","append").option("checkpointLocation", "/home/krishna/Downloads/kafka_2.12-2.0.0/delete").option("path", "/home/krishna/Downloads/kafka_2.12-2.0.0/abc").option("truncate", "false").outputMode("append").start()
}
}
When I am using the below line:
df1.writeStream
.format("console")
.option("truncate","false")
.start()
.awaitTermination()
then output will be displayed on the console.
But the problem is when I am replacing above line below line of code:
df1.writeStream
.format("csv")
.option("format","append")
.option("checkpointLocation", "/home/krishna/Downloads/kafka_2.12-2.0.0/delete")
.option("path", "/home/krishna/Downloads/kafka_2.12-2.0.0/abc")
.option("truncate", "false")
.outputMode("append")
.start()
Then the output is not saved in a CSV format. Only abc folder is created and the metadata folder is created inside it but there is no CSV file in it.
I am not able to understand that if the o/p is successfully displayed on a console then why it is not saved in file in the form of csv, parquet or text.
Sample output:
------------------
| key | value |
------------------
| null | abc |
| null | 123 |
|-----------------
Dependencies:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>2.4.5</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>2.4.5</version>
<scope>provided</scope>
</dependency>
</dependencies>
Upvotes: 1
Views: 1752
Reputation: 18495
I have tested the below code with Spark 2.4.5 and it produces the csv files as required:
val sparkSession = SparkSession.builder()
.appName("myAppName")
.master("local[*]")
.getOrCreate()
val df = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.load()
val df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df1.writeStream
.format("csv")
.outputMode("append")
.option("set", ",")
.option("checkpointLocation", "/home/krishna/Downloads/kafka_2.12-2.0.0/delete")
.option("path", "/home/krishna/Downloads/kafka_2.12-2.0.0/abc.csv")
.start()
.awaitTermination()
This code will create a folder named abc.csv
. Depending on the parallelism of your Sparksession (configurable by spark.default.parallelism
) you will find as many csv files as you have partitions. The number of files reflects the number of partitions in the DataFrame at the time it is written out. If you would repartition it before then, you would end up with a different number of files.
In my case the partition was 2
so I got this output in the corresponding folder:
> ~/abc.csv$ ll
total 28
drwxrwxr-x 3 x x 4096 Apr 18 17:01 ./
drwxr-xr-x 50 x x 4096 Apr 18 17:01 ../
-rw-r--r-- 1 x x 8 Apr 18 17:00 part-00000-77250d4a-e3af-46ef-b572-5476a3d075dd-c000.csv
-rw-r--r-- 1 x x 4 Apr 18 17:00 part-00000-82a76a8c-5977-4891-be36-1c2dc6837fb1-c000.csv
Upvotes: 0
Reputation: 900
in console you are using df
and for csv you are using df1
.
Most of the code looks fine for me.
try this.
df.writeStream
.format("csv")
.option("format", "append")
.trigger(processingTime = "5 seconds")
.option("checkpointLocation", "/home/krishna/Downloads/kafka_2.12-2.0.0/delete")
.option("path", "/home/krishna/Downloads/kafka_2.12-2.0.0/abc")
.outputMode("append")
.start()
Upvotes: 0
Reputation: 133
Try this:
df.writeStream
.outputMode(OutputMode.Append())
.format("csv")
.option("checkpointLocation", "/home/krishna/Downloads/kafka_2.12-2.0.0/delete")
.option("path", "/home/krishna/Downloads/kafka_2.12-2.0.0/abc/")
.start()
you can use format type as: com.databricks.spark.csv
Upvotes: 0