bill
bill

Reputation: 722

Beam python silently drops type checking for unsupported types?

I had the following pipeline:

from typing import Sequence, List

import apache_beam as beam


def add_type(x) -> int:
    return x

# no type error with Sequence, type error with List.
def print_with_type(x: Sequence[int]):
    print(x)


with beam.Pipeline(argv=["--type_check_additional", "all"]) as pipeline:
    lines = (
            pipeline
            | beam.Create([1, 2])
            | beam.Map(add_type)
            # removing this line should trigger type error
            # | beam.combiners.ToList()
            | beam.Map(print_with_type))

I expected a type checking error when building the pipeline, but did not get it. Only after much debugging did I realize that I should use List instead of Sequence.

Is this expected, as Sequence is one of the supported types (doc)? Is it possible to have a warning in such cases?

Upvotes: 0

Views: 483

Answers (2)

CaptainNabla
CaptainNabla

Reputation: 1166

You should use List instead of Sequence. In the link of the docs you have referenced is no Sequence listed, but only List. Executing your code with sequence in Apache play, I obtain the following Info: enter image description here

which explains why it is not throwing any typing hint error. If you switch to List, everything works as expected.

Btw, I would recommend using with_input_types and with_output_types. If your pipeline gets more complex, this approach is more readable in my opinion, since you do not have to look up all of your custom classes and methods for understanding the types, e.g.

from typing import Sequence, List
import apache_beam as beam

def add_type(x) -> int:
  return x

# no type error with Sequence, type error with List.
def print_with_type(x: Sequence[int]):  # <- is ignored
  print(x)
 
with beam.Pipeline(argv=["--type_check_additional", "all"]) as pipeline:
  lines = (
    pipeline
    | beam.Create([1, 2])
    | beam.Map(add_type).with_output_types(int)   # <- is checked
    # removing this line should trigger type error
    # | beam.combiners.ToList()
    | beam.Map(print_with_type).with_input_types(List[int])  # <- is checked
  )

Upvotes: 2

Mazlum Tosun
Mazlum Tosun

Reputation: 6572

In your case the type is not a List but an int for the current element in the PCollection :

from typing import Sequence, List

import apache_beam as beam


def add_type(x) -> int:
    return x

# The expected type is int
def print_with_type(x: int):
    print(x)


with beam.Pipeline(argv=["--type_check_additional", "all"]) as pipeline:
    lines = (
            pipeline
            | beam.Create([1, 2])
            | beam.Map(add_type)
            # | beam.combiners.ToList()
            | beam.Map(print_with_type))

When I am testing with str type instead of int, I have the expected type_hints error :

from typing import Sequence, List

import apache_beam as beam


def add_type(x) -> int:
    return x

# Test with the bad type
def print_with_type(x: str):
    print(x)


with beam.Pipeline(argv=["--type_check_additional", "all"]) as pipeline:
    lines = (
            pipeline
            | beam.Create([1, 2])
            | beam.Map(add_type)
            # | beam.combiners.ToList()
            | beam.Map(print_with_type))

The error is :

 raise TypeCheckError(
                'Type hint violation for \'{label}\': requires {hint} but got '
                '{actual_type} for {arg}\nFull type hint:\n{debug_str}'.format(
                    label=self.label,
                    hint=hint,
                    actual_type=bindings[arg],
                    arg=arg,
                    debug_str=type_hints.debug_str()))
E           apache_beam.typehints.decorators.TypeCheckError: Type hint violation for 'Map(print_with_type)': requires <class 'str'> but got <class 'int'> for x
E           Full type hint:
E           IOTypeHints[inputs=((<class 'str'>,), {}), outputs=((Any,), {})]

When I am testing with a List of int, I have again the expected error :

rom typing import Sequence, List

import apache_beam as beam


def add_type(x) -> int:
    return x

# Test with the bad type
def print_with_type(x: List[int]):
    print(x)


with beam.Pipeline(argv=["--type_check_additional", "all"]) as pipeline:
    lines = (
            pipeline
            | beam.Create([1, 2])
            | beam.Map(add_type)
            # | beam.combiners.ToList()
            | beam.Map(print_with_type))

The error is :

E           apache_beam.typehints.decorators.TypeCheckError: Type hint violation for 'Map(print_with_type)': requires List[int] but got <class 'int'> for x

But when I am testing with Sequence, I don't have the expected error.

According to the documentation, the following types are supported for the type checking :

Tuple[T, U]
Tuple[T, ...]
List[T]
KV[T, U]
Dict[T, U]
Set[T]
FrozenSet[T]
Iterable[T]
Iterator[T]
Generator[T]
PCollection[T]

Sequence type is not part of this list that’s why it was ignored.

But it makes no sense to pass a List or a Sequence in your example, because the expected type is a int not a List.

Upvotes: 1

Related Questions