Reputation: 1260
I am trying to run a stateful aggregating DoFn on google cloud dataflow which references stateful DoFns in its capability matrix, however I get the following error:
Exception: Requested execution of a stateful DoFn, but no user state context is available. This likely means that the current runner does not support the execution of stateful DoFns.
The preceding error occurs here:
@with_input_types(Dict[K, V])
@with_output_types(Dict[K, V])
class StatefulCombineDoFn(beam.DoFn):
BUFFER = BagStateSpec(
'buffer',
PickleCoder()
)
STATE = CombiningValueStateSpec(
'state',
PickleCoder(),
CombineFn()
)
EXPIRY_TIMER = TimerSpec(
'expiry',
TimeDomain.WATERMARK
)
def process(
self,
element,
w=beam.DoFn.WindowParam,
buffer=beam.DoFn.StateParam(BUFFER),
state=beam.DoFn.StateParam(STATE),
expiry_timer=beam.DoFn.TimerParam(EXPIRY_TIMER)
):
expiry_timer.set(w.end+self.allowed_lateness)
buffer.add(event)
state.add(event)
@on_timer(EXPIRY_TIMER)
def expiry(
self,
state=beam.DoFn.StateParam(STATE),
buffer=beam.DoFn.StateParam(BUFFER)
):
events = buffer.read()
info = state.read()
yield [(info, events)]
How does one circumnavigate this?
Upvotes: 2
Views: 516
Reputation: 11031
Dataflow now supports user state for streaming pipelines in Python. This is via a new feature that you can activate with --experiments use_runner_v2
, which uses a new architecture for parts of the Dataflow worker.
The Runner V2 feature will soon be the default, and all pipelines will support user state by default.
Note: To allow user state in Batch pipeline, please contact Dataflow support to have your project include that feature.
Upvotes: 1