Kenny Weeler
Kenny Weeler

Reputation: 163

Error Handling in Spring Cloud Stream - Kafka Binder

I am trying to Implement Error Handling functionality for SCS with Kafka Binder and I am currently having issues getting Errors into Errors Topic.

1) Is there anything specific that has to be given for Errors in the .yml file such as group or content-type

2) How can I do a retry when a msg flows into Kafka topic ?

Thank you.

Details as follows:-

1) Producer which generates JSON every few seconds:-

@EnableBinding(Source.class)
public class LoggingProducer {

      @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "7000", maxMessagesPerPoll = "1"))
        public LoggingObject pumpSource() {

          LoggingObject loggingObject = new LoggingObject();

          String loggingNumber = UUID.randomUUID().toString().toUpperCase().replaceAll("-", "");

          System.out.println(loggingNumber);

          loggingObject.setLoggingId(loggingNumber);
          Random rand = new Random();
          int randint = rand.nextInt(100000);

          if      (randint % 3 == 0) {
              loggingObject.setLoggingMessageStatus("SENT");
          }
          else if (randint % 4 == 0) {
              loggingObject.setLoggingMessageStatus("REVIEW");
          }
          else {
              loggingObject.setLoggingMessageStatus("ERROR");
          }

            System.out.println(loggingObject.toString());

            return loggingObject;
        }   
}

2) application.yml

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: Processortopic
          group: myGroup
          producer:
            header-mode: embeddedHeaders
          content-type: application/json

3) Consumer App:-

@EnableBinding(Sink.class)
@Configuration
public class LoggingObjectProcessor {


    @StreamListener(Sink.INPUT) // destination name 'input.myGroup'
    public void handle(LoggingObject loggingObject) {
        System.out.println("In the Consumer---->>>>><<<<<<");
        throw new RuntimeException("BOOM!");
    }

    /*@ServiceActivator(inputChannel = "Sourcetopic.myGroup.errors")
    public void error(Message<?> message) {
        System.out.println("Handling ERROR: " + message);
    }*/

    @StreamListener("errorChannel")
    public void errorGlobal(Message<?> message) {
        System.out.println("Handling ERROR: " + message);
    }


}

4) Consumer Application.yml

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: Processortopic
          group: myGroup
          consumer:
            header-mode: embeddedHeaders
          content-type: application/json
        error:
          destination: myErrors
          content-type: application/json

5) LoggingObject POJO

public class LoggingObject {

    private String loggingId;
    private String loggingMessageStatus;



                public String getLoggingId() {
                    return loggingId;
                }
                public void setLoggingId(String loggingId) {
                    this.loggingId = loggingId;
                }
                public LoggingObject() {

                }
                public String getLoggingMessageStatus() {
                    return loggingMessageStatus;
                }
                public void setLoggingMessageStatus(String loggingMessageStatus) {
                    this.loggingMessageStatus = loggingMessageStatus;
                }
}

6) Here's the POM

 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <spring-cloud.version>Finchley.BUILD-SNAPSHOT</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-snapshots</id>
            <name>Spring Snapshots</name>
            <url>https://repo.spring.io/snapshot</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

Updated Consumer App and Logs

@ServiceActivator(inputChannel = "Processortopic.myGroup.errors")
public void error(Message<?> message) {
    System.out.println("Handling ERROR: " + message);
}

Logs:-

2018-05-23 10:21:36.178  INFO 76939 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : c0518aa65f25317e
Handling ERROR: ErrorMessage [payload=org.springframework.messaging.MessagingException: Exception thrown while invoking com.example.LoggingObjectProcessor#handle[1 args]; nested exception is java.lang.RuntimeException: BOOM!, failedMessage=GenericMessage [payload=byte[191], headers={kafka_offset=4721, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@29a9cb9d, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, id=feda8595-5ef6-35dd-b43f-4940a90017ba, kafka_receivedPartitionId=0, contentType=application/json;charset=UTF-8, kafka_receivedTopic=Processortopic, kafka_receivedTimestamp=1526991171269, timestamp=1527088896173}], headers={kafka_data=ConsumerRecord(topic = Processortopic, partition = 0, offset = 4721, CreateTime = 1526991171269, serialized key size = -1, serialized value size = 277, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@538f1f04), id=37e72560-5e63-db90-27f3-2ff2e04e1778, timestamp=1527088896174}] for original GenericMessage [payload=byte[277], headers={kafka_offset=4721, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@29a9cb9d, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=Processortopic, kafka_receivedTimestamp=1526991171269}]
In the Consumer---->>>>><<<<<<
In the Consumer---->>>>><<<<<<
In the Consumer---->>>>><<<<<<

Upvotes: 1

Views: 4901

Answers (1)

Oleg Zhurakousky
Oleg Zhurakousky

Reputation: 6106

The one issue i see right away is that you are using @StreamListener for global 'errorChannel'. It is not going to work. As documentation states:

The use of @StreamListener annotation is intended specifically to define bindings that bridge internal channels and external destinations. Given that the destination specific error channel does NOT have an associated external destination, such channel is a prerogative of Spring Integration (SI). This means that the handler for such destination must be defined using one of the SI handler annotations (i.e., @ServiceActivator, @Transformer etc.).

Please fix and let us know.

Upvotes: 2

Related Questions