Reputation: 2040
I am trying to build a class that starts a system process which waits for stdin. The class should have another method which takes a string, inputs that into the system process, and return the process' output.
The reason is that starting the process involves loading a lot of data and hence takes a while.
I am trying to dummy-test this with bc
, so that bc
is started and waits for input. I would envision an interface like this:
case class BcWrapper(executable: File) {
var bc: Option[???] = None
def startBc(): Unit = bc = Some(???)
def calc(input: String): String = bc.get.???
def stopBc(): Unit = bc.get.???
}
I would like to be able to use it like this:
val wrapper = BcWrapper(new File("/usr/bin/bc"))
wrapper.startBc()
val result1 = wrapper.calc("1 + 1") // should be "2"
val result2 = wrapper.calc(???)
[...]
wrapper.stopBc()
This topic has been touched in multiple questions, but never fully answered for a use case like this one. This question or this one seems to come close. However, I am not sure how to implement the ProcessLogger
, nor whether to use one in the first place.
Unfortunately, the Scala documentation is not very elaborate either.
Note that I do not want to read from stdin, but want to call a function.
The background is that I want to read a large file, read it line by line, preprocess the lines, pass them to the external process, and post-process the output.
Upvotes: 0
Views: 590
Reputation: 51271
You can get something similar, but simpler, like so.
import sys.process._
import util.Try
class StdInReader(val reader :String) {
def send(input :String) :Try[String] =
Try(s"/bin/echo $input".#|(reader).!!.trim)
}
usage:
val bc = new StdInReader("/usr/bin/bc")
bc.send("2 * 8") //res0: scala.util.Try[String] = Success(16)
bc.send("12 + 8") //res1: scala.util.Try[String] = Success(20)
bc.send("22 - 8") //res2: scala.util.Try[String] = Success(14)
Programs that send a non-zero exit-code (bc
doesn't) will result with a Failure()
.
If you need more fine-grained control you might start with something like this and expand on it.
import sys.process._
class ProcHandler(val cmnd :String) {
private val resbuf = collection.mutable.Buffer.empty[String]
def run(data :Seq[String]) :Unit = {
cmnd.run(new ProcessIO(
in => {
val writer = new java.io.PrintWriter(in)
data.foreach(writer.println)
writer.close()
},
out => {
val src = io.Source.fromInputStream(out)
src.getLines().foreach(resbuf += _)
src.close()
},
_.close() //maybe create separate buffer for stderr?
)).exitValue()
}
def results() :Seq[String] = {
val rs = collection.mutable.Buffer.empty[String]
resbuf.copyToBuffer(rs)
resbuf.clear()
rs
}
}
usage:
val bc = new ProcHandler("/usr/bin/bc")
bc.run(List("4+5","6-2","2*5"))
bc.run(List("99/3","11*77"))
bc.results() //res0: Seq[String] = ArrayBuffer(9, 4, 10, 33, 847)
OK, I did some more research and found this. It appears to get at what you want but there are limitations. In particular, the process stays open for input until you want to get output. At that point IO streams are closed to insure all buffers are flushed.
import sys.process._
import util.Try
class ProcHandler(val cmnd :String) {
private val procInput = new java.io.PipedOutputStream()
private val procOutput = new java.io.PipedInputStream()
private val proc = cmnd.run( new ProcessIO(
{ in => // attach to the process's internal input stream
val istream = new java.io.PipedInputStream(procInput)
val buf = Array.fill(100)(0.toByte)
Iterator.iterate(istream.read(buf)){ br =>
in.write(buf, 0, br)
istream.read(buf)
}.takeWhile(_>=0).toList
in.close()
},
{ out => // attach to the process's internal output stream
val ostream = new java.io.PipedOutputStream(procOutput)
val buf = Array.fill(100)(0.toByte)
Iterator.iterate(out.read(buf)){ br =>
ostream.write(buf, 0, br)
out.read(buf)
}.takeWhile(_>=0).toList
out.close()
},
_ => () // ignore stderr
))
private val procO = new java.io.BufferedReader(new java.io.InputStreamReader(procOutput))
private val procI = new java.io.PrintWriter(procInput, true)
def feed(str :String) :Unit = procI.println(str)
def feed(ss :Seq[String]) :Unit = ss.foreach(procI.println)
def read() :List[String] = {
procI.close() //close input before reading output
val lines = Stream.iterate(Try(procO.readLine)){_ =>
Try(procO.readLine)
}.takeWhile(_.isSuccess).map(_.get).toList
procO.close()
lines
}
}
usage:
val bc = new ProcHandler("/usr/bin/bc")
bc.feed(List("9*3","4+11")) //res0: Unit = ()
bc.feed("4*13") //res1: Unit = ()
bc.read() //res2: List[String] = List(27, 15, 52)
bc.read() //res3: List[String] = List()
OK, this is my final word on the subject. I think this ticks every item on your wish list: start the process only once, it stays alive until actively closed, allows alternating the writing and reading.
import sys.process._
class ProcHandler(val cmnd :Seq[String]) {
private var os: java.io.OutputStream = null
private var is: java.io.InputStream = null
private val pio = new ProcessIO(os = _, is = _, _.close())
private val proc = cmnd.run(pio)
def feed(ss :String*) :Unit = {
ss.foreach(_.foreach(os.write(_)))
os.flush()
}
def ready :Boolean = is.available() > 0
def read() :String = {
Seq.fill[Char](is.available())(is.read().toChar).mkString
}
def close() :Unit = {
proc.exitValue()
os.close()
is.close()
}
}
There are still issues and much room for improvement. IO is handled at a basic level (streams) and I'm not sure what I'm doing here is completely safe and correct. The input, feed()
, is required to supply the necessary NewLine terminations, and the output, read()
, is just a raw String
and not separated into a nice collection of string results.
Note that this will bleed system resources if the client code fails to close()
all processes.
Note also that reading doesn't wait for content (i.e. no blocking). After writing the response might not be immediately available.
usage:
val bc = new ProcHandler(Seq("/usr/bin/bc","-q"))
bc.feed("44-21\n", "21*4\n")
bc.feed("67+11\n")
if (bc.ready) bc.read() else "not ready" // "23\n84\n78\n"
bc.feed("67-11\n")
if (bc.ready) bc.read() else "not ready" // "56\n"
bc.feed("67*11\n", "1+2\n")
if (bc.ready) bc.read() else "not ready" // "737\n3\n"
if (bc.ready) bc.read() else "not ready" // "not ready"
bc.close()
Upvotes: 2