Reputation: 620
I’m trying to setup an Amazon MSK cluster and connect to it from a lambda function. The lambda function will be a producer of messages, not a consumer.
I am using the serverless framework to provision everything and in my serverless.yml I have added the following and that seems to be working fine.
MSK:
Type: AWS::MSK::Cluster
Properties:
ClusterName: kafkaOne
KafkaVersion: 2.2.1
NumberOfBrokerNodes: 3
BrokerNodeGroupInfo:
InstanceType: kafka.t3.small
ClientSubnets:
- Ref: PrivateSubnet1
- Ref: PrivateSubnet2
- Ref: PrivateSubnet3
But when trying to connect to this cluster to actually send messages I am unsure how to get the connection string here? I presume it should be the ZookeeperConnectString? I’m new to kafka/msk so maybe I am not seeing something obvious.
Any advice much appreciated. Cheers.
Upvotes: 1
Views: 1890
Reputation: 81
The connection string which is called broker bootstrap string can be found my making an API call like aws kafka get-bootstrap-brokers --cluster-arn ClusterArn
See example here: https://docs.aws.amazon.com/msk/latest/developerguide/msk-get-bootstrap-brokers.html
Also here is a step by step walk through on how produce/consume data: https://docs.aws.amazon.com/msk/latest/developerguide/produce-consume.html
Upvotes: 0
Reputation: 3591
I don't know what kind of code base u are using, so I will add my code which I wrote in GO.
In essence you should connect to MSK cluster the same way as you would connect to some stand alone Kafka instance. We are using brokers for "connecting" or better said writing to MSK cluster.
I'm using segmentio/kafka-go library. My function for sending event to MSK cluster looks like this
// Add event
func addEvent(ctx context.Context, requestBody RequestBodyType) (bool, error) {
// Prepare dialer
dialer := &kafka.Dialer{
Timeout: 2 * time.Second,
DualStack: true,
}
brokers := []string{os.Getenv("KAFKA_BROKER_1"), os.Getenv("KAFKA_BROKER_2"), os.Getenv("KAFKA_BROKER_3"), os.Getenv("KAFKA_BROKER_4")}
// Prepare writer config
kafkaConfig := kafka.WriterConfig{
Brokers: brokers,
Topic: os.Getenv("KAFKA_TOPIC"),
Balancer: &kafka.Hash{},
Dialer: dialer,
}
// Prepare writer
w := kafka.NewWriter(kafkaConfig)
// Convert struct to json string
event, err := json.Marshal(requestBody)
if err != nil {
fmt.Println("Convert struct to json for writing to KAFKA failed")
panic(err)
}
// Write message
writeError := w.WriteMessages(ctx,
kafka.Message{
Key: []byte(requestBody.Event),
Value: []byte(event),
},
)
if writeError != nil {
fmt.Println("ERROR WRITING EVENT TO KAFKA")
panic("could not write message " + err.Error())
}
return true, nil
}
My serverless.yml
Upper code (addEvent) belongs to functions -> postEvent in serverless.yml... If you are consuming from kafka, then you should check functions -> processEvent. Consuming event is fairly simple, but setting everything up for producing to Kafka it crazy. We are probably working on this for month and a half and still figuring out how everything should be set up. Sadly serverless does not do everything for you, so you will have to "click trough" manually in AWS, but we compared to other frameworks and serverless is still the best right now
provider:
name: aws
runtime: go1.x
stage: dev
profile: ${env:AWS_PROFILE}
region: ${env:REGION}
apiName: my-app-${sls:stage}
lambdaHashingVersion: 20201221
environment:
ENV: ${env:ENV}
KAFKA_TOPIC: ${env:KAFKA_TOPIC}
KAFKA_BROKER_1: ${env:KAFKA_BROKER_1}
KAFKA_BROKER_2: ${env:KAFKA_BROKER_2}
KAFKA_BROKER_3: ${env:KAFKA_BROKER_3}
KAFKA_BROKER_4: ${env:KAFKA_BROKER_4}
KAFKA_ARN: ${env:KAFKA_ARN}
ACCESS_CONTROL_ORIGINS: ${env:ACCESS_CONTROL_ORIGINS}
ACCESS_CONTROL_HEADERS: ${env:ACCESS_CONTROL_HEADERS}
ACCESS_CONTROL_METHODS: ${env:ACCESS_CONTROL_METHODS}
BATCH_SIZE: ${env:BATCH_SIZE}
SLACK_API_TOKEN: ${env:SLACK_API_TOKEN}
SLACK_CHANNEL_ID: ${env:SLACK_CHANNEL_ID}
httpApi:
cors: true
apiGateway:
resourcePolicy:
- Effect: Allow
Action: '*'
Resource: '*'
Principal: '*'
vpc:
securityGroupIds:
- sg-*********
subnetIds:
- subnet-******
- subnet-*******
functions:
postEvent:
handler: bin/postEvent
package:
patterns:
- bin/postEvent
events:
- http:
path: event
method: post
cors:
origin: ${env:ACCESS_CONTROL_ORIGINS}
headers:
- Content-Type
- Content-Length
- Accept-Encoding
- Origin
- Referer
- Authorization
- X-CSRF-Token
- X-Amz-Date
- X-Api-Key
- X-Amz-Security-Token
- X-Amz-User-Agent
allowCredentials: false
methods:
- OPTIONS
- POST
processEvent:
handler: bin/processEvent
package:
patterns:
- bin/processEvent
events:
- msk:
arn: ${env:KAFKA_ARN}
topic: ${env:KAFKA_TOPIC}
batchSize: ${env:BATCH_SIZE}
startingPosition: LATEST
resources:
Resources:
GatewayResponseDefault4XX:
Type: 'AWS::ApiGateway::GatewayResponse'
Properties:
ResponseParameters:
gatewayresponse.header.Access-Control-Allow-Origin: "'*'"
gatewayresponse.header.Access-Control-Allow-Headers: "'*'"
ResponseType: DEFAULT_4XX
RestApiId:
Ref: 'ApiGatewayRestApi'
myDefaultRole:
Type: AWS::IAM::Role
Properties:
Path: /
RoleName: my-app-dev-eu-serverless-lambdaRole-${sls:stage} # required if you want to use 'serverless deploy --function' later on
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action: sts:AssumeRole
# note that these rights are needed if you want your function to be able to communicate with resources within your vpc
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole
- arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole
I must warn you that we spend a lot of time figuring out how to properly setup VPC and other networking / permission stuff. My collage will write blog post once he arrivers from vacation. :) I hope this helps you some how. Best of luck ;)
UPDATE
If you are using javascript, then you would connect to Kafka similar to this
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'order-app',
brokers: [
'broker1:port',
'broker2:port',
],
ssl: true, // false
})
Upvotes: 2