Reputation: 101
Kafka stream I am trying to stream but I've some issues, it doesn't work. To get started, I have three connectors but I can't use my own keys. I need the keys to join them, right? How can I join with 2 or more keys? I try to replicate something like this: select * from (select a. * from users a inner join deps b on a.dep = b.dep and a.group = b.group ) a inner join user_afy on a.id = b.id
I want to save the data the internal join in a topic and use it for the external join. This is an example I have.
Connector Properties:
....
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
mode=timestamp
query=select id, user, dep,tal, group,time from users
numeric.mapping=best_fit
table.types=TABLE
topic=users
// I try use this with 1 or more fields but not worked
transforms=createKey, extractInt
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=dep, group
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=dep, group
standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=D:/tmp/connect.offsets
plugin.path=D:/connector/lib
Topics:
Topic users
{"id":"0001", "user":"Alex", "dep":"ofi", "postal":170, group="ingen",time:"xxx"}
{"id":"0002", "user":"Emy", "dep":"lab", "postal":170, group="itn",time:"xxx"}
{"id":"0003", "user":"Lea", "dep":"lab", "postal":170, group="itn",time:"xxx"}
{"id":"0004", "user":"Silva", "dep":"cent", "postal":170, group="ingen",time:"xxx"}
{"id":"0005", "user":"Foxy", "dep":"cent", "postal":170, group="ete",time:"xxx"}
topic user_afy
{"id":"0001", name="bask"}
{"id":"0001", name="Silf"}
{"id":"0002", name="BTT"}
{"id":"0005", name="butf"}
Topic deps
{"id_dep":"1", "dep":"ofi", "sind"="worker", "group"="ingen."}
{"id_dep":"2", "dep":"lab", "sind"="worker", "group"="iti."}
{"id_dep":"3", "dep":"cent", "sind"="worker", "group"="etc."}
My code is an example of the official website but I can't test it
public static void main(String[] Args) {
Properties props = new Properties();
props.put(......);
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);
StreamsBuilder builder = new StreamsBuilder();
final KStream<String, JsonNode> left = builder.stream("user", consumed);
KTable<String, JsonNode> right = builder.table("deps", consumed);
KStream<String, String> joined = left.join(right,
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
Joined.with(Serdes.String(), jsonSerde, jsonSerde)
);
//Edit
joined.foreach((k, v) -> {
System.out.println("key="+k+ ", val=" + v);
});
}
The output, how would it show? To create a new topic, is a hashmap preferable with the values you want to save in json format? Later I will create custom Serdes
Upvotes: 3
Views: 4346
Reputation: 62350
What do you mean by "I can't use my own keys?". In Kafka Streams you can always set a new key as required for your processing.
If you want to read data into a KTable, you cannot change the key in a straightforward way though. You would need to read the topic as KStream, set a new key, and convert the KStream into a KTable (cf Kafka Streams API: KStream to KTable).
For multiple consecutive joins you can just "chain" the corresponding operations together.
builder.stream("topic-1").selectKey(...).to("table-topic-1");
KTable t1 = builder.table("table-topic-1");
KStream firstJoinResult = builder.stream(...).selectKey(...).join(t1, ...).
builder.stream("topic-2").selectKey(...).to("table-topic-2");
KTable t2 = builder.table("table-topic-2");
firstJoinResult.selectKey(...).join(t2, ...).to("result-topic");
Upvotes: 1