Reputation: 253
I am trying to use the [JCA resoruce-adapters][1] to use an MDB
to connect to kafka
.
Following are the entries in the standalone-full.xml
to configure the kafka resoruce adapter
and the related ejb-mdb
definition :
Note that the i have deploying the mdb
as part of an ejbModule
which is embeded in an ear
While deploying the application i get the below error
22:43:53,073 ERROR [org.jboss.msc.service.fail] (MSC service thread 1-1) MSC000001: Failed to start service jboss.deployment.subunit."kafkatemplatereceiver.ear"."kafkatemplatereceiverEJB.jar".component.KafkaMDB.CREATE: org.jboss.msc.service.StartException in service jboss.deployment.subunit."kafkatemplatereceiver.ear"."kafkatemplatereceiverEJB.jar".component.KafkaMDB.CREATE: Failed to start service
at [email protected]//org.jboss.msc.service.ServiceControllerImpl$StartTask.execute(ServiceControllerImpl.java:1731)
at [email protected]//org.jboss.msc.service.ServiceControllerImpl$ControllerTask.run(ServiceControllerImpl.java:1559)
at [email protected]//org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
at [email protected]//org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:1990)
at [email protected]//org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1486)
at [email protected]//org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1363)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalStateException: WFLYEJB0383: No message listener of type fish.payara.cloud.connectors.kafka.api.KafkaListener found in resource adapter kafka
at [email protected]//org.jboss.as.ejb3.component.messagedriven.MessageDrivenComponentCreateService.createActivationSpecs(MessageDrivenComponentCreateService.java:142)
at [email protected]//org.jboss.as.ejb3.component.messagedriven.MessageDrivenComponentCreateService.createComponent(MessageDrivenComponentCreateService.java:105)
at [email protected]//org.jboss.as.ee.component.BasicComponentCreateService.start(BasicComponentCreateService.java:86)
at [email protected]//org.jboss.as.ejb3.component.messagedriven.MessageDrivenComponentCreateService.start(MessageDrivenComponentCreateService.java:93)
at [email protected]//org.jboss.msc.service.ServiceControllerImpl$StartTask.startService(ServiceControllerImpl.java:1739)
at [email protected]//org.jboss.msc.service.ServiceControllerImpl$StartTask.execute(ServiceControllerImpl.java:1701)
... 6 more
22:43:53,076 ERROR [org.jboss.as.controller.management-operation] (External Management Request Threads -- 1) WFLYCTL0013: Operation ("add") failed - address: ([("deployment" => "kafkatemplatereceiver.ear")]) - failure description: {"WFLYCTL0080: Failed services" => {"jboss.deployment.subunit.\"kafkatemplatereceiver.ear\".\"kafkatemplatereceiverEJB.jar\".component.KafkaMDB.CREATE" => "Failed to start service
Caused by: java.lang.IllegalStateException: WFLYEJB0383: No message listener of type fish.payara.cloud.connectors.kafka.api.KafkaListener found in resource adapter kafka"}}
22:43:53,077 ERROR [org.jboss.as.server] (External Management Request Threads -- 1) WFLYSRV0021: Deploy of deployment "kafkatemplatereceiver.ear" was rolled back with the following failure message:
{"WFLYCTL0080: Failed services" => {"jboss.deployment.subunit.\"kafkatemplatereceiver.ear\".\"kafkatemplatereceiverEJB.jar\".component.KafkaMDB.CREATE" => "Failed to start service
Caused by: java.lang.IllegalStateException: WFLYEJB0383: No message listener of type fish.payara.cloud.connectors.kafka.api.KafkaListener found in resource adapter kafka"}}
**standalone-full.xml**
<subsystem xmlns="urn:jboss:domain:ejb3:8.0">
<session-bean>
<stateless>
<bean-instance-pool-ref pool-name="slsb-strict-max-pool"/>
</stateless>
<stateful default-access-timeout="5000" cache-ref="simple" passivation-disabled-cache-ref="simple"/>
<singleton default-access-timeout="5000"/>
</session-bean>
<mdb>
<!--<resource-adapter-ref resource-adapter-name="${ejb.resource-adapter-name:kafka-rar-0.6.0.rar}"/>-->
<resource-adapter-ref resource-adapter-name="kafka"/>
<bean-instance-pool-ref pool-name="mdb-strict-max-pool"/>
</mdb> ...
standalone-full.xml resource adapter configuration
<subsystem xmlns="urn:jboss:domain:resource-adapters:6.0">
<resource-adapters>
<resource-adapter id="kafka">
<archive>
kafka-rar-0.6.0.rar
</archive>
<transaction-support>XATransaction</transaction-support>
<connection-definitions>
<connection-definition class-name="fish.payara.cloud.connectors.kafka.outbound.KafkaManagedConnectionFactory" jndi-name="java:/KafkaConnectionFactory" enabled="true" pool-name="ConnectionFactory">
<xa-pool>
<min-pool-size>1</min-pool-size>
<initial-pool-size>1</initial-pool-size>
<max-pool-size>20</max-pool-size>
<prefill>false</prefill>
<is-same-rm-override>false</is-same-rm-override>
</xa-pool>
</connection-definition>
</connection-definitions>
</resource-adapter>
</resource-adapters>
</subsystem>
MDB Class:
package com.zhun.euon.service;
import java.util.Date;
import javax.ejb.MessageDriven;
import javax.ejb.ActivationConfigProperty;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.jboss.ejb3.annotation.ResourceAdapter;
//import com.zhun.euon.service.serializer.Stringo;
import fish.payara.cloud.connectors.kafka.api.KafkaListener;
import fish.payara.cloud.connectors.kafka.api.OnRecord;
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "clientId", propertyValue = "testClient"),
@ActivationConfigProperty(propertyName = "groupIdConfig", propertyValue = "test-consumer-group"),
@ActivationConfigProperty(propertyName = "topics", propertyValue = "zhun_core_data_service_topic"),
@ActivationConfigProperty(propertyName = "bootstrapServersConfig", propertyValue = "192.168.0.105:9092"),
@ActivationConfigProperty(propertyName = "autoCommitInterval", propertyValue = "100"),
@ActivationConfigProperty(propertyName = "retryBackoff", propertyValue = "1000"),
@ActivationConfigProperty(propertyName = "pollInterval", propertyValue = "1000") ,
// @ActivationConfigProperty(propertyName="resourceAdapterName",propertyValue="kafka-rar-0.6.0.rar"),
// @ActivationConfigProperty(propertyName="resourceAdapterMid",propertyValue="kafka-rar-0.6.0.rar"),
@ActivationConfigProperty(propertyName = "keyDeserializer", propertyValue = "org.apache.kafka.common.serialization.StringDeserializer"),
@ActivationConfigProperty(propertyName = "valueDeserializer", propertyValue = "org.apache.kafka.common.serialization.StringDeserializer"),
})
@ResourceAdapter(value="kafka")
public class KafkaMDB implements KafkaListener {
@OnRecord( topics={"zhun_core_data_service_topic"})
public void getMessageTest(ConsumerRecord<String,String> record) {
System.out.println("Got record on topic testing " + record + "at time"+ new Date());
}
}
I am not sure if I am missing something, however i am unable to figure out why the server will look for the Listener's Interface implementation in the rar
file itself and not in my EJB- Jar
which is withing rth
[1]: https://github.com/payara/Cloud-Connectors/tree/master/Kafka/KafkaRAR
Upvotes: 1
Views: 1862
Reputation: 43
I was facing a similar issue. What ended up fixing this problem was by adding a jboss-deployment-structure.xml file under webapp/WEB-INF/. This pointed to the Kafka RAR that had been deployed with the JBoss Server:
<jboss-deployment-structure>
<deployment>
<dependencies>
<module name="deployment.kafka-rar-0.6.1-SNAPSHOT.rar" export="TRUE"/>
</dependencies>
</deployment>
</jboss-deployment-structure>
This was the tutorial I was following -> http://www.mastertheboss.com/other/jboss-stuff/apache-kafka/getting-started-with-apache-kafka-and-wildfly
Upvotes: 1