jcadcell
jcadcell

Reputation: 811

Multiline Spark sliding window

I am learning Apache Spark with Scala and would like to use it to process a DNA data set that spans multiple lines like this:

ATGTAT
ACATAT
ATATAT

I want to map this into groups of a fixed size k and count the groups. So for k=3, we would get groups of each character with the next two characters:

ATG TGT GTA TAT ATA TAC 
ACA CAT ATA TAT ATA TAT 
ATA TAT ATA TAT

...then count the groups (like word count):

(ATA,5), (TAT,5), (TAC,1), (ACA,1), (CAT,1), (ATG,1), (TGT,1), (GTA,1)

The problem is that the "words" span multiple lines, as does TAC in the example above. It spans the line wrap. I don't want to just count the groups in each line, but in the whole file, ignoring line endings.

In other words, I want to process the entire sequence as a sliding window of width k over the entire file as though there were no line breaks. The problem is looking ahead (or back) to the next RDD row to complete a window when I get to the end of a line.

Two ideas I had were:

  1. Append k-1 characters from the next line:
ATATATAC
ACATATAT
ATATAT

I tried this with the Spark SQL lead() function, but when I tried executing a flatMap, I got a NotSerializableException for WindowSpec. Is there any other way to reference the next line? Would I need to write a custom input format?

  1. Read the entire sequence in as a single line (or join lines after reading):
ATATATACATATATATAT

Is there a way to read multiple lines so they can be processed as one? If so, would it all need to fit into the memory of a single machine?

I realize either of these could be done as a pre-processing step. I was wondering the best way is to do it within Spark. Once I have it in either of these formats, I know how to do the rest, but I am stuck here.

Upvotes: 1

Views: 1193

Answers (1)

akuiper
akuiper

Reputation: 214927

You can make a rdd of single character string instead of join them as one line, since that will make the result a string which can not be distributed:

val rdd = sc.textFile("gene.txt")
// rdd: org.apache.spark.rdd.RDD[String] = gene.txt MapPartitionsRDD[4] at textFile at <console>:24

So simply use flatMap to split the lines into List of characters:

rdd.flatMap(_.split("")).collect
// res4: Array[String] = Array(A, T, G, T, A, T, A, C, A, T, A, T, A, T, A, T, A, T)

A more complete solution borrowed from this answer:

val rdd = sc.textFile("gene.txt")

// create the sliding 3 grams for each partition and record the edges
val rdd1 = rdd.flatMap(_.split("")).mapPartitionsWithIndex((i, iter) => {
  val slideList = iter.toList.sliding(3).toList
  Iterator((slideList, (slideList.head, slideList.last)))
})

// collect the edge values, concatenate edges from adjacent partitions and broadcast it
val edgeValues = rdd1.values.collect

val sewedEdges = edgeValues zip edgeValues.tail map { case (x, y) => {
  (x._2 ++ y._1).drop(1).dropRight(1).sliding(3).toList
}}

val sewedEdgesMap = sc.broadcast(
  (0 until rdd1.partitions.size) zip sewedEdges toMap
)

// sew the edge values back to the result
rdd1.keys.mapPartitionsWithIndex((i, iter) => iter ++ List(sewedEdgesMap.value.getOrElse(i, Nil))).
  flatMap(_.map(_ mkString "")).collect

// res54: Array[String] = Array(ATG, TGT, GTA, TAT, ATA, TAC, ACA, CAT, ATA, TAT, ATA, TAT, ATA, TAT, ATA, TAT)

Upvotes: 1

Related Questions