Reputation: 3955
I am wondering which is the best way to test check the lenght of the output produced by a beam pipeline.
I have some testcode like this:
test_data = [
{'kind': 'storage#object', 'name': 'file1.doc', 'contentType': 'application/octet-stream', 'bucket': 'bucket123' },
{'kind': 'storage#object', 'name': 'file2.pdf', 'contentType': 'application/pdf','bucket': 'bucket234'},
{'kind': 'storage#object', 'name': 'file3.msg', 'contentType': 'message/rfc822', 'bucket': 'bucket345'}
]
with TestPipeline() as p:
output = (p
| beam.Create(test_data)
| beam.ParDo(DoFn_To_Test()).with_outputs('ok','error')
)
I want to test sure that all elements in the test_data list go to 'output.ok'. I think way to do it is to count them like this:
with TestPipeline() as p:
output = (p
| beam.Create(testdata)
| beam.ParDo(DoFn_To_Test()).with_outputs('ok','error')
)
okay_count = (output.ok | beam.Map(lambda x: ('dummy_key',x))
| beam.GroupByKey() # This gets ('dumm_key',[element1,element2....])
| beam.Map(lambda x: len(x[1]) ) # Drop the key and get the lengh of the list
)
# And finally check^H^H^H^H^H^H assert the count is correct:
assert_that(okay_count, equal_to([len(test_data)])
This works; but I don't feel this is the best way to do it, and I am sure there are more ways to do it.
This is the best option suggested up to date: using beam.combiners.Count.Globally()
with TestPipeline() as p:
output = (p
| beam.Create(testdata)
| beam.ParDo(DoFn_To_Test()).with_outputs('ok','error')
)
okay_count = output | beam.combiners.Count.Globally()
assert_that(okay_count, equal_to([len(test_data)])
Upvotes: 1
Views: 1285
Reputation: 6023
You answered your own question in the the question. Writing it here as an answer:
with TestPipeline() as p:
output = (p
| beam.Create(testdata)
| beam.ParDo(DoFn_To_Test()).with_outputs('ok','error')
)
okay_count = output | beam.combiners.Count.Globally()
assert_that(okay_count, equal_to([len(test_data)])
Upvotes: 2