Paweł Głasek
Paweł Głasek

Reputation: 136

Joining two streams

Is it possible to join two separate PubSubIo Unbounded PCollections using a key present in both of them? I try to accomplish the task with something like:

Read(FistStream)&Read(SecondStream) -> Flatten -> Generate key to use in joining -> Use Session Windowing to gather them together -> Group by key then rewindow with fixed size windows -> AvroIOWrite to disk using windowing.

EDIT:

Here is the pipeline code I created. I experience two problems:

  1. Nothing get's written to the disk
  2. Pipeline starts to be really unstable - it randomly slows down processing of certain steps. Especially group by. It's not able to keep up with ingestion speed even when I use 10 dataflow workers.

I need to handle ~ 10 000 sessions a second. Each session comprises of 1 or 2 events, then needs to be closed.

    PubsubIO.Read<String> auctionFinishedReader = PubsubIO.readStrings().withTimestampAttribute(TIMESTAMP_ATTRIBUTE)
            .fromTopic("projects/authentic-genre-152513/topics/auction_finished");
    PubsubIO.Read<String> auctionAcceptedReader = PubsubIO.readStrings().withTimestampAttribute(TIMESTAMP_ATTRIBUTE)
            .fromTopic("projects/authentic-genre-152513/topics/auction_accepted");

    PCollection<String> auctionFinishedStream = p.apply("ReadAuctionFinished", auctionFinishedReader);
    PCollection<String> auctionAcceptedStream = p.apply("ReadAuctionAccepted", auctionAcceptedReader);

    PCollection<String> combinedEvents = PCollectionList.of(auctionFinishedStream)
            .and(auctionAcceptedStream).apply(Flatten.pCollections());

    PCollection<KV<String, String>> keyedAuctionFinishedStream = combinedEvents
            .apply("AddKeysToAuctionFinished", WithKeys.of(new GenerateKeyForEvent()));

    PCollection<KV<String, Iterable<String>>> sessions = keyedAuctionFinishedStream
            .apply(Window.<KV<String, String>>into(Sessions.withGapDuration(Duration.standardMinutes(1)))
                                            .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
            .apply(GroupByKey.create());

    PCollection<SodaSession> values = sessions
            .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, SodaSession> () {
                @ProcessElement
                public void processElement(ProcessContext c, BoundedWindow window) {
                    c.output(new SodaSession("auctionid", "stattedat"));
                }

    }));

    PCollection<SodaSession> windowedEventStream = values
            .apply("ApplyWindowing", Window.<SodaSession>into(FixedWindows.of(Duration.standardMinutes(2)))
                    .triggering(Repeatedly.forever(
                            AfterProcessingTime.pastFirstElementInPane()
                                    .plusDelayOf(Duration.standardMinutes(1))
                    ))
                    .withAllowedLateness(Duration.ZERO)
                    .discardingFiredPanes()
            );

    AvroIO.Write<SodaSession> avroWriter = AvroIO
            .write(SodaSession.class)
            .to("gs://storage/")
            .withWindowedWrites()
            .withFilenamePolicy(new EventsToGCS.PerWindowFiles("sessionsoda"))
            .withNumShards(3);

    windowedEventStream.apply("WriteToDisk", avroWriter);

Upvotes: 3

Views: 1486

Answers (1)

Paweł Głasek
Paweł Głasek

Reputation: 136

I've found an efficient solution. As one of my collection was disproportionate in size compared to the other one so I used side input to speed up grouping operation. Here is an overview of my solution:

  1. Read both event streams.
  2. Flatten them into single PCollection.
  3. Use sliding window sized (closable session duration + session max length, every closable session duration).
  4. Partition collections again.
  5. Create PCollectionView from smaller PCollection.
  6. Join both streams using sideInput with the view created in the previous step.
  7. Write sessions to disk.

It handles joining 4000 events/sec stream (larger one) + 60 events/sec stream on 1-2 DataFlow workers versus ~15 workers when used Session windowing along with GroupBy.

Upvotes: 5

Related Questions