Reputation: 671
I am attempting to implement the slowly updating global window side inputs
example from the documentation from java into python and I am kinda stuck on what the AfterProcessingTime.pastFirstElementInPane()
equivalent in python. For the map
I've done something like this:
class ApiKeys(beam.DoFn):
def process(self, elm) -> Iterable[Dict[str, str]]:
yield TimestampedValue(
{"<api_key_1>": "<account_id_1>", "<api_key_2>": "<account_id_2>",},
elm,
)
map = beam.pvalue.AsSingleton(
p
| "trigger pipeline" >> beam.Create([None])
| "define schedule"
>> beam.Map(
lambda _: (
0, # would be timestamp.Timestamp.now() in production
20, # would be timestamp.MAX_TIMESTAMP in production
1, # would be around 1 hour or so in production
)
)
| "GenSequence"
>> PeriodicSequence()
| "ApplyWindowing"
>> beam.WindowInto(
beam.window.GlobalWindows(),
trigger=Repeatedly(Always(), AfterProcessingTime(???)),
accumulation_mode=AccumulationMode.DISCARDING,
)
| "api_keys" >> beam.ParDo(ApiKeys())
)
I am hoping to use this as a Dict[str, str]
input to a downstream function that will have windows of 60 seconds, merging with this one that I hope to update on an hourly basis.
The point is to run this on google cloud dataflow (where we currently just re-release it to update the api_keys
).
I've pasted the java example from the documentation below for convenience sake:
public static void sideInputPatterns() {
// This pipeline uses View.asSingleton for a placeholder external service.
// Run in debug mode to see the output.
Pipeline p = Pipeline.create();
// Create a side input that updates each second.
PCollectionView<Map<String, String>> map =
p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))
.apply(
Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply(
ParDo.of(
new DoFn<Long, Map<String, String>>() {
@ProcessElement
public void process(
@Element Long input, OutputReceiver<Map<String, String>> o) {
// Replace map with test data from the placeholder external service.
// Add external reads here.
o.output(PlaceholderExternalService.readTestData());
}
}))
.apply(View.asSingleton());
// Consume side input. GenerateSequence generates test data.
// Use a real source (like PubSubIO or KafkaIO) in production.
p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply(Sum.longsGlobally().withoutDefaults())
.apply(
ParDo.of(
new DoFn<Long, KV<Long, Long>>() {
@ProcessElement
public void process(ProcessContext c) {
Map<String, String> keyMap = c.sideInput(map);
c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());
LOG.debug(
"Value is {}, key A is {}, and key B is {}.",
c.element(),
keyMap.get("Key_A"),
keyMap.get("Key_B"));
}
})
.withSideInputs(map));
}
/** Placeholder class that represents an external service generating test data. */
public static class PlaceholderExternalService {
public static Map<String, String> readTestData() {
Map<String, String> map = new HashMap<>();
Instant now = Instant.now();
DateTimeFormatter dtf = DateTimeFormat.forPattern("HH:MM:SS");
map.put("Key_A", now.minus(Duration.standardSeconds(30)).toString(dtf));
map.put("Key_B", now.minus(Duration.standardSeconds(30)).toString());
return map;
}
}
Any ideas as to how to emulate this example would be enormously appreciated, I've spent literally days on this issue now :(
Update #2 based on @AlexanderMoraes
So, I've tried changing it according to my understanding of your suggestions:
main_window_size = 5
trigger_interval = 30
side_input = beam.pvalue.AsSingleton(
p
| "trigger pipeline" >> beam.Create([None])
| "define schedule"
>> beam.Map(
lambda _: (
0, # timestamp.Timestamp.now().__float__(),
60, # timestamp.Timestamp.now().__float__() + 30.0,
trigger_interval, # fire_interval
)
)
| "GenSequence" >> PeriodicSequence()
| "api_keys" >> beam.ParDo(ApiKeys())
| "window"
>> beam.WindowInto(
beam.window.GlobalWindows(),
trigger=Repeatedly(AfterProcessingTime(window_size)),
accumulation_mode=AccumulationMode.DISCARDING,
)
)
But when combining this with another pipeline with windowing set to something smaller than trigger_interval
I am unable to use the dictionary as a singleton because for some reason they are duplicated:
ValueError: PCollection of size 2 with more than one element accessed as a singleton view. First two elements encountered are "{'<api_key_1>': '<account_id_1>', '<api_key_2>': '<account_id_2>'}", "{'<api_key_1>': '<account_id_1>', '<api_key_2>': '<account_id_2>'}". [while running 'Pair with AccountIDs']
Is there some way to clarify that the singleton output should ignore whatever came before it?
Upvotes: 3
Views: 1661
Reputation: 368
For those who are mere mortals like me and agree that the beam's documentation is like talking to a brick wall, here's my solution in Python that worked (Python 3.11 SDK 2.61.0).
import apache_beam as beam
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam.transforms.window as window
from apache_beam.transforms import trigger
from apache_beam.transforms.window import TimestampedValue
class GetFromBQ(beam.DoFn):
# Your enrichment logic here
class HandleSideInput(beam.DoFn):
# Your side input usage logic here
def run():
'''Setting up the Beam pipeline options'''
pipeline_options = PipelineOptions(
streaming=True,
save_main_session=True
)
input_subscription = "your-pubsub-subscription" # unbounded source
with beam.Pipeline(options=pipeline_options) as pipeline:
side_input = (
pipeline
| 'GenerateSequence' >> PeriodicImpulse(fire_interval=3600, apply_windowing=True) # 1 hour
| 'ReadFromBigQuery' >> beam.ParDo(GetFromBQ()) # bounded source
| "WindowSideInput" >> beam.WindowInto(
windowfn=window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterProcessingTime(1800)), # 30 minutes
accumulation_mode=trigger.AccumulationMode.DISCARDING
)
#| 'GetLatestElement' >> beam.combiners.Latest.PerKey()
)
main_input = (
pipeline
| 'ReadFromPubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription, with_attributes=True)
| "WindowIntoFixedWindow" >> beam.WindowInto(
windowfn=window.FixedWindows(600), # 10 minutes
trigger=trigger.AfterWatermark(),
accumulation_mode=trigger.AccumulationMode.DISCARDING
)
| 'SideInputUsageAsList' >> beam.ParDo(HandleSideInput(), side_input=beam.pvalue.AsList(side_input))
| "PrintFinalData" >> beam.Map(print_item)
)
if __name__ == '__main__':
run()
Upvotes: 0
Reputation: 4051
The title of the question "slowly updating side inputs" refers to the documentation, which already has a Python version of the code. However, the code you provided is from "updating global window side inputs", which just has the Java version for the code. So I will be addressing an answer for the second one.
You are not able to reproduce the AfterProcessingTime.pastFirstElementInPane()
within Python. This function is used to fire triggers, which determine when to emit results of each window (refered as pane). In your case, this particular call AfterProcessingTime.pastFirstElementInPane()
creates a trigger that fires when the current processing time passes the processing time at which this trigger saw the first element in a pane, here. In Python this is achieve using AfterWatermark
and AfterProcessingTime()
.
Below, there are two pieces of code one in Java and another one in Python. Thus, you can understand more about each one's usage. Both examples set a time-based trigger which emits results one minute after the first element of the window has been processed. Also, the accumulation mode is set for not accumulating the results (Java: discardingFiredPanes() and Python: accumulation_mode=AccumulationMode.DISCARDING).
1- Java:
PCollection<String> pc = ...;
pc.apply(Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES))
.triggering(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1)))
.discardingFiredPanes());
2- Python: the trigger configuration is the same as described in point 1
pcollection | WindowInto(
FixedWindows(1 * 60),
trigger=AfterProcessingTime(1 * 60),
accumulation_mode=AccumulationMode.DISCARDING)
The examples above were taken from thew documentation.
Upvotes: 1