Reputation: 8903
I want to understand a little more details on the relationship between StreamThread
, StreamTask
and how many instances of StreamProcessor
is created when we have:
StreamThread
(num.stream.threads=1)I am keeping a simple processor topology:
source_topic --> Processor1 --> Processor2 --> Processo3 --> sink_topic
Each processor simply forwards to next processor in chain. Snippet of one of the processors. I am using low level Java API.
public class Processor1 implements Processor<String, String> {
private ProcessorContext context;
public Processor1() {
}
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context
}
@Override
public void punctuate(long timestamp) {
// TODO Auto-generated method stub
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public void process(String key, String value) {
System.out.println("Inside Processor1#process() method");
context.forward(key, value);
}
}
Snippet of Main driver application:
Topology topology = new Topology();
topology.addSource("SOURCE", "source-topic-data");
topology.addProcessor("Processor1", () -> new Processor1(), "SOURCE");
topology.addProcessor("Processor2", () -> new Processor2(), "Processor1");
topology.addProcessor("Processor3", () -> new Processor3(), "Processor2");
topology.addSink("SINK", "sink-topic-data", "Processor3");
Properties settings = new Properties();
settings.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
StreamsConfig config = new StreamsConfig(settings);
KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();
With this arrangement, I have following questions:
Processor1
, Processor2
, Processor3
) will be created?SIX stream tasks
. Is a new instance of processor created for each Stream task
or they "share" the same Processor instance
?Stream Thread
is created, does it create a new instance of processor
?Stream Tasks
created as part of Stream Threads
creation?(New question added to original list)
single stream thread
will have SIX stream tasks
. Does a stream thread
execute these stream tasks
one-by-one, sort of "in-a-loop". Do stream tasks
run as a separate "thread". Basically, not able to understand how a single stream thread
run multiple stream tasks
at the same time/in parallel?The below is topology which gets printed:
KafkaStreams processID: 1602fe25-57ab-4620-99df-fd0c15d96e42
StreamsThread appId: my-first-streams-application
StreamsThread clientId: my-first-streams-application-1602fe25-57ab-4620-99df-fd0c15d96e42
StreamsThread threadId: my-first-streams-application-1602fe25-57ab-4620-99df-fd0c15d96e42-StreamThread-1
Active tasks:
Running: StreamsTask taskId: 0_0
ProcessorTopology:
SOURCE:
topics: [source-topic-data]
children: [Processor1]
Processor1:
children: [Processor2]
Processor2:
children: [Processor3]
Processor3:
children: [SINK]
SINK:
topic: sink-topic-data
Partitions [source-topic-data-0]
StreamsTask taskId: 0_1
ProcessorTopology:
SOURCE:
topics: [source-topic-data]
children: [Processor1]
Processor1:
children: [Processor2]
Processor2:
children: [Processor3]
Processor3:
children: [SINK]
SINK:
topic: sink-topic-data
Partitions [source-topic-data-1]
StreamsTask taskId: 0_2
ProcessorTopology:
SOURCE:
topics: [source-topic-data]
children: [Processor1]
Processor1:
children: [Processor2]
Processor2:
children: [Processor3]
Processor3:
children: [SINK]
SINK:
topic: sink-topic-data
Partitions [source-topic-data-2]
StreamsTask taskId: 0_3
ProcessorTopology:
SOURCE:
topics: [source-topic-data]
children: [Processor1]
Processor1:
children: [Processor2]
Processor2:
children: [Processor3]
Processor3:
children: [SINK]
SINK:
topic: sink-topic-data
Partitions [source-topic-data-3]
StreamsTask taskId: 0_4
ProcessorTopology:
SOURCE:
topics: [source-topic-data]
children: [Processor1]
Processor1:
children: [Processor2]
Processor2:
children: [Processor3]
Processor3:
children: [SINK]
SINK:
topic: sink-topic-data
Partitions [source-topic-data-4]
StreamsTask taskId: 0_5
ProcessorTopology:
SOURCE:
topics: [source-topic-data]
children: [Processor1]
Processor1:
children: [Processor2]
Processor2:
children: [Processor3]
Processor3:
children: [SINK]
SINK:
topic: sink-topic-data
Partitions [source-topic-data-5]
Suspended:
Restoring:
New:
Standby tasks:
Running:
Suspended:
Restoring:
New:
Upvotes: 3
Views: 422
Reputation: 62330
How many instances of processors (Processor1, Processor2, Processor3) will be created?
In your example, six each. Each task will instantiate a full copy of the Topology
. (cf. https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L355; note: a Topology
is a the logical representation of the program, and is instantiated asProcessorTopology
at runtime)
As per my understanding, there will be SIX stream tasks. Is a new instance of processor created for each Stream task or they "share" the same Processor instance?
Each task has its own Processor
instance -- they are not shared.
When a Stream Thread is created, does it create a new instance of processor?
No. When a task is created, it will create new Processor
instances.
Are Stream Tasks created as part of Stream Threads creation?
No. Tasks are create during a rebalance according to the partition/task assignment. KafkaStreams registers a StreamsRebalanceListener
on its internal cosumner that call TaskManager#createTasks()
Update (as question was extended):
In this scenario a single stream thread will have SIX stream tasks. Does a stream thread execute these stream tasks one-by-one, sort of "in-a-loop". Do stream tasks run as a separate "thread". Basically, not able to understand how a single stream thread run multiple stream tasks at the same time/parallely?
Yes, the StreamsThread
will execute the tasks in a loop. There are no other threads. Hence, tasks that are assigned to the same thread are not executed at the same time/in-parallel but one after each other.(Cf. https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java#L472 -- each StreamThread
used exactly one TaskManager
that uses AssignedStreamsTasks
and AssignedStandbyTasks
internally.)
Upvotes: 4