Reputation: 31
I am having a problem with eclipse ditto. I want to send a command to update the features of a digital twins using a websocket (in python) and I want to read the new features in an apache kafka topic. This is my websocket:
import asyncio
import random
import time
from websockets import connect
import json
async def func(uri):
async with connect(uri) as websocket:
await websocket.send("START-SEND-EVENTS")
#await websocket.send("START-SEND-MESSAGES")
message = await websocket.recv()
print(message)
while(True):
msg = {
"topic": "org.eclipse.ditto/camera01/things/twin/commands/modify",
"headers": {
"content-type": "text/plain"
},
"path": "features/coordinates/properties",
"value": {"x": random.randrange(0,1000), "y": random.randrange(0,1000), "z": random.randrange(0,1000), "x_rotation": 0.0, "y_rotation": 0.0, "z_rotation": 0.0, "w_rotation": 1.0, "thingId": "org.eclipse.ditto:camera01"}
}
to_send = json.dumps(msg)
time.sleep(1)
await websocket.send(to_send)
msg_recv = await websocket.recv()
print(msg_recv)
uri = "ws://ditto:ditto@localhost:8080/ws/2"
asyncio.run(func(uri))
When I send a message, ditto updates the digital twin and a second websocket gets the new features, but kafka's topic doesn't receive it.
I thought the problem may be the target connection, but it doesn't seem like there are any errors. This is how I set it up:
{
"targetActorSelection": "/system/sharding/connection",
"headers": {
"aggregate": false
},
"piggybackCommand": {
"type": "connectivity.commands:modifyConnection",
"connection": {
"id": "kafka-connection-target",
"connectionType": "kafka",
"connectionStatus": "open",
"failoverEnabled": true,
"uri": "tcp://localhost:9092",
"specificConfig":{
"bootstrapServers":"localhost:9092"
},
"targets": [{
"address": "topic_ditto",
"topics": [
"_/_/things/twin/events",
"_/_/things/live/messages"
],
"authorizationContext": ["ditto:unity"],
"qos": 0
}],
"mappingContext": {
"mappingEngine": "JavaScript",
"options": {
"incomingScript": "function mapToDittoProtocolMsg(headers, textPayload, bytePayload, contentType) {return null;}",
"outgoingScript": "function mapFromDittoProtocolMsg(namespace, id, group, channel, criterion, action, path, dittoHeaders, value, status, extra) {let textPayload = '{\"x\":' + value.coordinates.properties.x + ',\"y\":' + value.coordinates.properties.y + ',\"z\":' + value.coordinates.properties.z + ',\"x_rotation\":' + value.coordinates.properties.x_rotation + ',\"y_rotation\": ' + value.coordinates.properties.y_rotation + ', \"z_rotation\": ' + value.coordinates.properties.z_rotation + ',\"w_rotation\":' + value.coordinates.properties.w_rotation + ',\"idCamera\":\"' + id + '\"}'; let bytePayload = null; let contentType = 'text/plain; charset=UTF-8'; return Ditto.buildExternalMsg(dittoHeaders, textPayload, bytePayload, contentType);}",
"loadBytebufferJS": "false",
"loadLongJS": "false"
}
}
}
}
}
Note: if I update the digital twin using a topic (specified in a source connection), the topic of the target connection receives the new features (also the second websocket ..)
Upvotes: 0
Views: 300
Reputation: 31
Solved.
the path of the message to be sent with websocket was not the correct path to pass to the mapping function of the target connection. This is why I was able to update the digital twin without being able to update the "ditto_topic" topic.
I passed path "features/coordinates/properties"
but the correct path is "/features"
for how I set up the mapping function.
The form of the message I sent is "{" x ": random.randrange (0,1000), .."
but the correct form in this case is {"coordinates": {"properties": {"x": random.randrange (0,1000), ... "
Upvotes: 1