Muhammad Arslan Akhtar
Muhammad Arslan Akhtar

Reputation: 557

KStream-KStream inner join based on messages with matching composite-key

I am trying to perform an inner join between KStream-KStream. I have observed that the join doesn't work when the messages from both KStreams have composite-keys (e.g a java pojo with many attributes) even though the pojo used as the composite-key have the methods hashCode() and equals(Object o) both implemented.

UniqueIdKey.java

public class UniqueIdKey {

    private int id;

    public UniqueIdKey() {
    }

    public UniqueIdKey(int id) {
        this.id = id;
    }

    @JsonGetter("id")
    public int getId() {
        return id;
    }

    @JsonSetter("id")
    public void setId(int id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return "UniqueIdKey{" +
                "id=" + id +
                '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        UniqueIdKey that = (UniqueIdKey) o;
        return id == that.id;
    }

    @Override
    public int hashCode() {
        return Objects.hash(id);
    }
}

The inner join works fine when both KStreams have messages with simple primitive keys (e.g String, int, double)

I am using the latest spring-cloud-stream (Greenwich.SR1) with kafka-client and kafka-stream version 2.2.1

MainApplication.java

@SpringBootApplication
public class KafkaStreamsTableJoin {

    public static void main(String[] args) {
        SpringApplication.run(KafkaStreamsTableJoin.class, args);
    }

    @EnableBinding(KStreamProcessorX.class)
    public static class KStreamToTableJoinApplication {

        @StreamListener
        public void process(@Input("person") KStream<PersonKey, Person> persons,
                                             @Input("school") KStream<SchoolKey, School> schools) {

            //Messages with composite-keys e.g pojo UniqueIdKey.java
            persons.selectKey((PersonKey, Person) -> new UniqueIdKey(PersonKey.getId())).peek((key, value) -> System.out.println("Personkey1= " + key + ", PersonValue1= " + value))
                    .join(
                            schools.selectKey((SchoolKey, School) -> new UniqueIdKey(SchoolKey.getId())).peek((key, value) -> System.out.println("SchoolKey1= " + key + ", SchoolValue1= " + value)),
                            (person, school) -> {
                                System.out.println("person1= " + person + ", school1= " + school); //**This never gets called**
                                return null;
                            },
                            JoinWindows.of(Duration.ofSeconds(5)),
                            Joined.with(
                                    new UniqueIdKeySerde(),
                                    new PersonSerde(),
                                    new SchoolSerde())
            );
            //Messages with primitive keys e.g String
            persons.selectKey((PersonKey, Person) -> PersonKey.getId()).peek((key, value) -> System.out.println("Personkey2= " + key + ", PersonValue2= " + value))
                    .join(
                            schools.selectKey((SchoolKey, School) -> SchoolKey.getId()).peek((key, value) -> System.out.println("Schoolkey2= " + key + ", SchoolValue2= " + value)),
                            (person, school) -> {
                                System.out.println("person2= " + person + ", school2= " + school); //**This one works fine**
                                return null;
                            },
                            JoinWindows.of(Duration.ofSeconds(5)),
                            Joined.with(
                                    Serdes.Integer(),
                                    new PersonSerde(),
                                    new SchoolSerde())
                    );
            //Messages with composite-keys e.g pojo UniqueIdKey.java
            persons.selectKey((PersonKey, Person) -> new UniqueIdKey(PersonKey.getId())).peek((key, value) -> System.out.println("Personkey3= " + key + ", PersonValue3= " + value))
                    .join(
                            schools.selectKey((SchoolKey, School) -> new UniqueIdKey(SchoolKey.getId())).peek((key, value) -> System.out.println("SchoolKey3= " + key + ", SchoolValue3= " + value)),
                            new Joiner(),                           //**This never gets called**
                            JoinWindows.of(Duration.ofSeconds(5)),
                            Joined.with(
                                    new UniqueIdKeySerde(),
                                    new PersonSerde(),
                                    new SchoolSerde())

                    );
        }
    }
    interface KStreamProcessorX {

        @Input("person")
        KStream<?, ?> inputPersonKStream();

        @Input("school")
        KStream<?, ?> inputSchoolKStream();
    }
}

Joiner.java

public class Joiner implements ValueJoiner<Person, School, Null> {

    @Override
    public Null apply(Person person, School school) {
        System.out.println("Joiner person3= " + person + " ,Joiner school3= " + school);
        return null;
    }
}

Person.java

public class Person {

    private double age;

    public Person() {
    }

    public Person(double age) {
        this.age = age;
    }

    @JsonGetter("age")
    public double getAge() {
        return age;
    }

    @JsonSetter("age")
    public void setAge(double age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "Person{" +
                "age=" + age +
                '}';
    }
}

PersonKey.java

public class PersonKey {

    private String firstName;
    private String lastName;
    private int id;

    public PersonKey() {
    }

    public PersonKey(String firstName, String lastName, int id) {
        this.firstName = firstName;
        this.lastName = lastName;
        this.id = id;
    }

    @JsonGetter("firstName")
    public String getFirstName() {
        return firstName;
    }

    @JsonSetter("firstName")
    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    @JsonGetter("lastName")
    public String getLastName() {
        return lastName;
    }

    @JsonSetter("lastName")
    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    @JsonGetter("id")
    public int getId() {
        return id;
    }

    @JsonSetter("id")
    public void setId(int id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return "PersonKey{" +
                "firstName='" + firstName + '\'' +
                ", lastName='" + lastName + '\'' +
                ", id=" + id +
                '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        PersonKey personKey = (PersonKey) o;
        return id == personKey.id &&
                Objects.equals(firstName, personKey.firstName) &&
                Objects.equals(lastName, personKey.lastName);
    }

    @Override
    public int hashCode() {
        return Objects.hash(firstName, lastName, id);
    }
}

School.java

public class School {

    private String address;

    public School() {
    }

    public School(String address) {
        this.address = address;
    }

    @JsonGetter("address")
    public String getAddress() {
        return address;
    }

    @JsonSetter("address")
    public void setAddress(String address) {
        this.address = address;
    }

    @Override
    public String toString() {
        return "School{" +
                "address='" + address + '\'' +
                '}';
    }
}

SchoolKey.java

public class SchoolKey {

    private String name;
    private String country;
    private String city;
    private int id;

    public SchoolKey() {
    }

    public SchoolKey(String name, String country, String city, int id) {
        this.name = name;
        this.country = country;
        this.city = city;
        this.id = id;
    }

    @JsonGetter("name")
    public String getName() {
        return name;
    }

    @JsonSetter("name")
    public void setName(String name) {
        this.name = name;
    }

    @JsonGetter("country")
    public String getCountry() {
        return country;
    }

    @JsonSetter("country")
    public void setCountry(String country) {
        this.country = country;
    }

    @JsonGetter("city")
    public String getCity() {
        return city;
    }

    @JsonSetter("city")
    public void setCity(String city) {
        this.city = city;
    }

    @JsonGetter("id")
    public int getId() {
        return id;
    }

    @JsonSetter("id")
    public void setId(int id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return "SchoolKey{" +
                "name='" + name + '\'' +
                ", country='" + country + '\'' +
                ", city='" + city + '\'' +
                ", id=" + id +
                '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        SchoolKey schoolKey = (SchoolKey) o;
        return id == schoolKey.id &&
                Objects.equals(name, schoolKey.name) &&
                Objects.equals(country, schoolKey.country) &&
                Objects.equals(city, schoolKey.city);
    }

    @Override
    public int hashCode() {
        return Objects.hash(name, country, city, id);
    }
}

Both KStreams are feeded with data from 'person' and 'school' topics respectively. Both person and school messages share the same 'id' based on which the inner join is performed.

person.topic

CreateTime:1559902106959-{"firstName":"JONH","lastName":"wICK","id":1}-{"age":34.0}
CreateTime:1559902106986-{"firstName":"Harley","lastName":"valla","id":2}-{"age":42.0}
CreateTime:1559902106991-{"firstName":"Mike","lastName":"PENCE","id":3}-{"age":23.0}
CreateTime:1559902106996-{"firstName":"Ali","lastName":"Akbar","id":4}-{"age":53.0}
CreateTime:1559902107000-{"firstName":"Arslan","lastName":"Akhtar","id":5}-{"age":53.0}
CreateTime:1559902107005-{"firstName":"Will","lastName":"David","id":6}-{"age":13.0}
CreateTime:1559902107009-{"firstName":"Beoionca","lastName":"Christ","id":7}-{"age":64.0}

school.topic

CreateTime:1559902107055-{"name":"BMIA","country":"PK","city":"Islamabad","id":1}-{"address":"Sector F/8"}
CreateTime:1559902107068-{"name":"CMII","country":"Hk","city":"Rawalpindi","id":2}-{"address":"Sector G/8"}
CreateTime:1559902107073-{"name":"SCSV","country":"USA","city":"Lahore","id":3}-{"address":"Sector H/8"}
CreateTime:1559902107079-{"name":"NVS","country":"SW","city":"Faisalbad","id":4}-{"address":"Sector J/8"}
CreateTime:1559902107082-{"name":"SNVJ","country":"CH","city":"Shikarpur","id":5}-{"address":"Sector C/8"}
CreateTime:1559902107088-{"name":"DBJ","country":"CN","city":"Talaqand","id":6}-{"address":"Sector Z/8"}
CreateTime:1559902107092-{"name":"SCNJ","country":"SE","city":"Karachi","id":7}-{"address":"Sector S/8"}

Console output result

Personkey1= UniqueIdKey{id=1}, PersonValue1= Person{age=34.0}
Personkey2= 1, PersonValue2= Person{age=34.0}
Personkey3= UniqueIdKey{id=1}, PersonValue3= Person{age=34.0}
SchoolKey1= UniqueIdKey{id=1}, SchoolValue1= School{address='Sector F/8'}
Schoolkey2= 1, SchoolValue2= School{address='Sector F/8'}
SchoolKey3= UniqueIdKey{id=1}, SchoolValue3= School{address='Sector F/8'}
Personkey1= UniqueIdKey{id=2}, PersonValue1= Person{age=42.0}
Personkey2= 2, PersonValue2= Person{age=42.0}
Personkey3= UniqueIdKey{id=2}, PersonValue3= Person{age=42.0}
SchoolKey1= UniqueIdKey{id=2}, SchoolValue1= School{address='Sector G/8'}
Schoolkey2= 2, SchoolValue2= School{address='Sector G/8'}
SchoolKey3= UniqueIdKey{id=2}, SchoolValue3= School{address='Sector G/8'}
Personkey1= UniqueIdKey{id=3}, PersonValue1= Person{age=23.0}
Personkey2= 3, PersonValue2= Person{age=23.0}
Personkey3= UniqueIdKey{id=3}, PersonValue3= Person{age=23.0}
SchoolKey1= UniqueIdKey{id=3}, SchoolValue1= School{address='Sector H/8'}
Schoolkey2= 3, SchoolValue2= School{address='Sector H/8'}
SchoolKey3= UniqueIdKey{id=3}, SchoolValue3= School{address='Sector H/8'}
Personkey1= UniqueIdKey{id=4}, PersonValue1= Person{age=53.0}
Personkey2= 4, PersonValue2= Person{age=53.0}
Personkey3= UniqueIdKey{id=4}, PersonValue3= Person{age=53.0}
SchoolKey1= UniqueIdKey{id=4}, SchoolValue1= School{address='Sector J/8'}
Schoolkey2= 4, SchoolValue2= School{address='Sector J/8'}
SchoolKey3= UniqueIdKey{id=4}, SchoolValue3= School{address='Sector J/8'}
Personkey1= UniqueIdKey{id=5}, PersonValue1= Person{age=53.0}
Personkey2= 5, PersonValue2= Person{age=53.0}
Personkey3= UniqueIdKey{id=5}, PersonValue3= Person{age=53.0}
SchoolKey1= UniqueIdKey{id=5}, SchoolValue1= School{address='Sector C/8'}
Schoolkey2= 5, SchoolValue2= School{address='Sector C/8'}
SchoolKey3= UniqueIdKey{id=5}, SchoolValue3= School{address='Sector C/8'}
Personkey1= UniqueIdKey{id=6}, PersonValue1= Person{age=13.0}
Personkey2= 6, PersonValue2= Person{age=13.0}
Personkey3= UniqueIdKey{id=6}, PersonValue3= Person{age=13.0}
SchoolKey1= UniqueIdKey{id=6}, SchoolValue1= School{address='Sector Z/8'}
Schoolkey2= 6, SchoolValue2= School{address='Sector Z/8'}
SchoolKey3= UniqueIdKey{id=6}, SchoolValue3= School{address='Sector Z/8'}
Personkey1= UniqueIdKey{id=7}, PersonValue1= Person{age=64.0}
Personkey2= 7, PersonValue2= Person{age=64.0}
Personkey3= UniqueIdKey{id=7}, PersonValue3= Person{age=64.0}
SchoolKey1= UniqueIdKey{id=7}, SchoolValue1= School{address='Sector S/8'}
Schoolkey2= 7, SchoolValue2= School{address='Sector S/8'}
SchoolKey3= UniqueIdKey{id=7}, SchoolValue3= School{address='Sector S/8'}
person2= Person{age=34.0}, school2= School{address='Sector F/8'}
person2= Person{age=42.0}, school2= School{address='Sector G/8'}
person2= Person{age=23.0}, school2= School{address='Sector H/8'}
person2= Person{age=53.0}, school2= School{address='Sector J/8'}
person2= Person{age=53.0}, school2= School{address='Sector C/8'}
person2= Person{age=13.0}, school2= School{address='Sector Z/8'}
person2= Person{age=64.0}, school2= School{address='Sector S/8'}

UniqueIdKeySerde.java

import kafka.streams.join.UniqueIdKey;
import org.apache.kafka.common.serialization.Serdes;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

public class UniqueIdKeySerde extends Serdes.WrapperSerde<UniqueIdKey> {
    public UniqueIdKeySerde () {
        super(new JsonSerializer<UniqueIdKey>(), new JsonDeserializer<UniqueIdKey>(UniqueIdKey.class));
    }
}

Sample spring-cloud-stream app with reproducible steps to debug

Upvotes: 4

Views: 1656

Answers (2)

todaynowork
todaynowork

Reputation: 1026

Serde example:

public class StateProvinceKeySerde extends JsonSerde<StateProvinceKey> {

  public StateProvinceKeySerde() {
    super(StateProvinceKey.class);
  }
}

Key example:

public class StateProvinceKey {

  private String stateCode;
  private String countryCodeAlpha2;

  public IBMStateProvinceKey() {

  }

  public StateProvinceKey(String stateCode, String countryCodeAlpha2) {
    this.stateCode = stateCode;
    this.countryCodeAlpha2 = countryCodeAlpha2;
  }

  public String getStateCode() {
    return stateCode;
  }

  public void setStateCode(String stateCode) {
    this.stateCode = stateCode;
  }

  public String getCountryCodeAlpha2() {
    return countryCodeAlpha2;
  }

  public void setCountryCodeAlpha2(String countryCodeAlpha2) {
    this.countryCodeAlpha2 = countryCodeAlpha2;
  }

  public byte[] serialize(){
    ObjectMapper objectMapper = new ObjectMapper();
    try {
      return objectMapper.writeValueAsBytes(this);
    } catch (JsonProcessingException e) {
    }
    return new byte[0];
  }
}

Upvotes: 0

Matthias J. Sax
Matthias J. Sax

Reputation: 62330

When Kafka Streams computes an aggregation of join, it does not compare Java objects when it compares keys, but it compares the key byte[] arrays, ie, the serialized keys. Hence, equals() and hashCode are not used.

You will need to make sure, that the used serializer, write matching byte[] arrays for the keys to make the join work.

Upvotes: 1

Related Questions