Reputation: 4828
I'd like to use the Apache Beam Python SDK to build an event-driven trading system for backtesting. Many PTransform
operations in this system would be of the "Temporal Validity Window" / "Temporal Join" type.
For example, take the working example from Streaming Systems, the book by the Beam folks, concerning currency quotes and trades. A similar example appears in an earlier paper.
Right Table: quotes.txt (Currency Pair, Price, Event Time, Processing Time)
USD/JPY,102.0000,12:00:00,12:04:13
EUR/JPY,114.0000,12:00:30,12:06:23
EUR/JPY,119.0000,12:06:00,12:07:33
EUR/JPY,116.0000,12:03:00,12:09:07
Left Table: orders.txt (Currency Pair, Quantity, Event Time, Processing Time)
USD/JPY,1,12:03:00,12:03:44
EUR/JPY,2,12:02:00,12:05:07
EUR/JPY,5,12:05:00,12:08:00
EUR/JPY,3,12:08:00,12:09:33
USD/JPY,5,12:10:00,12:10:59
Assume that both of these examples are proxies for what could be unbounded collections (e.g. 2 Kafka topics with keys=currency pairs). I'm at a complete loss for how, using the Apache Beam API, to do a Left Join on these two (potentially unbounded) collections to produce the following output.
Output Table With Retractions: trades.txt (Currency Pair, Price*Quantity, Order Event Time, Retraction?, Trade Processing Time)
USD/JPY,102.0000,12:03:00,False,12:03:44
EUR/JPY,000.0000,12:02:00,False,12:05:07
EUR/JPY,000.0000,12:02:00,True,12:06:23
EUR/JPY,228.0000,12:02:00,False,12:06:23
EUR/JPY,570.0000,12:05:00,False,12:08:00
EUR/JPY,570.0000,12:05:00,True,12:09:07
EUR/JPY,580.0000,12:05:00,False,12:09:07
EUR/JPY,357.0000,12:08:00,False,12:09:33
USD/JPY,510.0000,12:10:00,False,12:10:59
"Final" Output Table Without Retractions: trades.txt (Currency Pair, Price*Quantity, Order Event Time, Retraction?, Trade Processing Time)
USD/JPY,102.0000,12:03:00,False,12:03:44
EUR/JPY,228.0000,12:02:00,False,12:06:23
EUR/JPY,580.0000,12:05:00,False,12:09:07
EUR/JPY,357.0000,12:08:00,False,12:09:33
USD/JPY,510.0000,12:10:00,False,12:10:59
How do I implement the above PTransform
using Windows, Triggers, and CoGroupByKey
?
Current code - just some boilerplate with placeholders
"""Testing Apache beam joins."""
import logging
import datetime
import decimal
import typing
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# Schema and Transforms
class Quote(typing.NamedTuple):
base_currency: str
quote_currency: str
price: decimal.Decimal
time_ms: int
class ConvertQuote(beam.DoFn):
def process(self, element):
pair, price_str, time_str, _ = element.rstrip().split(",")
base_currency, quote_currency = pair.split("/")
price = decimal.Decimal(price_str)
time_ms = int(self._time_ms(time_str))
yield Quote(base_currency, quote_currency, price, time_ms)
def _time_ms(self, time):
epoch = datetime.datetime.utcfromtimestamp(0)
dt = datetime.datetime.strptime(time, "%H:%M:%S")
return (dt - epoch).total_seconds() * 1000
class AddQuoteTimestamp(beam.DoFn):
def process(self, element):
yield beam.window.TimestampedValue(element, element.time_ms)
class Order(typing.NamedTuple):
base_currency: str
quote_currency: str
quantity: int
time_ms: int
class ConvertOrder(beam.DoFn):
def process(self, element):
pair, quantity_str, time_str, _ = element.rstrip().split(",")
base_currency, quote_currency = pair.split("/")
quantity = int(quantity_str)
time_ms = int(self._time_ms(time_str))
yield Order(base_currency, quote_currency, quantity, time_ms)
def _time_ms(self, time):
epoch = datetime.datetime.utcfromtimestamp(0)
dt = datetime.datetime.strptime(time, "%H:%M:%S")
return (dt - epoch).total_seconds() * 1000
class AddOrderTimestamp(beam.DoFn):
def process(self, element):
yield beam.window.TimestampedValue(element, element.time_ms)
PAIRS = ["EUR/JPY", "USD/JPY"] # Maybe pass this in as an option?
def by_pair(item, num_pairs):
return PAIRS.index(f"{item.base_currency}/{item.quote_currency}")
# Administrative
LOGGING_MSG_FMT = "%(asctime)s - %(levelname)s: %(message)s"
LOGGING_DATE_FMT = "%Y-%m-%d %H:%M:%S%z"
logging.basicConfig(format=LOGGING_MSG_FMT, datefmt=LOGGING_DATE_FMT, level=logging.INFO)
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument("--quotes-file", dest="quotes_file", default="quotes.txt")
parser.add_argument("--orders-file", dest="orders_file", default="orders.txt")
parser.add_argument("--trades-file", dest="trades_file", default="trades")
options = PipelineOptions()
my_options = options.view_as(MyOptions)
# Main
with beam.Pipeline(options=my_options) as p:
eurjpy_quotes, usdjpy_quotes = (
p
| "ReadQuotes" >> beam.io.ReadFromText(my_options.quotes_file)
| "ConvertQuotes" >> beam.ParDo(ConvertQuote())
| "AddQuoteTimestamps" >> beam.ParDo(AddQuoteTimestamp())
| "PartitionQuotes" >> beam.Partition(by_pair, len(PAIRS))
# Some kind of windowing/triggering?
)
eurjpy_orders, usdjpy_orders = (
p
| "ReadOrders" >> beam.io.ReadFromText(my_options.orders_file)
| "ConvertOrders" >> beam.ParDo(ConvertOrder())
| "AddOrderTimestamps" >> beam.ParDo(AddOrderTimestamp())
| "PartitionOrders" >> beam.Partition(by_pair, len(PAIRS))
# Some kind of windowing/triggering?
)
# Something here using CoGroupByKey on eurjpy_quotes and eurjpy_orders
# This is just a placeholder for now.
eurjpy_quotes | "WriteEURJPYTrades" >> beam.io.WriteToText(my_options.trades_file)
Upvotes: 1
Views: 198
Reputation: 1256
For dealing with timeseries, it is often best make use of the State and Timers API.
Original Blog on State and Timers
State and Timers Documentation
There is also some current WIP in java on temporal joins temporal example
Upvotes: 1