Reputation: 8556
I'm brand new to Akka and I'm just trying to get the hang of it.
As an experiment, I want to read from a Kinesis stream and collect n messages and stop.
The only one I found that would stop reading records was Sink.head(). But that only returns one record, I'd like to get more than that.
I can't quite figure out how to stop reading from the stream after receiving the n messages though.
Here's the code I have tried so far
public void testReadingFromKinesisNRecords() throws ExecutionException, InterruptedException {
final ActorSystem system = ActorSystem.create("foo");
final Materializer materializer = ActorMaterializer.create(system);
ProfileCredentialsProvider profileCredentialsProvider = ProfileCredentialsProvider.create();
final KinesisAsyncClient kinesisClient = KinesisAsyncClient.builder()
String streamName = "akka-test-stream";
String shardId = "shardId-000000000000";
int numberOfRecordsToRead = 3;
final ShardSettings settings = ShardSettings.create(streamName, shardId)
.withLimit(numberOfRecordsToRead) // return a maximum of n records (and quit?!)
final Source<Record, NotUsed> sourceKinesisBasic = KinesisSource.basic(settings, kinesisClient);
Flow<Record, String, NotUsed> flowMapRecordToString = Flow.of(Record.class).map(record -> extractDataFromRecord(record));
Flow<String, String, NotUsed> flowPrinter = Flow.of(String.class).map(s -> debugPrint(s));
// Flow<String, List<String>, NotUsed> flowGroupedWithinMinute =
// Flow.of(String.class).groupedWithin(
// numberOfRecordsToRead, // group size
// Duration.ofSeconds(60) // group time
// );
Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
// .via(flowGroupedWithinMinute); // nope
// sink to list of strings
// Sink<String, CompletionStage<List<String>>> sinkToList = Sink.seq();
Sink<String, CompletionStage<List<String>>> sink10 = Sink.takeLast(10);
// Sink<String, CompletionStage<String>> sinkHead = Sink.head(); // only gives you one message
CompletionStage<List<String>> streamCompletion = sourceStringsFromKinesisRecords
.runWith(sink10, materializer);
CompletableFuture<List<String>> completableFuture = streamCompletion.toCompletableFuture();
completableFuture.join(); // never stops running...
List<String> result = completableFuture.get();
int foo = 1;
private String extractDataFromRecord(Record record) {
String encType = record.encryptionTypeAsString();
Instant arrivalTimestamp = record.approximateArrivalTimestamp();
String data =;
return data;
private String debugPrint(String s) {
return s;
Thank you for any clues
Upvotes: 0
Views: 242
Reputation: 32319
Just to add on to the answer you found, it is also possible to express things more directly without via
Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
.map(record -> extractDataFromRecord(record))
.map(s -> debugPrint(s))
Upvotes: 1
Reputation: 8556
I found out the answer is to use a takeN at the flow level
Flow<String, String, NotUsed> flowTakeN = Flow.of(String.class).take(numberOfRecordsToRead);
Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
Upvotes: 1