Curtis Olson
Curtis Olson

Reputation: 49

Spring Integration DSL - JdbcPollingChannelAdapter results not queueing

I swear I had this working, but when I can back to it after a few months (and an upgrade to Boot 1.5.9), I am having issues.

I set up a JdbcPollingChannelAdapter that I can do a receive() on just fine, but when I put the adapter in a flow that does nothing more than queue the result of the adapter, running .receive on the queue always returns a null (I can see in the console log that the adapter's SQL getting executed, though).

Tests below. Why can I get results from the adapter, but not queue the results? Thank you in advance for any assistance.

@RunWith(SpringRunner.class)
@SpringBootTest
@AutoConfigureTestDatabase
@JdbcTest
public class JdbcpollingchanneladapterdemoTests {

  @Autowired
  @Qualifier("dataSource")
  DataSource dataSource;

  private static PollableChannel outputQueue;

    @BeforeClass
    public static void setupClass() {
    outputQueue = MessageChannels.queue().get();
        return;
    }

    @Test
    public void Should_HaveQueue() {
        assertThat(outputQueue, instanceOf(QueueChannel.class));
    }

    @Test
  @Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
      statements = "Create Table DEMO (CODE VARCHAR(5));")
  @Sql(executionPhase = ExecutionPhase.AFTER_TEST_METHOD,
      statements = "Drop Table DEMO ;")
    public void Should_Not_HaveMessageOnTheQueue_When_No_DemosAreInTheDatabase() {
        Message<?> message = outputQueue.receive(5000);
        assertThat(message, nullValue()) ;
    }

  @Test
  @Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
      statements = "Create Table DEMO (CODE VARCHAR(5));")
  @Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
      statements = "Insert into DEMO (CODE) VALUES ('12345');")
  @Sql(executionPhase = ExecutionPhase.AFTER_TEST_METHOD,
      statements = "Drop Table DEMO ;")
  public void Should_HaveMessageOnTheQueue_When_DemosIsInTheDatabase() {
    assertThat(outputQueue, instanceOf(QueueChannel.class));
    Message<?> message = outputQueue.receive(5000);
    assertThat(message, notNullValue());
    assertThat(message.getPayload().toString(), equalTo("15317")) ;
  }

  @Test
  @Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
      statements = "Create Table DEMO (CODE VARCHAR(5));")
  @Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
      statements = "Insert into DEMO (CODE) VALUES ('12345');")
  @Sql(executionPhase = ExecutionPhase.AFTER_TEST_METHOD,
      statements = "Drop Table DEMO ;")
  public void get_message_directly_from_adapter() {
    JdbcPollingChannelAdapter adapter =
        new JdbcPollingChannelAdapter(dataSource, "SELECT CODE FROM DEMO");
    adapter.setRowMapper(new DemoRowMapper());
    adapter.setMaxRowsPerPoll(1);
    Message<?> message = adapter.receive();
    assertThat(message, notNullValue());
  }


  private static class Demo {

    private String demo;

    String getDemo() {
      return demo;
    }

    void setDemo(String value) {
      this.demo = value;
    }

    @Override
    public String toString() {
      return "Demo [value=" + this.demo + "]";
    }
  }

  public static class DemoRowMapper implements RowMapper<Demo> {

    @Override
    public Demo mapRow(ResultSet rs, int rowNum) throws SQLException {
      Demo demo = new Demo();
      demo.setDemo(rs.getString("CODE"));
      return demo;
    }
  }

  @Component
  public static class MyFlowAdapter extends IntegrationFlowAdapter {

    @Autowired
    @Qualifier("dataSource")
    DataSource dataSource;

    @Override
    protected IntegrationFlowDefinition<?> buildFlow() {

      JdbcPollingChannelAdapter adapter =
          new JdbcPollingChannelAdapter(dataSource, "SELECT CODE FROM DEMO");
      adapter.setRowMapper(new DemoRowMapper());
      adapter.setMaxRowsPerPoll(1);

      return from(adapter,
          c -> c.poller(Pollers.fixedRate(1000L, 2000L)
              .maxMessagesPerPoll(1)
              .get()))
          .channel(outputQueue);
    }
  }
}

EDIT I've simplified it as much as I can, refactoring to code below. The test passes a flow with a generic message source, and fails on a flow with JdbcPollingChannelAdapter message source. It's just not evident to me how I should configure the second message source so that it will suceed like the first message source.

  @Test
  @Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
      statements = "Create Table DEMO (CODE VARCHAR(5));")
  @Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
      statements = "Insert into DEMO (CODE) VALUES ('12345');")
  public void Should_HaveMessageOnTheQueue_When_UnsentDemosIsInTheDatabase() {

this.genericFlowContext.registration(new GenericFlowAdapter()).register();

PollableChannel genericChannel = this.beanFactory.getBean("GenericFlowAdapterOutput",
    PollableChannel.class);

this.jdbcPollingFlowContext.registration(new JdbcPollingFlowAdapter()).register();

PollableChannel jdbcPollingChannel = this.beanFactory.getBean("JdbcPollingFlowAdapterOutput",
    PollableChannel.class);

assertThat(genericChannel.receive(5000).getPayload(), equalTo("15317"));

assertThat(jdbcPollingChannel.receive(5000).getPayload(), equalTo("15317"));
  }

  private static class GenericFlowAdapter extends IntegrationFlowAdapter {

@Override
protected IntegrationFlowDefinition<?> buildFlow() {
  return from(getObjectMessageSource(),
      e -> e.poller(Pollers.fixedRate(100)))
      .channel(c -> c.queue("GenericFlowAdapterOutput"));
}

private MessageSource<Object> getObjectMessageSource() {
  return () -> new GenericMessage<>("15317");
}
}

private static class JdbcPollingFlowAdapter extends IntegrationFlowAdapter {

@Autowired
@Qualifier("dataSource")
DataSource dataSource;

@Override
protected IntegrationFlowDefinition<?> buildFlow() {
  return from(getObjectMessageSource(),
      e -> e.poller(Pollers.fixedRate(100)))
      .channel(c -> c.queue("JdbcPollingFlowAdapterOutput"));
}

private MessageSource<Object> getObjectMessageSource() {
  JdbcPollingChannelAdapter adapter =
      new JdbcPollingChannelAdapter(dataSource, "SELECT CODE FROM DEMO");
  adapter.setRowMapper(new DemoRowMapper());
  adapter.setMaxRowsPerPoll(1);
  return adapter;
}
  }

Upvotes: 0

Views: 949

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121247

Looks like you need to add @EnableIntegration to your test configuration. When you use Spring Boot slices for testing, not all auto-configurations are loaded:

https://docs.spring.io/spring-boot/docs/1.5.9.RELEASE/reference/htmlsingle/#test-auto-configuration

UPDATE

The problem that JdbcPollingChannelAdapter is run in the separate, scheduled thread, already out of the original transaction around test method, where those @Sqls are performed.

The fix for you is like this:

@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
      statements = "Insert into DEMO (CODE) VALUES ('12345');",
      config = @SqlConfig(transactionMode = SqlConfig.TransactionMode.ISOLATED))

Pay attention to that SqlConfig.TransactionMode.ISOLATED. This way the INSERT transaction is committed and the data is available for that separate polling thread for the JdbcPollingChannelAdapter.

Also pay attention that this JdbcPollingChannelAdapter always returns a List of records. So, your assertThat(jdbcPollingChannel.receive(5000).getPayload(), ...); should be against a List<String> even if there is only one record in the table.

Upvotes: 1

Related Questions