Reputation: 1819
As shown in Confluent docs on Writing PAPI Applications, you should close the stores you use in your Processor overriding the close method.
In the WordCountProcessor example, it shows how the store should be closed in the call to close() method.
I've done something similar (I don't start them in the init() method but with lazy val in Scala), and I have find that my Processor close() method gets called just after creating the store, and multiple times.
class EventWindowProcessor(sessionStoreName: String, lastSessionByChannelStoreName: String, lastChannelStoreName: String)
extends AbstractProcesso
// example of a store
private lazy val lastChannelStore: KeyValueStore[MyKey, Channel] =
context()
.getStateStore(lastChannelStoreName)
.asInstanceOf[KeyValueStore[MyKey, Channel]]
override def init(context: ProcessorContext) = {
super.init(context)
}
override def close() = {
logger.info("CLOSING PROCESSOR")
}
override def process(key: String, value: String): Unit = {
// ... my stuff here
}
So I get the following output, showing the processor.close() is called so many times at the beginning of the topology running - and also gets called at later points in the application.
[2018-06-08 05:13:16,255] INFO Stream Application starting, name: stream-processor (my.package.StreamProcessorApplication$)
[2018-06-08 05:13:16,760] INFO Topology: Sub-topologies:
Sub-topology: 0
Source: event-source (topics: [events])
--> session-processor
Processor: session-processor (stores: [sessionStoreName, lastSessionByChannelStoreName, lastChannelStoreName])
--> error-event-sink, order-sink, pageviews-sink, session-sink
<-- event-source
Sink: error-event-sink (topic: error-events)
<-- session-processor
Sink: order-sink (topic: orders)
<-- session-processor
Sink: pageviews-sink (topic: pageviews)
<-- session-processor
Sink: session-sink (topic: sessions)
<-- session-processor
Global Stores:
none
(my.package.StreamProcessorApplication$)
[2018-06-08 05:14:01,425] INFO CLOSING PROCESSOR (my.package.StreamProcessor)
[2018-06-08 05:14:01,539] INFO CLOSING PROCESSOR (my.package.StreamProcessor)
[2018-06-08 05:14:01,640] INFO CLOSING PROCESSOR (my.package.StreamProcessor)
... (102 lines like that)
[2018-06-08 05:29:05,548] INFO .... my own application logging here
...so if I close stores in that close() method, when my code in process() tries to use them, an exception comes up stating the store is closed.
Why does processor.close() gets called at the beginning of the KafkaStreams starting? And why does it happen so often?
What are the risks of not explicitly closing stores?
Upvotes: 0
Views: 362
Reputation: 62350
The example in the docs is incorrect. You should not close the store -- the store is managed by Kafka Streams and Kafka Streams will close the store for your. (I'll do a PR to fix the code example. Thanks for pointing out.)
About the calls to Processor#close()
: it's expected that a processor might be closed and re-opened. This happens during rebalance. Thus, you code must be written in a way such that it work correctly for multiple calls to init()
and close()
-- we recently updated the JavaDocs for this (the improved JavaDocs will be part of Kafka 2.0 release).
Upvotes: 2