Max
Max

Reputation: 37

Memgraph Broker Transport Failure registering Kafka Stream Consumer

I started Memgraph with the Confluent Kafka Platform according to the docker-compose.yml file. After that, I created a Kafka Topic called myTopic, to which the data is streamed. Now I want to implement a consumer called my_stream with a transformation module according to the documentation. The stream settings look like this Stream Settings

When I add my transformation module, attach it to the stream and try to connect the stream, the following Error is thrown:

Transformation Failed
Failed to initialize Kafka consumer my_stream : Local: Broker transport failure 

A more detailed description of the error from the terminal where the docker-compose.yml file is running:

MemgraphError

Looking at the Confluent Cluster Overview, the broker is running just fine. Anyone knows what the problem is? It doesn't seem like my transformation module is the problem, since the movielens-stream from the documentation connects with the same transformation module without problems (obviously there is an error thrown as soon as the transformation starts, but the stream can be connected and runs). But for the sake of completeness, here is my transformation module:

    import mgp 
    import json
    
    @mgp.transformation
    def transfer(messages: mgp.Messages) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]):
      result_queries = []
    
      for i in range(messages.total_messages()):
        message = messages.message_at(i)
        transfer_dict = json.loads(message.payload().decode('utf8'))
        result_queries.append(
          mgp.Record(
            query=(
              """
            MERGE (fromAccount:Account {accountId: $fromId, createTime: $fCreateTime, isBlocked: $fIsBlocked, accountType: $fAccountType, nickname: $fNickname, phonenum: $fPhonenum, 
            email: $fEmail, freqLoginType: $fFreqLoginType, lastLoginTime: $fLastLoginTime, accountLevel: $fAccountLevel})
            MERGE (toAccount:Account {accountId: $toId, createTime: $tCreateTime, isBlocked: $tIsBlocked, accountType: $tAccountType, nickname: $tNickname, phonenum: $tPhonenum, 
            email: $tEmail, freqLoginType: $tFreqLoginType, lastLoginTime: $tLastLoginTime, accountLevel: $tAccountLevel})
            MERGE (fromAccount)-[:TRANSFER {amount: $amount, createTime: $eCreateTime, orderNum: $orderNum, comment: $comment, payType: $payType, goodsType: $goodsType, 
            insertTime: $insertTime, exportTime: timestamp()}]->(toAccount)
            """),
            parameters={
              "fromId": transfer_dict["fromAccount"]["fromId"], "fCreateTime": transfer_dict["fromAccount"]["createTime"], "fIsBlocked": transfer_dict["fromAccount"]["isBlocked"],
              "fAccountType": transfer_dict["fromAccount"]["accountType"], "fNickname": transfer_dict["fromAccount"]["nickname"], "fPhonenum": transfer_dict["fromAccount"]["phonenum"], 
              "fEmail": transfer_dict["fromAccount"]["email"], "fFreqLoginType": transfer_dict["fromAccount"]["freqLoginType"], "fLastLoginTime": transfer_dict["fromAccount"]["lastLoginTime"], 
              "fAccountLevel": transfer_dict["fromAccount"]["accountLevel"],
              "toId": transfer_dict["toAccount"]["toId"], "tCreateTime": transfer_dict["toAccount"]["createTime"], "tIsBlocked": transfer_dict["toAccount"]["isBlocked"],
              "tAccountType": transfer_dict["toAccount"]["accountType"], "tNickname": transfer_dict["toAccount"]["nickname"], "tPhonenum": transfer_dict["toAccount"]["phonenum"],
              "tEmail": transfer_dict["toAccount"]["email"], "tFreqLoginType": transfer_dict["toAccount"]["freqLoginType"], "tLastLoginTime": transfer_dict["toAccount"]["lastLoginTime"],
              "tAccountLevel": transfer_dict["toAccount"]["accountLevel"],
              "amount": transfer_dict["amount"], "eCreateTime": transfer_dict["createTime"], "orderNum": transfer_dict["orderNum"], "comment": transfer_dict["comment"], 
              "payType": transfer_dict["payType"], "goodsType": transfer_dict["goodsType"], "insertTime": transfer_dict["insertTime"]
            }
          )
        )
        return result_queries

Upvotes: 0

Views: 400

Answers (2)

Max
Max

Reputation: 37

So it turned out, that the Broker Transport Failure was thrown, because the docker container could not reach the port 9092 at localhost. The point is, that the docker-compose.yml file creates an own network, where each container can be reached through its name, as described in the docker documentation. In the docker-compose.yml from the memgraph docs , the following line

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092

defines, that the broker is listening at address broker:29092 in the compose network, and at localhost:9092 on the host machine. So the correct server address to configure the stream from the memgraph UI would be

broker:29092

If you are using docker (not docker-compose!), then the following command works, to access kafka through localhost:9092 within the memgraph docker, if a local kafka cluster is running.

docker run -p 7687:7687 -p 7444:7444 -p 3000:3000 --network="host" --name memgraph memgraph/memgraph-platform

Note, that now the docker container and the docker host will share the same network, as described here. So if Kafka runs locally the docker container can access port 9092 on localhost

Upvotes: 1

Ante Javor
Ante Javor

Reputation: 371

What you are trying to achieve is to connect the Source (Broker) to a Sink (Consumer).

In the linked docker-compose from the docs, the guide uses the Neo4j Kafka connector.

Notice these lines in docker-compose:

command:
  - bash
  - -c
  - |
    confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:latest
    /etc/confluent/docker/run

So, if you have set the stream to use the Source as the Neo4j Connector.

You should also set the Sink as the Neo4j Connector, based on JSON schema.

You probably set the Neo4j connector Source (Broker) and now try to connect via the Python Transformation module (Consumer). Technically, that should be possible, but I guess we never tested this combination, and Neo4j Source is probably doing some pre-processing there, which is not compatible with our Python Query modules; they work with pure Kafka Brokers.

That is the reason why Kafka native stream (movie_lens) works with Python QM but not with Neo4j Kafka Connector.

My advice would be if you use Neo4j kafka Connector use it for both Source and Sink, or use native Kafka + Python QM.

We will probably explain this in the docs also, so it avoids further confusion. Hope this helps!

Upvotes: 1

Related Questions