Matthew Albrecht
Matthew Albrecht

Reputation: 11

How to type hint a PTransform that accepts a tuple of PCollections as input in python Apache Beam?

class MyTransform(beam.PTransform):
  def expand(self, pcolls: tuple[beam.PCollection[int], beam.PCollection[str]]) -> beam.PCollection[str]:
    pcoll1, pcoll2 = pcolls
    strified = pcoll1 | beam.Map(lambda x: str(x))

    return (pcoll1, pcoll2) | beam.Flatten()

with beam.Pipeline() as p:
  pcoll1 = p | beam.Create([1, 2, 3])
  pcoll2 = p | beam.Create(["a", "b", "c"])

  result = (pcoll1, pcoll2) | MyTransform()

This code works but gives warning:

WARNING:root:This input type hint will be ignored and not used for type-checking purposes. Typically, input type hints for a PTransform are single (or nested) types wrapped by a PCollection, or PBegin. Got: Tuple[apache_beam.pvalue.PCollection[int], apache_beam.pvalue.PCollection[dict[str, typing.Any]]] instead.

I'd love to be able to give type hints that are actually used for type checking purposes.

I tried to reference beam.Flatten itself to see how it works, but sadly it doesn't use type hints at all.

Upvotes: 0

Views: 32

Answers (0)

Related Questions