Reputation: 1809
I have a test that temperamentally leaves an open producer thread with a continuous error logging.
[2018-06-01 15:52:48,526] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,526] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,526] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,628] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,628] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,628] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,730] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,730] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,730] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,982] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,982] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,982] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
The test works, but sometimes fails like above.
test("My test") {
val topology = Application.getTopology(...)
val streams = new KafkaStreams(topology,properties)
withRunningKafka {
createCustomTopic(eventTopic)
val streamId = UUIDs.newUuid().toString
logger.info(s"Creating stream with Application ID: [$streamId]")
val streams = new KafkaStreams(topology, streamConfig(streamId, PropertiesConfig.asScalaMap(props)))
try {
publishToKafka(eventTopic, key = keyMSite1UID1, message = event11a)
// ... several more publishings
Thread.sleep(publishingDelay) // Give time to initialize
streams.start()
Thread.sleep(deletionDelay)
withConsumer[MyKey, MyEvent, Unit] { consumer =>
val consumedMessages: Stream[(MyKey, MyEvent)] =
consumer.consumeLazily[(MyKey, MyEvent)](eventTopic)
val messages = consumedMessages.take(20).toList
messages.foreach(tuple => logger.info("EVENT END: " + tuple))
messages.size should be(6)
// several assertions here
}
} finally {
streams.close()
}
}(config)
}
A particularity is that the streams application produces deletion events over the same topic it consumes from.
There are two similar tests in this suite. I execute the test suite under sbt, like so:
testOnly *MyTest
Four out of five executions leave a dangling thread posting those errors indefinitely. They appear in groups of 3, but I don't know why either.
I've tried setting delays after calls to close(), but it does not seem to help. How to avoid dangling Producer threads?
Upvotes: 0
Views: 562
Reputation: 62285
In your test, you are creating two KafkaStreams
instances, but you only close()
one. I assume that the lacking Producer
belongs to the instance you don't close. Note, that you need to call KafkaStreams#close()
even if you never called KafkaStreams#start()
.
Upvotes: 1