MikeRand
MikeRand

Reputation: 4828

Apache Beam - Streaming Join on Temporal Relation

I'd like to use the Apache Beam Python SDK to build an event-driven trading system for backtesting. Many PTransform operations in this system would be of the "Temporal Validity Window" / "Temporal Join" type.

For example, take the working example from Streaming Systems, the book by the Beam folks, concerning currency quotes and trades. A similar example appears in an earlier paper.

Right Table: quotes.txt (Currency Pair, Price, Event Time, Processing Time)
USD/JPY,102.0000,12:00:00,12:04:13
EUR/JPY,114.0000,12:00:30,12:06:23
EUR/JPY,119.0000,12:06:00,12:07:33
EUR/JPY,116.0000,12:03:00,12:09:07

Left Table: orders.txt (Currency Pair, Quantity, Event Time, Processing Time)
USD/JPY,1,12:03:00,12:03:44
EUR/JPY,2,12:02:00,12:05:07
EUR/JPY,5,12:05:00,12:08:00
EUR/JPY,3,12:08:00,12:09:33
USD/JPY,5,12:10:00,12:10:59

Assume that both of these examples are proxies for what could be unbounded collections (e.g. 2 Kafka topics with keys=currency pairs). I'm at a complete loss for how, using the Apache Beam API, to do a Left Join on these two (potentially unbounded) collections to produce the following output.

Output Table With Retractions: trades.txt (Currency Pair, Price*Quantity, Order Event Time, Retraction?, Trade Processing Time)
USD/JPY,102.0000,12:03:00,False,12:03:44
EUR/JPY,000.0000,12:02:00,False,12:05:07
EUR/JPY,000.0000,12:02:00,True,12:06:23
EUR/JPY,228.0000,12:02:00,False,12:06:23
EUR/JPY,570.0000,12:05:00,False,12:08:00
EUR/JPY,570.0000,12:05:00,True,12:09:07
EUR/JPY,580.0000,12:05:00,False,12:09:07
EUR/JPY,357.0000,12:08:00,False,12:09:33
USD/JPY,510.0000,12:10:00,False,12:10:59


"Final" Output Table Without Retractions: trades.txt (Currency Pair, Price*Quantity, Order Event Time, Retraction?, Trade Processing Time)
USD/JPY,102.0000,12:03:00,False,12:03:44
EUR/JPY,228.0000,12:02:00,False,12:06:23
EUR/JPY,580.0000,12:05:00,False,12:09:07
EUR/JPY,357.0000,12:08:00,False,12:09:33
USD/JPY,510.0000,12:10:00,False,12:10:59

How do I implement the above PTransform using Windows, Triggers, and CoGroupByKey?

Current code - just some boilerplate with placeholders

"""Testing Apache beam joins."""

import logging
import datetime
import decimal
import typing

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


# Schema and Transforms

class Quote(typing.NamedTuple):
    base_currency: str
    quote_currency: str
    price: decimal.Decimal
    time_ms: int


class ConvertQuote(beam.DoFn):

    def process(self, element):
        pair, price_str, time_str, _ = element.rstrip().split(",")
        base_currency, quote_currency = pair.split("/") 
        price = decimal.Decimal(price_str)
        time_ms = int(self._time_ms(time_str))
        yield Quote(base_currency, quote_currency, price, time_ms)

    def _time_ms(self, time):
        epoch = datetime.datetime.utcfromtimestamp(0)
        dt = datetime.datetime.strptime(time, "%H:%M:%S")
        return (dt - epoch).total_seconds() * 1000


class AddQuoteTimestamp(beam.DoFn):

    def process(self, element):
        yield beam.window.TimestampedValue(element, element.time_ms)


class Order(typing.NamedTuple):
    base_currency: str
    quote_currency: str
    quantity: int
    time_ms: int


class ConvertOrder(beam.DoFn):

    def process(self, element):
        pair, quantity_str, time_str, _ = element.rstrip().split(",")
        base_currency, quote_currency = pair.split("/") 
        quantity = int(quantity_str)
        time_ms = int(self._time_ms(time_str))
        yield Order(base_currency, quote_currency, quantity, time_ms)

    def _time_ms(self, time):
        epoch = datetime.datetime.utcfromtimestamp(0)
        dt = datetime.datetime.strptime(time, "%H:%M:%S")
        return (dt - epoch).total_seconds() * 1000


class AddOrderTimestamp(beam.DoFn):

    def process(self, element):
        yield beam.window.TimestampedValue(element, element.time_ms)


PAIRS = ["EUR/JPY", "USD/JPY"]  # Maybe pass this in as an option?

def by_pair(item, num_pairs):
    return PAIRS.index(f"{item.base_currency}/{item.quote_currency}")


# Administrative

LOGGING_MSG_FMT = "%(asctime)s - %(levelname)s: %(message)s"
LOGGING_DATE_FMT = "%Y-%m-%d %H:%M:%S%z"
logging.basicConfig(format=LOGGING_MSG_FMT, datefmt=LOGGING_DATE_FMT, level=logging.INFO)    


class MyOptions(PipelineOptions):

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument("--quotes-file", dest="quotes_file", default="quotes.txt")
        parser.add_argument("--orders-file", dest="orders_file", default="orders.txt")
        parser.add_argument("--trades-file", dest="trades_file", default="trades")

options = PipelineOptions()
my_options = options.view_as(MyOptions)


# Main

with beam.Pipeline(options=my_options) as p:

    eurjpy_quotes, usdjpy_quotes = (
        p
        | "ReadQuotes" >> beam.io.ReadFromText(my_options.quotes_file)
        | "ConvertQuotes" >> beam.ParDo(ConvertQuote())
        | "AddQuoteTimestamps" >> beam.ParDo(AddQuoteTimestamp())
        | "PartitionQuotes" >> beam.Partition(by_pair, len(PAIRS))
        # Some kind of windowing/triggering?
        )

    eurjpy_orders, usdjpy_orders = (
        p
        | "ReadOrders" >> beam.io.ReadFromText(my_options.orders_file)
        | "ConvertOrders" >> beam.ParDo(ConvertOrder())
        | "AddOrderTimestamps" >> beam.ParDo(AddOrderTimestamp())
        | "PartitionOrders" >> beam.Partition(by_pair, len(PAIRS))
        # Some kind of windowing/triggering?
        )

    # Something here using CoGroupByKey on eurjpy_quotes and eurjpy_orders
    # This is just a placeholder for now.
    eurjpy_quotes | "WriteEURJPYTrades" >> beam.io.WriteToText(my_options.trades_file)

Upvotes: 1

Views: 198

Answers (1)

Reza Rokni
Reza Rokni

Reputation: 1256

For dealing with timeseries, it is often best make use of the State and Timers API.

Original Blog on State and Timers

State and Timers Documentation

There is also some current WIP in java on temporal joins temporal example

Upvotes: 1

Related Questions