Reputation: 463
(Sorry this is TL;DR; but I'm desperate and want to be thorough!)
We are moving a service from AWS to GCP and switching from DynamoDB to Cloud Spanner as the back-end data store.
The data store (spanner) contains data that users of the web service query for. In production loads, the data being queried is found between a 1% and 10% of the time. I have a simple multi-threaded Java test client that queries our service, continually adding new threads as long as the average throughput over the last 1 minute is increasing.
My test client is running on a GCE VM (64 CPU) and when using DynamoDB data source, I can get up to 3700 threads, pushing through 50k req/s on average once our service auto-scales up to the configured pod max node count. Each thread reads 100 hashes from Dynamo for every 1000 requests (10% hit rate).
I now need to switch my Java client to query spanner for data used in 10% of the requests. My query generally looks like:
SELECT A, B, C FROM data_table LIMIT 250 OFFSET XXX
Theoretically, I want each thread to SELECT blocks of unique rows. I use the OFFSET to start each thread reading from a unique position and once each block of records has been used up, I increment the OFFSET to startingOffset + totalRows and SELECT another block of data.
I realize this query may not translate to every implementation, but the concept should hold true that every thread can query spanner for a unique dataset over the life of the thread.
I tried using the java-spanner-jdbc with both a c3p0 connection pool and just going through the standard DriverManager.getConnection() route. I played with the min/max Session configuration as well as numChannels, but nothing seemed to help me get this to scale. TBH, I still don't understand the correlation between the sessions and channels.
I also tried the native SpannerDB client with singleUseReadOnlyTransaction(), batchReadOnlyTransaction() and most recently txn.partitionQuery().
Since the partitionQuery() feels a lot like the DynamoDB code, this feels like the right direction, but because my query (based off the "Read data in parallel" example at https://cloud.google.com/spanner/docs/reads) has a LIMIT clause, I'm getting the error:
com.google.cloud.spanner.SpannerException: INVALID_ARGUMENT: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Query is not root partitionable since it does not have a DistributedUnion at the root. Please run EXPLAIN for query plan details.
Removing the LIMIT clause gets past this, but then the queries take an eternity!
So the question is, if the partitionQuery() route is correct, how do I do parallel queries with 'paging' limits? If this is not the best route, what should I use to get the best parallel read throughput with unique data sets for each thread?
[EDIT] Based on the comment below by Knut Olav Loite, partitioned or batch queries is not the right approach so I am back to a single use read-only query.
Here is my code for creating spannerDbClient:
RetrySettings retrySettings = RetrySettings.newBuilder()
.setInitialRpcTimeout(Duration.ofSeconds(SPANNER_INITIAL_TIMEOUT_RETRY_SECONDS))
.setMaxRpcTimeout(Duration.ofSeconds(SPANNER_MAX_TIMEOUT_RETRY_SECONDS))
.setMaxAttempts(SPANNER_MAX_RETRY_ATTEMPTS)
.setTotalTimeout(Duration.ofSeconds(SPANNER_TOTAL_TIMEOUT_RETRY_SECONDS))
.build();
SpannerOptions.Builder builder = SpannerOptions.newBuilder()
.setSessionPoolOption(SessionPoolOptions.newBuilder()
.setFailIfPoolExhausted()
.setMinSessions(SPANNER_MIN_SESSIONS)
.setMaxSessions(SPANNER_MAX_SESSIONS)
.build()
)
.setNumChannels(SPANNER_NUM_CHANNELS);
if (credentials != null) {
builder.setCredentials(credentials);
}
builder.getSpannerStubSettingsBuilder()
.executeSqlSettings()
.setRetryableCodes(StatusCode.Code.DEADLINE_EXCEEDED, StatusCode.Code.UNAVAILABLE)
.setRetrySettings(retrySettings);
spanner = builder.build().getService();
databaseId = DatabaseId.of(
projectName,
instanceName,
databaseName
);
spannerDbClient = spanner.getDatabaseClient(databaseId);
Here is my method for performing the actual query:
List<Entry> entry = new ArrayList<>();
try (ResultSet resultSet = spannerDbClient
.singleUseReadOnlyTransaction(TimestampBound.ofMaxStaleness(5, TimeUnit.SECONDS))
.executeQuery(Statement.newBuilder(String.format("SELECT * from %s LIMIT %d OFFSET %d", tableName, limit, offset)).build())) {
while (resultSet.next()) {
entry.add(getEntryFromResultSet(resultSet));
}
}
I added timer code show how long the queries and this is what it looks like for 50 threads. This is using a shared spannerDbClient instance with maxSession=50,minSession=50,numChannels=4 (default):
--> [0h:00m:00s] Throughput: Total 0, Interval 0 (0 req/s), 0/0 threads reporting
[tId:099][00:00:00.335] Spanner query, LIMIT 250 OFFSET 99000
[tId:146][00:00:00.382] Spanner query, LIMIT 250 OFFSET 146000
[tId:140][00:00:00.445] Spanner query, LIMIT 250 OFFSET 140000
[tId:104][00:00:00.494] Spanner query, LIMIT 250 OFFSET 104000
[tId:152][00:00:00.363] Spanner query, LIMIT 250 OFFSET 152000
[tId:149][00:00:00.643] Spanner query, LIMIT 250 OFFSET 149000
[tId:143][00:00:00.748] Spanner query, LIMIT 250 OFFSET 143000
[tId:163][00:00:00.682] Spanner query, LIMIT 250 OFFSET 163000
[tId:155][00:00:00.799] Spanner query, LIMIT 250 OFFSET 155000
[tId:166][00:00:00.872] Spanner query, LIMIT 250 OFFSET 166000
[tId:250][00:00:00.870] Spanner query, LIMIT 250 OFFSET 250000
[tId:267][00:00:01.319] Spanner query, LIMIT 250 OFFSET 267000
[tId:229][00:00:01.917] Spanner query, LIMIT 250 OFFSET 229000
[tId:234][00:00:02.256] Spanner query, LIMIT 250 OFFSET 234000
[tId:316][00:00:02.401] Spanner query, LIMIT 250 OFFSET 316000
[tId:246][00:00:02.844] Spanner query, LIMIT 250 OFFSET 246000
[tId:312][00:00:02.989] Spanner query, LIMIT 250 OFFSET 312000
[tId:176][00:00:03.497] Spanner query, LIMIT 250 OFFSET 176000
[tId:330][00:00:03.140] Spanner query, LIMIT 250 OFFSET 330000
[tId:254][00:00:03.879] Spanner query, LIMIT 250 OFFSET 254000
[tId:361][00:00:03.816] Spanner query, LIMIT 250 OFFSET 361000
[tId:418][00:00:03.635] Spanner query, LIMIT 250 OFFSET 418000
[tId:243][00:00:04.503] Spanner query, LIMIT 250 OFFSET 243000
[tId:414][00:00:04.006] Spanner query, LIMIT 250 OFFSET 414000
[tId:324][00:00:04.457] Spanner query, LIMIT 250 OFFSET 324000
[tId:498][00:00:03.865] Spanner query, LIMIT 250 OFFSET 498000
[tId:252][00:00:04.945] Spanner query, LIMIT 250 OFFSET 252000
[tId:494][00:00:04.211] Spanner query, LIMIT 250 OFFSET 494000
[tId:444][00:00:04.780] Spanner query, LIMIT 250 OFFSET 444000
[tId:422][00:00:04.951] Spanner query, LIMIT 250 OFFSET 422000
[tId:397][00:00:05.234] Spanner query, LIMIT 250 OFFSET 397000
[tId:420][00:00:05.106] Spanner query, LIMIT 250 OFFSET 420000
[tId:236][00:00:05.985] Spanner query, LIMIT 250 OFFSET 236000
[tId:406][00:00:05.429] Spanner query, LIMIT 250 OFFSET 406000
[tId:449][00:00:05.291] Spanner query, LIMIT 250 OFFSET 449000
[tId:437][00:00:05.929] Spanner query, LIMIT 250 OFFSET 437000
[tId:341][00:00:06.611] Spanner query, LIMIT 250 OFFSET 341000
[tId:475][00:00:06.223] Spanner query, LIMIT 250 OFFSET 475000
[tId:490][00:00:06.186] Spanner query, LIMIT 250 OFFSET 490000
[tId:416][00:00:06.460] Spanner query, LIMIT 250 OFFSET 416000
[tId:328][00:00:07.446] Spanner query, LIMIT 250 OFFSET 328000
[tId:322][00:00:07.679] Spanner query, LIMIT 250 OFFSET 322000
[tId:158][00:00:09.357] Spanner query, LIMIT 250 OFFSET 158000
[tId:496][00:00:08.183] Spanner query, LIMIT 250 OFFSET 496000
[tId:256][00:00:09.250] Spanner query, LIMIT 250 OFFSET 256000
--> [0h:00m:10s] Throughput: Total 9848, Interval +9848 (984 req/s), 44/50 threads reporting
[tId:492][00:00:08.646] Spanner query, LIMIT 250 OFFSET 492000
[tId:390][00:00:09.810] Spanner query, LIMIT 250 OFFSET 390000
[tId:366][00:00:10.142] Spanner query, LIMIT 250 OFFSET 366000
[tId:320][00:00:10.451] Spanner query, LIMIT 250 OFFSET 320000
[tId:318][00:00:10.619] Spanner query, LIMIT 250 OFFSET 318000
--> [0h:00m:20s] Throughput: Total 56051, Interval +46203 (4620 req/s), 50/50 threads reporting
--> [0h:00m:30s] Throughput: Total 102172, Interval +46121 (4612 req/s), 50/50 threads reporting
Note that the query times only increase regardless of the offset and it takes between 10 and 20 seconds for the initial spanner query to return data for all 50 threads before they start reporting results. If I increase the limit to 1000, it takes almost 2 minutes for all 50 threads to get their results back from Spanner.
Compare that to the DynamoDb equivalent (except the limit is 1000) where all queries return in less than 1 second and all 50 threads are reporting results before the 10 second status update is displayed:
--> [0h:00m:00s] Throughput: Total 0, Interval 0 (0 req/s), 0/0 threads reporting
[tId:045] Dynamo query, LIMIT 1000 [00:00:00.851]
[tId:138] Dynamo query, LIMIT 1000 [00:00:00.463]
[tId:183] Dynamo query, LIMIT 1000 [00:00:00.121]
[tId:122] Dynamo query, LIMIT 1000 [00:00:00.576]
[tId:095] Dynamo query, LIMIT 1000 [00:00:00.708]
[tId:072] Dynamo query, LIMIT 1000 [00:00:00.778]
[tId:115] Dynamo query, LIMIT 1000 [00:00:00.619]
[tId:166] Dynamo query, LIMIT 1000 [00:00:00.296]
[tId:058] Dynamo query, LIMIT 1000 [00:00:00.814]
[tId:179] Dynamo query, LIMIT 1000 [00:00:00.242]
[tId:081] Dynamo query, LIMIT 1000 [00:00:00.745]
[tId:106] Dynamo query, LIMIT 1000 [00:00:00.671]
[tId:162] Dynamo query, LIMIT 1000 [00:00:00.348]
[tId:035] Dynamo query, LIMIT 1000 [00:00:00.889]
[tId:134] Dynamo query, LIMIT 1000 [00:00:00.513]
[tId:187] Dynamo query, LIMIT 1000 [00:00:00.090]
[tId:158] Dynamo query, LIMIT 1000 [00:00:00.405]
[tId:191] Dynamo query, LIMIT 1000 [00:00:00.095]
[tId:195] Dynamo query, LIMIT 1000 [00:00:00.096]
[tId:199] Dynamo query, LIMIT 1000 [00:00:00.144]
[tId:203] Dynamo query, LIMIT 1000 [00:00:00.112]
[tId:291] Dynamo query, LIMIT 1000 [00:00:00.102]
[tId:303] Dynamo query, LIMIT 1000 [00:00:00.094]
[tId:312] Dynamo query, LIMIT 1000 [00:00:00.101]
[tId:318] Dynamo query, LIMIT 1000 [00:00:00.075]
[tId:322] Dynamo query, LIMIT 1000 [00:00:00.086]
[tId:326] Dynamo query, LIMIT 1000 [00:00:00.096]
[tId:330] Dynamo query, LIMIT 1000 [00:00:00.085]
[tId:334] Dynamo query, LIMIT 1000 [00:00:00.114]
[tId:342] Dynamo query, LIMIT 1000 [00:00:00.096]
[tId:391] Dynamo query, LIMIT 1000 [00:00:00.081]
[tId:395] Dynamo query, LIMIT 1000 [00:00:00.088]
[tId:406] Dynamo query, LIMIT 1000 [00:00:00.088]
[tId:415] Dynamo query, LIMIT 1000 [00:00:00.078]
[tId:421] Dynamo query, LIMIT 1000 [00:00:00.089]
[tId:425] Dynamo query, LIMIT 1000 [00:00:00.068]
[tId:429] Dynamo query, LIMIT 1000 [00:00:00.088]
[tId:433] Dynamo query, LIMIT 1000 [00:00:00.105]
[tId:437] Dynamo query, LIMIT 1000 [00:00:00.092]
[tId:461] Dynamo query, LIMIT 1000 [00:00:00.110]
[tId:483] Dynamo query, LIMIT 1000 [00:00:00.071]
[tId:491] Dynamo query, LIMIT 1000 [00:00:00.078]
[tId:495] Dynamo query, LIMIT 1000 [00:00:00.075]
[tId:503] Dynamo query, LIMIT 1000 [00:00:00.064]
[tId:499] Dynamo query, LIMIT 1000 [00:00:00.108]
[tId:514] Dynamo query, LIMIT 1000 [00:00:00.163]
[tId:518] Dynamo query, LIMIT 1000 [00:00:00.135]
[tId:529] Dynamo query, LIMIT 1000 [00:00:00.163]
[tId:533] Dynamo query, LIMIT 1000 [00:00:00.079]
[tId:541] Dynamo query, LIMIT 1000 [00:00:00.060]
--> [0h:00m:10s] Throughput: Total 24316, Interval +24316 (2431 req/s), 50/50 threads reporting
--> [0h:00m:20s] Throughput: Total 64416, Interval +40100 (4010 req/s), 50/50 threads reporting
Am I missing something in the config? If I let it autoscale the performance issue is greatly magnified.
Upvotes: 0
Views: 3003
Reputation: 3512
EDIT based on the additional information:
As Panagiotis Voulgaris pointed out below, I don't think the problem in this case is related to the client configuration, but to the query itself. The query seems to be quite slow, especially for higher OFFSET
values. I tried it out with a table with approx 1,000,000 rows, and for an OFFSET
value of 900,000 a single query runs for 4-5 seconds. The reason that the problem is getting worse when you scale up, is probably that you are overwhelming the backend with a lot of parallel queries that take a long time, and not because the client is wrongly configured.
The best would be if you could re-write your query to select a range of rows based on the primary key value instead of using a LIMIT x OFFSET y
construct. So your query would then look something like this:
SELECT A, B, C
FROM data_table
WHERE A >= x AND A < (x+250)
This obviously won't guarantee that you get exactly 250 rows in each partition if your key column contains gaps between the values. You could in that case also increase the +250
value a little to get reasonable partitions.
If the above is not possible because the key values are completely random values (or are not evenly distributed), then I think the following query would be more efficient than your current query:
SELECT A, B, C
FROM data_table
WHERE A >= (
SELECT ANY_VALUE(A)
FROM data_table
GROUP BY A
LIMIT 1 OFFSET y
)
ORDER BY A
LIMIT 250
It's not really clear to me exactly what your end goal is in this case, and that makes a difference when it comes to the concrete question:
...if the partitionQuery() route is correct (?)
The BatchReadOnlyTransaction
and partitionQuery()
route is intended for reading a large dataset at a single point in time. This could for example be when you want to create a dump of all the data in a table. Spanner will partition the query for you and return a list of partitions. Each partition can then be handled by separate threads (or even separate VMs). This so to speak automatically replaces the LIMIT 250 OFFSET xxxx
part of your query, as Spanner creates the different partitions based on the actual data in the table.
However, if your end goal here is to simulate production load, then BatchReadOnlyTransaction
is not the route to follow.
If what you want to do is to efficiently query a data set then you should make sure that you use a single-use read-only transaction for the query. This is what you are already doing with the native client. Also, the JDBC driver will also automatically use single-use read-only transactions for queries as long as the connection is in autocommit mode. If you turn off autocommit, the driver will automatically start a transaction when you execute a query.
Regarding sessions and channels:
Regarding the (example) query:
As mentioned above, it's not really clear to me whether this is just a test setup, or an actual production example. I would however expect the query to contain an explicit ORDER BY
clause to ensure that the data is returned in the expected order, and that ORDER BY
clause should obviously use an indexed column.
Finally: Is the problem caused by the backend responding to slowly on each query? Or is the backend basically idling, and is the client not able to really ramp up the the queries?
Upvotes: 1
Reputation: 31
I suspect that in order to produce accurate results for
SELECT A, B, C FROM data_table LIMIT 250 OFFSET XXX
The backend would need to fetch 250 + XXX rows and then skip XXX of them. So, if XXX is very large, this can be a very expensive query and require scanning a big chunk of data_table
.
Would it make sense to instead restrict the table key(s)? something like:
SELECT A, B, C FROM data_table WHERE TableKey1 > 'key_restriction' LIMIT 250;
This type of query should only read up to 250 rows.
Independently, it would be good to understand how representative such queries would be for your production workload. Can you explain what type of queries you expect in production?
Upvotes: 1