Amarjit Dhillon
Amarjit Dhillon

Reputation: 2816

How to send events from siddhi event simulator to android siddhi app

I have a siddhi cep application running on Android. Now I want to send events from event simulator from stream processing editor to android app via a socket connection. Till now I have been successful in making android server socket which listens to python client simulator made by me. But to ease the process, is it possible that I can use event simulator to send events to android siddhi app?

I was wondering if I can change some configurations such that event simulator sends events to android socket, so I looked at setting in deployment.yaml file

enter image description here

but the sending configurations are defined for HTTP

senderConfigurations:
    -
      id: "http-sender"

  # Configuration used for the databridge communication
databridge.config:
    # No of worker threads to consume events
    # THIS IS A MANDATORY FIELD
  workerThreads: 10
    # Maximum amount of messages that can be queued internally in MB
    # THIS IS A MANDATORY FIELD
  maxEventBufferCapacity: 10000000
    # Queue size; the maximum number of events that can be stored in the queue
    # THIS IS A MANDATORY FIELD
  eventBufferSize: 2000
    # Keystore file path
    # THIS IS A MANDATORY FIELD
  keyStoreLocation : ${sys:carbon.home}/resources/security/wso2carbon.jks
    # Keystore password
    # THIS IS A MANDATORY FIELD
  keyStorePassword : wso2carbon
    # Session Timeout value in mins
    # THIS IS A MANDATORY FIELD
  clientTimeoutMin: 30
    # Data receiver configurations
    # THIS IS A MANDATO

Thanks in advance. If you need some more details please let me know


Edit 1

I actually found a way around to do it but it's having some issues. So basically I redirected output sink of event generator to port such that sink has all the data streams. The code for Stream Processor Studio editor is

    @App:name("PatternMatching")

@App:description('Identify event patterns based on the order of event arrival')

define stream RoomTemperatureStream(roomNo string, temp double);

@sink(type="tcp",  url='tcp://localhost:5001/abc', sync='false', tcp.no.delay='true', keep.alive='true', worker.threads="10", @map(type='text'))
define stream RoomTemperatureAlertStream(roomNo string, initialTemp double, finalTemp double);

--Capture a pattern where the temperature of a room increases by 5 degrees within 2 minutes
@info(name='query1')
from RoomTemperatureStream
select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp
insert into RoomTemperatureAlertStream;

it sends the streams as text to python server, which needs to be started first, code of which is

#!/usr/bin/env python
# Author : Amarjit Singh
import pickle
import socket

import pandas
from pandas import json

if __name__ == "__main__":


    # ------------------ create a socket object-----------------------#
    try:
        serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    except socket.error as err:
        serversocket.close()
        print("socket creation failed with error %s" % (err))

    except KeyboardInterrupt:
        serversocket.close()
        print("KeyboardInterrupt - but server socket was closed ")



    host = "127.0.0.1"
    Server_port = 5001

    # ------------------ binding to the port -----------------------#

    try:

        serversocket.bind((host, Server_port))
        serversocket.listen(50)  # queue up to 5 requests

        print("\n Server has started and waiting for connection request   ..... ")

        # bind to the port
        while True:  # extra while is created  so that server runs even if there is no data

            running = True
            clientsocket, addr = serversocket.accept()  # accept a connection from client



            print("Got a connection from Server%s" % str(addr))  # show connection success message


            while running:

                receivedData = clientsocket.recv(2048)
                # json = receivedData

                if receivedData:

                    print(receivedData)
                    print(receivedData[0])
                    print(receivedData[1])
                    print(receivedData[2])


                    # roomNo = str(receivedData[0])
                    # temp = int(client_tuple[1])  # from unicode to int
                    #
                    # print(" roomNo = %d:  UUID = %s temp = %d" % (roomNo, temp))


    except socket.error as err:
        serversocket.close()
        print("socket creation failed with error %s" % (err))

    except KeyboardInterrupt:
        serversocket.close()
        print("KeyboardInterrupt - but server socket was closed ")

Initially, I was sending json data from simulator, but pickle.loads and json.loads did not work. but the problem with text is that data is displayed as

b'\x02\x00\x00\x00t\x00\x00\x003bed14d31-6697-4a74-8a3f-30dc012914ad-localhost:5001\x00\x00\x00\x03abc\x00\x00\x002roomNo:"X0ZYp",\ninitialTemp:15.97,\nfinalTemp:17.22'

b'\x02\x00\x00\x00t\x00\x00\x003bed14d31-6697-4a74-8a3f-30dc012914ad-localhost:5001\x00\x00\x00\x03abc\x00\x00\x002roomNo:"2X951",\ninitialTemp:13.42,\nfinalTemp:10.76'

b'\x02\x00\x00\x00t\x00\x00\x003bed14d31-6697-4a74-8a3f-30dc012914ad-localhost:5001\x00\x00\x00\x03abc\x00\x00\x002roomNo:"PUaJA",\ninitialTemp:15.46,\nfinalTemp:16.26'

b'\x02\x00\x00\x00t\x00\x00\x003bed14d31-6697-4a74-8a3f-30dc012914ad-localhost:5001\x00\x00\x00\x03abc\x00\x00\x002roomNo:"pnz0i",\ninitialTemp:10.42,\nfinalTemp:15.82'

how to remove this extra data?

Upvotes: 0

Views: 308

Answers (1)

suho
suho

Reputation: 912

Siddhi has a WebSocket connector[1] and it's still WIP. Using this dependency you will be able to add a WebSocket sink to your app and send events from that.

Unfortunately, you cannot directly send events from Stream Processor[2] Studio/Editor but if you have an app running in Stream Processor Editor and if it has a WebSocket sink then you can send events to App's sink stream from the Simulator which will intern send that message via WebSocket to the Siddhi app in android.

You can only simulate apps running inside the editor via the Event Simulator or simulate apps deployed in Stream Processor worker nodes via Event Simulator API.

[1]https://github.com/wso2-extensions/siddhi-io-websocket

[2]https://wso2.com/analytics

Upvotes: 2

Related Questions