constantlearner
constantlearner

Reputation: 5247

Configuration not to loose messages

I have spring xd component called throttle where i store the messages and post are and throttle them and send them slowly.I am running my xd in 3 containers.I have holding delay for delayer as 0.When one container goes down.I loose messages.Below is my configuration.Looks like its not storing to postgre and loosing in memory or cache.

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:beans="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:int-groovy="http://www.springframework.org/schema/integration/groovy"
    xmlns:task="http://www.springframework.org/schema/task" 
    xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
    xmlns:jdbc="http://www.springframework.org/schema/jdbc"
    xmlns:ehcache="http://www.springframework.org/schema/cache"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/integration
        http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
        http://www.springframework.org/schema/integration/jdbc 
        http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd
        http://www.springframework.org/schema/integration/groovy
        http://www.springframework.org/schema/integration/groovy/spring-integration-groovy.xsd
        http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-4.1.xsd">



    <ehcache:annotation-driven cache-manager="cacheManager" />

    <context:property-placeholder
        location="classpath*:dms-security-${region}.properties, classpath*:dms-jms-${region}.properties, classpath*:dms-mongo-${region}.properties" />

    <context:component-scan base-package="com.testhrottle.api.dms.*"
        use-default-filters="false">
        <context:include-filter type="annotation"
            expression="org.springframework.context.annotation.Configuration" />
    </context:component-scan>

    <channel id="input">
        <!-- <interceptors>
            <ref bean="messageInterceptor" />
        </interceptors> -->
    </channel>

    <channel id="output" />

    <channel id="sms" />

    <channel id="eml" />

    <channel id="del" />

    <!-- <header-value-router input-channel="input" header-name="Channel-Type" >
        <mapping value="PHN" channel="sms" />
        <mapping value="EML" channel="eml" />
        <mapping value="DEL" channel="del" />
    </header-value-router> -->

    <router input-channel="input">
        <beans:bean name="messageRouter" class="com.testhrottle.xd.util.MessageRouter">
            <beans:property name="cache" ref="configCache" />
        </beans:bean>
    </router>

    <service-activator input-channel="del" ref="delete" method="delete"/>

    <filter input-channel="eml" output-channel="output">
        <beans:bean class="com.testhrottle.xd.throttle.Throttle">
            <beans:constructor-arg value="${tpsThreshold}" />
            <beans:constructor-arg value="${spanSeconds}" />
            <beans:property name="cache" ref="configCache" />
            <beans:property name="dailyLimitCounter" ref="dailyLimitCounter" />
            <beans:property name="errorPublishing" ref="errorPublishing" />
        </beans:bean>
    </filter>

    <filter input-channel="sms" output-channel="output">
        <beans:bean class="com.testhrottle.xd.throttle.SmsThrottle">
            <beans:constructor-arg value="${smsTpsThreshold}" />
            <beans:constructor-arg value="${smsSpanSeconds}" />
            <beans:property name="cache" ref="configCache" />
            <beans:property name="dailyLimitCounter" ref="dailyLimitCounter" />
            <beans:property name="errorPublishing" ref="errorPublishing" />
        </beans:bean>
    </filter>

    <channel id="requeue">
        <dispatcher task-executor="taskExecutor" />
    </channel>

    <task:executor id="taskExecutor" pool-size="1" />

    <delayer id="delayer" input-channel="requeue" default-delay="${holdingDelay}"
        expression="headers['delay']" message-store="messageStore"
        output-channel="input" />


    <!-- <int-jdbc:message-store id="messageStore" data-source="dataSource" /> --> 

    <beans:bean id="messageStore" class="com.testhrottle.xd.util.CustomJdbcMessageStore">
      <beans:constructor-arg ref="dataSource"/>
   </beans:bean>

    <!-- <beans:bean name="messageInterceptor" class="com.testhrottle.xd.util.MessageInterceptor">
        <beans:property name="cache" ref="configCache" />
    </beans:bean> -->

    <beans:bean name="configCache"
        class="com.testhrottle.xd.cache.ConfigCache">
        <beans:property name="jdbcTemplate" ref="jdbcTemplate" />
        <beans:property name="holdRetryIntervelInSeconds" value="${holdRetryIntervelInSeconds}" />
        <beans:property name="configQuery" value="select alrt_type_cd alertType,dlvry_days_of_week daysOfWeek,DLVRY_STRT_TM startTime,DLVRY_END_TM endTime,DLVRY_STRT_DT startDate,DLVRY_END_DT endDate,hold_ind holdFlag, max_msgs_per_day maxMsgPerDay,purge_ind purge from ${schema}.alrt_type where alrt_type_cd = ?" />
    </beans:bean>

    <beans:bean id="dataSource" 
    class="org.springframework.jdbc.datasource.DriverManagerDataSource">
        <beans:property name="driverClassName" value="org.postgresql.Driver"/>
        <beans:property name="url" value="${dbURL}"/>
        <beans:property name="username" value="${userName}"/>
        <beans:property name="password" value="#{encryptedDatum.decryptBase64Encoded('${passWord}')}"/>
    </beans:bean>

    <beans:bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
        <beans:property name="dataSource">
            <beans:ref bean="dataSource" />
        </beans:property>
    </beans:bean>

    <beans:bean id="encryptedDatum" class="com.testhrottle.api.dms.core.security.EncryptedSecuredDatum"/>

    <!-- <beans:bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"/> -->

    <beans:bean id="errorPublishing" class="com.testhrottle.xd.util.ErrorPublishing">
        <!-- <beans:property name="rabbitTemplate" ref="rabbitTemplate" /> -->
    </beans:bean>

    <beans:bean id="delete" class="com.testhrottle.xd.util.Delete"/>

    <!-- 
    <beans:bean id="dataSource" 
    class="org.springframework.jdbc.datasource.DriverManagerDataSource">
        <beans:property name="driverClassName" value="org.postgresql.Driver"/>
        <beans:property name="url" value="jdbc:postgresql://localhost:5432/postgres"/>
        <beans:property name="username" value="postgres"/>
        <beans:property name="password" value="postgres"/>
    </beans:bean>
    <beans:bean name="configCache"
        class="com.testhrottle.xd.cache.ConfigCache">
        <beans:property name="jdbcTemplate" ref="jdbcTemplate" />
        <beans:property name="holdRetryIntervelInSeconds" value="10" />
        <beans:property name="configQuery" value="select alrt_type_cd alertType,dlvry_days_of_week daysOfWeek,DLVRY_STRT_TM startTime,DLVRY_END_TM endTime,DLVRY_STRT_DT startDate,DLVRY_END_DT endDate,hold_ind holdFlag, max_msgs_per_day maxMsgPerDay from dms_wb.alrt_type where alrt_type_cd = ?; 
        " />
    </beans:bean>

    <beans:bean name="cacheTest"
        class="com.testhrottle.xd.cache.CacheTest">
        <beans:property name="cache" ref="configCache" />
    </beans:bean> -->

    <beans:bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>


    <beans:bean id="cacheManager" class="org.springframework.cache.ehcache.EhCacheCacheManager">
        <beans:property name="cacheManager" ref="ehcache" />
    </beans:bean>

    <beans:bean id="ehcache"
        class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean">
        <beans:property name="configLocation" value="classpath:ehcache.xml" />
        <beans:property name="shared" value="true" />
    </beans:bean>

    <beans:bean name="dailyLimitCounter" class="com.testhrottle.xd.throttle.DailyLimitCounter"/>

    <task:scheduler id="taskSchedulerInbound" pool-size="10" />

    <task:scheduled-tasks>
        <task:scheduled ref="dailyLimitCounter" method="clearAlertDailyCount" cron="59 59 23 * * *" />
    </task:scheduled-tasks>

</beans:beans>

so if i remove this it should work in same thread.

<channel id="requeue">
   <dispatcher task-executor="taskExecutor" />
</channel>

<task:executor id="taskExecutor" pool-size="1" />

Upvotes: 1

Views: 91

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121552

Looks like your <dispatcher task-executor="taskExecutor" /> is guilty.

Since you jump out of the Message Bus thread, that acks (commits) the message on the Broker, but the crash of the node doesn't effect that already (rallback), because your message has been moved to different thread. And there it hasn't reached your DB to be commited there, too.

In general it would be better to do everything in the Spring XD withing container's (XD) threads and rely on the Message Bus persistent mechanism. Or commit messages to DB manually, but in the same XD thread.

Makes sense?

Upvotes: 2

Related Questions