Reputation: 335
I am trying to access kafka deployed on AWS server with public IP , however when trying to connect it and send some data i receive no response and the server connection is closed.Following is my producer code --
public SensorDevice() {
Properties props = new Properties();
props.put("metadata.broker.list", "myip-xyz:9092");
props.put("bootstrap.servers", "myip-xyz:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
// props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");
producer = new KafkaProducer<String, String>(props);
}
public void run() {
Object objectData = new Object();
ProducerRecord<String, String> data = new ProducerRecord<String, String>(
topic, "mytopic", objectData.toString());
System.out.println(data);
Future<RecordMetadata> rs = producer.send(data,
new org.apache.kafka.clients.producer.Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata,
Exception arg1) {
System.out.println("Received ack for partition="
+ recordMetadata.partition() + " offset = "
+ recordMetadata.offset());
}
});
try {
String msg = "";
RecordMetadata rm = rs.get();
msg = msg + " partition = " + rm.partition() + " offset ="
+ rm.offset();
System.out.println(msg);
} catch (Exception e) {
System.out.println(e);
}
producer.close();
}
I have also tried adding advertise.host.name to server.properties config file. Kafka shows following error --
> [2015-04-24 09:06:35,329] INFO Created log for partition [mytopic,0] in /tmp/kafka-logs with properties {segment.index.bytes ->
> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 1073741824,
> flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000,
> index.interval.bytes -> 4096, retention.bytes -> -1,
> min.insync.replicas -> 1, cleanup.policy -> delete,
> unclean.leader.election.enable -> true, segment.ms -> 604800000,
> max.message.bytes -> 1000012, flush.messages -> 9223372036854775807,
> min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000,
> segment.jitter.ms -> 0}. (kafka.log.LogManager)
> [2015-04-24 09:06:35,330] WARN Partition [mytopic,0] on broker 0: No checkpointed highwatermark is found for partition [mytopic,0]
> (kafka.cluster.Partition)
> [2015-04-24 09:07:34,788] INFO Closing socket connection to /50.156.87.157. (kafka.network.Processor)
Please help me resolve this issue!
Upvotes: 1
Views: 1014
Reputation: 554
EC2 IP addresses are internal. You may face some issues when dealing with EC2 server running kafka and zookeeper. Try setting advertised.host.name
and advertised.port
variables in your server.properties
file.
advertised.host.name
should be IP address of the EC2 server.
advertised.port
should be kafka port. By default it is 9092
.
Upvotes: 1