Reputation: 116
I am trying to build a simple pipeline using Kafka as streaming source to Spark's structured streaming APIs, performing group-by aggregations and persisting the results to HDFS.
But, as soon as I submit the job, I am getting Java heap space error even though the streaming data is very less in volume.
Below is the code in pyspark:
allEvents =spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe","MyNewTopic") \
.option("group.id","aggStream") \
.option("startingOffsets", "earliest") \
.load() \
.select(col("value").cast("string"))
aaIDF = allEvents.filter(col("value").contains("myNewAPI")).select(from_json(col("value"),aaISchema) \
.alias("colName")).select(col("colName.eventTime"), col("colName.appId"),col("colName.articleId"),col("colName.locale"),col("colName.impression"))
windowedCountsDF = aaIDF.withWatermark("eventTime","10 minutes") \
.groupBy("appId","articleId","locale",window("eventTime", "2 minutes")).sum("impression").withColumnRenamed("sum(impression)", "views")
query = windowedCountsDF \
.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "/CDS/events/JS/agg/" + strftime("%Y/%m/%d/%H/%M", gmtime()) + "/") \
.option("checkpointLocation", "/CDS/checkpoint/").start()
And below is the exception:
17/11/23 14:24:45 ERROR Utils: Aborting task
java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73)
at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:214)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:315)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:258)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Upvotes: 0
Views: 1188
Reputation: 2228
You need to have appropriate driver and execute memory set while submitting the jobs. This post gives you a brief idea of how to set these configurations.
Upvotes: 0
Reputation: 4667
Two possible reasons:
Your watermark set does not take effect. You should reference the column with colName.eventTime
.
Since no watermark is defined (only defined in other category), old aggregation state is not dropped.
You should set a bigger value to --driver-memory
or --executor-memory
for Spark.
Upvotes: 2