Thiago Sayão
Thiago Sayão

Reputation: 2357

Spring cloud dataflow does not flow

I have a DSL (shown below) that ends on "log", so the json produced from the jdbc source should be logged and it's not.

The Supplier reads a database queue and produce a json array for the rows.

If I turn on logging, the SybaseSupplierConfiguration.this.logger.debug("Json: {}", json); is outputted.

Why is it not flowing to "log" ?

So far I have tried:

None worked.

the docker:

export DATAFLOW_VERSION=2.6.0
export SKIPPER_VERSION=2.5.0
docker-compose -f ./docker-compose.yml -f ./docker-compose-prometheus.yml up -d

the pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>company-cloud-dataflow-apps</artifactId>
        <groupId>br.com.company.cloud.dataflow.apps</groupId>
        <version>1.0.0-SNAPSHOT</version>
        <relativePath>../pom.xml</relativePath>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>jdbc-sybase-supplier</artifactId>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud.stream.app</groupId>
            <artifactId>app-starters-micrometer-common</artifactId>
            <version>${app-starters-micrometer-common.version}</version>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-registry-prometheus</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer.prometheus</groupId>
            <artifactId>prometheus-rsocket-spring</artifactId>
            <version>${prometheus-rsocket.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>net.sourceforge.jtds</groupId>
            <artifactId>jtds</artifactId>
            <version>1.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jdk8</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jsr310</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.module</groupId>
            <artifactId>jackson-module-parameter-names</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.module</groupId>
            <artifactId>jackson-module-jaxb-annotations</artifactId>
            <version>${jackson.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

The configuration:

....    
spring.cloud.stream.function.bindings.jsonSupplier-out-0=output
spring.cloud.function.definition=jsonSupplier

The implementation:

@SpringBootApplication
@EnableConfigurationProperties(SybaseSupplierProperties.class)
public class SybaseSupplierConfiguration {
    private final DataSource dataSource;
    private final SybaseSupplierProperties properties;
    private final ObjectMapper objectMapper;
    private final JdbcTemplate jdbcTemplate;

    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    public SybaseSupplierConfiguration(DataSource dataSource,
                                       JdbcTemplate jdbcTemplate,
                                       SybaseSupplierProperties properties) {
        this.dataSource = dataSource;
        this.jdbcTemplate = jdbcTemplate;
        this.properties = properties;

        objectMapper = new ObjectMapper().registerModule(new ParameterNamesModule())
                .registerModule(new Jdk8Module())
                .registerModule(new JavaTimeModule())
                .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
    }

    public static void main(String[] args) {
        SpringApplication.run(SybaseSupplierConfiguration.class, args);
    }

    @Value
    static class IntControle {
        long cdIntControle;
        String deTabela;
    }

    @Bean
    public MessageSource<Object> jdbcMessageSource() {
        String query = "select cd_int_controle, de_tabela from int_controle rowlock readpast " +
                    "where id_status = 0 order by cd_int_controle";

        JdbcPollingChannelAdapter adapter =
                new JdbcPollingChannelAdapter(dataSource, query) {
                    @Override
                    protected Object doReceive() {
                        Object object = super.doReceive();

                        if (object == null) {
                            return null;
                        }

                        @SuppressWarnings("unchecked")
                        List<IntControle> ints = (List<IntControle>) object;

                        try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
                            try (JsonGenerator jen = objectMapper.createGenerator(out)) {
                                jen.writeStartArray();
                                for (IntControle itm : ints) {
                                    String qry = String.format("select * from vw_integ_%s where cd_int_controle = %d",
                                            itm.getDeTabela(), itm.getCdIntControle());
                                    List<Map<String, Object>> maps = jdbcTemplate.queryForList(qry);
                                    for (Map<String, Object> l : maps) {
                                        jen.writeStartObject();
                                        for (Map.Entry<String, Object> entry : l.entrySet()) {
                                            String k = entry.getKey();
                                            Object v = entry.getValue();

                                            jen.writeFieldName(k);
                                            if (v == null) {
                                                jen.writeNull();
                                            } else {
                                                //caso necessário um item específico, ver em:
                                                // https://stackoverflow.com/questions/6514876/most-efficient-conversion-of-resultset-to-json
                                                jen.writeObject(v);
                                            }
                                        }
                                        jen.writeEndObject();
                                    }
                                }
                                jen.writeEndArray();
                            }

                            String json = out.toString();

                            SybaseSupplierConfiguration.this.logger.debug("Json: {}", json);

                            return json;
                        } catch (IOException e) {
                            throw new IllegalArgumentException("Erro ao converter json", e);
                        }
                    }
                };

        adapter.setMaxRows(properties.getPollSize());
        adapter.setUpdatePerRow(true);
        adapter.setRowMapper((RowMapper<IntControle>) (rs, i) -> new IntControle(rs.getLong(1), rs.getString(2)));
        adapter.setUpdateSql("update int_controle set id_status = 1 where cd_int_controle = :cdIntControle");

        return adapter;
    }

    @Bean
    public Supplier<Message<?>> jsonSupplier() {
        return jdbcMessageSource()::receive;
    }
}

the shell setup:

app register --name jdbc-postgresql-sink --type sink --uri maven://br.com.company.cloud.dataflow.apps:jdbc-postgresql-sink:1.0.0-SNAPSHOT --force
app register --name jdbc-sybase-supplier --type source --uri maven://br.com.company.cloud.dataflow.apps:jdbc-sybase-supplier:1.0.0-SNAPSHOT --force

stream create --name sybase_to_pgsql --definition "jdbc-sybase-supplier | log "
stream deploy --name sybase_to_pgsql

the log:

....
2020-08-02 00:40:18.644  INFO 81 --- [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 0 endpoint(s) beneath base path '/actuator'
2020-08-02 00:40:18.793  INFO 81 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2020-08-02 00:40:18.793  INFO 81 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2020-08-02 00:40:18.793  INFO 81 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2020-08-02 00:40:18.793  INFO 81 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {router} as a subscriber to the 'jsonSupplier_integrationflow.channel#0' channel
2020-08-02 00:40:18.793  INFO 81 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.jsonSupplier_integrationflow.channel#0' has 1 subscriber(s).
2020-08-02 00:40:18.794  INFO 81 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'jsonSupplier_integrationflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2020-08-02 00:40:18.795  INFO 81 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Creating binder: kafka
2020-08-02 00:40:19.235  INFO 81 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Caching the binder: kafka
2020-08-02 00:40:19.235  INFO 81 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Retrieving cached binder: kafka
2020-08-02 00:40:19.362  INFO 81 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Using kafka topic for outbound: sybase_to_pgsql.jdbc-sybase-supplier
2020-08-02 00:40:19.364  INFO 81 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
    bootstrap.servers = [PLAINTEXT://kafka-broker:9092]
    client.dns.lookup = default
    client.id = 
    connections.max.idle.ms = 300000
    default.api.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

2020-08-02 00:40:19.572  INFO 81 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-08-02 00:40:19.572  INFO 81 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-08-02 00:40:19.572  INFO 81 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1596328819571
2020-08-02 00:40:20.403  INFO 81 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [PLAINTEXT://kafka-broker:9092]
    buffer.memory = 33554432
    client.dns.lookup = default
    client.id = producer-1
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

2020-08-02 00:40:20.477  INFO 81 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-08-02 00:40:20.477  INFO 81 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-08-02 00:40:20.477  INFO 81 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1596328820477
2020-08-02 00:40:20.573  INFO 81 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: um9lJtXTQUmURh9cwOkqxA
2020-08-02 00:40:20.574  INFO 81 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
2020-08-02 00:40:20.622  INFO 81 --- [           main] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.output' has 1 subscriber(s).
2020-08-02 00:40:20.625  INFO 81 --- [           main] o.s.i.e.SourcePollingChannelAdapter      : started bean 'jsonSupplier_integrationflow.org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0'
2020-08-02 00:40:20.654  INFO 81 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 20031 (http) with context path ''
2020-08-02 00:40:20.674  INFO 81 --- [           main] b.c.c.d.a.s.SybaseSupplierConfiguration  : Started SybaseSupplierConfiguration in 12.982 seconds (JVM running for 14.55)
2020-08-02 00:40:21.160  INFO 81 --- [ask-scheduler-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [PLAINTEXT://kafka-broker:9092]
    buffer.memory = 33554432
    client.dns.lookup = default
    client.id = producer-2
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

2020-08-02 00:40:21.189  INFO 81 --- [ask-scheduler-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-08-02 00:40:21.189  INFO 81 --- [ask-scheduler-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-08-02 00:40:21.189  INFO 81 --- [ask-scheduler-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1596328821189
2020-08-02 00:40:21.271  INFO 81 --- [ad | producer-2] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-2] Cluster ID: um9lJtXTQUmURh9cwOkqxA

Upvotes: 1

Views: 792

Answers (1)

Sabby Anandan
Sabby Anandan

Reputation: 5651

If you are using function-based applications in SCDF, you will have to supply an extra configuration when deploying streams. Please have a look at the recipe that walks through the function-based deployment scenario.

Specifically, look at the application-specific function bindings and the property override for the time-source and the log-sink applications.

app.time-source.spring.cloud.stream.function.bindings.timeSupplier-out-0=output
app.log-sink.spring.cloud.stream.function.bindings.logConsumer-in-0=input

The input/output channel bindings require an explicit mapping to the function-binding that you have in your custom source. You will have to override the custom-sources' function binding to the output channel, and everything should come together then.

In v2.6, we are attempting to automate this explicit binding in SCDF, so there will be one less thing to configure in the future.

Upvotes: 2

Related Questions