Yrah
Yrah

Reputation: 113

Spark Structured Streaming writestream doesn't write file until I stop the job

I'm using Spark Structured Streaming on a classic use case : I want to read form a kafka topic and write the stream into HDFS in parquet format.

Here is my code :

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{ArrayType, DataTypes, StructType}

object TestKafkaReader extends  App{
  val spark = SparkSession
    .builder
    .appName("Spark-Kafka-Integration")
    .master("local")
    .getOrCreate()
  spark.sparkContext.setLogLevel("ERROR")
  import spark.implicits._

  val kafkaDf = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers","KAFKA_BROKER_IP:PORT")
    //.option("subscribe", "test")
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .load()

  val moviesJsonDf = kafkaDf.selectExpr("CAST(value AS STRING)")

  // movie struct
  val struct = new StructType()
    .add("title", DataTypes.StringType)
    .add("year", DataTypes.IntegerType)
    .add("cast", ArrayType(DataTypes.StringType))
    .add("genres", ArrayType(DataTypes.StringType))

  val moviesNestedDf = moviesJsonDf.select(from_json($"value", struct).as("movie"))
  // json flatten
  val movieFlattenedDf = moviesNestedDf.selectExpr("movie.title", "movie.year", "movie.cast","movie.genres")


  // convert to parquet and save to hdfs
  val query = movieFlattenedDf
    .writeStream
    .outputMode("append")
    .format("parquet")
    .queryName("movies")
    .option("checkpointLocation", "src/main/resources/chkpoint_dir")
    .start("src/main/resources/output")
    .awaitTermination()
  }

Context :

My problem :

During the job, it doesn't write anything in the folder, I have to manualy stop the job to finally see the files.

I figured that there is maybe something to do with .awaitTermination() For information, I tried to delete this option but without that I get an error and the job simply doesn't run.

Maybe I didn't set the right options but after reading many time the doc and searching on Google I didn't find anything.

Can you please help me on that ?

Thank you

EDIT :

Upvotes: 3

Views: 9121

Answers (1)

Yrah
Yrah

Reputation: 113

Yes problem solve

My problem was that, I had too few data and spark was waiting for more data to write the parquet file.

To make this work I use the comment from @AlexandrosBiratsis (change the block size)

Once again all credit to @AlexandrosBiratsis thank you very much

Upvotes: 3

Related Questions