user12211419
user12211419

Reputation:

How to stream data with golang query to Cassandra

I have the following code:

cluster := gocql.NewCluster("our-cass")
cass, err := cluster.CreateSession()
defer cass.Close()
iter := cass.Query(`SELECT * FROM cmuser.users LIMIT 9999999999;`).Iter()
c :=iter.Columns()
scanArgs := make([]interface{}, len(c))

for i:=0; i < len(scanArgs); i++ {
    scanArgs[i] = makeType(c[i])
}

for iter.Scan(scanArgs...) { ... }

The problem is that we have way too many rows in that table. But I need to read all of them, to migrate the data to another db. Is there a way to stream the data from Cassandra? Unfortunately, we don't have a sequence for the primary key of the table, we are using a uuid for the PK. So that means we can't do a simple technique of 2 for loops, one incrementing a counter and going through all the rows that way.

Upvotes: 1

Views: 1486

Answers (2)

phonaputer
phonaputer

Reputation: 1530

Gocql has some options for paging (assuming your Cassandra version is at least version 2).

Gocql's Session has a method SetPageSize

And Gocql's Query has a similar method, PageSize

This may help you break up your query. Here's what the code would look like:

cluster := gocql.NewCluster("our-cass")
cass, err := cluster.CreateSession()
defer cass.Close()

iter := cass.Query(`SELECT * FROM cmuser.users;`).PageSize(5000).Iter()

// use the iter as usual to iterate over all results 
// this will send additional CQL queries when it needs to get new pages

Upvotes: 0

Aaron
Aaron

Reputation: 57798

For a pure Cassandra way to do this, you could run a range query on the token ranges as they are broken-up per-node.

First, find the token ranges.:

$ nodetool ring

Datacenter: dc1
==========
Address   Rack       Status State   Load         Owns                Token
                                                                      8961648479018332584
10.1.4.3  rack3      Up     Normal  1.34 GB      100.00%             -9023369133424793632
10.1.4.1  rack2      Up     Normal  1.56 GB      100.00%             -7946127339777435347
10.1.4.3  rack3      Up     Normal  1.34 GB      100.00%             -7847456805881540087
...

etc...(this might be large, depending on the number of nodes and tokens on each node)

Then adjust your query to use the token() function on your partition key. As I don't know what your PRIMARY KEY definition is, I'm going to guess and use users_id as the partition key:

SELECT * FROM cmuser.users
WHERE token(users_id) > 8961648479018332584
  AND token(users_id) <= -9023369133424793632;

Once that completes, move to the next token range:

SELECT * FROM cmuser.users
WHERE token(users_id) > -9023369133424793632
  AND token(users_id) <= -7946127339777435347;

Breaking your query down like this will help to ensure that it's only reading from a single node at a time. That should allow the query to read the data sequentially from the cluster (and from the disk), without worry of timeouts.

Upvotes: 2

Related Questions