salty
salty

Reputation: 91

Streaming to parquet files not happy with flink 1.6.1

I'm very new to flink (and parquet/hadoop for that matter) so I'm most certainly doing something really stupid. I'm trying to create a sink which will dump my data source to a parquet file.

My code looks like:

val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1);
streamEnv.enableCheckpointing(100, CheckpointingMode.EXACTLY_ONCE);

val sink = StreamingFileSink.forBulkFormat(outputPath, ParquetAvroWriters.forReflectRecord(classOf[MyClass])).build()
testSource.addSink(sink)

Unfortunately I'm not getting the exception I was earlier, but it's still not generating correct output. I'm currently getting a single .part-xxx file with 4B of data in it. There are about 20,000 records in this stream so that doesn't seem right.

Before I started writing this question I was getting a method not found exception from ParquetAvroWriters.java at line 84.

That code looks like:

return AvroParquetWriter.<T>builder(out)
    .withSchema(schema)
    .withDataModel(dataModel)
    .build();

The AvroParquetWriter method signature is:

public static <T> Builder<T> builder(Path file)

But the parameter at the time ParquetAvroWriters.java calls it is a StreamOutputFile hence the no method error.

I'm using flink 1.6.1 and parquet-hadoop/parquet-avro 1.10.0. How exactly should I be setting things up to write a parquet file?

This is getting most frustrating - I can't even find an example that compiles.

Upvotes: 2

Views: 2211

Answers (2)

user644265
user644265

Reputation: 301

After reading the coments of the people, I have create a project with the same code( similar), but you can compile and execute.

object CustomSource {

  case class TextOut(data:String )

  def generateRandomStringSource(out: SourceContext[TextOut]) = {
    val lines = Array("how are you", "you are how", " i am fine")
    while (true) {
      val index = Random.nextInt(3)
      Thread.sleep(200)
      out.collect(TextOut(lines(index)))
    }
  }

  def main(args: Array[String]) {
    val streamEnv =
      StreamExecutionEnvironment.getExecutionEnvironment

    streamEnv.setParallelism(1)
    streamEnv.enableCheckpointing(10000,
      CheckpointingMode.EXACTLY_ONCE)
    val sink = StreamingFileSink.forBulkFormat(new
        Path("file:///tmp/test2"),
      ParquetAvroWriters.forReflectRecord(classOf[TextOut])).build()

    val customSource = streamEnv.addSource(generateRandomStringSource
      _)

    customSource.print()

    customSource.addSink(sink)

    streamEnv.execute()
  }
}

I have created a project to show how is running, and the minimal things(jar,ect) that it is necessary.

This is the link : https://github.com/jose1003/flinkparquet

BR

Jose

Upvotes: 4

Till Rohrmann
Till Rohrmann

Reputation: 13346

Flink's StreamingFileSink using a bulk format automatically uses the OnCheckpointRollingPolicy. This means that results are only materialized whenever a checkpoint completes. This is required to provide exactly once processing guarantees.

I assume that you are using a CollectionSource as a test input and that the processing of this input takes less than the specified 100ms. Consequently, no checkpoint can complete and also no results will be written. Flink won't trigger a checkpoint once the input is completely consumed. Thus, all events after the last completed checkpoint won't be visible.

Try to decrease the checkpoint interval, increase the number of elements in your CollectionSource or write your own TestingSource extends SourceFunction which runs at least as long as a single checkpoint interval (e.g. with a sleep). That way, Flink should be able to complete a checkpoint and, thus, write out the results to the specified directory.

Upvotes: 2

Related Questions