M. Gopal
M. Gopal

Reputation: 454

Kafka Consumer ( Java Application ) - Getting Connection Refused, when run in remote. Working fine in localhost

Karka Server, Producer running on System A. I have a Kafka Consumer which is a java application. If I run it in System A, able to receive the logs properly. If run in some other system, say B Getting below indicated exception and no logs received. Checked ping and telnet working fine.

 0    [main] INFO  org.apache.kafka.clients.consumer.ConsumerConfig  - ConsumerConfig values: 
    auto.commit.interval.ms = 1000
    auto.offset.reset = earliest
    bootstrap.servers = [192.168.95.217:9092]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = test-consumer-group
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 30000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

4    [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Initializing the Kafka consumer
118  [main] DEBUG org.apache.kafka.clients.Metadata  - Updated cluster metadata version 1 to Cluster(id = null, nodes = [192.168.95.217:9092 (id: -1 rack: null)], partitions = [])
138  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name fetch-throttle-time
157  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name connections-closed:
163  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name connections-created:
164  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name successful-authentication:
164  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name failed-authentication:
165  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-sent-received:
166  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-sent:
168  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-received:
169  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name select-time:
171  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name io-time:
198  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name heartbeat-latency
198  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name join-latency
199  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name sync-latency
202  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name commit-latency
209  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-fetched
210  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name records-fetched
210  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name fetch-latency
211  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name records-lag
220  [main] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version : 1.0.0
220  [main] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId : aaa7af6d4a11b29d
223  [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Kafka consumer initialized
properties loaded
224  [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Subscribed to topic(s): test
224  [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Sending GroupCoordinator request to broker 192.168.95.217:9092 (id: -1 rack: null)
386  [main] DEBUG org.apache.kafka.clients.NetworkClient  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Initiating connection to node 192.168.95.217:9092 (id: -1 rack: null)
488  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node--1.bytes-sent
489  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node--1.bytes-received
490  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node--1.latency
491  [main] DEBUG org.apache.kafka.common.network.Selector  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
492  [main] DEBUG org.apache.kafka.clients.NetworkClient  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Completed connection to node -1. Fetching API versions.
492  [main] DEBUG org.apache.kafka.clients.NetworkClient  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Initiating API versions fetch from node -1.
506  [main] DEBUG org.apache.kafka.clients.NetworkClient  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Recorded API versions for node -1: (Produce(0): 0 to 5 [usable: 5], Fetch(1): 0 to 6 [usable: 6], ListOffsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 5 [usable: 5], LeaderAndIsr(4): 0 to 1 [usable: 1], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 4 [usable: 4], ControlledShutdown(7): 0 to 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0], AlterReplicaLogDirs(34): 0 [usable: 0], DescribeLogDirs(35): 0 [usable: 0], SaslAuthenticate(36): 0 [usable: 0], CreatePartitions(37): 0 [usable: 0])
506  [main] DEBUG org.apache.kafka.clients.NetworkClient  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Sending metadata request (type=MetadataRequest, topics=test) to node 192.168.95.217:9092 (id: -1 rack: null)
514  [main] DEBUG org.apache.kafka.clients.Metadata  - Updated cluster metadata version 2 to Cluster(id = jm5HyHZbT1mlgdZxinD8oA, nodes = [localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = test, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])])
516  [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Received GroupCoordinator response ClientResponse(receivedTimeMs=1515749439145, latencyMs=134, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1, correlationId=0), responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=localhost:9092 (id: 0 rack: null)))
516  [main] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Discovered coordinator localhost:9092 (id: 2147483647 rack: null)
516  [main] DEBUG org.apache.kafka.clients.NetworkClient  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Initiating connection to node localhost:9092 (id: 2147483647 rack: null)
520  [kafka-coordinator-heartbeat-thread | test-consumer-group] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Heartbeat thread started
520  [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Sending synchronous auto-commit of offsets {}
520  [main] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Revoking previously assigned partitions []
520  [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Disabling heartbeat thread
520  [main] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-1, groupId=test-consumer-group] (Re-)joining group
523  [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Sending JoinGroup ((type: JoinGroupRequest, groupId=test-consumer-group, sessionTimeout=30000, rebalanceTimeout=300000, memberId=, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@19d481b)) to coordinator localhost:9092 (id: 2147483647 rack: null)
525  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-2147483647.bytes-sent
526  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-2147483647.bytes-received
527  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-2147483647.latency
528  [main] DEBUG org.apache.kafka.common.network.Selector  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Connection with localhost/127.0.0.1 disconnected
java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:106)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:444)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:398)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
    at com.myCompany.kafka.subscription.KafkaSubscription.main(KafkaSubscription.java:30)
533  [main] DEBUG org.apache.kafka.clients.NetworkClient  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Node 2147483647 disconnected.
534  [main] WARN  org.apache.kafka.clients.NetworkClient  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Connection to node 2147483647 could not be established. Broker may not be available.
534  [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Cancelled JOIN_GROUP request RequestHeader(apiKey=JOIN_GROUP, apiVersion=2, clientId=consumer-1, correlationId=3) with correlation id 3 due to node 2147483647 being disconnected
534  [main] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Marking the coordinator localhost:9092 (id: 2147483647 rack: null) dead
635  [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Sending GroupCoordinator request to broker localhost:9092 (id: 0 rack: null)
635  [main] DEBUG org.apache.kafka.clients.NetworkClient  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Initiating connection to node localhost:9092 (id: 0 rack: null)
635  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-0.bytes-sent
636  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-0.bytes-received
637  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-0.latency
638  [main] DEBUG org.apache.kafka.common.network.Selector  - [Consumer clientId=consumer-1, groupId=test-consumer-group] Connection with localhost/127.0.0.1 

What else I need to check for getting logs at remote kafka consumer ( java application ) ?

Given below my java application.

public class KafkaSubscription {

    public static void main(String args[]) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.95.217:9092");
        // props.put("zookeeper.connect", "192.168.95.217:2181");
        props.put("group.id", "test-consumer-group1");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        System.out.println("properties loaded");
        kafkaConsumer.subscribe(Arrays.asList("test1"));
        kafkaConsumer.seekToBeginning(Collections.emptyList());
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1);
            System.out.println("Records Length : " + records.count());
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, value = %s", record.offset(), record.value());
                System.out.println();
            }
            Thread.sleep(30000);
        }
    }
}

Upvotes: 0

Views: 2742

Answers (2)

Abdul Khadeer
Abdul Khadeer

Reputation: 39

Do the following changes in server.properties file

listeners=PLAINTEXT://localhost:9092 to listeners = PLAINTEXT://xx.xx.xx.xx:9092

and un comment advertised.listeners line

advertised.listeners=PLAINTEXT://your.host.name:9092

your.host.name - xx.xx.xx.xx (kafka broker IP )

Restart kafka server

Upvotes: 2

ppatierno
ppatierno

Reputation: 10075

So in the first part of the log it seems to me that the client is able to reach the remote machine asking for metadata and finding the coordinator but the address it receive then is "localhost:9092" so it tries to open the connection to such an address for getting messages. Can you get the broker configuration properties file ? maybe something is wrong on the "advertised.*" part of the configuration.

Upvotes: 2

Related Questions