Reputation: 141
I have created sample Kafka Streams application from the tutorial:
public static void main(String[] args) throws Exception {
Logger log = Logger.getLogger("Name");
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordprint");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final KStreamBuilder builder = new KStreamBuilder();
builder.stream("onecon_postgres").print();
final KafkaStreams streams = new KafkaStreams(builder, props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
log.info("After Start");
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
Unfortunately this application does not read input stream. I have a JDBC source connector from PostgreSQL and it's working fine streaming data from one database (I can see on Kafka Connect UI data within this topic).
The problem I have is even though I have changed IP in BOOTSTRAP_SERVERS_CONFIG in Properties IP is localhost I don't know why.
[main] INFO org.apache.kafka.streams.StreamsConfig - StreamsConfig values:
application.id = streams-linesplit
application.server =
**bootstrap.servers = [localhost:9092]**
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 30000
connections.max.idle.ms = 540000
default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
key.serde = null
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
timestamp.extractor = null
value.serde = null
windowstore.changelog.additional.retention.ms = 86400000
zookeeper.connect =
To overcome this I have used netsh to forward traffic but I cannot see this application to consume my stream.
netsh interface portproxy add v4tov4 listenport=9092 listenaddress=127.0.0.1 connectport=9092 connectaddress=192.168.99.100
Upvotes: 0
Views: 4703
Reputation: 15067
Unfortunately this application does not read input stream.
You seem to have a networking problem between your Kafka Streams application and your Kafka broker. It is rather unlikely that "Kafka Streams does not work".
Also, it's hard to help you without you providing more information:
192.168.99.100:9092
?ERROR
or WARN
log messages?The problem I have is even though I have changed IP in BOOTSTRAP_SERVERS_CONFIG in Properties IP is localhost I don't know why.
I don't understand -- why do you think changing the BOOTSTRAP_SERVERS_CONFIG
to localhost:9092
will fix your original problem? I understood that the Kafka broker actually listens on 192.168.99.100:9092
?
To overcome this I have used netsh to forward traffic but I cannot see this application to consume my stream.
The port forwarding will most probably not help. Without updating the configuration of your Kafka broker, the broker will by default only communicate on its "real" IP + port. Slightly simplified: the broker, configured to listen on 192.168.99.100:9092
, will not respond to localhost:9092
request that your Kafka Streams application sends, even though you are doing port forwarding from localhost:9092 -> 192.168.99.100:9092
on the machine that runs your Kafka Streams application.
Hope this helps a bit!
Upvotes: 1