Lannest Caster
Lannest Caster

Reputation: 13

Optimising write performance for custom mongodb-sink in flink stream job

I have a streaming flink job which writes data into MongoDB (Kafka -> Map -> MongoDB). Since there seems to be no MongoDB Sink connector currently available for Flink, I created creating a custom one for writing data into MongoDB.

However, the number of write operations seems pretty low. Even though I tried using mongodb driver reactive-streams for asynchronous operations, the result was still not satisfying for my needs (only around 2k writes/sec) when I needed at least 5k writes for my application.

Is there any other things I can do for optimising write performance in such scenario? Below is the code I used for my custom MongoDB flink.

import com.mongodb.WriteConcern;
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoDatabase;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.bson.Document;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.logging.Logger;

public class MongoDBSinkConfig extends RichSinkFunction<Document> {

    Configuration configuration;

    static MongoClient mongoClient;

    static MongoDatabase mongoDatabase;

    public MongoDBSinkConfig(Configuration configuration) {
        this.configuration = configuration;
    }

    @Override
    public void open(Configuration parameters) {
        String uri = configuration.getString(ConfigOptions.key(ConfigurationParamUtils.MONGO_URI).stringType().noDefaultValue());
        String database = configuration.getString(ConfigOptions.key(ConfigurationParamUtils.MONGO_DATABASE).stringType().noDefaultValue());
        String config = configuration.getString(ConfigOptions.key(ConfigurationParamUtils.CONFIG_COLLECTION).stringType().noDefaultValue());
        MongoDBMetricConfig.loadMongoDBMetricConfig(uri, database, config);
        mongoClient = MongoClients.create(uri);
        mongoDatabase = mongoClient.getDatabase(database).withWriteConcern(WriteConcern.UNACKNOWLEDGED);
    }

    @Override
    public void invoke(Document value, Context context) {
        Publisher<InsertOneResult> insertResult = mongoDatabase.getCollection("collection").withWriteConcern(WriteConcern.UNACKNOWLEDGED).insertOne(value);
        
        insertResult.subscribe(new Subscriber<InsertOneResult>() {
            @Override
            public void onSubscribe(Subscription s) {

            }

            @Override
            public void onNext(InsertOneResult insertOneResult) {
                Logger.getGlobal().info(insertOneResult.toString());
            }

            @Override
            public void onError(Throwable t) {
                Logger.getGlobal().info(t.getMessage());
            }

            @Override
            public void onComplete() {
            }


        }) ;

    }

    @Override
    public void close() {
        SinkUtils.closeConnection();
    }
}

Upvotes: 1

Views: 677

Answers (1)

Martijn Visser
Martijn Visser

Reputation: 2108

I don't know MongoDB but a Flink MongoDB connector is in the works at https://github.com/apache/flink-connector-mongodb

Upvotes: 0

Related Questions