Reputation: 5247
I have spring xd component called throttle where i store the messages and post are and throttle them and send them slowly.I am running my xd in 3 containers.I have holding delay for delayer as 0.When one container goes down.I loose messages.Below is my configuration.Looks like its not storing to postgre and loosing in memory or cache.
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int-groovy="http://www.springframework.org/schema/integration/groovy"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
xmlns:jdbc="http://www.springframework.org/schema/jdbc"
xmlns:ehcache="http://www.springframework.org/schema/cache"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/integration/jdbc
http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd
http://www.springframework.org/schema/integration/groovy
http://www.springframework.org/schema/integration/groovy/spring-integration-groovy.xsd
http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-4.1.xsd">
<ehcache:annotation-driven cache-manager="cacheManager" />
<context:property-placeholder
location="classpath*:dms-security-${region}.properties, classpath*:dms-jms-${region}.properties, classpath*:dms-mongo-${region}.properties" />
<context:component-scan base-package="com.testhrottle.api.dms.*"
use-default-filters="false">
<context:include-filter type="annotation"
expression="org.springframework.context.annotation.Configuration" />
</context:component-scan>
<channel id="input">
<!-- <interceptors>
<ref bean="messageInterceptor" />
</interceptors> -->
</channel>
<channel id="output" />
<channel id="sms" />
<channel id="eml" />
<channel id="del" />
<!-- <header-value-router input-channel="input" header-name="Channel-Type" >
<mapping value="PHN" channel="sms" />
<mapping value="EML" channel="eml" />
<mapping value="DEL" channel="del" />
</header-value-router> -->
<router input-channel="input">
<beans:bean name="messageRouter" class="com.testhrottle.xd.util.MessageRouter">
<beans:property name="cache" ref="configCache" />
</beans:bean>
</router>
<service-activator input-channel="del" ref="delete" method="delete"/>
<filter input-channel="eml" output-channel="output">
<beans:bean class="com.testhrottle.xd.throttle.Throttle">
<beans:constructor-arg value="${tpsThreshold}" />
<beans:constructor-arg value="${spanSeconds}" />
<beans:property name="cache" ref="configCache" />
<beans:property name="dailyLimitCounter" ref="dailyLimitCounter" />
<beans:property name="errorPublishing" ref="errorPublishing" />
</beans:bean>
</filter>
<filter input-channel="sms" output-channel="output">
<beans:bean class="com.testhrottle.xd.throttle.SmsThrottle">
<beans:constructor-arg value="${smsTpsThreshold}" />
<beans:constructor-arg value="${smsSpanSeconds}" />
<beans:property name="cache" ref="configCache" />
<beans:property name="dailyLimitCounter" ref="dailyLimitCounter" />
<beans:property name="errorPublishing" ref="errorPublishing" />
</beans:bean>
</filter>
<channel id="requeue">
<dispatcher task-executor="taskExecutor" />
</channel>
<task:executor id="taskExecutor" pool-size="1" />
<delayer id="delayer" input-channel="requeue" default-delay="${holdingDelay}"
expression="headers['delay']" message-store="messageStore"
output-channel="input" />
<!-- <int-jdbc:message-store id="messageStore" data-source="dataSource" /> -->
<beans:bean id="messageStore" class="com.testhrottle.xd.util.CustomJdbcMessageStore">
<beans:constructor-arg ref="dataSource"/>
</beans:bean>
<!-- <beans:bean name="messageInterceptor" class="com.testhrottle.xd.util.MessageInterceptor">
<beans:property name="cache" ref="configCache" />
</beans:bean> -->
<beans:bean name="configCache"
class="com.testhrottle.xd.cache.ConfigCache">
<beans:property name="jdbcTemplate" ref="jdbcTemplate" />
<beans:property name="holdRetryIntervelInSeconds" value="${holdRetryIntervelInSeconds}" />
<beans:property name="configQuery" value="select alrt_type_cd alertType,dlvry_days_of_week daysOfWeek,DLVRY_STRT_TM startTime,DLVRY_END_TM endTime,DLVRY_STRT_DT startDate,DLVRY_END_DT endDate,hold_ind holdFlag, max_msgs_per_day maxMsgPerDay,purge_ind purge from ${schema}.alrt_type where alrt_type_cd = ?" />
</beans:bean>
<beans:bean id="dataSource"
class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<beans:property name="driverClassName" value="org.postgresql.Driver"/>
<beans:property name="url" value="${dbURL}"/>
<beans:property name="username" value="${userName}"/>
<beans:property name="password" value="#{encryptedDatum.decryptBase64Encoded('${passWord}')}"/>
</beans:bean>
<beans:bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
<beans:property name="dataSource">
<beans:ref bean="dataSource" />
</beans:property>
</beans:bean>
<beans:bean id="encryptedDatum" class="com.testhrottle.api.dms.core.security.EncryptedSecuredDatum"/>
<!-- <beans:bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"/> -->
<beans:bean id="errorPublishing" class="com.testhrottle.xd.util.ErrorPublishing">
<!-- <beans:property name="rabbitTemplate" ref="rabbitTemplate" /> -->
</beans:bean>
<beans:bean id="delete" class="com.testhrottle.xd.util.Delete"/>
<!--
<beans:bean id="dataSource"
class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<beans:property name="driverClassName" value="org.postgresql.Driver"/>
<beans:property name="url" value="jdbc:postgresql://localhost:5432/postgres"/>
<beans:property name="username" value="postgres"/>
<beans:property name="password" value="postgres"/>
</beans:bean>
<beans:bean name="configCache"
class="com.testhrottle.xd.cache.ConfigCache">
<beans:property name="jdbcTemplate" ref="jdbcTemplate" />
<beans:property name="holdRetryIntervelInSeconds" value="10" />
<beans:property name="configQuery" value="select alrt_type_cd alertType,dlvry_days_of_week daysOfWeek,DLVRY_STRT_TM startTime,DLVRY_END_TM endTime,DLVRY_STRT_DT startDate,DLVRY_END_DT endDate,hold_ind holdFlag, max_msgs_per_day maxMsgPerDay from dms_wb.alrt_type where alrt_type_cd = ?;
" />
</beans:bean>
<beans:bean name="cacheTest"
class="com.testhrottle.xd.cache.CacheTest">
<beans:property name="cache" ref="configCache" />
</beans:bean> -->
<beans:bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>
<beans:bean id="cacheManager" class="org.springframework.cache.ehcache.EhCacheCacheManager">
<beans:property name="cacheManager" ref="ehcache" />
</beans:bean>
<beans:bean id="ehcache"
class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean">
<beans:property name="configLocation" value="classpath:ehcache.xml" />
<beans:property name="shared" value="true" />
</beans:bean>
<beans:bean name="dailyLimitCounter" class="com.testhrottle.xd.throttle.DailyLimitCounter"/>
<task:scheduler id="taskSchedulerInbound" pool-size="10" />
<task:scheduled-tasks>
<task:scheduled ref="dailyLimitCounter" method="clearAlertDailyCount" cron="59 59 23 * * *" />
</task:scheduled-tasks>
</beans:beans>
so if i remove this it should work in same thread.
<channel id="requeue">
<dispatcher task-executor="taskExecutor" />
</channel>
<task:executor id="taskExecutor" pool-size="1" />
Upvotes: 1
Views: 91
Reputation: 121552
Looks like your <dispatcher task-executor="taskExecutor" />
is guilty.
Since you jump out of the Message Bus thread, that ack
s (commits) the message on the Broker, but the crash of the node doesn't effect that already (rallback), because your message has been moved to different thread. And there it hasn't reached your DB to be commited there, too.
In general it would be better to do everything in the Spring XD withing container's (XD) threads and rely on the Message Bus persistent mechanism. Or commit messages to DB manually, but in the same XD thread.
Makes sense?
Upvotes: 2