Reputation: 454
I'm trying to send messages from Kafka Producer on one PC to Kafka broker on another PC using wifi interfaces, but messages don't appear in the specified topic at Kafka broker.
I connected two PCs using ASUS wireless router and disabled all the firewalls on PCs and router. Both PCs ping successfully each other. When I turn to a wired connection, it works and messages are ingested to the specified topic on kafka broker pc.
Kafka Producer:
public class CarDataProducer {
public static void main(String[] args) {
CarDataProducer fProducer= new CarDataProducer();
Producer<String, CarData> producer= fProducer.initializeKafkaProducer();
String topicName = "IN-DATA";
CSVReaderCarData csvReader = new CSVReaderCarData();
List<CarData> CarDataList = csvReader.readCarDataFromCSV("data/mllib/TrainTest_101.csv");
//read from CSV file and send
for (CarData val : CarDataList) {
producer.send(new ProducerRecord<String, CarData>(topicName, val));
}
}
public KafkaProducer<String, CarData> initializeKafkaProducer() {
// Set the producer configuration properties.
Properties props = ProducerProperties.getInstance();
// Instantiate a producerSampleJDBC
KafkaProducer<String, CarData> producer = new KafkaProducer<String, CarData>(props);
return producer;
}
public class ProducerProperties {
private ProducerProperties() {
}
public static final Properties props = new Properties();
static {
props.put("bootstrap.servers", "192.168.1.124:9092");
props.put("acks", "0");
props.put("retries", 0);
props.put("batch.size", 500);
props.put("linger.ms", 500);
props.put("buffer.memory", 500);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.iov.safety.vehicleproducer.CarDataSerializer");
}
public static Properties getInstance() {
return props;
}
}
Check message reception via Kafka Consumer using console on the server-side:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic IN-DATA
The reception of messages should look like this:
kafka-console-consumer.sh --bootstrap-server 192.168.1.124:9092 --topic IN-DATA
{"instSpeed":19.0,"time":15.0,"label":0.0}
{"instSpeed":64.0,"time":15.0,"label":1.0}
{"instSpeed":10.0,"time":16.0,"label":0.0}
ifconfig on server side
ipconfig on kafka producer side
listeners=PLAINTEXT://:9092
tcp6 0 0 :::9092 :::*
LISTEN off (0.00/0/0) tcp6 0 0 127.0.0.1:53880
127.0.1.1:9092 ESTABLISHED keepalive (6659.53/0/0) tcp6 0 0 127.0.1.1:9092 127.0.0.1:53880 ESTABLISHED keepalive (6659.53/0/0) tcp6 0 0 127.0.1.1:9092
127.0.0.1:53878 ESTABLISHED keepalive (6659.15/0/0) tcp6 0 0 127.0.0.1:53878 127.0.1.1:9092 ESTABLISHED keepalive (6659.15/0/0)
By adding callback to send of kafka producer, I get timeout error:
org.apache.kafka.common.errors.TimeoutException: Expiring 8 record(s) for IN-DATA-0: 30045 ms has passed since last append
Upvotes: 0
Views: 2475
Reputation: 454
I solved it. Each Kafka broker has to advertise its hostname/ip, to be reachable from Kafka Producer on another pc.
kafka-server-start.sh config/server.properties --override advertised.listeners=PLAINTEXT://192.168.1.124:9092
sha
Instead, we can update config/server.properties as follows:
advertised.listeners=PLAINTEXT://your.host.name:9092
or
advertised.listeners=PLAINTEXT://192.168.1.124:9092
Upvotes: 0