Reputation: 6042
I have couple topics which have consumers and producers. I want to ensure that one distinct application posts correct messages to kafka. So this is somewhat between integration and unit test as I only want to test producer puts the right content into kafka.
Test is effectively:
One option would be to check all the kafka log files with messages and find them by ref id.
But maybe there is some testing tool/pattern which allows to intercept messages from app to kafka and assert its validity? If there is some kafka imposter or stub or maybe I am approaching it entirely from different angle? Any ideas are welcome.
Upvotes: 4
Views: 9584
Reputation: 1
Cucumblan-message library provides predefined Gherkin step definition for Kafka message/event testing. This provides options to tester to write feature files and test. You need to create each implementation java class for the the message that would be verified and check the following tutorials link for more details with examples. This framework can be integrated with your test framework.
Tutorials link:
https://tutorials.virtualan.io/#/Cucumblan-message
Example project and feature file sample:
https://github.com/virtualansoftware/asyncapi-virtualization/tree/master/virtualan-kafka
Scenario: check produce and consume event validation 1
Given Send inline message pets for event MOCK_REQUEST on pet with type JSON
| { "category": { "id": 100, "name": "Fish-POST" }, "id": 100, "name": "GoldFish-POST", "photoUrls": [ "/fish/" ], "status": "available", "tags": [ { "id": 100, "name": "Fish-POST" } ] } |
And Pause message PROCESSING for process for 2000 milliseconds
When Verify-by-elements for pets for event MOCK_RESPONSE contains 101 on pet with type JSON
| id | i~101 |
| category.name | german shepherd |
Then Verify for pets for event MOCK_RESPONSE contains 101 on pet with type JSON
| id,name, category/id:name,status |
| i~101,Rocky,i~100:german shepherd,available |
And Verify for pets for event MOCK_RESPONSE contains 101 on pet with type JSON
| id,name, category/id:name,tags/id:name,status,photoUrls |
| i~101,Rocky,i~100:german shepherd,i~101:brown\|,available,string\| |
Upvotes: 0
Reputation: 36
Running examples are here:
I want to ensure that one distinct application posts correct messages to kafka.
To automate this :
You can validate/assert that that the message has successfully landed in a topic(also a partition).
Basically Kafka brokers send the acknowledgement in form of "recordMetadata".
See the "verify"
section below.
Test is effectively:
- send a message to the topic
To automate this:
e.g. order-topic
You need to produce a record e.g. Order No. 123
and verify it has been successful
Produce:
---
name: load_kafka
url: kafka-topic:order-topic
operation: PRODUCE
request:
records:
- key: "${RANDOM.NUMBER}"
value: "Order No. 123"
partition: 0
verify:
status: Ok
recordMetadata:
topicPartition:
partition: 0
topic: order-topic
In the above step, you may not know the exact partition number where the message has actually landed.
Hence you can use $IS.NOTNULL
as below to assert this(otherwise mention the number).
verify:
status: Ok
recordMetadata:
topicPartition:
partition: "$IS.NOTNULL"
topic: order-topic
- app reads the message and does something
At this point of time the production Application reads/consumes the message from "order-topic" and transforms/enriches it. e.g. it adds card information to the order number "Order No. 123, Card No. 456"
- app generates and sends another message to another topic with reference to message in step one.
Here the production Application sends this "Order No. 123, Card No. 456" to the "billing-topic".
To automate this:
You can validate the expected transformed message in "billing-topic" by consuming the message like below.
Consume:
---
name: consume_message
url: kafka-topic:billing-topic
operation: CONSUME
request: {}
assertions:
size: 1
records:
- key: "$IS.NOTNULL"
value: "Order No. 123, Card No. 456"
To further automate the test scenario:
i.e. if you know the partition number and the offset of the transformed message, then you can read the message and validate.
By using seek: billing-topic,0,1
where 0=partition number, 1=offset.
name: consume_message
url: kafka-topic:billing-topic
operation: CONSUME
request:
consumerLocalConfigs:
maxNoOfRetryPollsOrTimeouts: 1
seek: billing-topic,0,1
assertions:
size: 1
records:
- value:
value: "Order No. 123, Card No. 456"
To enhance the automation test logging:
showRecordsConsumed: true
as below.console/log
output slightly verbose, but helps in analysis and debug purpose...
request:
consumerLocalConfigs:
showRecordsConsumed: true
maxNoOfRetryPollsOrTimeouts: 1
seek: billing-topic,0,1
...
The above example is for PLAINTEXT or a RAW message.
There are number of handy parameters which you can use for your automation test suites.
e.g. to list few of them:
"recordType": "JSON"
or "RAW"`Now to run the above integration test with both the steps
you can use the simple JUnit runner as below.
kafka_e2e_validation_test.yaml
located under test/resources/kafka
kafka_test_server.properties
located under test/resources/kafka_servers
package gov.uk.data.processing.e2e.tests;
import org.jsmart.zerocode.core.domain.Scenario;
import org.jsmart.zerocode.core.domain.TargetEnv;
import org.jsmart.zerocode.core.runner.ZeroCodeUnitRunner;
import org.junit.Test;
import org.junit.runner.RunWith;
@TargetEnv("kafka_servers/kafka_test_server.properties")
@RunWith(ZeroCodeUnitRunner.class)
public class KafkaTest {
@Test
@Scenario("kafka/kafka_e2e_validation_test.yaml")
public void testKafkaMsgTransform_e2e() throws Exception {
}
}
kafka_test_server.properties
has the broker and test suite level configs.# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
# kafka bootstrap servers comma separated
# e.g. localhost:9092,host2:9093
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
kafka.bootstrap.servers=1govsmrsit.execute-kafka.uk-2.amazonaws.com:9092
kafka.producer.properties=kafka_servers/kafka_producer.properties
kafka.consumer.properties=kafka_servers/kafka_consumer.properties
consumer.commitSync = true
consumer.maxNoOfRetryPollsOrTimeouts = 1
consumer.pollingTime = 5000
kafka_producer.properties
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
# kafka producer properties
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
client.id=zerocode-producer
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
#acks=all
#retries=0
#batch.size=16384
#client.id=fcd-producer
#auto.commit.interval.ms=1000
#block.on.buffer.full=true
kafka_consumer.properties
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
# kafka consumer properties
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
group.id=consumerGroup1
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
max.poll.records=1
enable.auto.commit=false
auto.offset.reset=latest
#max.partition.fetch.bytes=23
#max.poll.interval.ms=2000
#group.id=None
#enable.auto.commit=true
So this is somewhat between integration and unit test as I only want to test producer puts the right content into kafka.
Better to use a containerised Kafka using docker which proves a real integration testing and it will make things easy to run from the local laptop as well as in CI CD pipeline.
This also gives more confidence to release the application into higher env as Kafka here is not faked or mocked.
You can find more complex examples of producing/consuming JSON/XML messages in the Zerocode GitHub Wiki
The below HelloWorld GitHub repo has many more flavours of automated Kafka end-to-end scenarios to clone and run locally(docker
is pre-requisite).
Produce examples:
https://github.com/authorjapps/zerocode/tree/master/kafka-testing/src/test/resources/kafka/produce
Consume examples:
https://github.com/authorjapps/zerocode/tree/master/kafka-testing/src/test/resources/kafka/consume
JUnit runners
P.S.
SDET = Software Developer Engineer in Test
Upvotes: 1
Reputation: 4324
I usually follow how Confluent have done integration testing for their examples: https://github.com/confluentinc/examples/tree/master/kafka-streams/src/test/java/io/confluent/examples/streams
You'll find some handy utilities there for running embedded kafka as well as sending and retrieving messages.
Upvotes: 3
Reputation: 3182
I have some experience writing integration tests like this in the past. So my approach was as follows:
Upvotes: 0