Reputation: 454
Karka Server, Producer running on System A. I have a Kafka Consumer which is a java application. If I run it in System A, able to receive the logs properly. If run in some other system, say B Getting below indicated exception and no logs received. Checked ping and telnet working fine.
0 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 1000
auto.offset.reset = earliest
bootstrap.servers = [192.168.95.217:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = test-consumer-group
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 30000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
4 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=test-consumer-group] Initializing the Kafka consumer
118 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(id = null, nodes = [192.168.95.217:9092 (id: -1 rack: null)], partitions = [])
138 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name fetch-throttle-time
157 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-closed:
163 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-created:
164 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name successful-authentication:
164 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name failed-authentication:
165 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received:
166 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent:
168 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-received:
169 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name select-time:
171 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name io-time:
198 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name heartbeat-latency
198 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name join-latency
199 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name sync-latency
202 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name commit-latency
209 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-fetched
210 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-fetched
210 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name fetch-latency
211 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-lag
220 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.0.0
220 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
223 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=test-consumer-group] Kafka consumer initialized
properties loaded
224 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=test-consumer-group] Subscribed to topic(s): test
224 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test-consumer-group] Sending GroupCoordinator request to broker 192.168.95.217:9092 (id: -1 rack: null)
386 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=test-consumer-group] Initiating connection to node 192.168.95.217:9092 (id: -1 rack: null)
488 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent
489 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received
490 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.latency
491 [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-1, groupId=test-consumer-group] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
492 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=test-consumer-group] Completed connection to node -1. Fetching API versions.
492 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=test-consumer-group] Initiating API versions fetch from node -1.
506 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=test-consumer-group] Recorded API versions for node -1: (Produce(0): 0 to 5 [usable: 5], Fetch(1): 0 to 6 [usable: 6], ListOffsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 5 [usable: 5], LeaderAndIsr(4): 0 to 1 [usable: 1], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 4 [usable: 4], ControlledShutdown(7): 0 to 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0], AlterReplicaLogDirs(34): 0 [usable: 0], DescribeLogDirs(35): 0 [usable: 0], SaslAuthenticate(36): 0 [usable: 0], CreatePartitions(37): 0 [usable: 0])
506 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=test-consumer-group] Sending metadata request (type=MetadataRequest, topics=test) to node 192.168.95.217:9092 (id: -1 rack: null)
514 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to Cluster(id = jm5HyHZbT1mlgdZxinD8oA, nodes = [localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = test, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])])
516 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test-consumer-group] Received GroupCoordinator response ClientResponse(receivedTimeMs=1515749439145, latencyMs=134, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1, correlationId=0), responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=localhost:9092 (id: 0 rack: null)))
516 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test-consumer-group] Discovered coordinator localhost:9092 (id: 2147483647 rack: null)
516 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=test-consumer-group] Initiating connection to node localhost:9092 (id: 2147483647 rack: null)
520 [kafka-coordinator-heartbeat-thread | test-consumer-group] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test-consumer-group] Heartbeat thread started
520 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=test-consumer-group] Sending synchronous auto-commit of offsets {}
520 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=test-consumer-group] Revoking previously assigned partitions []
520 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test-consumer-group] Disabling heartbeat thread
520 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test-consumer-group] (Re-)joining group
523 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test-consumer-group] Sending JoinGroup ((type: JoinGroupRequest, groupId=test-consumer-group, sessionTimeout=30000, rebalanceTimeout=300000, memberId=, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@19d481b)) to coordinator localhost:9092 (id: 2147483647 rack: null)
525 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-2147483647.bytes-sent
526 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-2147483647.bytes-received
527 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-2147483647.latency
528 [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-1, groupId=test-consumer-group] Connection with localhost/127.0.0.1 disconnected
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:106)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:444)
at org.apache.kafka.common.network.Selector.poll(Selector.java:398)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
at com.myCompany.kafka.subscription.KafkaSubscription.main(KafkaSubscription.java:30)
533 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=test-consumer-group] Node 2147483647 disconnected.
534 [main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=test-consumer-group] Connection to node 2147483647 could not be established. Broker may not be available.
534 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient - [Consumer clientId=consumer-1, groupId=test-consumer-group] Cancelled JOIN_GROUP request RequestHeader(apiKey=JOIN_GROUP, apiVersion=2, clientId=consumer-1, correlationId=3) with correlation id 3 due to node 2147483647 being disconnected
534 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test-consumer-group] Marking the coordinator localhost:9092 (id: 2147483647 rack: null) dead
635 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test-consumer-group] Sending GroupCoordinator request to broker localhost:9092 (id: 0 rack: null)
635 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=test-consumer-group] Initiating connection to node localhost:9092 (id: 0 rack: null)
635 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-sent
636 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-received
637 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-0.latency
638 [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-1, groupId=test-consumer-group] Connection with localhost/127.0.0.1
What else I need to check for getting logs at remote kafka consumer ( java application ) ?
Given below my java application.
public class KafkaSubscription {
public static void main(String args[]) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.95.217:9092");
// props.put("zookeeper.connect", "192.168.95.217:2181");
props.put("group.id", "test-consumer-group1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
System.out.println("properties loaded");
kafkaConsumer.subscribe(Arrays.asList("test1"));
kafkaConsumer.seekToBeginning(Collections.emptyList());
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(1);
System.out.println("Records Length : " + records.count());
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, value = %s", record.offset(), record.value());
System.out.println();
}
Thread.sleep(30000);
}
}
}
Upvotes: 0
Views: 2742
Reputation: 39
Do the following changes in server.properties file
listeners=PLAINTEXT://localhost:9092 to listeners = PLAINTEXT://xx.xx.xx.xx:9092
and un comment advertised.listeners line
advertised.listeners=PLAINTEXT://your.host.name:9092
your.host.name - xx.xx.xx.xx (kafka broker IP )
Restart kafka server
Upvotes: 2
Reputation: 10075
So in the first part of the log it seems to me that the client is able to reach the remote machine asking for metadata and finding the coordinator but the address it receive then is "localhost:9092" so it tries to open the connection to such an address for getting messages. Can you get the broker configuration properties file ? maybe something is wrong on the "advertised.*" part of the configuration.
Upvotes: 2