Egidijus K
Egidijus K

Reputation: 57

Cassandra parallel insert performance c#

When testing Cassandra C# insert performance on single node i7 8cores got only 100 inserts/sec. Are there any code improvements possible using Datastax Cassandra driver? Tried async and sync Session.Execute, but performance is very poor.

[TestMethod]
public void TestMethod2()
{
    //   CREATE TABLE table1(
    //   col1 text,
    //   col2 timestamp,
    //   col3 int,
    //   col4 text,
    //   PRIMARY KEY(col1, col2, col3)
    //   );

    PoolingOptions poolingOptions = new PoolingOptions();
    poolingOptions
        .SetCoreConnectionsPerHost(HostDistance.Local, 1280)
        .SetMaxConnectionsPerHost(HostDistance.Local, 1280)
        .SetCoreConnectionsPerHost(HostDistance.Remote, 1280)
        .SetMaxConnectionsPerHost(HostDistance.Remote, 1280);

    poolingOptions
        .SetMaxSimultaneousRequestsPerConnectionTreshold(HostDistance.Local, 32768)
        .SetMinSimultaneousRequestsPerConnectionTreshold(HostDistance.Remote, 2000);

    var cluster = Cluster.Builder()
        .AddContactPoints("localhost")
        .WithPoolingOptions(poolingOptions)
        .WithQueryOptions(new QueryOptions().SetConsistencyLevel(ConsistencyLevel.One))
        .WithLoadBalancingPolicy(new RoundRobinPolicy())
        .Build();

    var options = new ParallelOptions();
    options.MaxDegreeOfParallelism = 50;

    using (var session = cluster.Connect("keyspace1"))
    {
        var ps = session.Prepare("INSERT INTO table1(col1,col2,col3,col4) VALUES (?,?,?,?)");

        var r = Parallel.For(0, 1000, options, (x) =>
        {
            {
                var statement = ps.Bind("123456", DateTime.UtcNow, x, "1234 some log message goes here. Hello world. 123334556567586978089-==00");
                var t = session.ExecuteAsync(statement);
                t.Wait();
            }
        });
    }
}

Upvotes: 2

Views: 1018

Answers (1)

jorgebg
jorgebg

Reputation: 6600

The amount of connections (> 1K) is way too large, with modern Cassandra instances (2.1+) a single connection would be enough.

The fastest way to do asynchronous operations would be to launch n operations and each time a task is completed launch a new one.

Using a SemaphoreSlim would do the trick to limit the amount of concurrent operations or you could manually do it using continuations.

If you don't want to write that code, you can use a small utility package I've created ConcurrentUtils:

// Execute 1,000,000 times
// limiting the maximum amount of parallel async operations to 512
await ConcurrentUtils.Times(1000000, 512, _ => 
  session.ExecuteAsync(ps.Bind(parameters)));

Upvotes: 1

Related Questions