Pavlic Morozov
Pavlic Morozov

Reputation: 153

REST WSO2 Kafka

I've installed WSO2 Integration Studio version 6.5.0 in my Windows workstation and created a project using the Kafka Consumer and Producer built-in template.

api.xml:

<?xml version="1.0" encoding="UTF-8"?>
<api context="/publishweatherdata" name="WeatherDataPublishAPI" xmlns="http://ws.apache.org/ns/synapse">
    <resource methods="POST">
        <inSequence>
            <kafkaTransport.init>
                <bootstrapServers>localhost:9092</bootstrapServers>
                <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
                <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
                <acks>all</acks>
                <requestTimeout>10000</requestTimeout>
                <timeout>8000</timeout>
                <metadataFetchTimeout>5000</metadataFetchTimeout>
                <maxPoolSize>50</maxPoolSize>
            </kafkaTransport.init>
            <kafkaTransport.publishMessages>
                <topic>weatherdatatopic</topic>
            </kafkaTransport.publishMessages>
            <payloadFactory media-type="json">
                <format>
                    {"topic":"$1", "partition":"$2", "offset":"$3"}
                </format>
                <args>
                    <arg evaluator="xml" expression="$ctx:topic"/>
                    <arg evaluator="xml" expression="$ctx:partition"/>
                    <arg evaluator="xml" expression="$ctx:offset"/>
                </args>
            </payloadFactory>
            <property name="messageType" scope="axis2" type="STRING" value="application/json"/>
            <respond/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </resource>
</api>

weatherdatatransmitinboundEP.xml:

<?xml version="1.0" encoding="UTF-8"?>
<inboundEndpoint class="org.wso2.carbon.inbound.kafka.KafkaMessageConsumer" name="WeatherDataTransmitInboundEP" onError="WeatherDataErrorSeq" sequence="WeatherDataProcessSeq" suspend="false" xmlns="http://ws.apache.org/ns/synapse">
    <parameters>
        <parameter name="sequential">true</parameter>
        <parameter name="interval">10</parameter>
        <parameter name="coordination">true</parameter>
        <parameter name="inbound.behavior">polling</parameter>
        <parameter name="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
        <parameter name="topic.name">weatherdatatopic</parameter>
        <parameter name="poll.timeout">100</parameter>
        <parameter name="bootstrap.servers">localhost:9092</parameter>
        <parameter name="group.id">hello</parameter>
        <parameter name="contentType">application/json</parameter>
        <parameter name="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
        <parameter name="class">org.wso2.carbon.inbound.kafka.KafkaMessageConsumer</parameter>
    </parameters>
    
</inboundEndpoint>

WeatherDataPublishService.xml:

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="WeatherDataPublishService" startOnLoad="true" transports="http https" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <kafkaTransport.init>
                <bootstrapServers>localhost:9092</bootstrapServers>
                <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
                <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
                <acks>all</acks>
                <requestTimeout>10000</requestTimeout>
                <timeout>8000</timeout>
                <metadataFetchTimeout>5000</metadataFetchTimeout>
                <maxPoolSize>50</maxPoolSize>
            </kafkaTransport.init>
            <kafkaTransport.publishMessages>
                <topic>weatherdatatopic</topic>
            </kafkaTransport.publishMessages>
            <payloadFactory media-type="json">
                <format>
                    {"topic":"$1", "partition":"$2", "offset":"$3"}
                </format>
                <args>
                    <arg evaluator="xml" expression="$ctx:topic"/>
                    <arg evaluator="xml" expression="$ctx:partition"/>
                    <arg evaluator="xml" expression="$ctx:offset"/>
                </args>
            </payloadFactory>
            <property name="messageType" scope="axis2" type="STRING" value="application/json"/>
            <respond/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </target>
</proxy>

now I can send post request to http://localhost:8290/publishweatherdata and get it in the kafka topic. Also I can receive message produced by kafka in wso2. How can I send message to external service from wso2? I think I should to use

<endpoint [name="string"] [key="string"]>
        address-endpoint | default-endpoint | wsdl-endpoint | load-balanced-endpoint | fail-over-endpoint
</endpoint>

but I have no idea where it must be added and how configured

Upvotes: 2

Views: 331

Answers (2)

Pavlic Morozov
Pavlic Morozov

Reputation: 153

I just add

<send>
        <endpoint>
            <http method="post" statistics="enable" trace="enable" uri-template="http://localhost:8081/api">
            <property name="name" scope="axis2" value="messageValue"/>
                <suspendOnFailure>
                    <initialDuration>-1</initialDuration>
                    <progressionFactor>-1</progressionFactor>
                    <maximumDuration>0</maximumDuration>
                </suspendOnFailure>
                <markForSuspension>
                    <retriesBeforeSuspension>0</retriesBeforeSuspension>
                </markForSuspension>
            </http>
            
        </endpoint>
    </send>

to sequence and get what I want

Upvotes: 2

Dilan Premarathna
Dilan Premarathna

Reputation: 1294

You can use a call mediator [1] or a send mediator [2] to achieve your use case. Within the mediator, you can define the desired endpoint you want to invoke. Please refer to the following sample configuration. Here we have used a call mediator to invoke the external endpoint http://run.mocky.io/v3/9cf4b844-57c1-4fa5-a101-881dc36385bd.

     <call>
        <endpoint>
           <address uri="http://run.mocky.io/v3/9cf4b844-57c1-4fa5-a101-881dc36385bd"/>
        </endpoint>
     </call>

In your use case, if you have finish building the desired payload after the payload factory mediator, you can use the call mediator after the payload factory mediator to invoke the external endpoint. Here the payload build by the payload factory mediator will be used to invoke the external endpoint.

    <payloadFactory media-type="json">
                <format>
                    {"topic":"$1", "partition":"$2", "offset":"$3"}
                </format>
                <args>
                    <arg evaluator="xml" expression="$ctx:topic"/>
                    <arg evaluator="xml" expression="$ctx:partition"/>
                    <arg evaluator="xml" expression="$ctx:offset"/>
                </args>
            </payloadFactory>
            
             <call>
                <endpoint>
                   <address uri="http://run.mocky.io/v3/9cf4b844-57c1-4fa5-a101-881dc36385bd"/>
                </endpoint>
             </call>
<property name="messageType" scope="axis2" type="STRING" value="application/json"/>
            <respond/>

Further, in the case of WeatherDataTransmitInboundEP(inbound endpoint), it will read the messages published to Kafka, and then the message is sent to the sequence defined in the inbound endpoint. If you want to send the messages consumed by WeatherDataTransmitInboundEP to an external endpoint you have to follow a different approach. In your case, WeatherDataProcessSeq is invoked after reading messages from Kafka. So if your requirement is to send the messages in Kafka you will need to define the call or the send mediator in the WeatherDataProcessSeq.

If you want further clarification regarding the call/send mediator please refer to the blog post [3].

[1]-https://docs.wso2.com/display/EI6xx/Call+Mediator [2]-https://docs.wso2.com/display/EI600/Send+Mediator [3]-https://www.yenlo.com/blog/wso2torial-to-send-or-not-to-send-that-is-your-choice

Upvotes: 2

Related Questions