Reputation: 127
I'd like to add some codes here and stdout the protobuf data from Flink.
I am using Flink's Apache Kafka Connector in order to connect Flink to Kafka.
This is my Flink's code.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "localhost:9092").
val producer = FlinkKafkaProducer011(topic, new myProtobufSchema, props)
env.addSink(producer)
env.execute("To Kafka")
Here is my Kafka's code.
val props: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "protobuf-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
p
}
val builder: StreamsBuilder = new StreamsBuilder
// TODO: implement here to stdout
val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
streams.start()
sys.ShutdownHookThread {
streams.close(Duration.ofSeconds(10))
}
Upvotes: 0
Views: 168
Reputation: 192023
You need to setup the StreamsBuilder
to consume from a topic
val builder: StreamsBuilder = new StreamsBuilder()
.stream(topic)
.print(Printed.toSysOut());
Upvotes: 1