xksa
xksa

Reputation: 127

How can I forward Protobuf data from Flink to Kafka and stdout?

I'd like to add some codes here and stdout the protobuf data from Flink.

I am using Flink's Apache Kafka Connector in order to connect Flink to Kafka.

This is my Flink's code.

val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "localhost:9092").
val producer = FlinkKafkaProducer011(topic, new myProtobufSchema, props)
env.addSink(producer)
env.execute("To Kafka")

Here is my Kafka's code.

  val props: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "protobuf-application")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    p
  }

  val builder: StreamsBuilder = new StreamsBuilder
  // TODO: implement here to stdout 

  val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
  streams.start()

  sys.ShutdownHookThread {
     streams.close(Duration.ofSeconds(10))
  }

Upvotes: 0

Views: 168

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 192023

You need to setup the StreamsBuilder to consume from a topic

val builder: StreamsBuilder = new StreamsBuilder()
    .stream(topic)
    .print(Printed.toSysOut());

Upvotes: 1

Related Questions