Jim M.
Jim M.

Reputation: 1009

Does Testcontainers/LocalStack work with DynamoDb Streams KCL 1.x?

Problem Statement

I have written a program which utilizes DynamoDb Streams to get notified when an update occurs, this code works fine when using AWS Services but doesn't seem to work utilizing Testcontainers/Localstack for my integration tests.

What has been done

Tried reverting to older version of testcontainer/localstack

Read this article, which has a comment at the end which sort of implies that KCL 1.x lacks some API's which prevent it from working with Localstack

However, Kinesis Client Library-1.x has not provided the capability of giving AWS CloudWatch service endpoint URL as a configuration parameter.

I believe the DynamoDb Kinesis adapter uses KCL 1.x internally, so, I don't think I can switch to use KCL 2.x. As an aside, it seems the DynamoDb Kinesis Adapter has been archived, but Amazon Docs still refer to it and there is nothing in that git repo that indicates why it was archived or what to use instead.

What Happens?

My program runs fine, I don't see any errors but I also don't get any update information from any shards.

App Design

Basically the integrations tests start, it creates the required tables in LocalStack (confirmed this is done via AWS CLI), it then places 3 items in the DynamoDb. The Spring Boot Application starts up and reads the data from DynamoDb into a a list. The Integration test then calls the delete endpoint, the delete endpoint simply calls the CrudRepository.delete function (implemented via Spring Data DynamoDb). I have confirmed that the actual DynamoDb has gone from 3 elements to 2, with the deleted element removed. The applications cache should get updated when my app gets the updated records from KCL, however; this never happens with TreatContainers/localStack.

The application is using InitialPositionInStream.LATEST for reading the shards.

Application Output

You can see the application starts up and is ready at 13:42:17.468, the item is "deleted" at 13:42:24.768. The test then calls Thread.sleep(1000) to give KCL time to process any changes and then at 13:42:25.793 the test calls the server to see if the element is still present in the cache, which it is.

Thinking this might be a timing issue, where LocalStack was processing slowly, I added a @AfterAll function to my tests which waits for 2 minutes. I utilized curl to call the get endpoint at 13:44:38.723 and the element still exists, which should have been plenty of time for Localstack to process the updates for the shards.

2021-04-05 13:42:07.712  INFO 11414 --- [    Test worker] c.c.u.d.s.processor.MyKclProcessor    : KCL shard lease table, test-table, exists and is ACTIVE
2021-04-05 13:42:07.770  INFO 11414 --- [    Test worker] c.c.u.d.s.processor.MyKclProcessor    : KCL shard lease table, kcl-shard-lease-lock, exists and is ACTIVE
2021-04-05 13:42:07.772  INFO 11414 --- [    Test worker] c.c.u.d.s.processor.MyKclProcessor    : Describing table=test-table
2021-04-05 13:42:07.821  INFO 11414 --- [    Test worker] c.c.u.d.s.processor.MyKclProcessor    : Got description for table=test-table
2021-04-05 13:42:07.822  INFO 11414 --- [    Test worker] c.c.u.d.s.processor.MyKclProcessor    : Got stream arn (arn:aws:dynamodb:us-east-1:000000000000:table/test-table/stream/2021-04-05T17:41:21.860) for table=test-table with tableArn=arn:aws:dynamodb:us-east-1:000000000000:table/test-table
2021-04-05 13:42:07.904  INFO 11414 --- [    Test worker] c.c.u.d.s.processor.MyKclProcessor    : Creating KCL worker
2021-04-05 13:42:07.938  INFO 11414 --- [    Test worker] c.a.s.k.leases.impl.LeaseCoordinator     : With failover time 10000 ms and epsilon 25 ms, LeaseCoordinator will renew leases every 3308 ms, takeleases every 20050 ms, process maximum of 2147483647 leases and steal 1 lease(s) at a time.
2021-04-05 13:42:07.941  INFO 11414 --- [    Test worker] c.a.s.k.clientlibrary.lib.worker.Worker  : Shard sync strategy determined as SHARD_END.
2021-04-05 13:42:07.941  INFO 11414 --- [    Test worker] c.c.u.d.s.processor.MyKclProcessor    : KCL Worker created!
2021-04-05 13:42:07.944  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Initialization attempt 1
2021-04-05 13:42:07.945  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Initializing LeaseCoordinator
2021-04-05 13:42:14.525  INFO 11414 --- [    Test worker] c.c.u.d.s.controller.TestItemController  : *** TestItemController Started ***
2021-04-05 13:42:14.650  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Syncing Kinesis shard info
2021-04-05 13:42:14.656  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : syncShardLeases: begin
2021-04-05 13:42:14.656  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : getShardList: begin
2021-04-05 13:42:14.757  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : getShardList: done
2021-04-05 13:42:14.779  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : determineNewLeasesToCreate: begin
2021-04-05 13:42:14.781  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : determineNewLeasesToCreate: done
2021-04-05 13:42:14.863  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : cleanupGarbageLeases: begin
2021-04-05 13:42:14.863  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : cleanupGarbageLeases: done
2021-04-05 13:42:14.863  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : cleanupLeasesOfFinishedShards: begin
2021-04-05 13:42:14.863  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : cleanupLeasesOfFinishedShards: done
2021-04-05 13:42:14.863  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : syncShardLeases: done
2021-04-05 13:42:14.866  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Starting LeaseCoordinator
2021-04-05 13:42:14.900  INFO 11414 --- [cessingThread-0] c.a.s.kinesis.leases.impl.LeaseRenewer   :  Worker Test-application-lMtQuWkZeFmq+ found lease {
  "leaseKey" : "shardId-00000001617600000000-000000000000",
  "leaseOwner" : "Test-application-lMtQuWkZeFmq+",
  "leaseCounter" : 0,
  "concurrencyToken" : null,
  "lastCounterIncrementNanos" : null,
  "checkpoint" : {
    "sequenceNumber" : "LATEST",
    "subSequenceNumber" : 0
  },
  "pendingCheckpoint" : null,
  "ownerSwitchesSinceCheckpoint" : 0,
  "parentShardIds" : [ ]
}
2021-04-05 13:42:14.949  WARN 11414 --- [cessingThread-0] c.a.s.k.metrics.impl.MetricsHelper       : No metrics scope set in thread KCLProcessingThread-0, getMetricsScope returning NullMetricsScope.
2021-04-05 13:42:15.011  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:42:15.011  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : TakeLeases took 0 seconds.
2021-04-05 13:42:16.709  INFO 11414 --- [    Test worker] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2021-04-05 13:42:17.309  INFO 11414 --- [    Test worker] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 2 endpoint(s) beneath base path '/actuator'
2021-04-05 13:42:17.451  INFO 11414 --- [    Test worker] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 50376 (http) with context path ''
2021-04-05 13:42:17.464  INFO 11414 --- [    Test worker] d.d.r.u.Entity2DynamoDBTableSynchronizer : Checking repository classes with DynamoDB tables test-table for ContextRefreshedEvent
2021-04-05 13:42:17.468  INFO 11414 --- [    Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp    : Started DynamoDbStreamsTestApp in 54.763 seconds (JVM running for 77.176)
2021-04-05 13:42:17.872  INFO 11414 --- [    Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp    : *** ASK SERVER TO DELETE ITEM 23456 ***
2021-04-05 13:42:17.996  INFO 11414 --- [o-auto-1-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-04-05 13:42:17.996  INFO 11414 --- [o-auto-1-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2021-04-05 13:42:18.019  INFO 11414 --- [o-auto-1-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 23 ms
2021-04-05 13:42:18.068  INFO 11414 --- [o-auto-1-exec-1] c.c.u.d.s.controller.TestItemController  : Deleting itemNumber=23456
2021-04-05 13:42:21.670  INFO 11414 --- [o-auto-1-exec-1] c.c.u.d.s.controller.TestItemController  : Found inventory item to delete
2021-04-05 13:42:24.768  INFO 11414 --- [o-auto-1-exec-1] c.c.u.d.s.controller.TestItemController  : Item deleted
2021-04-05 13:42:24.789  INFO 11414 --- [    Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp    : *** SERVER SAYS ITEM 23456 DELETED ***
2021-04-05 13:42:24.954  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Initialization complete. Starting worker loop.
2021-04-05 13:42:24.970  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Created new shardConsumer for : ShardInfo [shardId=shardId-00000001617600000000-000000000000, concurrencyToken=b773dd9e-d385-44cd-8189-3cf330b94351, parentShardIds=[], checkpoint={SequenceNumber: LATEST,SubsequenceNumber: 0}]
2021-04-05 13:42:24.972  INFO 11414 --- [dProcessor-0000] c.a.s.k.c.l.w.BlockOnParentShardTask     : No need to block on parents [] of shard shardId-00000001617600000000-000000000000
2021-04-05 13:42:25.793  INFO 11414 --- [    Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp    : *** VERIFY ITEM 23456 WAS DELETED OR NOT ***
2021-04-05 13:42:31.304  INFO 11414 --- [o-auto-1-exec-2] c.c.u.d.s.controller.TestItemController  : Getting itemNumber=23456
2021-04-05 13:42:34.586  INFO 11414 --- [o-auto-1-exec-2] c.c.u.d.s.controller.TestItemController  : Item=TestItem{itemNumber='23456', description='A doo', price=10.99}
2021-04-05 13:42:34.589  INFO 11414 --- [dProcessor-0000] c.a.s.k.c.lib.worker.KinesisDataFetcher  : Initializing shard shardId-00000001617600000000-000000000000 with LATEST
2021-04-05 13:42:34.611 ERROR 11414 --- [    Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp    : *** SERVER STILL HAS ITEM 23456 ***
2021-04-05 13:42:34.663  WARN 11414 --- [    Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp    : Giving system 2 MINUTES before shut down
2021-04-05 13:42:35.088  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:42:35.088  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : TakeLeases took 0 seconds.
2021-04-05 13:42:55.171  INFO 11414 --- [oordinator-0001] c.a.s.d.s.leases.StreamsLeaseTaker       : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:42:55.171  INFO 11414 --- [oordinator-0001] c.a.s.d.s.leases.StreamsLeaseTaker       : TakeLeases took 0 seconds.
2021-04-05 13:43:08.731  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Current stream shard assignments: shardId-00000001617600000000-000000000000
2021-04-05 13:43:08.732  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Sleeping ...
2021-04-05 13:43:15.248  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:43:15.248  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : TakeLeases took 0 seconds.
2021-04-05 13:43:35.322  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:43:35.323  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : TakeLeases took 0 seconds.
2021-04-05 13:43:55.399  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:43:55.399  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : TakeLeases took 0 seconds.
2021-04-05 13:44:09.937  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Current stream shard assignments: shardId-00000001617600000000-000000000000
2021-04-05 13:44:09.937  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Sleeping ...
2021-04-05 13:44:15.470  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:44:15.470  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : TakeLeases took 0 seconds.
2021-04-05 13:44:35.550  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:44:35.551  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : TakeLeases took 0 seconds.
2021-04-05 13:44:38.723  INFO 11414 --- [o-auto-1-exec-5] c.c.u.d.s.controller.TestItemController  : Getting itemNumber=23456
2021-04-05 13:44:41.374  INFO 11414 --- [o-auto-1-exec-5] c.c.u.d.s.controller.TestItemController  : Item=TestItem{itemNumber='23456', description='A doo', price=10.99}

Config Info

Client: Docker Engine - Community
 Cloud integration: 1.0.9
 Version:           20.10.5
 API version:       1.41
 Go version:        go1.13.15
 Git commit:        55c4c88
 Built:             Tue Mar  2 20:13:00 2021
 OS/Arch:           darwin/amd64
 Context:           default
 Experimental:      true

Server: Docker Engine - Community
 Engine:
  Version:          20.10.5
  API version:      1.41 (minimum version 1.12)
  Go version:       go1.13.15
  Git commit:       363e9a8
  Built:            Tue Mar  2 20:15:47 2021
  OS/Arch:          linux/amd64
  Experimental:     true
 containerd:
  Version:          1.4.3
  GitCommit:        269548fa27e0089a8b8278fc4fc781d7f65a939b
 runc:
  Version:          1.0.0-rc92
  GitCommit:        ff819c7e9184c13b7c2607fe6c30ae19403a7aff
 docker-init:
  Version:          0.19.0
  GitCommit:        de40ad0
 Kubernetes:
  Version:          Unknown
  StackAPI:         Unknown

Upvotes: 3

Views: 1914

Answers (1)

Jim M.
Jim M.

Reputation: 1009

Solution

After many hours of playing with this I finally noticed this message

2021-04-05 13:42:24.954 INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker : Initialization complete. Starting worker loop.

After looking at this a while it became obvious that when using Testcontainer/Localstack it takes 10 seconds for the worker to become initialized and "ready". This was easy enough to solve as the com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker has a state change listener, so I could setup a listener to determine when Localstack was ready and then allow the code to continue.

Final Issues

Sadly this did not resolve the problem completely, turns out that the DynamoDb streams implementation in Testcontainer/Localstack is extremely slow. By that I mean would create/delete or modify a record it it was taking upwards of 7 seconds for the shard processor to get the updates. So here I ended up putting a 10 second sleep between the time that I deleted the item and before I checked to verify the item was actually removed.

Conclusion

So this turned out to be a timing issue, Testconatiners/Localstack is just very slow pushing the data to the shards (at least compared to the same exact code run against DynamoDb and DynamoDb Streams).

Upvotes: 1

Related Questions