Josh Lemer
Josh Lemer

Reputation: 446

Flink does not checkpoint, and BucketingSink leaves files in pending state, when source is generated from Collection

I'm trying to generate some test data using a collection, and write that data to s3, Flink doesn't seem to do any checkpointing at all when I do this, but it does do checkpointing when the source comes from s3.

For example, this DOES checkpoint and leaves output files in a completed state:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setMaxParallelism(128)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(2000L)
env.setStateBackend(new RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints"))

val lines: DataStream[String] = {
  val path = "s3a://my_bucket/simple_job/in"
  env
    .readFile(
      inputFormat = new TextInputFormat(new Path(path)),
      filePath = path,
      watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
      interval = 5000L
    )
}

val sinkFunction: BucketingSink[String] =
  new BucketingSink[String]("s3a://my_bucket/simple_job/out")
    .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))

lines.addSink(sinkFunction)

env.execute()

Meanwhile, this DOES NOT checkpoint, and leaves files in a .pending state even after the job has finished:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setMaxParallelism(128)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(2000L)
env.setStateBackend(new RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints"))

val lines: DataStream[String] = env.fromCollection((1 to 100).map(_.toString))

val sinkFunction: BucketingSink[String] =
  new BucketingSink[String]("s3a://my_bucket/simple_job/out")
    .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))

lines.addSink(sinkFunction)

env.execute()

Upvotes: 1

Views: 845

Answers (1)

Josh Lemer
Josh Lemer

Reputation: 446

It turns out that this is because of this ticket: https://issues.apache.org/jira/browse/FLINK-2646 and simply comes about because the stream from collection finishes finishes before the app ever has time to make a single checkpoint.

Upvotes: 1

Related Questions