Reputation: 644
I am trying to create a function that will accept a dict and schema as input and returns a data frame automatically filling unspecified fields as nulls. This is my below code
def get_element(name, row_dict):
value = None
if name in row_dict:
value = row_dict[name]
return value
def create_row(schema, row_dict):
row_tuple = ()
for fields in schema:
element = get_element(fields.name, row_dict)
row_tuple = (*row_tuple, element)
return row_tuple
def fill(schema, values):
spark = (
SparkSession
.builder
.master("local[*]")
.appName("pysparktest")
.getOrCreate()
)
return \
spark.createDataFrame(
spark.sparkContext.parallelize(
[(Row(create_row(schema.fields, row_dict)) for row_dict in values)]
),
schema
)
This is how I'm calling the function:
schema = T.StructType([T.StructField("base_currency", T.StringType()),
T.StructField("target_currency", T.StringType()),
T.StructField("valid_from", T.StringType()),
T.StructField("valid_until", T.StringType())])
values = [
{"base_currency": "USD", "target_currency": "EUR", "valid_from": "test",
"valid_until": "test"},
{"base_currency": "USD1", "target_currency": "EUR2"}
]
fill(schema, values).show()
Error message:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
test_utilities/create_df_from_schema.py:37: in fill
[(Row(create_row(schema.fields, row_dict)) for row_dict in values)]
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/context.py:566: in parallelize
jrdd = self._serialize_to_jvm(c, serializer, reader_func, createRDDServer)
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/context.py:603: in _serialize_to_jvm
serializer.dump_stream(data, tempFile)
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/serializers.py:211: in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/serializers.py:133: in dump_stream
self._write_with_length(obj, stream)
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/serializers.py:143: in _write_with_length
serialized = self.dumps(obj)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = PickleSerializer()
obj = [<generator object fill.<locals>.<genexpr> at 0x1091b9350>]
def dumps(self, obj):
> return pickle.dumps(obj, pickle_protocol)
E TypeError: can't pickle generator objects
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/serializers.py:427: TypeError
Somehow the syntax to construct the data frame is not right.
Upvotes: 0
Views: 200
Reputation: 32640
You are already returning tuples from create_row
function, you don't need to create Row
object, simply pass the list of tuples to spark.createDataFrame
like this:
def fill(schema, values):
return spark.createDataFrame(
[create_row(schema.fields, row_dict) for row_dict in values],
schema
)
Now you can call:
fill(schema, values).show()
#+-------------+---------------+----------+-----------+
#|base_currency|target_currency|valid_from|valid_until|
#+-------------+---------------+----------+-----------+
#| USD| EUR| test| test|
#| USD1| EUR2| null| null|
#+-------------+---------------+----------+-----------+
Moreover, you can actually simplify your code to a one line list-comprehension without having to define those functions:
spark.createDataFrame(
[[row.get(f.name) for f in schema.fields] for row in values],
schema
).show()
Calling .get(key)
on a dict object returns None if the key
does not exist.
Upvotes: 1