daesu
daesu

Reputation: 620

Connect to Amazon MSK cluster

I’m trying to setup an Amazon MSK cluster and connect to it from a lambda function. The lambda function will be a producer of messages, not a consumer.

I am using the serverless framework to provision everything and in my serverless.yml I have added the following and that seems to be working fine.

    MSK:
      Type: AWS::MSK::Cluster
      Properties:
        ClusterName: kafkaOne
        KafkaVersion: 2.2.1
        NumberOfBrokerNodes: 3
        BrokerNodeGroupInfo:
          InstanceType: kafka.t3.small
          ClientSubnets:
            - Ref: PrivateSubnet1
            - Ref: PrivateSubnet2
            - Ref: PrivateSubnet3

But when trying to connect to this cluster to actually send messages I am unsure how to get the connection string here? I presume it should be the ZookeeperConnectString? I’m new to kafka/msk so maybe I am not seeing something obvious.

Any advice much appreciated. Cheers.

Upvotes: 1

Views: 1890

Answers (2)

MGB
MGB

Reputation: 81

The connection string which is called broker bootstrap string can be found my making an API call like aws kafka get-bootstrap-brokers --cluster-arn ClusterArn

See example here: https://docs.aws.amazon.com/msk/latest/developerguide/msk-get-bootstrap-brokers.html

Also here is a step by step walk through on how produce/consume data: https://docs.aws.amazon.com/msk/latest/developerguide/produce-consume.html

Upvotes: 0

Valor_
Valor_

Reputation: 3591

I don't know what kind of code base u are using, so I will add my code which I wrote in GO.

In essence you should connect to MSK cluster the same way as you would connect to some stand alone Kafka instance. We are using brokers for "connecting" or better said writing to MSK cluster.

I'm using segmentio/kafka-go library. My function for sending event to MSK cluster looks like this

// Add event
func addEvent(ctx context.Context, requestBody RequestBodyType) (bool, error) {

    // Prepare dialer
    dialer := &kafka.Dialer{
        Timeout:   2 * time.Second,
        DualStack: true,
    }

    brokers := []string{os.Getenv("KAFKA_BROKER_1"), os.Getenv("KAFKA_BROKER_2"), os.Getenv("KAFKA_BROKER_3"), os.Getenv("KAFKA_BROKER_4")}


    // Prepare writer config
    kafkaConfig := kafka.WriterConfig{
        Brokers:  brokers,
        Topic:    os.Getenv("KAFKA_TOPIC"),
        Balancer: &kafka.Hash{},
        Dialer:   dialer,
    }

    // Prepare writer
    w := kafka.NewWriter(kafkaConfig)


    // Convert struct to json string
    event, err := json.Marshal(requestBody)
    if err != nil {
        fmt.Println("Convert struct to json for writing to KAFKA failed")
        panic(err)
    }

    // Write message
    writeError := w.WriteMessages(ctx,
        kafka.Message{
            Key:   []byte(requestBody.Event),
            Value: []byte(event),
        },
    )
    if writeError != nil {
        fmt.Println("ERROR WRITING EVENT TO KAFKA")
        panic("could not write message " + err.Error())
    }

    return true, nil
}

My serverless.yml

Upper code (addEvent) belongs to functions -> postEvent in serverless.yml... If you are consuming from kafka, then you should check functions -> processEvent. Consuming event is fairly simple, but setting everything up for producing to Kafka it crazy. We are probably working on this for month and a half and still figuring out how everything should be set up. Sadly serverless does not do everything for you, so you will have to "click trough" manually in AWS, but we compared to other frameworks and serverless is still the best right now

provider:
  name: aws
  runtime: go1.x
  stage: dev
  profile: ${env:AWS_PROFILE}
  region: ${env:REGION}
  apiName: my-app-${sls:stage}
  lambdaHashingVersion: 20201221
  environment:
    ENV: ${env:ENV}
    KAFKA_TOPIC: ${env:KAFKA_TOPIC}
    KAFKA_BROKER_1: ${env:KAFKA_BROKER_1}
    KAFKA_BROKER_2: ${env:KAFKA_BROKER_2}
    KAFKA_BROKER_3: ${env:KAFKA_BROKER_3}
    KAFKA_BROKER_4: ${env:KAFKA_BROKER_4}
    KAFKA_ARN: ${env:KAFKA_ARN}
    ACCESS_CONTROL_ORIGINS: ${env:ACCESS_CONTROL_ORIGINS}
    ACCESS_CONTROL_HEADERS: ${env:ACCESS_CONTROL_HEADERS}
    ACCESS_CONTROL_METHODS: ${env:ACCESS_CONTROL_METHODS}
    BATCH_SIZE: ${env:BATCH_SIZE}
    SLACK_API_TOKEN: ${env:SLACK_API_TOKEN}
    SLACK_CHANNEL_ID: ${env:SLACK_CHANNEL_ID}
  httpApi:
    cors: true
  apiGateway:
    resourcePolicy:
      - Effect: Allow
        Action: '*'
        Resource: '*'
        Principal: '*'
  vpc:
    securityGroupIds:
      - sg-*********
    subnetIds:
      - subnet-******
      - subnet-*******

functions:
  postEvent:
    handler: bin/postEvent
    package:
      patterns:
        - bin/postEvent
    events:
      - http:
          path: event
          method: post
          cors:
            origin: ${env:ACCESS_CONTROL_ORIGINS}
            headers:
              - Content-Type
              - Content-Length
              - Accept-Encoding
              - Origin
              - Referer
              - Authorization
              - X-CSRF-Token
              - X-Amz-Date
              - X-Api-Key
              - X-Amz-Security-Token
              - X-Amz-User-Agent
            allowCredentials: false
            methods:
              - OPTIONS
              - POST
  processEvent:
    handler: bin/processEvent
    package:
      patterns:
        - bin/processEvent
    events:
      - msk:
          arn: ${env:KAFKA_ARN}
          topic: ${env:KAFKA_TOPIC}
          batchSize: ${env:BATCH_SIZE}
          startingPosition: LATEST
resources:
  Resources:
    GatewayResponseDefault4XX:
      Type: 'AWS::ApiGateway::GatewayResponse'
      Properties:
        ResponseParameters:
          gatewayresponse.header.Access-Control-Allow-Origin: "'*'"
          gatewayresponse.header.Access-Control-Allow-Headers: "'*'"
        ResponseType: DEFAULT_4XX
        RestApiId:
          Ref: 'ApiGatewayRestApi'
    myDefaultRole:
      Type: AWS::IAM::Role
      Properties:
        Path: /
        RoleName: my-app-dev-eu-serverless-lambdaRole-${sls:stage} # required if you want to use 'serverless deploy --function' later on
        AssumeRolePolicyDocument:
          Version: '2012-10-17'
          Statement:
            - Effect: Allow
              Principal:
                Service:
                  - lambda.amazonaws.com
              Action: sts:AssumeRole
        # note that these rights are needed if you want your function to be able to communicate with resources within your vpc
        ManagedPolicyArns:
          - arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole
          - arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole

I must warn you that we spend a lot of time figuring out how to properly setup VPC and other networking / permission stuff. My collage will write blog post once he arrivers from vacation. :) I hope this helps you some how. Best of luck ;)

UPDATE

If you are using javascript, then you would connect to Kafka similar to this

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'order-app',
  brokers: [
    'broker1:port',
    'broker2:port',
  ],
  ssl: true, // false
})

Upvotes: 2

Related Questions