Reputation: 3
I have an action array which consists of user id and action type
+-------+-------+
|user_id| type|
+-------+-------+
| 11| SEARCH|
+-------+-------+
| 11| DETAIL|
+-------+-------+
| 12| SEARCH|
+-------+-------+
I want to filter actions that belongs to the users who have at least one search action.
So I created a bloom filter with user ids who has SEARCH action.
Then I tried to filter all actions depending on bloom filter's user status
val df = spark.read...
val searchers = df.filter($"type" === "SEARCH").select("user_id").distinct.as[String].collect
val bloomFilter = BloomFilter.create(100)
searchers.foreach(bloomFilter.putString(_))
df.filter(bloomFilter.mightContainString($"user_id"))
But the code gives an exception
type mismatch;
found : org.apache.spark.sql.ColumnName
required: String
Please let me know how can I pass column value to the BloomFilter.mightContainString method?
Upvotes: 0
Views: 1766
Reputation: 2853
You can do something like this,
val sparkSession = ???
val sc = sparkSession.sparkContext
val bloomFilter = BloomFilter.create(100)
val df = ???
val searchers = df.filter($"type" === "SEARCH").select("user_id").distinct.as[String].collect
At this point, i'll mention the fact that collect
is not a good idea. Next you can do something like.
import org.apache.spark.sql.functions.udf
val bbFilter = sc.broadcast(bloomFilter)
val filterUDF = udf((s: String) => bbFilter.value.mightContainString(s))
df.filter(filterUDF($"user_id"))
You can remove the broadcasting if the bloomFilter instance is serializable.
Hope this helps, Cheers.
Upvotes: 0
Reputation: 35249
Create filter:
val expectedNumItems: Long = ???
val fpp: Double = ???
val f = df.stat.bloomFilter("user_id", expectedNumItems, fpp)
Use udf
for filtering:
import org.apache.spark.sql.functions.udf
val mightContain = udf((s: String) => f.mightContain(s))
df.filter(mightContain($"user_id"))
If your current Bloom filter implementation is serializable you should be able to use it the same way, but if data is large enough to justify Bloom filter, you should avoid collecting.
Upvotes: 1