grepIt
grepIt

Reputation: 116

Spark Structured Streaming throwing Java OOM immediately

I am trying to build a simple pipeline using Kafka as streaming source to Spark's structured streaming APIs, performing group-by aggregations and persisting the results to HDFS.

But, as soon as I submit the job, I am getting Java heap space error even though the streaming data is very less in volume.

Below is the code in pyspark:

allEvents =spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe","MyNewTopic") \
    .option("group.id","aggStream") \
    .option("startingOffsets", "earliest") \
    .load() \
    .select(col("value").cast("string"))

aaIDF = allEvents.filter(col("value").contains("myNewAPI")).select(from_json(col("value"),aaISchema) \
 .alias("colName")).select(col("colName.eventTime"), col("colName.appId"),col("colName.articleId"),col("colName.locale"),col("colName.impression"))

windowedCountsDF = aaIDF.withWatermark("eventTime","10 minutes") \
    .groupBy("appId","articleId","locale",window("eventTime", "2 minutes")).sum("impression").withColumnRenamed("sum(impression)", "views")


query = windowedCountsDF \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "/CDS/events/JS/agg/" + strftime("%Y/%m/%d/%H/%M", gmtime()) + "/") \
    .option("checkpointLocation", "/CDS/checkpoint/").start()

And below is the exception:

17/11/23 14:24:45 ERROR Utils: Aborting task
java.lang.OutOfMemoryError: Java heap space
    at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73)
    at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:214)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:315)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:258)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

Upvotes: 0

Views: 1188

Answers (2)

Akhil Bojedla
Akhil Bojedla

Reputation: 2228

You need to have appropriate driver and execute memory set while submitting the jobs. This post gives you a brief idea of how to set these configurations.

Upvotes: 0

secfree
secfree

Reputation: 4667

Two possible reasons:

  1. Your watermark set does not take effect. You should reference the column with colName.eventTime.

    Since no watermark is defined (only defined in other category), old aggregation state is not dropped.

  2. You should set a bigger value to --driver-memory or --executor-memory for Spark.

Upvotes: 2

Related Questions