Reputation: 500
I'm learning Kafka and have made the leap to using Maven.
I have a standalone Kafka instance in AWS and a Maven application on my laptop. I've written a small application which acts as a producer
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ProducerDemo {
public static void main(String[] args) {
// create producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<IP_TO_REMOTE_SERVER>:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//create producer
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
//producer record
ProducerRecord <String,String> record = new ProducerRecord<String, String>("first_topic", "jello there");
System.out.println("SENDING RECORD");
//send data - async
producer.send(record);
producer.flush();
producer.close();
System.out.println("complete");
}
}
When I run this, it appears as though I can't connect to the remote instance. I get the error below.
[kafka-producer-network-thread |> producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 0 (/xx.xx.xx.xx:9092) could not be established. Broker may not be available.
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
After looking at Stackoverflow, I updated the server.properties listeners section to be the private IP of the server
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://10.0.1.51:9092
How should I configure Kafka on the server to be accessible and listening remotely?
Upvotes: 1
Views: 3560
Reputation: 500
Seeing the responses got me thinking about to change my config to make this work. I found a really good blog article addressing this issue here.
My set up
I would stress this is not production.
A singular AWS EC2 instance in a VPC in a public subnet. Kafka installed. I am connecting to Kafka as a producer from my laptop remotely using Maven.
No changes to zookeeper.properties
Updated server.properties , specifically the listeners and advertised.listeners
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://<PRIVATE_IP_ADDRESS>:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://<PUBLIC_IP_ADDRESS>:9092
Then in my Maven code, for the BOOTSTRAP_SERVERS_CONFIG I reference the public IP
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ProducerDemo {
public static void main(String[] args) {
// create producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<PUBLIC_IP_ADRESS>:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//create producer
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
//producer record
ProducerRecord <String,String> record = new ProducerRecord<String, String>("first_topic", "good pony");
System.out.println("SENDING RECORD");
//send data - async
producer.send(record);
producer.flush();
producer.close();
System.out.println("complete");
}
}
This runs successfully
SENDING RECORD
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1]
[main] INFOorg.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
complete
We see the text pushed to the consumer
Upvotes: 2
Reputation: 655
I suppose the main problem you are facing is from the configuration standpoint. Please check if you have made all the necessary changes before communicating through producer. You need to make following changes:
Kafka change: You need to add configuration in Zookeeper.properties for relevant brokers.
AWS change: While connecting to AWS you need to set up way to pass .pem file. You might need to enable direct access in AWS instance. By default it will block all the unknown traffic.
Other approach:
I would recommend creating a Certificate and Key file which will whitelist your own PC as valid source.
Add that cert to keystore and truststore on AWS server instance.
Change the server.properties listeners = SSL://your.host.name:9092
& your BOOTSTRAP_SERVERS_CONFIG
Upvotes: 2