smartydoc
smartydoc

Reputation: 85

Apache Camel: Spring multicast + processing

dear community.

I've tried to add multicast aggregation policy in spring config xml in different ways, which i found on apache forums, stackoverflow and lot of others converting from java dsl.

Something i get missed, can you suppose, how it may be configured to get the right behaviour for the situation described below.

i've got some names list (all beans was correctly defined),

<to id="_to85" uri="bean:otrsCIApi?method=ConfigItemNameList()"/>

then i've need to make a post calls splitting each other in a two different system:

<split streaming="true">
    <simple>${body}</simple>
    <!-- iterate in JsonArray -->
    <choice>
        <when>
            <simple>${body} != ''</simple>
            <multicast stopOnException="true" parallelProcessing="true" strategyRef="OtrsZabbixCiAggregationStrategy" >
                <to uri="direct:zabbix_multicast"/>
                <to uri="direct:otrs_multicast"/>
                <to uri="log:jms_apachemq3"/>
            </multicast>
            <to id="_to92" uri="log:jms_apachemq4"/>
        </when>
        <when>
            <simple>${body} == ''</simple>

        </when>
    </choice>
</split>

each direct consist with post calls methods, which are correctly works (and returns JSONObject.toString()):

<route>
    <from uri="direct:zabbix_multicast"/>
    <to uri="bean:zabbixApi?method=getHostBody(body)"/>
    <to uri="log:multicast_zab1"/>
</route>

<route>
    <from uri="direct:otrs_multicast"/>
    <to uri="bean:otrsCIApi?method=searchCI(body)"/>
    <to uri="log:multicast_otrs1"/>
    <to uri="bean:otrsCIApi?method=getCIBodyByID(body)"/>
    <to uri="log:multicast_otrs2"/>
</route>

Aggregation strategy is quite simple, and should work:

import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;
public class OtrsZabbixCiAggregationStrategy implements AggregationStrategy{
    public Exchange aggregate(Exchange exchange1, Exchange exchange2) {
            String body1 = exchange1.getIn().getBody().toString();
            String body2 = exchange2.getIn().getBody().toString();
            String merged = body1 + "," + body2;
            exchange1.getIn().setBody(merged);
            return exchange1;
    }
}

Then i've got pretty strange output, like no aggregation is starting:

[fc8135) thread #39 - Multicast] jms_apachemq3                  INFO  Exchange[ExchangePattern: InOnly, BodyType: String, Body: Zabbix server]
[fc8135) thread #37 - Multicast] multicast_zab1                 INFO  Exchange[ExchangePattern: InOnly, BodyType: String, Body: [{"available":"1","description":"","disable_until":"0","error":"","errors_from":"0","flags":"0","host":"Zabbix server","hostid":"10084","ipmi_authtype":"-1","ipmi_available":"0","ipmi_disable_until":"0","ipmi_error":"","ipmi_errors_from":"0","ipmi_password":"","ipmi_privilege":"2","ipmi_username":"","jmx_available":"0","jmx_disable_until":"0","jmx_error":"","jmx_errors_from":"0","lastaccess":"0","maintenance_from":"0","maintenance_status":"0","maintenance_type":"0","maintenanceid":"0","name":"Zabbix server","proxy_hostid":"0","snmp_available":"0","snmp_disable_until":"0","snmp_error":"","snmp_errors_from":"0","status":"0","templateid":"0","tls_accept":"1","tls_connect":"1","tls_issuer":"","tls_psk":"","tls_psk_identity":"","tls_subject":""}]]
[fc8135) thread #38 - Multicast] multicast_otrs1                INFO  Exchange[ExchangePattern: InOnly, BodyType: com.alibaba.fastjson.JSONArray, Body: 2]
[fc8135) thread #38 - Multicast] multicast_otrs2                INFO  Exchange[ExchangePattern: InOnly, BodyType: String, Body: {"ConfigItem":[{"CurInciState":"Operational","ConfigItemID":"2","InciStateType":"operational","CurInciStateType":"operational","Number":"1022000002","CreateBy":"2","LastVersionID":"2","DeplState":"Production","CurDeplState":"Production","CreateTime":"2016-11-01 13:47:44","DefinitionID":"1","VersionID":"2","DeplStateType":"productive","CIXMLData":{"SerialNumber":"","Ram":"","WarrantyExpirationDate":"2016-11-01","Vendor":"","CPU":"","Model":"","Owner":"","Type":"","HardDisk":{"HardDisk":"","Capacity":""},"GraphicAdapter":"","FQDN":"","OperatingSystem":"","Description":""},"Class":"Computer","InciState":"Operational","CurDeplStateType":"productive","Name":"Zabbix server"}]}]
[hread #32 - JmsConsumer[queue]] jms_apachemq4                  INFO  Exchange[ExchangePattern: InOnly, BodyType: String, Body: Zabbix server]

So i expect body with two String JSON splitted with comma (like in aggregation strategy class), but i've got only body, which sends to additional routes.

Any ideas?

Upvotes: 0

Views: 1767

Answers (1)

lahu89
lahu89

Reputation: 363

I believe you have a NullPointerException in your AggregationStrategy. When it is called the first time, exchange1 is NULL and exchange2 is the exchange object from your first run. Due to the parameter stopOnException="true" the exception is ignored and multicasting is stopped. Therefore you have afterwards your original exchange object.

You only need a need check and it should work:

public Exchange aggregate(Exchange exchange1, Exchange exchange2) {
        if (exchange1==null){
           return exchange2;
        }
        String body1 = exchange1.getIn().getBody().toString();
        String body2 = exchange2.getIn().getBody().toString();
        String merged = body1 + "," + body2;
        exchange1.getIn().setBody(merged);
        return exchange1;
}

Upvotes: 2

Related Questions