Carina
Carina

Reputation: 19

java.lang.IllegalArgumentException: 'path' is not specified // Spark Consumer Issue

I am trying to create SparkConsumer so I can send messeges in this case a csv file to Kafka through Spark Streaming. But I have an error that 'path' is not specified. See my code below

My code is as follows:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.streaming.OutputMode

object sparkConsumer extends App {

  val conf = new SparkConf().setMaster("local").setAppName("Name")
  val sc = new SparkContext(conf)

  val rootLogger = Logger.getRootLogger()
  rootLogger.setLevel(Level.ERROR)

  val spark = SparkSession
    .builder()
    .appName("Spark-Kafka-Integration")
    .master("local")
    .getOrCreate()

  val schema = StructType(Array(
    StructField("InvoiceNo", StringType, nullable = true),
    StructField("StockCode", StringType, nullable = true),
    StructField("Description", StringType, nullable = true),
    StructField("Quantity", StringType, nullable = true)
  ))

  val streamingDataFrame = spark.readStream.schema(schema).csv("C:/Users/me/Desktop/Tasks/Tasks1/test.csv")

  streamingDataFrame.selectExpr("CAST(InvoiceNo AS STRING) AS key", "to_json(struct(*)) AS value").
    writeStream
    .format("csv")
    .option("topic", "topic_test")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("checkpointLocation", "C:/Users/me/IdeaProjects/SparkStreaming/checkpointLocation/")
    .start()

  import spark.implicits._
  val df = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "topic_test")
    .load()

  val df1 = df.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)]
    .select(from_json($"value", schema).as("data"), $"timestamp")
    .select("data.*", "timestamp")

  df1.writeStream
    .format("console")
    .option("truncate","false")
    .outputMode(OutputMode.Append)
    .start()
    .awaitTermination()

}

I become the following error:

Exception in thread "main" java.lang.IllegalArgumentException: 'path' is not specified

Does anyone know what I am missing?

Upvotes: 0

Views: 4742

Answers (1)

Emiliano Martinez
Emiliano Martinez

Reputation: 4123

It seems that it can be a problem on this part of your code:

  streamingDataFrame.selectExpr("CAST(InvoiceNo AS STRING) AS key", "to_json(struct(*)) AS value").
    writeStream
    .format("csv")
    .option("topic", "topic_test")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("checkpointLocation", "C:/Users/me/IdeaProjects/SparkStreaming/checkpointLocation/")
    .start()

because you use use a "csv" format but you don´t set the file location that it needs. Instead you configure Kafka properties to use a kafka topic as your sink. So if you change the format to "kafka" it should work.

Another problem you can experiment using csv as source is that your path should be a directory not file. In your case, if you create a directory and move your csv file it will work.

Just for testing, create a directoy named C:/Users/me/Desktop/Tasks/Tasks1/test.csv and create a file with the name part-0000.csv inside. Then include your csv content in this new file and start again the process.

Upvotes: 2

Related Questions