theShadow89
theShadow89

Reputation: 1549

NoNodeAvailableException after some insert request to cassandra

I am trying to insert data into Cassandra local cluster using async execution and version 4 of the driver (as same as my Cassandra instance)

I have instantiated the cql session in this way:

CqlSession cqlSession = CqlSession.builder()
  .addContactEndPoint(new DefaultEndPoint(
    InetSocketAddress.createUnresolved("localhost",9042))).build();

Then I create a statement in an async way:

return session.prepareAsync(
       "insert into table (p1,p2,p3, p4) values (?, ?,?, ?)")
          .thenComposeAsync(
              (ps) -> {
                 CompletableFuture<AsyncResultSet>[] result = data.stream().map(
                     (d) -> session.executeAsync(
                          ps.bind(d.p1,d.p2,d.p3,d.p4)
                       )
                  ).toCompletableFuture()
              ).toArray(CompletableFuture[]::new);
          return CompletableFuture.allOf(result);
      }
);

data is a dynamic list filled with user data.

When I exec the code I get the following exception:

Caused by: com.datastax.oss.driver.api.core.NoNodeAvailableException: No node was available to execute the query

    at com.datastax.oss.driver.api.core.AllNodesFailedException.fromErrors(AllNodesFailedException.java:53)
    at com.datastax.oss.driver.internal.core.cql.CqlPrepareHandler.sendRequest(CqlPrepareHandler.java:210)
    at com.datastax.oss.driver.internal.core.cql.CqlPrepareHandler.onThrottleReady(CqlPrepareHandler.java:167)
    at com.datastax.oss.driver.internal.core.session.throttling.PassThroughRequestThrottler.register(PassThroughRequestThrottler.java:52)
    at com.datastax.oss.driver.internal.core.cql.CqlPrepareHandler.<init>(CqlPrepareHandler.java:153)
    at com.datastax.oss.driver.internal.core.cql.CqlPrepareAsyncProcessor.process(CqlPrepareAsyncProcessor.java:66)
    at com.datastax.oss.driver.internal.core.cql.CqlPrepareAsyncProcessor.process(CqlPrepareAsyncProcessor.java:33)
    at com.datastax.oss.driver.internal.core.session.DefaultSession.execute(DefaultSession.java:210)
    at com.datastax.oss.driver.api.core.cql.AsyncCqlSession.prepareAsync(AsyncCqlSession.java:90)

The node is active and some data are inserted before the exception rise. I have also tried to set up a data center name on the session builder without any result.

Why this exception rise if the node is up and running? Actually I have only one local node and that could be a problem?

Upvotes: 1

Views: 3316

Answers (2)

theShadow89
theShadow89

Reputation: 1549

Finally, I have found a solution using BatchStatement and a little custom code to create a chucked list.

    int chunks = 0;
    if (data.size() % 100 == 0) {
      chunks = data.size() / 100;
    } else {
      chunks = (data.size() / 100) + 1;
    }

    final int finalChunks = chunks;

    return session.prepareAsync(
           "insert into table (p1,p2,p3, p4) values (?, ?,?, ?)")
            .thenComposeAsync(
                    (ps) -> {


                      AtomicInteger counter = new AtomicInteger();

                      final List<CompletionStage<AsyncResultSet>> batchInsert = data.stream()
                              .map(
                                      (d) -> ps.bind(d.p1,d.p2,d.p3,d.p4)

                              )
                              .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / finalChunks))
                              .values().stream().map(
                                      boundedStatements -> BatchStatement.newInstance(BatchType.LOGGED, boundedStatements.toArray(new BatchableStatement[0]))
                              ).map(
                                      session::executeAsync
                              ).collect(Collectors.toList());

                      return CompletableFutures.allSuccessful(batchInsert);
                    }
            );

Upvotes: 1

Aaron
Aaron

Reputation: 57843

The biggest thing that I don't see, is a way to limit the current number of active async threads.

Basically, if that (mapped) data stream gets hit hard enough, it'll basically create all of these new threads that it's awaiting. If the number of writes coming in from those threads creates enough back-pressure that node can't keep up or catch up to, the node will become overwhelmed and not accept requests.

Take a look at this post by Ryan Svihla of DataStax:

Cassandra: Batch Loading Without the Batch — The Nuanced Edition

Its code is from the 3.x version of the driver, but the concepts are the same. Basically, provide some way to throttle-down the writes, or limit the number of "in flight threads" running at any given time, and that should help greatly.

Upvotes: 1

Related Questions