slashdottir
slashdottir

Reputation: 8556

How to exit akka stream after n elements recieved?

I'm brand new to Akka and I'm just trying to get the hang of it.

As an experiment, I want to read from a Kinesis stream and collect n messages and stop.

The only one I found that would stop reading records was Sink.head(). But that only returns one record, I'd like to get more than that.

I can't quite figure out how to stop reading from the stream after receiving the n messages though.

Here's the code I have tried so far

  @Test
  public void testReadingFromKinesisNRecords() throws ExecutionException, InterruptedException {
    final ActorSystem system = ActorSystem.create("foo");
    final Materializer materializer = ActorMaterializer.create(system);

    ProfileCredentialsProvider profileCredentialsProvider = ProfileCredentialsProvider.create();

    final KinesisAsyncClient kinesisClient = KinesisAsyncClient.builder()
        .credentialsProvider(profileCredentialsProvider)
        .region(Region.US_WEST_2)
            .httpClient(AkkaHttpClient.builder()
                .withActorSystem(system).build())
            .build();

    system.registerOnTermination(kinesisClient::close);

    String streamName = "akka-test-stream";
    String shardId = "shardId-000000000000";

    int numberOfRecordsToRead = 3;

    final ShardSettings settings = ShardSettings.create(streamName, shardId)
            .withRefreshInterval(Duration.ofSeconds(1))
            .withLimit(numberOfRecordsToRead) // return a maximum of n records (and quit?!)
            .withShardIterator(ShardIterators.latest());

    final Source<Record, NotUsed> sourceKinesisBasic = KinesisSource.basic(settings, kinesisClient);

    Flow<Record, String, NotUsed> flowMapRecordToString = Flow.of(Record.class).map(record -> extractDataFromRecord(record));
    Flow<String, String, NotUsed> flowPrinter = Flow.of(String.class).map(s -> debugPrint(s));
//    Flow<String, List<String>, NotUsed> flowGroupedWithinMinute =
//        Flow.of(String.class).groupedWithin(
//            numberOfRecordsToRead, // group size
//            Duration.ofSeconds(60) // group time
//        );

    Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
        .via(flowMapRecordToString)
        .via(flowPrinter);
//        .via(flowGroupedWithinMinute); // nope

    // sink to list of strings
//    Sink<String, CompletionStage<List<String>>> sinkToList = Sink.seq();
    Sink<String, CompletionStage<List<String>>> sink10 = Sink.takeLast(10);
//    Sink<String, CompletionStage<String>> sinkHead = Sink.head(); // only gives you one message

    CompletionStage<List<String>> streamCompletion = sourceStringsFromKinesisRecords
        .runWith(sink10, materializer);
    CompletableFuture<List<String>> completableFuture = streamCompletion.toCompletableFuture();
    completableFuture.join(); // never stops running...
    List<String> result = completableFuture.get();
    int foo = 1;
  }

  private String extractDataFromRecord(Record record) {
    String encType = record.encryptionTypeAsString();
    Instant arrivalTimestamp = record.approximateArrivalTimestamp();
    String data = record.data().asString(StandardCharsets.UTF_8);
    return data;
  }

  private String debugPrint(String s) {
    System.out.println(s);
    return s;
  }

Thank you for any clues

Upvotes: 0

Views: 242

Answers (2)

Alec
Alec

Reputation: 32319

Just to add on to the answer you found, it is also possible to express things more directly without via:

Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
    .map(record -> extractDataFromRecord(record))
    .map(s -> debugPrint(s))
    .take(10)

Upvotes: 1

slashdottir
slashdottir

Reputation: 8556

I found out the answer is to use a takeN at the flow level

...
Flow<String, String, NotUsed> flowTakeN = Flow.of(String.class).take(numberOfRecordsToRead);

Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
    .via(flowMapRecordToString)
    .via(flowPrinter)
    .via(flowTakeN);
...

Upvotes: 1

Related Questions