Minalcar
Minalcar

Reputation: 53

Cassandra cluster is not scaling. 3 Nodes are even a little faster then 6 nodes (Code and data provided)

I am using Datastax Enterprise 4.8 for testing purposes in my bachelor thesis. I am loading wheather data into the cluster (about 33 Mio rows). The data looks something like the following

//id;unix timestamp; validity; station info; temp in °C; humidity in %
3;1950040101;5;24; 5.7000;83.0000
3;1950040102;5;24; 5.6000;83.0000
3;1950040103;5;24; 5.5000;83.0000

I know my data model is not very clean (I use decimal for the timestamp but I just wanted to try it this way).

CREATE TABLE temp{
    id int,
    timestamp decimal,
    validity decimal,
    structure decimal,
    temperature float,
    humidity float,
    PRIMARY KEY((id),timestamp));

I roughly based it on an article on the datastax website: https://academy.datastax.com/resources/getting-started-time-series-data-modeling The insertion is done based on the often mentioned article on lostechies

https://lostechies.com/ryansvihla/2016/04/29/cassandra-batch-loading-without-the-batch%E2%80%8A-%E2%80%8Athe-nuanced-edition/

This is my insertion code:

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.extras.codecs.jdk8.InstantCodec;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;

public class BulkLoader {

    private final int threads;
    private final String[] contactHosts;
    private final Cluster cluster;
    private final Session session;
    private final ExecutorService executor;

    public BulkLoader(int threads, String... contactHosts) {
        this.threads = threads;
        this.contactHosts = contactHosts;
        this.cluster = Cluster.builder().addContactPoints(contactHosts).build();

        cluster.getConfiguration().getCodecRegistry()
                .register(InstantCodec.instance);
        session = cluster.newSession();
        // fixed thread pool that closes on app exit
        executor = MoreExecutors
                .getExitingExecutorService((ThreadPoolExecutor) Executors
                        .newFixedThreadPool(threads));
    }

    public static class IngestCallback implements FutureCallback<ResultSet> {

        public void onSuccess(ResultSet result) {
        }

        public void onFailure(Throwable t) {
            throw new RuntimeException(t);
        }
    }

    public void ingest(Iterator<Object[]> boundItemsIterator, String insertCQL)
            throws InterruptedException {
        final PreparedStatement statement = session.prepare(insertCQL);
        while (boundItemsIterator.hasNext()) {
            BoundStatement boundStatement = statement.bind(boundItemsIterator
                    .next());
            boundStatement.setConsistencyLevel(ConsistencyLevel.QUORUM);
            ResultSetFuture future = session.executeAsync(boundStatement);
            Futures.addCallback(future, new IngestCallback(), executor);
        }
    }

    public void stop() {
        session.close();
        cluster.close();
    }


    public static List<Object[]> readCSV(File csv) {
        BufferedReader fileReader = null;
        List<Object[]> result = new LinkedList<Object[]>();
        try {
            fileReader = new BufferedReader(new FileReader(csv));
            String line = "";
            while ((line = fileReader.readLine()) != null) {
                String[] tokens = line.split(";");
                if (tokens.length < 6) {
                    System.out.println("Unvollständig");
                    continue;
                }
                Object[] tmp = new Object[6];
                tmp[0] = (int) Integer.parseInt(tokens[0]);
                tmp[1] = new BigDecimal(Integer.parseInt(tokens[1]));
                tmp[2] = new BigDecimal(Integer.parseInt(tokens[2]));
                tmp[3] = new BigDecimal(Integer.parseInt(tokens[2]));
                tmp[4] = (float) Float.parseFloat(tokens[4]);
                tmp[5] = (float) Float.parseFloat(tokens[5]);
                result.add(tmp);
            }
        } catch (FileNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            try {
                fileReader.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        return result;
    }

    public static void main(String[] args) {
        Stopwatch watch = Stopwatch.createStarted();
        File folder = new File(
                "C:/VirtualMachines/Kiosk/BachelorarbeitStraubinger/workspace/bulk/src/main/resources");
        List<Object[]> data = new LinkedList<Object[]>();
        BulkLoader loader = new BulkLoader(16, "10.2.57.38", "10.2.57.37",
                "10.2.57.36", "10.2.57.35", "10.2.57.34", "10.2.57.33");
        int cnt = 0;
        File[] listOfFiles = folder.listFiles();
        for (File file : listOfFiles) {
            if (file.isFile() && file.getName().contains(".th")) {
                data = readCSV(file);
                cnt += data.size();
                try {

                    loader.ingest(
                            data.iterator(),
                            "INSERT INTO wheather.temp (id, timestamp, validity, structure, temperature, humidity) VALUES(?,?,?,?,?,?)");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(file.getName()
                            + " -> Datasets importet: " + cnt);
                }
            }
        }
        System.out.println("total time seconds = "
                + watch.elapsed(TimeUnit.SECONDS));
        watch.stop();
        loader.stop();
    }
}

The replication factor is 3 and i run test on 6 or 3 nodes. With vNodes enabled and num_tokens = 256. I get roughly the same insert times when running it on either cluster. Any ideas why that is?

Upvotes: 1

Views: 155

Answers (2)

Jeff Jirsa
Jeff Jirsa

Reputation: 4426

It is likely that you're maxing out the client application / client server. If you're reading from a static file, you may benefit from breaking it up into a few pieces and running them in parallel, or even looking at Brian Hess' loader ( https://github.com/brianmhess/cassandra-loader ) or the real cassandra bulk loader ( http://www.datastax.com/dev/blog/using-the-cassandra-bulk-loader-updated ) , which turns the data into a series of sstables and streams those in directly. Both are likely faster than your existing code.

Upvotes: 2

MarcintheCloud
MarcintheCloud

Reputation: 1653

Physics.

You're probably maxing out the throughput your app is capable of. Normally the answer would be to have multiple clients/app servers but it looks like you are reading from a CSV. I suggest either cutting up the CSV in pieces and running multiple instances of your app or generate fake data and multiple instances of that.

Edit: I also think it's worth noting that with a data model like that, a payload size that small, and proper hardware, I'd imagine each node could be capable of 15-20K inserts/second (Not accounting for node density/compaction).

Upvotes: 1

Related Questions