
Reputation: 2546

Can´t find "window" function in Spark Structured Streaming

I´m coding a small example in Spark Structured Streaming where I´m trying to process the output of the netstatcommand and can´t figure out how to invoke the windowfunction.

These are the relevant lines of my build.sbt:

scalaVersion := "2.11.4"
scalacOptions += "-target:jvm-1.8"

libraryDependencies ++= {

  val sparkVer = "2.3.0"
    "org.apache.spark" %% "spark-streaming" % sparkVer % "provided",
    "org.apache.spark" %% "spark-streaming-kafka-0-8" % sparkVer % "provided",
    "org.apache.spark" %% "spark-core" % sparkVer % "provided" withSources(),
    "org.apache.spark" %% "spark-hive" % sparkVer % "provided",

And the code:

case class NetEntry(val timeStamp: java.sql.Timestamp, val sourceHost: String, val targetHost: String, val status: String)

def convertToNetEntry(x: String): NetEntry = {
    // tcp        0      0 eselivpi14:icl-twobase1 TIME_WAIT
   val array = x.replaceAll("\\s+"," ").split(" ").slice(3,6)
   NetEntry(java.sql.Timestamp.valueOf(, array(0),array(1),array(2))

def main(args: Array[String]) {

    // Initialize spark context
    val spark: SparkSession = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()

    val lines = spark.readStream
    .option("host", args(0))
    .option("port", args(1).toInt)

    import spark.implicits._
    val df =[String].map(x => convertToNetEntry(x))

    val wordsArr: Dataset[NetEntry] =[NetEntry]

    // Never get past this point
    val windowColumn = window($"timestamp", "10 minutes", "5 minutes")

    val windowedCounts = wordsArr.groupBy( windowColumn, $"targetHost").count()

    val query = windowedCounts.writeStream.outputMode("complete").format("console").start()

I have with Spark 2.1, 2,2 and 2.3 with the same results. What is really bizarre is that, I have a Spark Cluster, I log in the Spark Shell and copy all the lines... and it works! Any idea of what am I doing wrong?

The error at compilation time:

[error] C:\code_legacy\edos-dp-mediation-spark-consumer\src\main\scala\com\ericsson\streaming\structured\StructuredStreamingMain.scala:39: not found: value window
[error]     val windowColumn = window($"timestamp", "10 minutes", "5 minutes")
[error]                        ^
[warn] 5 warnings found
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 19 s, completed 16-mar-2018 20:13:40

Update: To make things weirder, I have check the API docs and I could not found a valid reference here either:$implicits$

Upvotes: 1

Views: 4380

Answers (1)

Lokesh Yadav
Lokesh Yadav

Reputation: 998

You need to import the window function to compile it, which is already imported in spark-shell.

Add this import statement:

import org.apache.spark.sql.functions.window

Upvotes: 7

Related Questions