Reputation: 37
I started Memgraph with the Confluent Kafka Platform according to the docker-compose.yml file. After that, I created a Kafka Topic called myTopic, to which the data is streamed. Now I want to implement a consumer called my_stream with a transformation module according to the documentation. The stream settings look like this
When I add my transformation module, attach it to the stream and try to connect the stream, the following Error is thrown:
Transformation Failed
Failed to initialize Kafka consumer my_stream : Local: Broker transport failure
A more detailed description of the error from the terminal where the docker-compose.yml file is running:
Looking at the Confluent Cluster Overview, the broker is running just fine. Anyone knows what the problem is? It doesn't seem like my transformation module is the problem, since the movielens-stream from the documentation connects with the same transformation module without problems (obviously there is an error thrown as soon as the transformation starts, but the stream can be connected and runs). But for the sake of completeness, here is my transformation module:
import mgp
import json
@mgp.transformation
def transfer(messages: mgp.Messages) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]):
result_queries = []
for i in range(messages.total_messages()):
message = messages.message_at(i)
transfer_dict = json.loads(message.payload().decode('utf8'))
result_queries.append(
mgp.Record(
query=(
"""
MERGE (fromAccount:Account {accountId: $fromId, createTime: $fCreateTime, isBlocked: $fIsBlocked, accountType: $fAccountType, nickname: $fNickname, phonenum: $fPhonenum,
email: $fEmail, freqLoginType: $fFreqLoginType, lastLoginTime: $fLastLoginTime, accountLevel: $fAccountLevel})
MERGE (toAccount:Account {accountId: $toId, createTime: $tCreateTime, isBlocked: $tIsBlocked, accountType: $tAccountType, nickname: $tNickname, phonenum: $tPhonenum,
email: $tEmail, freqLoginType: $tFreqLoginType, lastLoginTime: $tLastLoginTime, accountLevel: $tAccountLevel})
MERGE (fromAccount)-[:TRANSFER {amount: $amount, createTime: $eCreateTime, orderNum: $orderNum, comment: $comment, payType: $payType, goodsType: $goodsType,
insertTime: $insertTime, exportTime: timestamp()}]->(toAccount)
"""),
parameters={
"fromId": transfer_dict["fromAccount"]["fromId"], "fCreateTime": transfer_dict["fromAccount"]["createTime"], "fIsBlocked": transfer_dict["fromAccount"]["isBlocked"],
"fAccountType": transfer_dict["fromAccount"]["accountType"], "fNickname": transfer_dict["fromAccount"]["nickname"], "fPhonenum": transfer_dict["fromAccount"]["phonenum"],
"fEmail": transfer_dict["fromAccount"]["email"], "fFreqLoginType": transfer_dict["fromAccount"]["freqLoginType"], "fLastLoginTime": transfer_dict["fromAccount"]["lastLoginTime"],
"fAccountLevel": transfer_dict["fromAccount"]["accountLevel"],
"toId": transfer_dict["toAccount"]["toId"], "tCreateTime": transfer_dict["toAccount"]["createTime"], "tIsBlocked": transfer_dict["toAccount"]["isBlocked"],
"tAccountType": transfer_dict["toAccount"]["accountType"], "tNickname": transfer_dict["toAccount"]["nickname"], "tPhonenum": transfer_dict["toAccount"]["phonenum"],
"tEmail": transfer_dict["toAccount"]["email"], "tFreqLoginType": transfer_dict["toAccount"]["freqLoginType"], "tLastLoginTime": transfer_dict["toAccount"]["lastLoginTime"],
"tAccountLevel": transfer_dict["toAccount"]["accountLevel"],
"amount": transfer_dict["amount"], "eCreateTime": transfer_dict["createTime"], "orderNum": transfer_dict["orderNum"], "comment": transfer_dict["comment"],
"payType": transfer_dict["payType"], "goodsType": transfer_dict["goodsType"], "insertTime": transfer_dict["insertTime"]
}
)
)
return result_queries
Upvotes: 0
Views: 400
Reputation: 37
So it turned out, that the Broker Transport Failure was thrown, because the docker container could not reach the port 9092 at localhost. The point is, that the docker-compose.yml file creates an own network, where each container can be reached through its name, as described in the docker documentation. In the docker-compose.yml from the memgraph docs , the following line
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
defines, that the broker is listening at address broker:29092 in the compose network, and at localhost:9092 on the host machine. So the correct server address to configure the stream from the memgraph UI would be
broker:29092
If you are using docker (not docker-compose!), then the following command works, to access kafka through localhost:9092 within the memgraph docker, if a local kafka cluster is running.
docker run -p 7687:7687 -p 7444:7444 -p 3000:3000 --network="host" --name memgraph memgraph/memgraph-platform
Note, that now the docker container and the docker host will share the same network, as described here. So if Kafka runs locally the docker container can access port 9092 on localhost
Upvotes: 1
Reputation: 371
What you are trying to achieve is to connect the Source (Broker) to a Sink (Consumer).
In the linked docker-compose from the docs, the guide uses the Neo4j Kafka connector.
Notice these lines in docker-compose:
command:
- bash
- -c
- |
confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:latest
/etc/confluent/docker/run
So, if you have set the stream to use the Source as the Neo4j Connector.
You should also set the Sink as the Neo4j Connector, based on JSON schema.
You probably set the Neo4j connector Source (Broker) and now try to connect via the Python Transformation module (Consumer). Technically, that should be possible, but I guess we never tested this combination, and Neo4j Source is probably doing some pre-processing there, which is not compatible with our Python Query modules; they work with pure Kafka Brokers.
That is the reason why Kafka native stream (movie_lens) works with Python QM but not with Neo4j Kafka Connector.
My advice would be if you use Neo4j kafka Connector use it for both Source and Sink, or use native Kafka + Python QM.
We will probably explain this in the docs also, so it avoids further confusion. Hope this helps!
Upvotes: 1