Reputation: 21
I'm currently working on a streaming pipeline in Apache Beam (v2.43) to insert data in mongoDB. It runs on dataflow quite fine, but I'm not able to control the number of connections : in case of input peak (PubSub), dataflow scales up and overwhelms Mongo with thousands of connections. Even with a limited number of workers (15), I reach more than 20K connections on MongoDB side.
I use a custom doFn (input data is already formatted as KV<Document, Document> : first document is the search query and the second document is the update operation) to upsert records in BulkMode. To improve performances, I accumulate records until reaching a number of elements and then flush the buffer.
In this class, I create a pool of connections during @setup and pick connections from this pool in the process @elementElement step. I put a max size of 10 elements in the ConnectionPoolSettings, so I was expecting around 150 connections at most (15 workers * 10 connection/worker).
Here is a code snippet.
static class BatchUpsertFn extends DoFn<KV<Document, Document>, CustomBulkWriteError> {
private final ConcurrentHashMap<String, MongoClient> workerHashmap;
private final String instanceId;
private List<WriteModel<Document>> batch;
BatchUpsertFn() {
this.workerHashmap = new ConcurrentHashMap<>();
this.instanceId = UUID.randomUUID().toString();
}
@Setup
public void createMongoClient() {
workerHashmap.putIfAbsent(instanceId, MongoClients
.create(MongoClientSettings.builder()
.applyConnectionString(new ConnectionString("myUri"))
.applyToConnectionPoolSettings(builder -> builder
.maxConnectionIdleTime(60, SECONDS)
.maxConnectionLifeTime(60, SECONDS)
.maxSize(10)
.minSize(1)
)
.applyToSocketSettings(builder -> builder
.connectTimeout(10, SECONDS)
.readTimeout(20, SECONDS))
.build()));
}
@StartBundle
public void startBundle() {
batch = new ArrayList<>();
}
@ProcessElement
public void processElement(ProcessContext ctx) {
batch.add(
new UpdateManyModel<>(
ctx.element().getKey(),
ctx.element().getValue(),
new UpdateOptions().upsert(true)));
// If batch limit is exceeded
if (batch.size() >= 1024L) {
try {
flush();
} catch (MongoBulkWriteException pException) {
pException.getWriteErrors()
.forEach(pBulkWriteError ->
ctx.output(new CustomBulkWriteError(pBulkWriteError)));
}
}
}
@FinishBundle
public void finishBundle(FinishBundleContext ctx) {
try {
flush();
} catch (MongoBulkWriteException pException) {
pException.getWriteErrors()
.forEach(pBulkWriteError ->
ctx.output(new CustomBulkWriteError(pBulkWriteError),
Instant.now(),
GlobalWindow.INSTANCE));
}
}
private void flush() throws MongoBulkWriteException {
if (batch.isEmpty()) {
return;
}
MongoDatabase mongoDatabase = workerHashmap.get(instanceId).getDatabase("myDatabase");
MongoCollection<Document> mongoCollection = mongoDatabase.getCollection("myCollection");
try {
mongoCollection.bulkWrite(batch, new BulkWriteOptions().ordered(false));
batch.clear();
} catch (MongoBulkWriteException pException) {
batch.clear();
throw pException;
}
}
@Teardown
public void closeMongoClient() {
workerHashmap.get(instanceId).close();
}
}
Any idea is more than welcome
Upvotes: 0
Views: 171
Reputation: 1428
You don't have a lot of control over how many threads Beam / Dataflow will create.
The correct way of limiting parallelism is probably through the usage of a specific set of keys, as any operations for the same key, when followed by keyed operations (e.g., if you use state), will be guaranteed to be processed by the same worker.
This template Pub/Sub to Splunk has a good example of how this can be accomplished: CreateKeys and Batching.
Upvotes: 0