Reputation: 1402
i am trying to use Beam to read a csv and send data to postgres.
But the pipeline is failing due to a conversion mismatch. note that this pipeline work when the 2 column are of type int and fail when the type of column contains a string.
here one of the things that i tried.
from past.builtins import unicode
ExampleRow = typing.NamedTuple('ExampleRow',[('id',int),('name',unicode)])
beam_df = (pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('path.csv').with_output_types(ExampleRow))
beam_df2 = (convert.to_pcollection(beam_df) | beam.Map(print) |
WriteToJdbc(
table_name=table_name,
jdbc_url=jdbc_url,
driver_class_name = 'org.postgresql.Driver',
statement="insert into tablr values(?,?);",
username=username,
password=password,
)
)
result = pipeline.run()
result.wait_until_finish()
I tried also to add an urn to convert the str python type to varchar or unicode but this don't seems to work also
from apache_beam.typehints.schemas import LogicalType
@LogicalType.register_logical_type
class db_str(LogicalType):
@classmethod
def urn(cls):
return "beam:logical_type:javasdk:v1"
@classmethod
def language_type(cls):
return unicode
def to_language_type(self, value):
return unicode(value)
def to_representation_type(self, value):
return unicode(value)
ADD: this is the print result :
BeamSchema_f0d95d64_95c7_43ba_8a04_ac6a0b7352d9(id=21, nom='nom21')
BeamSchema_f0d95d64_95c7_43ba_8a04_ac6a0b7352d9(id=22, nom='nom22')
BeamSchema_f0d95d64_95c7_43ba_8a04_ac6a0b7352d9(id=21, nom='nom21')
BeamSchema_f0d95d64_95c7_43ba_8a04_ac6a0b7352d9(id=22, nom='nom22')
the problem comes from the WriteToJdbc function and the 'nom' column.
any idea how to make this work ?
Upvotes: 1
Views: 956
Reputation: 6572
I think your problem is due to you have BeamSchema
structure instead of expected NamedTuple
in your output PCollection
.
Also, according to the documentation, a code instruction is missing in your example coders.registry.register_coder(ExampleRow, coders.RowCoder)
:
ExampleRow = typing.NamedTuple('ExampleRow',
[('id', int), ('name', unicode)])
coders.registry.register_coder(ExampleRow, coders.RowCoder)
with TestPipeline() as p:
_ = (
p
| beam.Create([ExampleRow(1, 'abc')])
.with_output_types(ExampleRow)
| 'Write to jdbc' >> WriteToJdbc(
driver_class_name='org.postgresql.Driver',
jdbc_url='jdbc:postgresql://localhost:5432/example',
username='postgres',
password='postgres',
statement='INSERT INTO example_table VALUES(?, ?)',
))
The following code in your snippet, doesn't set ExampleRow
NamedTuple
as expected, because your print
indicates the type is BeamSchema
:
beam_df = (pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('path.csv').with_output_types(ExampleRow))
Try with this code before :
from past.builtins import unicode
ExampleRow = typing.NamedTuple('ExampleRow',[('id',int),('name',unicode)])
# Register coder here
coders.registry.register_coder(ExampleRow, coders.RowCoder)
beam_df = (pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('path.csv').with_output_types(ExampleRow))
If it doesn't works, you need to find a way to transform your BeamSchema
to ExampleRow
NamedTuple
in a DoFn
:
def convert_beam_schema_to_named_tuple(beam_schema) -> ExampleRow :
# Your logic to transform Beam Schema to ExampleRow NamedTuple
from past.builtins import unicode
ExampleRow = typing.NamedTuple('ExampleRow',[('id',int),('name',unicode)])
beam_df = (pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('path.csv').with_output_types(ExampleRow))
beam_df2 = (convert.to_pcollection(beam_df)
| beam.Map(print)
| beam.Map(convert_beam_schema_to_named_tuple)
| WriteToJdbc(
table_name=table_name,
jdbc_url=jdbc_url,
driver_class_name = 'org.postgresql.Driver',
statement="insert into tablr values(?,?);",
username=username,
password=password,
)
)
result = pipeline.run()
result.wait_until_finish()
At the end, if you have issue with Beam schema and Beam dataframe, you can read the CSV
file directly with Beam
IO
and work with PCollection
You can check this link : csv-into-a-dictionary-in-apache-beam
Upvotes: 4