Reputation: 13
I have a streaming flink job which writes data into MongoDB (Kafka -> Map -> MongoDB). Since there seems to be no MongoDB Sink connector currently available for Flink, I created creating a custom one for writing data into MongoDB.
However, the number of write operations seems pretty low. Even though I tried using mongodb driver reactive-streams for asynchronous operations, the result was still not satisfying for my needs (only around 2k writes/sec) when I needed at least 5k writes for my application.
Is there any other things I can do for optimising write performance in such scenario? Below is the code I used for my custom MongoDB flink.
import com.mongodb.WriteConcern;
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoDatabase;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.bson.Document;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.logging.Logger;
public class MongoDBSinkConfig extends RichSinkFunction<Document> {
Configuration configuration;
static MongoClient mongoClient;
static MongoDatabase mongoDatabase;
public MongoDBSinkConfig(Configuration configuration) {
this.configuration = configuration;
}
@Override
public void open(Configuration parameters) {
String uri = configuration.getString(ConfigOptions.key(ConfigurationParamUtils.MONGO_URI).stringType().noDefaultValue());
String database = configuration.getString(ConfigOptions.key(ConfigurationParamUtils.MONGO_DATABASE).stringType().noDefaultValue());
String config = configuration.getString(ConfigOptions.key(ConfigurationParamUtils.CONFIG_COLLECTION).stringType().noDefaultValue());
MongoDBMetricConfig.loadMongoDBMetricConfig(uri, database, config);
mongoClient = MongoClients.create(uri);
mongoDatabase = mongoClient.getDatabase(database).withWriteConcern(WriteConcern.UNACKNOWLEDGED);
}
@Override
public void invoke(Document value, Context context) {
Publisher<InsertOneResult> insertResult = mongoDatabase.getCollection("collection").withWriteConcern(WriteConcern.UNACKNOWLEDGED).insertOne(value);
insertResult.subscribe(new Subscriber<InsertOneResult>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(InsertOneResult insertOneResult) {
Logger.getGlobal().info(insertOneResult.toString());
}
@Override
public void onError(Throwable t) {
Logger.getGlobal().info(t.getMessage());
}
@Override
public void onComplete() {
}
}) ;
}
@Override
public void close() {
SinkUtils.closeConnection();
}
}
Upvotes: 1
Views: 677
Reputation: 2108
I don't know MongoDB but a Flink MongoDB connector is in the works at https://github.com/apache/flink-connector-mongodb
Upvotes: 0