George C
George C

Reputation: 1223

How to improve cassandra 3.0 read performance and throughput using async queries?

I have a table:

CREATE TABLE my_table (
    user_id text,
    ad_id text,
    date timestamp,
    PRIMARY KEY (user_id, ad_id)
);

The lengths of the user_id and ad_id that I use are not longer than 15 characters.

I query the table like this:

Set<String> users = ... filled somewhere 
Session session = ... builded somewhere
BoundStatement boundQuery = ... builded somewhere
(using query: "SELECT * FROM my_table WHERE user_id=?")

List<Row> rowAds = 
      users.stream()
          .map(user -> session.executeAsync(boundQuery.bind(user)))
          .map(ResultSetFuture::getUninterruptibly)
          .map(ResultSet::all)
          .flatMap(List::stream)
          .collect(toList());

The Set of users has aproximately 3000 elements , and each users has aproximately 300 ads.

This code is excecuted in 50 threads in the same machine, (with differents users), (using the same Session object)

The algorithm takes between 2 and 3 seconds to complete

The Cassandra cluster has 3 nodes, with a replication factor of 2. Each node has 6 cores and 12 GB of ram.

The Cassandra nodes are in 60% of their CPU capacity, 33% of ram, 66% of ram (including page cache)
The querying machine is 50% of it's cpu capacity, 50% of ram

How do I improve the read time to less than 1 second?

Thanks!

UPDATE:

After some answers(thank you very much), I realized that I wasn' t doing the queries in parallel, so I changed the code to:

List<Row> rowAds = 
     users.stream()
       .map(user ->  session.executeAsync(boundQuery.bind(user)))
       .collect(toList())
       .stream()
       .map(ResultSetFuture::getUninterruptibly)
       .map(ResultSet::all)
       .flatMap(List::stream)
       .collect(toList());

So now the queries are being done in parrallel, this gave me times of aprox 300 milliseconds, so great improvement there!.
But my question continues, can it be faster? Again, thanks!

Upvotes: 1

Views: 1237

Answers (1)

doanduyhai
doanduyhai

Reputation: 8812

users.stream()
          .map(user -> session.executeAsync(boundQuery.bind(user)))
          .map(ResultSetFuture::getUninterruptibly)
          .map(ResultSet::all)
          .flatMap(List::stream)
          .collect(toList());

A remark. On the 2nd map() you're calling ResultSetFuture::getUninterruptibly. It's a blocking call so you don't benefit much from asynchronous exec ...

Instead, try to transform a list of Futures returned by the driver (hint: ResultSetFuture is implementing the ListenableFuture interface of Guava) into a Future of List

See: http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/util/concurrent/Futures.html#successfulAsList(java.lang.Iterable)

Upvotes: 1

Related Questions