Reputation: 1993
I am trying to find the co-occurrence of words. Following is the code I am using.
val dataset = df.select("entity").rdd.map(row => row.getList(0)).filter(r => r.size() > 0).distinct()
println("dataset")
dataset.take(10).foreach(println)
Example Dataset
dataset
[aa]
[bb]
[cc]
[dd]
[ee]
[ab, ac, ad]
[ff]
[ef, fg]
[ab, gg, hh]
Code Snippet
case class tupleIn(a: String,b: String)
case class tupleOut(i: tupleIn, c: Long)
val cooccurMapping = dataset.flatMap(
list => {
list.toArray().map(e => e.asInstanceOf[String].toLowerCase).flatMap(
ele1 => {
list.toArray().map(e => e.asInstanceOf[String].toLowerCase).map(ele2 => {
if (ele1 != ele2) {
((ele1, ele2), 1L)
}
})
})
})
How to filter from this?
I have tried
.filter(e => e.isInstanceOf[Tuple2[(String, String), Long]])
:121: warning: fruitless type test: a value of type Unit cannot also be a ((String, String), Long) .filter(e => e.isInstanceOf[Tuple2[(String, String), Long]]) ^
:121: error: isInstanceOf cannot test if value types are references. .filter(e => e.isInstanceOf[Tuple2[(String, String), Long]])
.filter(e => e.isInstanceOf[tupleOut])
:122: warning: fruitless type test: a value of type Unit cannot also be a coocrTupleOut .filter(e => e.isInstanceOf[tupleOut]) ^ :122: error: isInstanceOf cannot test if value types are references. .filter(e => e.isInstanceOf[tupleOut])
If I map
.map(e => e.asInstanceOf[Tuple2[(String, String), Long]])
The above snippet works fine but gives this exception after sometime:
java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to scala.Tuple2 at $line84834447093.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2$$anonfun$9.apply(:123) at $line84834447093.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2$$anonfun$9.apply(:123) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Why is instanceOf
not working in filter()
but working in map()
Upvotes: 0
Views: 603
Reputation: 25909
The result of your code is collection of items of type Unit so both the filter and map iterate nothing (note that map does as so it will cast to the type you want wheres as is checks the type)
In any event, if I understand your intent correctly you can get what you want with spark's built in functions:
val l=List(List("aa"),List("bb","vv"),List("bbb"))
val rdd=sc.parallelize(l)
val df=spark.createDataFrame(rdd,"data")
import org.apache.spark.sql.functions._
val ndf=df.withColumn("data",explode($"data"))
val cm=ndf.select($"data".as("elec1")).crossJoin(ndf.select($"data".as("elec2"))).withColumn("cnt",lit(1L))
val coocurenceMap=cm.filter($"elec1" !== $"elec2")
Upvotes: 1