Ekaterina Tcareva
Ekaterina Tcareva

Reputation: 439

How can I filter a spark RDD by the result of mapping?

I need to filter a RDD by the result of mapping. Initially I have RDD of diagnosis:

Diagnostic(000140966-01,2008-07-06,250.00) Diagnostic(202009464-01,2009-09-29,V70.0) Diagnostic(202009464-01,2009-09-29,590.80) Diagnostic(818009099-01,2014-12-11,592.0) Diagnostic(545360000-01,2005-12-09,584.9) Diagnostic(000012631-01,2013-09-23,V70.0) Diagnostic(666071437-01,2006-11-29,496) Diagnostic(000681687-01,2006-06-28,250.01) Diagnostic(497910000-01,2009-04-07,584.9) Diagnostic(022001344-01,2011-11-28,584.9) Diagnostic(285060000-01,2012-03-28,584.9) ....

Where: case class Diagnostic(patientID: String, date: Date, code: String)

I group the patients:

val grouped_patients = diagnostic.groupBy(_.patientID)
grouped_patients.take(50).foreach(println)

(000644947-01,CompactBuffer(Diagnostic(000644947-01,2010-09-22,584.9), Diagnostic(000644947-01,2007-02-02,584.9), Diagnostic(000644947-01,2014-06-15,250.01), Diagnostic(000644947-01,2009-01-02,250.01), ... )) (000124665-01,CompactBuffer(Diagnostic(000124665-01,2006-09-05,V70.0), Diagnostic(000124665-01,2011-11-21,585.9), Diagnostic(000124665-01,2009-10-14,585.9), ....))

I need to filter out the patients with some specific code (I have a set of these codes T1DM_DX).

I can pint out:

val grouped_patient_fil_1 = diagnostic.groupBy(_.patientID)
    .map(x => x._2.map(y => y.code))
    .map(x=>x.toSet.intersect(T1DM_DX).size>0)
    .take(100).foreach(println)

... false false false true false true false true false false false ....

How can I filter grouped_patients for which we have "True"? I think it should be like:

val grouped_patient_fil_1 = grouped_patients
    .filter(x => x._2.map(y => y.code)
          .map(x=> x.toSet.intersect(T1DM_DX).size>0))

But I am getting an error:

T2dmPhenotype.scala:71:37: type mismatch;
[error]  found   : scala.collection.immutable.Set[String]
[error]  required: scala.collection.GenSet[Any]
[error] Note: String <: Any, but trait GenSet is invariant in type A.
[error] You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
[error]         .map(x => x.toSet.intersect(T1DM_DX).size > 0))

Upvotes: 1

Views: 711

Answers (1)

Andronicus
Andronicus

Reputation: 26076

If you already have an array with Boolean objects, then simply change that map into filter in your stream, that will leave only true values:

val grouped_patient_fil_1 = diagnostic
    .groupBy(_.patientID)
    .filter(x => x._2.map(y => y.code).toSet.intersect(T1DM_DX).size>0)
grouped_patient_fil_1.take(100).foreach(println)

Upvotes: 1

Related Questions