Aubergine
Aubergine

Reputation: 6042

How does one verify Kafka message content from automated test?

I have couple topics which have consumers and producers. I want to ensure that one distinct application posts correct messages to kafka. So this is somewhat between integration and unit test as I only want to test producer puts the right content into kafka.

Test is effectively:

  1. send a message to the topic
  2. app reads the message and does something
  3. app generates and sends another message to another topic with reference to message in step one.

One option would be to check all the kafka log files with messages and find them by ref id.

But maybe there is some testing tool/pattern which allows to intercept messages from app to kafka and assert its validity? If there is some kafka imposter or stub or maybe I am approaching it entirely from different angle? Any ideas are welcome.

Upvotes: 4

Views: 9584

Answers (4)

Elan Thangamani
Elan Thangamani

Reputation: 1

Cucumblan-message library provides predefined Gherkin step definition for Kafka message/event testing. This provides options to tester to write feature files and test. You need to create each implementation java class for the the message that would be verified and check the following tutorials link for more details with examples. This framework can be integrated with your test framework.

Tutorials link:

https://tutorials.virtualan.io/#/Cucumblan-message

Example project and feature file sample:

https://github.com/virtualansoftware/asyncapi-virtualization/tree/master/virtualan-kafka

Scenario: check produce and consume event validation 1
Given Send inline message pets for event MOCK_REQUEST on pet with type JSON
  | {   "category": {     "id": 100,     "name": "Fish-POST"   },   "id": 100,   "name": "GoldFish-POST",   "photoUrls": [     "/fish/"   ],   "status": "available",   "tags": [     {       "id": 100,       "name": "Fish-POST"     }   ] } |
And Pause message PROCESSING for process for 2000 milliseconds
When Verify-by-elements for pets for event MOCK_RESPONSE contains 101 on pet with type JSON
  | id            | i~101           |
  | category.name | german shepherd |
Then Verify for pets for event MOCK_RESPONSE contains 101 on pet with type JSON
  | id,name, category/id:name,status            |
  | i~101,Rocky,i~100:german shepherd,available |
And Verify for pets for event MOCK_RESPONSE contains 101 on pet with type JSON
  | id,name, category/id:name,tags/id:name,status,photoUrls            |
  | i~101,Rocky,i~100:german shepherd,i~101:brown\|,available,string\| |

Upvotes: 0

Nirmal Fleet
Nirmal Fleet

Reputation: 36

enter image description here

Running examples are here:

I want to ensure that one distinct application posts correct messages to kafka.

To automate this : You can validate/assert that that the message has successfully landed in a topic(also a partition). Basically Kafka brokers send the acknowledgement in form of "recordMetadata". See the "verify" section below.

Test is effectively:

  1. send a message to the topic
  2. app reads the message and does something
  3. app generates and sends another message to another topic with reference to message in step one.
  1. send a message to the topic

To automate this: e.g. order-topic You need to produce a record e.g. Order No. 123 and verify it has been successful

Produce:

---
name: load_kafka
url: kafka-topic:order-topic
operation: PRODUCE
request:
  records:
  - key: "${RANDOM.NUMBER}"
    value: "Order No. 123"
    partition: 0
verify:
  status: Ok
  recordMetadata:
    topicPartition:
      partition: 0
      topic: order-topic

In the above step, you may not know the exact partition number where the message has actually landed. Hence you can use $IS.NOTNULL as below to assert this(otherwise mention the number).

verify:
  status: Ok
  recordMetadata:
    topicPartition:
      partition: "$IS.NOTNULL"
      topic: order-topic
  1. app reads the message and does something

At this point of time the production Application reads/consumes the message from "order-topic" and transforms/enriches it. e.g. it adds card information to the order number "Order No. 123, Card No. 456"

  1. app generates and sends another message to another topic with reference to message in step one.

Here the production Application sends this "Order No. 123, Card No. 456" to the "billing-topic".

To automate this:

You can validate the expected transformed message in "billing-topic" by consuming the message like below.

Consume:

---
name: consume_message
url: kafka-topic:billing-topic
operation: CONSUME
request: {}
assertions:
  size: 1
  records:
  - key: "$IS.NOTNULL"
    value: "Order No. 123, Card No. 456"

To further automate the test scenario:

i.e. if you know the partition number and the offset of the transformed message, then you can read the message and validate. By using seek: billing-topic,0,1 where 0=partition number, 1=offset.

name: consume_message
url: kafka-topic:billing-topic
operation: CONSUME
request:
  consumerLocalConfigs:
    maxNoOfRetryPollsOrTimeouts: 1
    seek: billing-topic,0,1
assertions:
  size: 1
  records:
  - value:
      value: "Order No. 123, Card No. 456"

To enhance the automation test logging:

  • You can mention showRecordsConsumed: true as below.
  • This will make the console/log output slightly verbose, but helps in analysis and debug purpose
...
request:
  consumerLocalConfigs:
    showRecordsConsumed: true
    maxNoOfRetryPollsOrTimeouts: 1
    seek: billing-topic,0,1
...

The above example is for PLAINTEXT or a RAW message.

There are number of handy parameters which you can use for your automation test suites.

e.g. to list few of them:

  • "recordType": "JSON" or "RAW"`
  • "commitSync": true : Whether you cant to read messages where you last read from or from beginning
  • more >> (See the README docs)

Now to run the above integration test with both the steps you can use the simple JUnit runner as below.

  • Your scenario YAML file name can be kafka_e2e_validation_test.yaml located under test/resources/kafka
  • Your Kafka broker config kafka_test_server.properties located under test/resources/kafka_servers
package gov.uk.data.processing.e2e.tests;

import org.jsmart.zerocode.core.domain.Scenario;
import org.jsmart.zerocode.core.domain.TargetEnv;
import org.jsmart.zerocode.core.runner.ZeroCodeUnitRunner;
import org.junit.Test;
import org.junit.runner.RunWith;

@TargetEnv("kafka_servers/kafka_test_server.properties")
@RunWith(ZeroCodeUnitRunner.class)
public class KafkaTest {

    @Test
    @Scenario("kafka/kafka_e2e_validation_test.yaml")
    public void testKafkaMsgTransform_e2e() throws Exception {
    }
}

  • kafka_test_server.properties has the broker and test suite level configs.
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
# kafka bootstrap servers comma separated
# e.g. localhost:9092,host2:9093
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
kafka.bootstrap.servers=1govsmrsit.execute-kafka.uk-2.amazonaws.com:9092

kafka.producer.properties=kafka_servers/kafka_producer.properties
kafka.consumer.properties=kafka_servers/kafka_consumer.properties

consumer.commitSync = true
consumer.maxNoOfRetryPollsOrTimeouts = 1
consumer.pollingTime = 5000
  • kafka_producer.properties
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
#          kafka producer properties
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
client.id=zerocode-producer
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
#acks=all
#retries=0
#batch.size=16384
#client.id=fcd-producer
#auto.commit.interval.ms=1000
#block.on.buffer.full=true

  • kafka_consumer.properties
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
#          kafka consumer properties
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
group.id=consumerGroup1
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
max.poll.records=1
enable.auto.commit=false
auto.offset.reset=latest
#max.partition.fetch.bytes=23
#max.poll.interval.ms=2000
#group.id=None
#enable.auto.commit=true

So this is somewhat between integration and unit test as I only want to test producer puts the right content into kafka.

Better to use a containerised Kafka using docker which proves a real integration testing and it will make things easy to run from the local laptop as well as in CI CD pipeline.

This also gives more confidence to release the application into higher env as Kafka here is not faked or mocked.

You can find more complex examples of producing/consuming JSON/XML messages in the Zerocode GitHub Wiki

The below HelloWorld GitHub repo has many more flavours of automated Kafka end-to-end scenarios to clone and run locally(docker is pre-requisite).

P.S.

SDET = Software Developer Engineer in Test

Upvotes: 1

Michal Borowiecki
Michal Borowiecki

Reputation: 4324

I usually follow how Confluent have done integration testing for their examples: https://github.com/confluentinc/examples/tree/master/kafka-streams/src/test/java/io/confluent/examples/streams

You'll find some handy utilities there for running embedded kafka as well as sending and retrieving messages.

Upvotes: 3

codejitsu
codejitsu

Reputation: 3182

I have some experience writing integration tests like this in the past. So my approach was as follows:

  1. In your test start embedded kafka broker (it's easy, if you are on the JVM).
  2. Start the producer instance and write a message into kafka.
  3. Here you can use mocking/stubbing whatever you want to validate the app behaivoir.

Upvotes: 0

Related Questions