Reputation: 557
I am trying to perform an inner join between KStream-KStream. I have observed that the join doesn't work when the messages from both KStreams have composite-keys (e.g a java pojo with many attributes) even though the pojo used as the composite-key have the methods hashCode() and equals(Object o) both implemented.
UniqueIdKey.java
public class UniqueIdKey {
private int id;
public UniqueIdKey() {
}
public UniqueIdKey(int id) {
this.id = id;
}
@JsonGetter("id")
public int getId() {
return id;
}
@JsonSetter("id")
public void setId(int id) {
this.id = id;
}
@Override
public String toString() {
return "UniqueIdKey{" +
"id=" + id +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
UniqueIdKey that = (UniqueIdKey) o;
return id == that.id;
}
@Override
public int hashCode() {
return Objects.hash(id);
}
}
The inner join works fine when both KStreams have messages with simple primitive keys (e.g String, int, double)
I am using the latest spring-cloud-stream (Greenwich.SR1) with kafka-client and kafka-stream version 2.2.1
MainApplication.java
@SpringBootApplication
public class KafkaStreamsTableJoin {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsTableJoin.class, args);
}
@EnableBinding(KStreamProcessorX.class)
public static class KStreamToTableJoinApplication {
@StreamListener
public void process(@Input("person") KStream<PersonKey, Person> persons,
@Input("school") KStream<SchoolKey, School> schools) {
//Messages with composite-keys e.g pojo UniqueIdKey.java
persons.selectKey((PersonKey, Person) -> new UniqueIdKey(PersonKey.getId())).peek((key, value) -> System.out.println("Personkey1= " + key + ", PersonValue1= " + value))
.join(
schools.selectKey((SchoolKey, School) -> new UniqueIdKey(SchoolKey.getId())).peek((key, value) -> System.out.println("SchoolKey1= " + key + ", SchoolValue1= " + value)),
(person, school) -> {
System.out.println("person1= " + person + ", school1= " + school); //**This never gets called**
return null;
},
JoinWindows.of(Duration.ofSeconds(5)),
Joined.with(
new UniqueIdKeySerde(),
new PersonSerde(),
new SchoolSerde())
);
//Messages with primitive keys e.g String
persons.selectKey((PersonKey, Person) -> PersonKey.getId()).peek((key, value) -> System.out.println("Personkey2= " + key + ", PersonValue2= " + value))
.join(
schools.selectKey((SchoolKey, School) -> SchoolKey.getId()).peek((key, value) -> System.out.println("Schoolkey2= " + key + ", SchoolValue2= " + value)),
(person, school) -> {
System.out.println("person2= " + person + ", school2= " + school); //**This one works fine**
return null;
},
JoinWindows.of(Duration.ofSeconds(5)),
Joined.with(
Serdes.Integer(),
new PersonSerde(),
new SchoolSerde())
);
//Messages with composite-keys e.g pojo UniqueIdKey.java
persons.selectKey((PersonKey, Person) -> new UniqueIdKey(PersonKey.getId())).peek((key, value) -> System.out.println("Personkey3= " + key + ", PersonValue3= " + value))
.join(
schools.selectKey((SchoolKey, School) -> new UniqueIdKey(SchoolKey.getId())).peek((key, value) -> System.out.println("SchoolKey3= " + key + ", SchoolValue3= " + value)),
new Joiner(), //**This never gets called**
JoinWindows.of(Duration.ofSeconds(5)),
Joined.with(
new UniqueIdKeySerde(),
new PersonSerde(),
new SchoolSerde())
);
}
}
interface KStreamProcessorX {
@Input("person")
KStream<?, ?> inputPersonKStream();
@Input("school")
KStream<?, ?> inputSchoolKStream();
}
}
Joiner.java
public class Joiner implements ValueJoiner<Person, School, Null> {
@Override
public Null apply(Person person, School school) {
System.out.println("Joiner person3= " + person + " ,Joiner school3= " + school);
return null;
}
}
Person.java
public class Person {
private double age;
public Person() {
}
public Person(double age) {
this.age = age;
}
@JsonGetter("age")
public double getAge() {
return age;
}
@JsonSetter("age")
public void setAge(double age) {
this.age = age;
}
@Override
public String toString() {
return "Person{" +
"age=" + age +
'}';
}
}
PersonKey.java
public class PersonKey {
private String firstName;
private String lastName;
private int id;
public PersonKey() {
}
public PersonKey(String firstName, String lastName, int id) {
this.firstName = firstName;
this.lastName = lastName;
this.id = id;
}
@JsonGetter("firstName")
public String getFirstName() {
return firstName;
}
@JsonSetter("firstName")
public void setFirstName(String firstName) {
this.firstName = firstName;
}
@JsonGetter("lastName")
public String getLastName() {
return lastName;
}
@JsonSetter("lastName")
public void setLastName(String lastName) {
this.lastName = lastName;
}
@JsonGetter("id")
public int getId() {
return id;
}
@JsonSetter("id")
public void setId(int id) {
this.id = id;
}
@Override
public String toString() {
return "PersonKey{" +
"firstName='" + firstName + '\'' +
", lastName='" + lastName + '\'' +
", id=" + id +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PersonKey personKey = (PersonKey) o;
return id == personKey.id &&
Objects.equals(firstName, personKey.firstName) &&
Objects.equals(lastName, personKey.lastName);
}
@Override
public int hashCode() {
return Objects.hash(firstName, lastName, id);
}
}
School.java
public class School {
private String address;
public School() {
}
public School(String address) {
this.address = address;
}
@JsonGetter("address")
public String getAddress() {
return address;
}
@JsonSetter("address")
public void setAddress(String address) {
this.address = address;
}
@Override
public String toString() {
return "School{" +
"address='" + address + '\'' +
'}';
}
}
SchoolKey.java
public class SchoolKey {
private String name;
private String country;
private String city;
private int id;
public SchoolKey() {
}
public SchoolKey(String name, String country, String city, int id) {
this.name = name;
this.country = country;
this.city = city;
this.id = id;
}
@JsonGetter("name")
public String getName() {
return name;
}
@JsonSetter("name")
public void setName(String name) {
this.name = name;
}
@JsonGetter("country")
public String getCountry() {
return country;
}
@JsonSetter("country")
public void setCountry(String country) {
this.country = country;
}
@JsonGetter("city")
public String getCity() {
return city;
}
@JsonSetter("city")
public void setCity(String city) {
this.city = city;
}
@JsonGetter("id")
public int getId() {
return id;
}
@JsonSetter("id")
public void setId(int id) {
this.id = id;
}
@Override
public String toString() {
return "SchoolKey{" +
"name='" + name + '\'' +
", country='" + country + '\'' +
", city='" + city + '\'' +
", id=" + id +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SchoolKey schoolKey = (SchoolKey) o;
return id == schoolKey.id &&
Objects.equals(name, schoolKey.name) &&
Objects.equals(country, schoolKey.country) &&
Objects.equals(city, schoolKey.city);
}
@Override
public int hashCode() {
return Objects.hash(name, country, city, id);
}
}
Both KStreams are feeded with data from 'person' and 'school' topics respectively. Both person and school messages share the same 'id' based on which the inner join is performed.
person.topic
CreateTime:1559902106959-{"firstName":"JONH","lastName":"wICK","id":1}-{"age":34.0}
CreateTime:1559902106986-{"firstName":"Harley","lastName":"valla","id":2}-{"age":42.0}
CreateTime:1559902106991-{"firstName":"Mike","lastName":"PENCE","id":3}-{"age":23.0}
CreateTime:1559902106996-{"firstName":"Ali","lastName":"Akbar","id":4}-{"age":53.0}
CreateTime:1559902107000-{"firstName":"Arslan","lastName":"Akhtar","id":5}-{"age":53.0}
CreateTime:1559902107005-{"firstName":"Will","lastName":"David","id":6}-{"age":13.0}
CreateTime:1559902107009-{"firstName":"Beoionca","lastName":"Christ","id":7}-{"age":64.0}
school.topic
CreateTime:1559902107055-{"name":"BMIA","country":"PK","city":"Islamabad","id":1}-{"address":"Sector F/8"}
CreateTime:1559902107068-{"name":"CMII","country":"Hk","city":"Rawalpindi","id":2}-{"address":"Sector G/8"}
CreateTime:1559902107073-{"name":"SCSV","country":"USA","city":"Lahore","id":3}-{"address":"Sector H/8"}
CreateTime:1559902107079-{"name":"NVS","country":"SW","city":"Faisalbad","id":4}-{"address":"Sector J/8"}
CreateTime:1559902107082-{"name":"SNVJ","country":"CH","city":"Shikarpur","id":5}-{"address":"Sector C/8"}
CreateTime:1559902107088-{"name":"DBJ","country":"CN","city":"Talaqand","id":6}-{"address":"Sector Z/8"}
CreateTime:1559902107092-{"name":"SCNJ","country":"SE","city":"Karachi","id":7}-{"address":"Sector S/8"}
Console output result
Personkey1= UniqueIdKey{id=1}, PersonValue1= Person{age=34.0}
Personkey2= 1, PersonValue2= Person{age=34.0}
Personkey3= UniqueIdKey{id=1}, PersonValue3= Person{age=34.0}
SchoolKey1= UniqueIdKey{id=1}, SchoolValue1= School{address='Sector F/8'}
Schoolkey2= 1, SchoolValue2= School{address='Sector F/8'}
SchoolKey3= UniqueIdKey{id=1}, SchoolValue3= School{address='Sector F/8'}
Personkey1= UniqueIdKey{id=2}, PersonValue1= Person{age=42.0}
Personkey2= 2, PersonValue2= Person{age=42.0}
Personkey3= UniqueIdKey{id=2}, PersonValue3= Person{age=42.0}
SchoolKey1= UniqueIdKey{id=2}, SchoolValue1= School{address='Sector G/8'}
Schoolkey2= 2, SchoolValue2= School{address='Sector G/8'}
SchoolKey3= UniqueIdKey{id=2}, SchoolValue3= School{address='Sector G/8'}
Personkey1= UniqueIdKey{id=3}, PersonValue1= Person{age=23.0}
Personkey2= 3, PersonValue2= Person{age=23.0}
Personkey3= UniqueIdKey{id=3}, PersonValue3= Person{age=23.0}
SchoolKey1= UniqueIdKey{id=3}, SchoolValue1= School{address='Sector H/8'}
Schoolkey2= 3, SchoolValue2= School{address='Sector H/8'}
SchoolKey3= UniqueIdKey{id=3}, SchoolValue3= School{address='Sector H/8'}
Personkey1= UniqueIdKey{id=4}, PersonValue1= Person{age=53.0}
Personkey2= 4, PersonValue2= Person{age=53.0}
Personkey3= UniqueIdKey{id=4}, PersonValue3= Person{age=53.0}
SchoolKey1= UniqueIdKey{id=4}, SchoolValue1= School{address='Sector J/8'}
Schoolkey2= 4, SchoolValue2= School{address='Sector J/8'}
SchoolKey3= UniqueIdKey{id=4}, SchoolValue3= School{address='Sector J/8'}
Personkey1= UniqueIdKey{id=5}, PersonValue1= Person{age=53.0}
Personkey2= 5, PersonValue2= Person{age=53.0}
Personkey3= UniqueIdKey{id=5}, PersonValue3= Person{age=53.0}
SchoolKey1= UniqueIdKey{id=5}, SchoolValue1= School{address='Sector C/8'}
Schoolkey2= 5, SchoolValue2= School{address='Sector C/8'}
SchoolKey3= UniqueIdKey{id=5}, SchoolValue3= School{address='Sector C/8'}
Personkey1= UniqueIdKey{id=6}, PersonValue1= Person{age=13.0}
Personkey2= 6, PersonValue2= Person{age=13.0}
Personkey3= UniqueIdKey{id=6}, PersonValue3= Person{age=13.0}
SchoolKey1= UniqueIdKey{id=6}, SchoolValue1= School{address='Sector Z/8'}
Schoolkey2= 6, SchoolValue2= School{address='Sector Z/8'}
SchoolKey3= UniqueIdKey{id=6}, SchoolValue3= School{address='Sector Z/8'}
Personkey1= UniqueIdKey{id=7}, PersonValue1= Person{age=64.0}
Personkey2= 7, PersonValue2= Person{age=64.0}
Personkey3= UniqueIdKey{id=7}, PersonValue3= Person{age=64.0}
SchoolKey1= UniqueIdKey{id=7}, SchoolValue1= School{address='Sector S/8'}
Schoolkey2= 7, SchoolValue2= School{address='Sector S/8'}
SchoolKey3= UniqueIdKey{id=7}, SchoolValue3= School{address='Sector S/8'}
person2= Person{age=34.0}, school2= School{address='Sector F/8'}
person2= Person{age=42.0}, school2= School{address='Sector G/8'}
person2= Person{age=23.0}, school2= School{address='Sector H/8'}
person2= Person{age=53.0}, school2= School{address='Sector J/8'}
person2= Person{age=53.0}, school2= School{address='Sector C/8'}
person2= Person{age=13.0}, school2= School{address='Sector Z/8'}
person2= Person{age=64.0}, school2= School{address='Sector S/8'}
UniqueIdKeySerde.java
import kafka.streams.join.UniqueIdKey;
import org.apache.kafka.common.serialization.Serdes;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
public class UniqueIdKeySerde extends Serdes.WrapperSerde<UniqueIdKey> {
public UniqueIdKeySerde () {
super(new JsonSerializer<UniqueIdKey>(), new JsonDeserializer<UniqueIdKey>(UniqueIdKey.class));
}
}
Sample spring-cloud-stream app with reproducible steps to debug
Upvotes: 4
Views: 1656
Reputation: 1026
Serde example:
public class StateProvinceKeySerde extends JsonSerde<StateProvinceKey> {
public StateProvinceKeySerde() {
super(StateProvinceKey.class);
}
}
Key example:
public class StateProvinceKey {
private String stateCode;
private String countryCodeAlpha2;
public IBMStateProvinceKey() {
}
public StateProvinceKey(String stateCode, String countryCodeAlpha2) {
this.stateCode = stateCode;
this.countryCodeAlpha2 = countryCodeAlpha2;
}
public String getStateCode() {
return stateCode;
}
public void setStateCode(String stateCode) {
this.stateCode = stateCode;
}
public String getCountryCodeAlpha2() {
return countryCodeAlpha2;
}
public void setCountryCodeAlpha2(String countryCodeAlpha2) {
this.countryCodeAlpha2 = countryCodeAlpha2;
}
public byte[] serialize(){
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsBytes(this);
} catch (JsonProcessingException e) {
}
return new byte[0];
}
}
Upvotes: 0
Reputation: 62330
When Kafka Streams computes an aggregation of join, it does not compare Java objects when it compares keys, but it compares the key byte[]
arrays, ie, the serialized keys. Hence, equals()
and hashCode
are not used.
You will need to make sure, that the used serializer, write matching byte[]
arrays for the keys to make the join work.
Upvotes: 1