ClogShug
ClogShug

Reputation: 11

Loading Data from Aerospike Failing with Spark Connector

Iam using aerospike spark connector to load data from an Aerospike cluster in to dataframe, process it, and write it to another Aerospike cluster. The data has two bins: one with a List of Strings and another with a Map where the keys are Strings and the values are Longs. When running the Spark application, it encounters a task failure. The driver stack trace is as follows:

24/06/21 09:59:58 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (10.10.16.151 executor 0): java.lang.UnsupportedOperationException: empty.reduceLeft
    at scala.collection.TraversableOnce.reduceLeft(TraversableOnce.scala:185)
    at scala.collection.TraversableOnce.reduceLeft$(TraversableOnce.scala:183)
    at scala.collection.mutable.ArrayBuffer.scala$collection$IndexedSeqOptimized$$super$reduceLeft(ArrayBuffer.scala:49)
    at scala.collection.IndexedSeqOptimized.reduceLeft(IndexedSeqOptimized.scala:77)
    at scala.collection.IndexedSeqOptimized.reduceLeft$(IndexedSeqOptimized.scala:76)
    at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.reduce(TraversableOnce.scala:213)
    at scala.collection.TraversableOnce.reduce$(TraversableOnce.scala:213)
    at scala.collection.AbstractTraversable.reduce(Traversable.scala:108)
    at com.aerospike.spark.converters.TypeConverter$.matchesSchemaType(TypeConverter.scala:246)
    at com.aerospike.spark.converters.TypeConverter$.convertToSparkType(TypeConverter.scala:353)
    at com.aerospike.spark.converters.TypeConverter$.binToValue(TypeConverter.scala:428)
    at com.aerospike.spark.sql.sources.v2.RowIterator.$anonfun$get$2(RowIterator.scala:60)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
    at com.aerospike.spark.sql.sources.v2.RowIterator.get(RowIterator.scala:48)
    at com.aerospike.spark.sql.sources.v2.RowIterator.get(RowIterator.scala:21)
    at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.next(DataSourceRDD.scala:89)
    at org.apache.spark.sql.execution.datasources.v2.MetricsRowIterator.next(DataSourceRDD.scala:124)
    at org.apache.spark.sql.execution.datasources.v2.MetricsRowIterator.next(DataSourceRDD.scala:121)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Upon looking into it I found the cause of this exception as calling reduce function to perform schema match on the bin value as empty list.

Versions:

spark version: 3.1.2 connector version: 3.3.0

Question: Is there anyway to resolve this or this is an issue with the spark connector

Upvotes: 1

Views: 90

Answers (0)

Related Questions