Philip K. Adetiloye
Philip K. Adetiloye

Reputation: 3270

Apache Spark using running one task on one executor

I have a spark job that reads from database and performs a filter, union, 2 joins and finally writing the result back to the database.

However, the last stage only run one task on just one executor, out of 50 executors. I've tried to increase the number of partitions, use hash partition but no luck.

After several hours of Googling, it seems my data may be skewed but I don't know how to fix it.

Any suggestion please ?

Specs:

Executors:

Code snippet

    ...

  def main(args: Array[String]) {

    ....

    import sparkSession.implicits._


    val similarityDs = sparkSession.read.format("jdbc").options(opts).load
    similarityDs.createOrReplaceTempView("locator_clusters")

    val  ClassifierDs = sparkSession.sql("select *  " +
                                              "from locator_clusters where " +
                                              "relative_score >= 0.9 and " +
                                              "((content_hash_id is not NULL or content_hash_id <> '') " +
                                              "or (ref_hash_id is not NULL or ref_hash_id <> ''))").as[Hash].cache()



    def nnHash(tag: String) = (tag.hashCode & 0x7FFFFF).toLong


    val contentHashes = ClassifierDs.map(locator => (nnHash(locator.app_hash_id), Member(locator.app_hash_id,locator.app_hash_id, 0, 0, 0))).toDF("id", "member").dropDuplicates().alias("ch").as[IdMember]
    val similarHashes = ClassifierDs.map(locator => (nnHash(locator.content_hash_id), Member(locator.app_hash_id, locator.content_hash_id, 0, 0, 0))).toDF("id", "member").dropDuplicates().alias("sh").as[IdMember]


    val missingContentHashes = similarHashes.join(contentHashes, similarHashes("id") === contentHashes("id"), "right_outer").select("ch.*").toDF("id", "member").as[IdMember]

    val locatorHashesRdd = similarHashes.union(missingContentHashes).cache()

    val vertices = locatorHashesRdd.map{ case row: IdMember=> (row.id, row.member) }.cache()

    val toHashId = udf(nnHash(_:String))

    val edgesDf =  ClassifierDs.select(toHashId($"app_hash_id"), toHashId($"content_hash_id"), $"raw_score", $"relative_score").cache()

    val edges = edgesDf.map(e => Edge(e.getLong(0), e.getLong(1), (e.getDouble(2), e.getDouble(2)))).cache()


    val graph = Graph(vertices.rdd, edges.rdd).cache()

    val sc = sparkSession.sparkContext

    val ccVertices =  graph.connectedComponents.vertices.cache()


    val ccByClusters = vertices.rdd.join(ccVertices).map({
                          case (id, (hash, compId)) => (compId, hash.content_hash_id, hash.raw_score, hash.relative_score, hash.size)
                      }).toDF("id", "content_hash_id", "raw_score", "relative_score", "size").alias("cc")


    val verticesDf  = vertices.map(x => (x._1, x._2.app_hash_id, x._2.content_hash_id, x._2.raw_score, x._2.relative_score, x._2.size))
                              .toDF("id", "app_hash_id", "content_hash_id", "raw_score", "relative_score", "size").alias("v")

    val superClusters = verticesDf.join(ccByClusters, "id")
                                  .select($"v.app_hash_id", $"v.app_hash_id", $"cc.content_hash_id", $"cc.raw_score", $"cc.relative_score", $"cc.size")
                                  .toDF("ref_hash_id", "app_hash_id", "content_hash_id", "raw_score", "relative_score", "size")



    val prop = new Properties()
    prop.setProperty("user", M_DB_USER)
    prop.setProperty("password", M_DB_PASSWORD)
    prop.setProperty("driver", "org.postgresql.Driver")


    superClusters.write
                 .mode(SaveMode.Append)
                 .jdbc(s"jdbc:postgresql://$M_DB_HOST:$M_DB_PORT/$M_DATABASE", MERGED_TABLE, prop)


    sparkSession.stop()

Screenshot showing one executor enter image description here

Stderr from the executor

16/10/01 18:53:42 INFO ShuffleBlockFetcherIterator: Getting 409 non-empty blocks out of 2000 blocks
16/10/01 18:53:42 INFO ShuffleBlockFetcherIterator: Started 59 remote fetches in 5 ms
16/10/01 18:53:42 INFO ShuffleBlockFetcherIterator: Getting 2000 non-empty blocks out of 2000 blocks
16/10/01 18:53:42 INFO ShuffleBlockFetcherIterator: Started 59 remote fetches in 9 ms
16/10/01 18:53:43 INFO UnsafeExternalSorter: Thread 123 spilling sort data of 896.0 MB to disk (1  time so far)
16/10/01 18:53:46 INFO UnsafeExternalSorter: Thread 123 spilling sort data of 896.0 MB to disk (2  times so far)
16/10/01 18:53:48 INFO Executor: Finished task 1906.0 in stage 769.0 (TID 260306). 3119 bytes result sent to driver
16/10/01 18:53:51 INFO UnsafeExternalSorter: Thread 123 spilling sort data of 1792.0 MB to disk (3  times so far)
16/10/01 18:53:57 INFO UnsafeExternalSorter: Thread 123 spilling sort data of 1792.0 MB to disk (4  times so far)
16/10/01 18:54:03 INFO UnsafeExternalSorter: Thread 123 spilling sort data of 1792.0 MB to disk (5  times so far)
16/10/01 18:54:09 INFO UnsafeExternalSorter: Thread 123 spilling sort data of 1792.0 MB to disk (6  times so far)
16/10/01 18:54:15 INFO UnsafeExternalSorter: Thread 123 spilling sort data of 1792.0 MB to disk (7  times so far)
16/10/01 18:54:21 INFO UnsafeExternalSorter: Thread 123 spilling sort data of 1792.0 MB to disk (8  times so far)
16/10/01 18:54:27 INFO UnsafeExternalSorter: Thread 123 spilling sort data of 1792.0 MB to disk (9  times so far)
16/10/01 18:54:33 INFO UnsafeExternalSorter: Thread 123 spilling sort data of 1792.0 MB to disk (10  times so far)
16/10/01 18:54:39 INFO UnsafeExternalSorter: Thread 123 spilling sort data of 1792.0 MB to disk (11  times so far)
16/10/01 18:54:44 INFO UnsafeExternalSorter: Thread 123 spilling sort data of 1792.0 MB to disk (12  times so far)

Upvotes: 2

Views: 4161

Answers (1)

zero323
zero323

Reputation: 330453

If data skew is indeed the problem here and all keys hash to a single partition then what you can try is either full Cartesian product or broadcast join with prefiltered data. Let's consider following example:

val left = spark.range(1L, 100000L).select(lit(1L), rand(1)).toDF("k", "v")

left.select(countDistinct($"k")).show
// +-----------------+
// |count(DISTINCT k)|
// +-----------------+
// |                1|
// +-----------------+

Any attempt to join with data like this would result in a serious data skew. Now let's say we can another table as follows:

val right = spark.range(1L, 100000L).select(
  (rand(3) * 1000).cast("bigint"), rand(1)
).toDF("k", "v")

right.select(countDistinct($"k")).show
// +-----------------+
// |count(DISTINCT k)|
// +-----------------+
// |             1000|
// +-----------------+

As mentioned above we there are two methods we can try:

  • If we expect that number of records in right corresponding to the key left is small we can use broadcast join:

    type KeyType = Long
    val keys = left.select($"k").distinct.as[KeyType].collect
    
    val rightFiltered = broadcast(right.where($"k".isin(keys: _*)))
    left.join(broadcast(rightFiltered), Seq("k"))
    
  • Otherwise we can perform crossJoin followed by filter:

    left.as("left")
      .crossJoin(rightFiltered.as("right"))
      .where($"left.k" === $"right.k")
    

    or

    spark.conf.set("spark.sql.crossJoin.enabled", true)
    
    left.as("left")
      .join(rightFiltered.as("right"))
      .where($"left.k" === $"right.k")
    

If there is a mix of rare and common keys you can separate computation by performing standard join on rare keys and using one of the methods shown above for common.

Another possible issue is jdbc format. If you don't provide predicates or partitioning column, bounds and number of partitions all data is loaded by a single executor.

Upvotes: 3

Related Questions