Thibault Coudert
Thibault Coudert

Reputation: 21

Java - Apache Beam - Control number of connections when writing in MongoDB

I'm currently working on a streaming pipeline in Apache Beam (v2.43) to insert data in mongoDB. It runs on dataflow quite fine, but I'm not able to control the number of connections : in case of input peak (PubSub), dataflow scales up and overwhelms Mongo with thousands of connections. Even with a limited number of workers (15), I reach more than 20K connections on MongoDB side.

I use a custom doFn (input data is already formatted as KV<Document, Document> : first document is the search query and the second document is the update operation) to upsert records in BulkMode. To improve performances, I accumulate records until reaching a number of elements and then flush the buffer.

In this class, I create a pool of connections during @setup and pick connections from this pool in the process @elementElement step. I put a max size of 10 elements in the ConnectionPoolSettings, so I was expecting around 150 connections at most (15 workers * 10 connection/worker).

Here is a code snippet.

static class BatchUpsertFn extends DoFn<KV<Document, Document>, CustomBulkWriteError> {

  private final ConcurrentHashMap<String, MongoClient> workerHashmap;
  private final String instanceId;
  private List<WriteModel<Document>> batch;

  BatchUpsertFn() {
    this.workerHashmap = new ConcurrentHashMap<>();
    this.instanceId = UUID.randomUUID().toString();
  }

  @Setup
  public void createMongoClient() {
    workerHashmap.putIfAbsent(instanceId, MongoClients
        .create(MongoClientSettings.builder()
            .applyConnectionString(new ConnectionString("myUri"))
            .applyToConnectionPoolSettings(builder -> builder
                .maxConnectionIdleTime(60, SECONDS)
                .maxConnectionLifeTime(60, SECONDS)
                .maxSize(10)
                .minSize(1)
            )
            .applyToSocketSettings(builder -> builder
                .connectTimeout(10, SECONDS)
                .readTimeout(20, SECONDS))
            .build()));
  }

  @StartBundle
  public void startBundle() {
    batch = new ArrayList<>();
  }

  @ProcessElement
  public void processElement(ProcessContext ctx) {

    batch.add(
          new UpdateManyModel<>(
              ctx.element().getKey(),
              ctx.element().getValue(),
              new UpdateOptions().upsert(true)));

    // If batch limit is exceeded
    if (batch.size() >= 1024L) {
      try {
        flush();
      } catch (MongoBulkWriteException pException) {
        pException.getWriteErrors()
            .forEach(pBulkWriteError ->
                ctx.output(new CustomBulkWriteError(pBulkWriteError)));
      }
    }
  }

  @FinishBundle
  public void finishBundle(FinishBundleContext ctx) {
    try {
      flush();
    } catch (MongoBulkWriteException pException) {
      pException.getWriteErrors()
          .forEach(pBulkWriteError ->
              ctx.output(new CustomBulkWriteError(pBulkWriteError),
                  Instant.now(),
                  GlobalWindow.INSTANCE));
    }
  }

  private void flush() throws MongoBulkWriteException {
    if (batch.isEmpty()) {
      return;
    }
    MongoDatabase mongoDatabase = workerHashmap.get(instanceId).getDatabase("myDatabase");
    MongoCollection<Document> mongoCollection = mongoDatabase.getCollection("myCollection");
    try {
      mongoCollection.bulkWrite(batch, new BulkWriteOptions().ordered(false));
      batch.clear();
    } catch (MongoBulkWriteException pException) {
      batch.clear();
      throw pException;
    }
  }

  @Teardown
  public void closeMongoClient() {
    workerHashmap.get(instanceId).close();
  }
}

Any idea is more than welcome

Upvotes: 0

Views: 171

Answers (1)

Bruno Volpato
Bruno Volpato

Reputation: 1428

You don't have a lot of control over how many threads Beam / Dataflow will create.

The correct way of limiting parallelism is probably through the usage of a specific set of keys, as any operations for the same key, when followed by keyed operations (e.g., if you use state), will be guaranteed to be processed by the same worker.

This template Pub/Sub to Splunk has a good example of how this can be accomplished: CreateKeys and Batching.

Upvotes: 0

Related Questions