ecagiral
ecagiral

Reputation: 3

How to pass dataset column value to a function while using spark filter with scala?

I have an action array which consists of user id and action type

+-------+-------+
|user_id|   type|
+-------+-------+
|     11| SEARCH|
+-------+-------+
|     11| DETAIL|
+-------+-------+
|     12| SEARCH|
+-------+-------+

I want to filter actions that belongs to the users who have at least one search action.

So I created a bloom filter with user ids who has SEARCH action.

Then I tried to filter all actions depending on bloom filter's user status

val df = spark.read...
val searchers = df.filter($"type" === "SEARCH").select("user_id").distinct.as[String].collect
val bloomFilter = BloomFilter.create(100)
searchers.foreach(bloomFilter.putString(_))
df.filter(bloomFilter.mightContainString($"user_id"))

But the code gives an exception

type mismatch;
found   : org.apache.spark.sql.ColumnName
required: String

Please let me know how can I pass column value to the BloomFilter.mightContainString method?

Upvotes: 0

Views: 1766

Answers (2)

Chitral Verma
Chitral Verma

Reputation: 2853

You can do something like this,

val sparkSession = ???
val sc = sparkSession.sparkContext

val bloomFilter = BloomFilter.create(100)

val df = ???

val searchers = df.filter($"type" === "SEARCH").select("user_id").distinct.as[String].collect

At this point, i'll mention the fact that collect is not a good idea. Next you can do something like.

import org.apache.spark.sql.functions.udf
val bbFilter = sc.broadcast(bloomFilter)

val filterUDF = udf((s: String) => bbFilter.value.mightContainString(s))

df.filter(filterUDF($"user_id"))

You can remove the broadcasting if the bloomFilter instance is serializable.

Hope this helps, Cheers.

Upvotes: 0

Alper t. Turker
Alper t. Turker

Reputation: 35249

Create filter:

val expectedNumItems: Long = ???
val fpp: Double = ???
val f = df.stat.bloomFilter("user_id", expectedNumItems, fpp)

Use udf for filtering:

import org.apache.spark.sql.functions.udf

val mightContain = udf((s: String) => f.mightContain(s))
df.filter(mightContain($"user_id"))

If your current Bloom filter implementation is serializable you should be able to use it the same way, but if data is large enough to justify Bloom filter, you should avoid collecting.

Upvotes: 1

Related Questions