Semo
Semo

Reputation: 821

Kafka ConsumerRecord returns null

When trying to implement a Unit-test in a spring-boot application, I can't retrieve a ConsumerRecord, though a custom Serializer using an own POJO is working. I checked it with the kafka-console-consumer, where a new message is each and every time I run the test generated and appears on the console. What do I have to do to get the record instead of a null?

@RunWith(SpringRunner.class)
@SpringBootTest
@DisplayName("Testing GlobalMessageTest")
@DirtiesContext
public class NumberPlateSenderTest {

private static Logger log = LogManager.getLogger(NumberPlateSenderTest.class);

@Autowired
KafkaeskAdapterApplication kafkaeskAdapterApplication;

@Autowired
private NumberPlateSender numberPlateSender;

private KafkaMessageListenerContainer<String, NumberPlate> container;
private BlockingQueue<ConsumerRecord<String, NumberPlate>> records;

private static final String SENDER_TOPIC = "numberplate_test_topic";

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);

@Before
public void setUp() throws Exception {
    // set up the Kafka consumer properties
    Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka);
    consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NumberPlateDeserializer.class);


    // create a Kafka consumer factory
    DefaultKafkaConsumerFactory<String, NumberPlate> consumerFactory =
            new DefaultKafkaConsumerFactory<>(consumerProperties);

    // set the topic that needs to be consumed
    ContainerProperties containerProperties = new ContainerProperties(SENDER_TOPIC);

    // create a Kafka MessageListenerContainer
    container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

    // create a thread safe queue to store the received message
    records = new LinkedBlockingQueue<>();

    // setup a Kafka message listener
    container.setupMessageListener((MessageListener<String, NumberPlate>) record -> {
        log.info("Message Listener received message='{}'", record.toString());
        records.add(record);
    });

    // start the container and underlying message listener
    container.start();

    // wait until the container has the required number of assigned partitions
    ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
}

@DisplayName("Should send a Message to a Producer and retrieve it")
@Test
public void TestProducer() throws InterruptedException {
    //Test instance of Numberplate to send
    NumberPlate localNumberplate = new NumberPlate();
    byte[] bytes = "0x33".getBytes();
    localNumberplate.setImageBlob(bytes);
    localNumberplate.setNumberString("ABC123");
    log.info(localNumberplate.toString());

    //Send it
    numberPlateSender.sendNumberPlateMessage(localNumberplate);

    //Retrieve it
    ConsumerRecord<String, NumberPlate> received = records.poll(3, TimeUnit.SECONDS);
    log.info("Received the following content of ConsumerRecord: {}", received);

    if (received == null) {
        assert false;
    } else {
        NumberPlate retrNumberplate = received.value();
        Assert.assertEquals(retrNumberplate, localNumberplate);
    }
}

@After
public void tearDown() {
    // stop the container
    container.stop();
}

}

The complete code can be seen at my github repository. I read a load of different SO questions and searched the web, but can't find an approach what is wrong with my code. Other users posted similar problems but to no avail.

The kafka version which runs on my Craptop is kafka_2.11-1.0.1

The springframework kafka Client is of version 2.1.5.RELEASE

Upvotes: 2

Views: 3090

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121292

Your problem that you start consumer against embedded Kafka, but send data to the real one. I don't know what is your goal, but I made it working against an embedded Kafka like this:

@BeforeClass
public static void setup() {
    System.setProperty("kafka.bootstrapAddress", embeddedKafka.getBrokersAsString());
}

I override your kafka.bootstrapAddress configuration property for the producer with the broker address provided by the embedded Kafka.

In this case I fail with the:

java.lang.AssertionError: expected: dev.semo.kafkaeskadapter.models.NumberPlate<NumberPlate{numberString='ABC123', imageBlob=[48, 120, 51, 51]}> but was: dev.semo.kafkaeskadapter.models.NumberPlate<NumberPlate{numberString='ABC123', imageBlob=[48, 120, 51, 51]}>
Expected :dev.semo.kafkaeskadapter.models.NumberPlate<NumberPlate{numberString='ABC123', imageBlob=[48, 120, 51, 51]}> 
Actual   :dev.semo.kafkaeskadapter.models.NumberPlate<NumberPlate{numberString='ABC123', imageBlob=[48, 120, 51, 51]}>

But that's just because you use this assertion:

Assert.assertEquals(retrNumberplate, localNumberplate);

Meanwhile your NumberPlate doesn't provide a proper equals() implementation. Something like this:

@Override
public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;
    NumberPlate that = (NumberPlate) o;
    return Objects.equals(numberString, that.numberString) &&
            Arrays.equals(imageBlob, that.imageBlob);
}

@Override
public int hashCode() {
    int result = Objects.hash(numberString);
    result = 31 * result + Arrays.hashCode(imageBlob);
    return result;
}

Thank you for providing the whole project to play and reproduce! With the "question-answer-question-answer" game we would spend too much time here :-).

Upvotes: 2

Related Questions