Mistre83
Mistre83

Reputation: 2827

Spring Integration - Scheduling Job from configuration file

I'm using Spring Integration to parse XML file and i will need to create a thread (and each one have a different rate) for each tag.

Right now (with the help of many users here :)) i'm able to split XML by tag and then route it to the appropiate service-activator.

This works great but i'm not able to redirect to a channel that create "a thread" and then execute the operations. Right now i have the following configuration and in my mind (that i dont know if it is correct...)

Split tag -> Route to the appropiate channel -> Start a thread(from tag configuration) -> Execute the operation

This is my actual configuration that split tag and redirect to the channel. The router should redirect not toward a channel directly, but schedule them.

In first instance will be enought to redirect it in a pool with fixed rate and later i will use XPATH to get the attribute and then replace this "fixed" rate with the correct value.

I've tried many solutions to create this flow but each one fails or do not compile :(

<context:component-scan base-package="it.mypkg" />

<si:channel id="rootChannel" />

<si-xml:xpath-splitter id="mySplitter" input-channel="rootChannel" output-channel="routerChannel" create-documents="true">
    <si-xml:xpath-expression expression="//service" />
</si-xml:xpath-splitter>

<si-xml:xpath-router id="router" input-channel="routerChannel" evaluate-as-string="true">
    <si-xml:xpath-expression expression="concat(name(./node()), 'Channel')" />
</si-xml:xpath-router>

<si:service-activator input-channel="serviceChannel" output-channel="endChannel">
    <bean class="it.mypkg.Service" />
</si:service-activator>

UPDATE: Using this configuration for the service this should run a task every 10 seconds (the id=service1) and every 5 seconds the other (the id=service2). In the same way i can have another tag that is handle by another class (because this will have another behaviour)

<root>
    <service id="service1" interval="10000" />
    <service id="service2" interval="5000" />
    <activity id="activity1" interval="50000" />
<root>

I will have a classe (Service) that is general to handle Service tag and this complete some operation and then "return me" the value so i can redirect to another channel.

public class Service {
    public int execute() {
        // Execute the task and return the value to continue the "chain"
    }
}

Upvotes: 0

Views: 1428

Answers (2)

Mistre83
Mistre83

Reputation: 2827

I dont know if this is the right way to implement the flow, but i've write the follow code:

applicationContext.xml

<context:component-scan base-package="it.mypkg" />

<!-- Expression to extract interval from XML tag -->    
<si-xml:xpath-expression id="selectIntervalXpath" expression="//*/@interval" />

<si:channel id="rootChannel" />

<!-- Split each tag to redirect on router -->
<si-xml:xpath-splitter id="mySplitter" input-channel="rootChannel" output-channel="routerChannel" create-documents="true">
    <si-xml:xpath-expression expression="//service|//activity" />
</si-xml:xpath-splitter>

<!-- Route each tag to the appropiate channel -->
<si-xml:xpath-router id="router" input-channel="routerChannel" evaluate-as-string="true">
    <si-xml:xpath-expression expression="concat(name(./node()), 'Channel')" />
</si-xml:xpath-router>

<!-- Activator for Service Tag -->
<si:service-activator input-channel="serviceChannel" method="schedule">
    <bean class="it.mypkg.Service" />
</si:service-activator>

<!-- Activator for Activity Tag -->
<si:service-activator input-channel="activityChannel" method="schedule">
    <bean class="it.mypkg.Activity" />
</si:service-activator>

<!-- Task scheduler -->
<task:scheduler id="taskScheduler" pool-size="10"/>

Each tag will extend an Operation class (to avoid code duplication on bean injection)

Operation.java

public abstract class Operation {

    protected TaskScheduler taskScheduler;
    protected XPathExpression selectIntervalXpath;

    abstract public void schedule(Node document);

    @Autowired
    public void setTaskScheduler(TaskScheduler taskScheduler) {
        this.taskScheduler= taskScheduler;
    }

    public TaskScheduler getTaskScheduler() {
        return this.taskScheduler;
    }

    @Autowired
    public void setSelectIntervalXpath(XPathExpression selectIntervalXpath) {
        this.selectIntervalXpath = selectIntervalXpath;
    }

    public XPathExpression getSelectIntervalXPath() {
        return this.selectIntervalXpath;
    }
}

And an example of Service class (that handle all tags service provided on .xml)

public class Service extends Operation {

    private static final Logger log = Logger.getLogger(Service.class);

    @Override
    public void schedule(Node document) {
        log.debug("Scheduling Service");
        long interval = Long.parseLong(this.selectIntervalXpath.evaluateAsString(document));

        this.taskScheduler.scheduleAtFixedRate(new ServiceRunner(), interval);
    }

    private class ServiceRunner implements Runnable {

        public void run() {
            log.debug("Running...");

        }
    }
}

Now to continue my flow i will need to find a way to redirect the output of each job to Spring Integration (applicationContext.xml).

Upvotes: 0

Gary Russell
Gary Russell

Reputation: 174554

It's not at all clear what you mean; you split a tag; route it but want to "schedule" it at a rate in the XML. It's not clear what you mean by "schedule" here - normally each message is processed once not multiple times on a schedule.

As I said, I don't understand what you need to do, but a smart poller might be suitable.

Another possibility is the delayer where the amount of the delay can be derived from the message.

EDIT

Since your "services" don't seem to take any input data, it looks like you simply need to configure/start an <inbound-channel-adapter/> for each service, and then start it, based on the arguments in the XML.

<int:inbound-channel-adapter id="service1" channel="foo"
            auto-startup="false"
            ref="service1Bean" method="execute">
     <poller fixed-delay="1000" />
</int:inbound-channel-adapter/>

Note auto-startup="false".

Now, in the code that receives the split

@Autowired
SourcePollingChannelAdapter service1;

...

public void startService1(Node node) {

    ...

    service1.setTrigger(new PeridicTrigger(...));

    service1.start();

    ...
}

Upvotes: 0

Related Questions