Randomize
Randomize

Reputation: 9103

Spark: run an external process in parallel

Is it possible with Spark to "wrap" and run an external process managing its input and output?

The process is represented by a normal C/C++ application that usually runs from command line. It accepts a plain text file as input and generate another plain text file as output. As I need to integrate the flow of this application with something bigger (always in Spark), I was wondering if there is a way to do this.

The process can be easily run in parallel (at the moment I use GNU Parallel) just splitting its input in (for example) 10 part files, run 10 instances in memory of it, and re-join the final 10 part files output in one file.

Upvotes: 5

Views: 3139

Answers (2)

Mark Rajcok
Mark Rajcok

Reputation: 364677

If you end up here based on the question title from a Google search, but you don't have the OP restriction that the external program needs to read from a file--i.e., if your external program can read from stdin--here is a solution. For my use case, I needed to call an external decryption program for each input file.

import org.apache.commons.io.IOUtils
import sys.process._
import scala.collection.mutable.ArrayBuffer

val showSampleRows = true
val bfRdd = sc.binaryFiles("/some/files/*,/more/files/*")
val rdd   = bfRdd.flatMap{ case(file, pds) => {  // pds is a PortableDataStream
    val rows   = new ArrayBuffer[Array[String]]()
    var errors = List[String]()
    val io     = new ProcessIO (
        in  => {  // "in" is an OutputStream; write the encrypted contents of the 
                  // input file (pds) to this stream
            IOUtils.copy(pds.open(), in)  // open() returns a DataInputStream
            in.close
        },
        out => {  // "out" is an InputStream; read the decrypted data off this stream.
            // Even though this runs in another thread, we can write to rows, since it
            // is part of the closure for this function
            for(line <- scala.io.Source.fromInputStream(out).getLines) {
                // ...decode line here... for my data, it was pipe-delimited
                rows += line.split('|')
            }
            out.close
        },
        err => {  // "err" is an InputStream; read any errors off this stream
            // errors is part of the closure for this function
            errors = scala.io.Source.fromInputStream(err).getLines.toList
            err.close
        }
    )
    val cmd       = List("/my/decryption/program", "--decrypt")
    val exitValue = cmd.run(io).exitValue  // blocks until subprocess finishes
    println(s"-- Results for file $file:")
    if (exitValue != 0) {  
        // TBD write to string accumulator instead, so driver can output errors
        // string accumulator from @zero323: https://stackoverflow.com/a/31496694/215945
        println(s"exit code: $exitValue")
        errors.foreach(println)
    } else {
        // TBD, you'll probably want to move this code to the driver, otherwise
        // unless you're using the shell, you won't see this output
        // because it will be sent to stdout of the executor
        println(s"row count: ${rows.size}")
        if (showSampleRows) {
            println("6 sample rows:")
            rows.slice(0,6).foreach(row => println("  " + row.mkString("|")))
        }
    }
    rows
}}
scala> :paste "test.scala"
Loading test.scala...
...
rdd: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[62] at flatMap at <console>:294

scala> rdd.count  // action, causes Spark code to actually run
-- Results for file hdfs://path/to/encrypted/file1:  // this file had errors
exit code: 255
ERROR: Error decrypting
my_decryption_program: Bad header data[0]
-- Results for file hdfs://path/to/encrypted/file2:
row count: 416638
sample rows:
  <...first row shown here ...>
  ...
  <...sixth row shown here ...>
...
res43: Long = 843039

References:

Upvotes: 1

zero323
zero323

Reputation: 330063

The simplest thing you can do is to write a simple wrapper which takes data from standard input, writes to file, executes an external program, and outputs results to the standard output. After that all you have to do is to use pipe method:

rdd.pipe("your_wrapper")

The only serious considerations is IO performance. If it is possible it would be better to adjust program you want to call so it can read and write data directly without going through disk.

Alternativelly you can use mapPartitions combined with process and standard IO tools to write to the local file, call your program and read the output.

Upvotes: 5

Related Questions