Reputation: 1186
I wanted to setup a basic producer-consumer with Flink on Kafka but I am having trouble producing data to an existing consumer via Java.
CLI solution
I setup a Kafka broker
using kafka_2.11-2.4.0
zip from https://kafka.apache.org/downloads
with commands
bin/zookeeper-server-start.sh config/zookeeper.properties
and bin/kafka-server-start.sh config/server.properties
I create a topic called transactions1 using
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic transactions1
Now I can use a producer and consumer on the command line to see that the topic has been created and works.
To setup consumer I run
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic transactions1 --from-beginning
Now if any producer sends data to the topic transactions1
I will see it in the consumer console.
I test that the consumer is working by running
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic transactions1
and enter the following data lines in the producer in cli which also show up in consumer cli.
{"txnID":1,"amt":100.0,"account":"AC1"}
{"txnID":2,"amt":10.0,"account":"AC2"}
{"txnID":3,"amt":20.0,"account":"AC3"}
Now I want to replicate step 3 i.e producer and consumer in Java code which is the core problem of this question.
...
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
compile group: 'org.apache.flink', name: 'flink-connector-kafka_2.11', version: '1.9.0'
compile group: 'org.apache.flink', name: 'flink-core', version: '1.9.0'
// https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.12', version: '1.9.2'
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.5'
compile group: 'com.twitter', name: 'chill-thrift', version: '0.7.6'
compile group: 'org.apache.thrift', name: 'libthrift', version: '0.11.0'
compile group: 'com.twitter', name: 'chill-protobuf', version: '0.7.6'
compile group: 'org.apache.thrift', name: 'protobuf-java', version: '3.7.0'
}
...
Transactions.class
where you can suggest changes to the Serialization Logic using Kryo, Protobuf or TbaseSerializer by extending classes relevant to Flink.import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;
public class Transaction {
public final int txnID;
public final float amt;
public final String account;
public Transaction(int txnID, float amt, String account) {
this.txnID = txnID;
this.amt = amt;
this.account = account;
}
public String toJSONString() {
Gson gson = new Gson();
return gson.toJson(this);
}
public static Transaction fromJSONString(String some) {
Gson gson = new Gson();
return gson.fromJson(some, Transaction.class);
}
public static MapFunction<String, String> mapTransactions() {
MapFunction<String, String> map = new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
if (value != null || value.trim().length() > 0) {
try {
return fromJSONString(value).toJSONString();
} catch (Exception e) {
return "";
}
}
return "";
}
};
return map;
}
@Override
public String toString() {
return "Transaction{" +
"txnID=" + txnID +
", amt=" + amt +
", account='" + account + '\'' +
'}';
}
}
transactions1
.public class SetupSpike {
public static void main(String[] args) throws Exception {
System.out.println("begin");
List<Transaction> txns = new ArrayList<Transaction>(){{
add(new Transaction(1, 100, "AC1"));
add(new Transaction(2, 10, "AC2"));
add(new Transaction(3, 20, "AC3"));
}};
// This list txns needs to be serialized in Flink as Transaction.class->String->ByteArray
//via producer and then to the topic in Kafka broker
//and deserialized as ByteArray->String->Transaction.class from the Consumer in Flink reading Kafka broker.
String topic = "transactions1";
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", topic);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//env.getConfig().addDefaultKryoSerializer(Transaction.class, TBaseSerializer.class);
// working Consumer logic below which needs edit if you change serialization
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), properties);
myConsumer.setStartFromEarliest(); // start from the earliest record possible
DataStream<String> stream = env.addSource(myConsumer).map(Transaction::toJSONString);
//working Producer logic below which works if you are sinking a pre-existing DataStream
//but needs editing to work with Java List<Transaction> datatype.
System.out.println("sinking expanded stream");
MapFunction<String, String> etl = new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
if (value != null || value.trim().length() > 0) {
try {
return fromJSONString(value).toJSONString();
} catch (Exception e) {
return "";
}
}
return "";
}
};
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(topic,
new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
try {
System.out.println(element);
return new ProducerRecord<byte[], byte[]>(topic, stringToBytes(etl.map(element)));
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}, properties, Semantic.EXACTLY_ONCE);
// stream.timeWindowAll(Time.minutes(1));
stream.addSink(myProducer);
JobExecutionResult execute = env.execute();
}
}
As you can see I am not able to do this with the List txns
provided. The above is working code I could gather from Flink documentation to redirect topic stream data and sending data manually via Cli producer. The problem is writing KafkaProducer code in java that sends data to the topic, which is further compounded with issues like
Can someone who has worked with Flink please help me with how to Produce the txns
List to transactions1
topic in Flink and then verify that it works with Consumer?
Also any help on the issues of adding timestamp or some processing before sinking will be of great help. You can find source code on https://github.com/devssh/kafkaFlinkSpike and the intent is generate Flink boilerplate to add details of "AC1" from an in-memory store and join it with the Transaction event coming in real time to send expanded output to user.
Upvotes: 0
Views: 2031
Reputation: 43524
Several points, in no particular order:
It would be better not to mix Flink version 1.9.2 together with version 1.9.0 as you've done here:
compile group: 'org.apache.flink', name: 'flink-connector-kafka_2.11', version: '1.9.0'
compile group: 'org.apache.flink', name: 'flink-core', version: '1.9.0'
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.12', version: '1.9.2'
For tutorials on how to work with timestamps, watermarks, keyBy, windows, etc., see the online training materials from Ververica.
To use List<Transaction> txns
as an input stream, you can do this (docs):
DataStream<Transaction> transactions = env.fromCollection(txns);
For an example of how to handle serialization / deserialization when working with Flink and Kafka, see the Flink Operations Playground, in particular look at ClickEventDeserializationSchema
and ClickEventStatisticsSerializationSchema
, which are used in ClickEventCount.java and defined here. (Note: this playground has not yet been updated for Flink 1.10.)
Upvotes: 1