daydreamer
daydreamer

Reputation: 91949

Akka Stream: Could not process file from Test, works fine from Main class

This is what my code looks like

object LogFile {
  def apply(file: File, system: ActorSystem) = new LogFile(file, system)
  val maxBytesPerLine = 1500

  def main(args: Array[String]) {
    val system = ActorSystem("system")
    LogFile(new File("/Users/harit/Downloads/customer issues/bc/bluecoat_proxy_big/Demo_log_004.log"), system).process()
  }
}

class LogFile(file: File, implicit val system: ActorSystem) {
  Predef.assert(file.exists(), "log file must exists")

  implicit val materializer = ActorMaterializer()
  val logger = Logger(LoggerFactory.getLogger(getClass))

  val source: Source[ByteString, Future[Long]] = Source.synchronousFile(file)

  // todo (harit): what should be maximumFrameLength
  val flow: Flow[ByteString, String, Unit] = Flow[ByteString]
    .via(Framing.delimiter(ByteString(System.lineSeparator), maximumFrameLength = LogFile.maxBytesPerLine, allowTruncation = true))
    .map(_.utf8String)

  def process() = {
    logger.debug(s"processing $file")
    source.via(flow).runForeach(println)
  }
}

When I run this, I get output as

16:07:58.062 [main] DEBUG com.learner.processor.LogFile - processing /Users/harit/workspace/LogProcessor/trunk/test-log-files/bc.envision.0001.log
Nov 13 14:50:29 [151.162.175.11] %CACHEFLOWELFF-4-: date="2013-11-13",time="19:49:06",time-taken="1",c-ip="151.162.175.2",s-action="FAILED",s-ip="151.162.175.11",s-hierarchy="-",s-supplier-name="-",s-sitename="SG-SOCKS-Service",cs-user="-",cs-username="-",cs-auth-group="-",cs-categories="unavailable;unavailable;unavailable;unavailable;unavailable",cs-method="unknown",cs-host="-",cs-uri="-",cs-uri-scheme="-",cs-uri-port="0",cs-uri-path="/",cs-uri-query="-",cs-uri-extension="-",cs(Referer)="-",cs(User-Agent)="-",cs-bytes="0",sc-status="0",sc-bytes="0",sc-filter-result="DENIED",sc-filter-category="unavailable",x-virus-id="-",x-exception-id="internal_error",rs(Content-Type)="-",duration="0",s-supplier-ip="-",cs(Cookie)="-",s-computername="ATLTCP-WP01",s-port="1080",cs-uri-stem="-",cs-version="-",
Nov 13 14:50:29 [151.162.175.11] %CACHEFLOWELFF-4-: date="2013-11-13",time="19:49:06",time-taken="6",c-ip="170.227.133.15",s-action="TCP_NC_MISS",s-ip="151.162.175.11",s-hierarchy="-",s-supplier-name="s.youtube.com",s-sitename="SG-HTTP-Service",cs-user="NA\os3281",cs-username="os3281",cs-auth-group="na\SEC-InternetAccess",cs-categories="Mixed Content/Potentially Adult;Audio/Video Clips",cs-method="GET",cs-host="s.youtube.com",cs-uri="http://s.youtube.com/stream_204?el=detailpage&cpn=5IWD86_4oayP2E4P&scoville=1&event=streamingstats&vps=652.509:PL&docid=KFwjibi-JRU&df=652.509:474&fmt=134&ns=yt&ei=_NSDUuDMFMXc6QaTnYCwDA",cs-uri-scheme="http",cs-uri-port="80",cs-uri-path="/stream_204",cs-uri-query="?el=detailpage&cpn=5IWD86_4oayP2E4P&scoville=1&event=streamingstats&vps=652.509:PL&docid=KFwjibi-JRU&df=652.509:474&fmt=134&ns=yt&ei=_NSDUuDMFMXc6QaTnYCwDA",cs-uri-extension="-",cs(Referer)="http://s.ytimg.com/yts/swfbin/player-vfle2Qpz1/watch_as3.swf",cs(User-Agent)="Mozilla/5.0 (Windows NT 5.1; rv:17.0) Gecko/20100101 Firefox/17.0",cs-bytes="1374",sc-status="204",sc-bytes="400",sc-filter-result="OBSERVED",sc-filter-category="Mixed%20Content/Potentially%20Adult",x-virus-id="-",x-exception-id="-",rs(Content-Type)="text/html;%20charset=UTF-8",duration="0",s-supplier-ip="74.125.196.101",cs(Cookie)="dkv=074ff5491b2ac93d89643b66573b1028e3QEAAAAdGxpcGkl1YNSMA==",s-computername="ATLTCP-WP01",s-port="880",cs-uri-stem="http://s.youtube.com/stream_204",cs-version="HTTP/1.0",

Now I write test for it as

class LogFileTest extends UnitTestKit(ActorSystem("logFileTestSystem")) {
  behavior of "LogFile"

  "source with some log lines" should "complete processing successfully" in {

    val bcFile = new File("/Users/harit/Downloads/customer issues/bc/bluecoat_proxy_big/Demo_log_004.log")
    val logFile: LogFile = LogFile(bcFile, system)
    logFile.process()
  }
}

and

class UnitTestKit(_system: ActorSystem) extends TestKit(_system)
with FlatSpecLike
with BeforeAndAfterAll
with ShouldMatchers {
  override protected def afterAll(): Unit = _system.shutdown()
}

When I run this test, I see

16:10:14.849 [ScalaTest-run-running-LogFileTest] DEBUG com.learner.processor.LogFile - processing /Users/harit/Downloads/customer issues/bc/bluecoat_proxy_big/Demo_log_004.log

Process finished with exit code 0

and no output is generated, what am I missing?

Upvotes: 0

Views: 79

Answers (1)

Aldo Stracquadanio
Aldo Stracquadanio

Reputation: 6237

Your problem is that runForeach returns a Future[Unit] and your test is not awaiting on it, so the test terminates right after the process started and no computation is actually run. Using Await.result or the whenReady utility in ScalaTest ScalaFutures trait to consume the result of your logFile.process() call should fix it.

Upvotes: 3

Related Questions