Gurupraveen
Gurupraveen

Reputation: 181

Spark cassandra connector + join time outs

I need to join two Spark dataframes and wite the result back to Hive. Following are the dataframes:

Dataframe1: Cassandra table - PARTITION and Clustering Key: (ID,PART_NBR)

val df1 = spark.read.format("org.apache.spark.sql.cassandra")
    .option("keyspace", "mykeyspace")
    .option("table", "mytable")
    .load

**Dataframe2: Dataframe of keys (which are partition keys that is ID column in the above table) got from another source - the number of distinct keys in this table is around 0.15 million **

val df2 = spark.read
    .format("jdbc")
    .option("driver", "com.mysql.jdbc.Driver")
    .option("url","****")
    .option("dbtable","table")
    .option("user", "username")
    .option("password", "password123")
    .load()

  val joinExpr = df1.col("ID") === df2.col("ID")

  val res = df1.join(df2,joinExpr)

  res.write.mode(SaveMode.Append).format("orc")
    .saveAsTable("targetTable")

Now this code always results in "com.datastax.oss.driver.api.core.servererrors.ReadFailureException: Cassandra failure during read query at consistency LOCAL_ONE (1 responses were required but only 0 replica responded, 1 failed)".

Changed LOCAL_ONE to QUORUM even that fails.

I have tried even splitting keys dataframe into batches of 20 KEYS (20 ID values in a dataframe)and then join with cassandra table - even that fails.

I have even tried IN clause though it works DBA is limiting us from running that as it loads up Cassandra and resulting in CPU spikes.

On checking with Cassandra DBA they are asking to do pointed queries as the above one is resulting in large token range scans and that is causing the failure. But individual pointed query would result in 0.15 Million roundtrips to Cassandra (which is taking several hours to complete) and that is too costly.

Why does this result such huge token range scans? How can we fix this? What re my alternatives?

pom.xml Dependency

 <scala.version>2.11.12</scala.version>
 <spark.version>2.2.0</spark.version>

        <dependency>
            <groupId>com.datastax.spark</groupId>
            <artifactId>spark-cassandra-connector_2.11</artifactId>
            <version>2.5.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>



Tried with the following yet it's not doing Direct join. Anything am I missing?

spark-submit --class ExampleCassandra --deploy-mode client --num-executors 15 --executor-memory 4g  --driver-memory=1g  --conf spark.sql.shuffle.partitions=25 --conf spark.executor.heartbeatInterval=100s --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --jars spark-sql_2.11-2.4.0.jar,spark-core_2.11-2.4.0.jar,spark-hive_2.11-2.4.0.jar,mysql-connector-java-8.0.18.jar,spark-cassandra-connector_2.11-2.5.1.jar ExampleCassandra-bundled-1.0-SNAPSHOT.jar

Spark version as printed in the code => spark.sparkContext.version = 2.4.0

The resulting plan

== Physical Plan ==
*(8) SortMergeJoin [item_nbr#31], [item_nbr#24], Inner
:- *(2) Sort [item_nbr#31 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(item_nbr#31, 25)
:     +- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [item_nbr#31,planNum#32,strN#33,currTail#34,currTailTy#35,hor#36,prNbr#37,revSce#38,stckHnad#39] PushedFilters: [], ReadSchema: struct<item_nbr:int,planNum:int,strN:int,currTail:decimal(38,18),currTailTy:s...
+- *(7) Sort [item_nbr#24 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(item_nbr#24, 25)
      +- *(6) HashAggregate(keys=[item_nbr#21], functions=[])
         +- Exchange hashpartitioning(item_nbr#21, 25)
            +- *(5) HashAggregate(keys=[item_nbr#21], functions=[])
               +- *(5) Filter (NOT (trim(lower(item_nbr#21), None) = null) && isnotnull(cast(trim(item_nbr#21, None) as int)))
                  +- Generate explode(split(items#4, ,)), false, [item_nbr#21]
                     +- *(4) Project [items#4]
                        +- *(4) BroadcastHashJoin [planNum#0], [planNum#2], Inner, BuildRight
                           :- *(4) Scan JDBCRelation(( select planNum from QAMdPlans.Plan where plan_type = 'MBM' order by planNum desc ) t) [numPartitions=1] [planNum#0] PushedFilters: [*IsNotNull(planNum)], ReadSchema: struct<planNum:int>
                           +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
                              +- *(3) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [planNum#2,items#4] PushedFilters: [], ReadSchema: struct<planNum:int,items:string>

Upvotes: 1

Views: 1096

Answers (1)

Alex Ott
Alex Ott

Reputation: 87154

The problem is that version 2.0.5 doesn't optimize joins for Dataframes - if you'll do res.explain you'll see that Spark will perform reading the all data from the Cassandra, and then perform join on the Spark level. The optimized join was available only in the RDD API as leftJoinWithCassandraTable or joinWithCassandraTable.

This situation has changed with release of the Spark Cassandra Connector 2.5, that now includes optimized join for Dataframe API (but you need to have Spark SQL extensions enabled to make it working). So you need to upgrade your connector to 2.5.latest (2.5.1 as of right now), or use join functionality in the RDD API.

P.S. I recently wrote a detailed blog post on the effective join with data in Cassandra table from Spark using both Dataframe & RDD APIs.

Upvotes: 0

Related Questions