Marco Prado
Marco Prado

Reputation: 1298

Spring Cloud Stream function with Spring Cloud Contract

I am trying to test the consumer of my event producer using StubTrigger.

Here is my contract:

import org.springframework.cloud.contract.spec.Contract

Contract.make {
    label "sendUserMessage"
    input {
        triggeredBy("sendUserMessageTriggered()")
    }
    outputMessage {
        sentTo"users-out-0"
        body '''{ "email": "[email protected]", "location": [10.0, 20.5], "area": 3.5 }'''
    }
}

the yml from the producer:

spring:
  profiles: "local"
  cloud:
    stream:
      source: users
      bindings:
        users-out-0.destination: users
    config:
      enabled: false
      discovery.enabled: false
    discovery:
      enabled: false

Here is the consumer bean definition:

package br.com.marco.cadeacerva.notification.infra.config;

import br.com.marco.cadeacerva.notification.domain.UsersEventConsumer;
import br.com.marco.cadeacerva.notification.endpoints.dto.UserDTO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.function.Consumer;

@Configuration
@Slf4j
@RequiredArgsConstructor
public class FunctionsConfig {

    private final UsersEventConsumer usersEventConsumer;

    @Bean
    public Consumer<UserDTO> users() {
        return (u) -> {
            log.info("Received user on users function: {}", u.toString());
            usersEventConsumer.consume(u);
        };
    }
}

Here is my consumer test:

package br.com.marco.cadeacerva.notification.infra.config;

import br.com.marco.cadeacerva.notification.domain.UsersEventConsumer;
import br.com.marco.cadeacerva.notification.endpoints.dto.UserDTO;
import br.com.marco.cadeacerva.testcommons.utils.annotation.IntegrationTest;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.cloud.contract.stubrunner.StubTrigger;
import org.springframework.cloud.contract.stubrunner.spring.AutoConfigureStubRunner;
import org.springframework.cloud.contract.stubrunner.spring.StubRunnerProperties;
import org.springframework.test.context.junit4.SpringRunner;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;

@RunWith(SpringRunner.class)
@IntegrationTest
@SpringBootTest
@AutoConfigureStubRunner(
        stubsMode = StubRunnerProperties.StubsMode.LOCAL,
        ids = "br.com.marco.cadeacerva:users:+:stubs:8090")
public class FunctionsConfigTest {

    @Autowired
    StubTrigger trigger;

    @MockBean
    UsersEventConsumer consumer;

    @Test
    public void shouldConsumeUsersEvents() {
        trigger.trigger("sendUserMessage");
        verify(consumer).consume(any(UserDTO.class));
    }
}

And the consumer yml:

spring:
  profiles: "integration-tests"
  cloud:
    stream:
      bindings:
        users-in-0:
          destination: users
          group: users
    config:
      enabled: false
      discovery.enabled: false
    discovery:
      enabled: false

eureka:
  client:
    enabled: false

When I run the test it fails because of:

org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named 'users-out-0' available

    at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBeanDefinition(DefaultListableBeanFactory.java:814)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getMergedLocalBeanDefinition(AbstractBeanFactory.java:1282)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:297)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:276)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:207)
    at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:1114)
    at org.springframework.cloud.contract.verifier.messaging.stream.StreamFromBinderMappingMessageSender.send(StreamFromBinderMappingMessageSender.java:57)
    at org.springframework.cloud.contract.verifier.messaging.stream.StreamFromBinderMappingMessageSender.send(StreamFromBinderMappingMessageSender.java:51)
    at org.springframework.cloud.contract.verifier.messaging.stream.StreamStubMessages.send(StreamStubMessages.java:44)
    at org.springframework.cloud.contract.stubrunner.spring.LazyMessageVerifier.send(StubRunnerConfiguration.java:212)
    at org.springframework.cloud.contract.stubrunner.StubRunnerExecutor.sendMessage(StubRunnerExecutor.java:260)
    at org.springframework.cloud.contract.stubrunner.StubRunnerExecutor.triggerForDsls(StubRunnerExecutor.java:215)
    at org.springframework.cloud.contract.stubrunner.StubRunnerExecutor.trigger(StubRunnerExecutor.java:200)
    at org.springframework.cloud.contract.stubrunner.StubRunner.trigger(StubRunner.java:163)
    at org.springframework.cloud.contract.stubrunner.BatchStubRunner.trigger(BatchStubRunner.java:136)
    at br.com.marco.cadeacerva.notification.infra.config.FunctionsConfigTest.shouldConsumeUsersEvents(FunctionsConfigTest.java:35)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74)
    at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:84)
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
    at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
    at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

I even tried to define the same source configuration from my producer, on the consumer, but got the same exception. Don't know what I'm supposed to do in this case...

Upvotes: 0

Views: 1181

Answers (1)

Marcin Grzejszczak
Marcin Grzejszczak

Reputation: 11189

Here you have an example of the new Stream API and Spring Cloud Contract on the consumer side: https://github.com/spring-cloud-samples/spring-cloud-contract-samples/blob/3.0.x/consumer/src/test/java/com/example/BeerVerificationListenerTest.java Remember to ensure that you have the stream test dependency on the classpath

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
        <type>test-jar</type>
        <scope>test</scope>
        <classifier>test-binder</classifier>
    </dependency>

As for your code, your sentTo should point to users not users-out-0.

Let me copy paste what's there in the code.

Consumer side listener

package com.example;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.stereotype.Component;

/**
 * @author Marcin Grzejszczak
 */
@Component("input")
class BeerVerificationListener implements Consumer<BeerVerificationListener.Verification> {

    private static final Log log = LogFactory.getLog(BeerVerificationListener.class);

    AtomicInteger eligibleCounter = new AtomicInteger();
    AtomicInteger notEligibleCounter = new AtomicInteger();

    @Override
    public void accept(Verification verification) {
        log.info("Received new verification");
        //remove::start[]
        //tag::listener[]
        if (verification.eligible) {
            this.eligibleCounter.incrementAndGet();
        } else {
            this.notEligibleCounter.incrementAndGet();
        }
        //end::listener[]
        //remove::end[]
    }

    public static class Verification {
        public boolean eligible;
    }
}

Consumer side setup

spring:
  application.name: beer-api-consumer
  # remove::start[]
  cloud.stream.bindings.input-in-0.destination: verifications
  # remove::end[]
server.port: ${PORT:8081}
logging:
  level:
    org.springframework.cloud: debug

Producer side code

package com.example;

import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Service;

/**
 * @author Marcin Grzejszczak
 */
@Service
public class AgeCheckingPersonCheckingService implements PersonCheckingService {

    private final StreamBridge source;

    public AgeCheckingPersonCheckingService(StreamBridge source) {
        this.source = source;
    }

    @Override
    public Boolean shouldGetBeer(PersonToCheck personToCheck) {
        //remove::start[]
        //tag::impl[]
        boolean shouldGetBeer = personToCheck.age >= 20;
        this.source.send("output-out-0", new Verification(shouldGetBeer));
        return shouldGetBeer;
        //end::impl[]
        //remove::end[return]
    }

    public static class Verification {
        boolean eligible;

        public Verification(boolean eligible) {
            this.eligible = eligible;
        }

        public Verification() {
        }

        public boolean isEligible() {
            return this.eligible;
        }

        public void setEligible(boolean eligible) {
            this.eligible = eligible;
        }
    }
}

Producer side setup

spring:
    application.name: beer-api-producer
    cloud.function.definition: output
    cloud.stream.bindings.output-out-0:
        # remove::start[]
        destination: verifications
        # remove::end[]
server.port: ${PORT:8080}
logging:
  level:
    org.springframework.cloud: debug

Contract definition

package contracts.beer.messaging

import com.example.ProducerUtils
import org.springframework.cloud.contract.spec.Contract 

Contract.make {
    description("""
Sends a positive verification message when person is eligible to get the beer
```
given:
    client is old enough
when:
    he applies for a beer
then:
    we'll send a message with a positive verification
```
""")
    // Label by means of which the output message can be triggered
    label 'accepted_verification'
    // input to the contract
    input {
        // the contract will be triggered by a method
        triggeredBy('clientIsOldEnough()')
    }
    // output message of the contract
    outputMessage {
        // destination to which the output message will be sent
        sentTo 'verifications'
        // the body of the output message
        body(
            eligible: true
        )
        headers {
            messagingContentType(applicationJson())
        }
    }
}

Upvotes: 1

Related Questions