MaatDeamon
MaatDeamon

Reputation: 9761

Reading binaryFile with Spark Streaming

Does any one know how to setup the `

streamingContext.fileStream [KeyClass, ValueClass, InputFormatClass] (dataDirectory)

to actually consume binary files.

If someone clarify the use of the method.

EDIT1

I have tried the following:

val bfiles = ssc.fileStreamBytesWritable, BytesWritable, SequenceFileAsBinaryInputFormat

However the compiler complain as such:

[error] /xxxxxxxxx/src/main/scala/EstimatorStreamingApp.scala:14: type arguments [org.apache.hadoop.io.BytesWritable,org.apache.hadoop.io.BytesWritable,org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat] conform to the bounds of none of the overloaded alternatives of
[error]  value fileStream: [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path => Boolean, newFilesOnly: Boolean, conf: org.apache.hadoop.conf.Configuration)(implicit evidence$10: scala.reflect.ClassTag[K], implicit evidence$11: scala.reflect.ClassTag[V], implicit evidence$12: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path => Boolean, newFilesOnly: Boolean)(implicit evidence$7: scala.reflect.ClassTag[K], implicit evidence$8: scala.reflect.ClassTag[V], implicit evidence$9: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String)(implicit evidence$4: scala.reflect.ClassTag[K], implicit evidence$5: scala.reflect.ClassTag[V], implicit evidence$6: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)]
[error]   val bfiles = ssc.fileStream[BytesWritable, BytesWritable, SequenceFileAsBinaryInputFormat]("/xxxxxxxxx/Casalini_streamed")

What am i doing wrong ?

Upvotes: 2

Views: 1314

Answers (2)

Rahul Sharma
Rahul Sharma

Reputation: 5834

Follow link to read about about all hadoop input formats

I found here well documented answer about sequence file format.

You are facing the compilation issue because of import missmatch. Hadoop Mapred vs mapreduce

E.g.

Java

JavaPairInputDStream<Text,BytesWritable> dstream=
        sc.fileStream("/somepath",org.apache.hadoop.io.Text.class,
        org.apache.hadoop.io.BytesWritable.class,
    org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat.class);

I didn't try in scala but it should be something similar;

val dstream = sc.fileStream("/somepath", 
        classOf[org.apache.hadoop.io.Text], classOf[org.apache.hadoop.io.BytesWritable],
        classOf[org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat] ) ;

Upvotes: 2

MaatDeamon
MaatDeamon

Reputation: 9761

I finally got it to compile.

The compilation problem was in the import. I used

  • import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat

I replaced it with

  • import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat

Then it works. However i have no idea why. I don't understand the difference between the two hierarchy. The two files seem to have the same content. So it hard to say. If someone could help clarify that here, i think it would help a lot

Upvotes: 0

Related Questions