Reputation: 125
I am coding Kafka Broker and Consumer to catch messages from the application. When trying to get messages from Consumer, an error occurs
java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:216)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:531)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:540)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at org.springframework.kafka.test.utils.KafkaTestUtils.getRecords(KafkaTestUtils.java:303)
at org.springframework.kafka.test.utils.KafkaTestUtils.getRecords(KafkaTestUtils.java:280)
On the application side (Producer), there is also a connection error
2020-03-25 12:29:33.689 WARN 25786 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1, transactionalId=tx0] Connection to node -1 (<here broker hostname>:9092) could not be established. Broker may not be available.
My project has the following dependencies:
compile "org.springframework.kafka:spring-kafka-test:2.4.4.RELEASE"
compile "org.springframework.kafka:spring-kafka:2.4.4.RELEASE"
Code of My Kafka Broker
public class KafkaServer {
private static final String BROKERPORT = "9092";
private static final String BROKERHOST = "localhost";
public static final String TOPIC1 = "fss-fsstransdata";
public static final String TOPIC2 = "fss-fsstransscores";
public static final String TOPIC3 = "fss-fsstranstimings";
public static final String TOPIC4 = "fss-fssdevicedata";
@Getter
private Consumer<String, String> consumer;
private EmbeddedKafkaBroker embeddedKafkaBroker;
public void run() {
String[] topics = {TOPIC1, TOPIC2, TOPIC3, TOPIC4};
this.embeddedKafkaBroker = new EmbeddedKafkaBroker(
1,
false,
1,
topics
).kafkaPorts(BROKERPORT);
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", this.embeddedKafkaBroker));
this.consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();
this.consumer.subscribe(Arrays.asList(topics));
}
}
Please help to deal with the situation. I am not good at kafka architecture and how it can be implemented on Spring.
Upvotes: 3
Views: 3839
Reputation: 174484
The EmbeddedKafkaBroker
is designed to be used from a Spring application context or by a JUnit4 @Rule
or @ClassRule
or by a JUnit5 Condition
.
To use it outside those environments, you must call afterPropertiesSet()
to initialize it and destroy()
to shut it down.
Upvotes: 2
Reputation: 9492
If you are using spring then you need to annotate your bean with @EmbeddedKafka and then use @Autowire on EmbeddedKafkaBroker
Example embeded kafka annotation configuration:
@EmbeddedKafka(
partitions = 1,
controlledShutdown = false,
brokerProperties = {// place your proerties here
})
What I would do is to create a spring bean KafkaServerConfig and place all my logic for configuration and bean construction inside.
PS: it should be noted that EmbeddedKafkaBroker is intended for unit tests.
Upvotes: 1