Reputation: 157
I looked at spark-rdd to dataframe.
I read my gziped json into rdd
rdd1 =sc.textFile('s3://cw-milenko-tests/Json_gzips/ticr_calculated_2_2020-05-27T11-59-06.json.gz')
I want to convert it to spark dataframe. The first method from the linked SO question does not work. This is the first row form the file
{"code_event": "1092406", "code_event_system": "LOTTO", "company_id": "2", "date_event": "2020-05-27 12:00:00.000", "date_event_real": "0001-01-01 00:00:00.000", "ecode_class": "", "ecode_event": "183", "eperiod_event": "", "etl_date": "2020-05-27", "event_no": 1, "group_no": 0, "name_event": "Ungaria Putto - 8/20", "name_event_short": "Ungaria Putto - 8/20", "odd_coefficient": 1, "odd_coefficient_entry": 1, "odd_coefficient_user": 1, "odd_ekey": "11", "odd_name": "11", "odd_status": "", "odd_type": "11", "odd_voidfactor": 0, "odd_win_types": "", "special_bet_value": "", "ticket_id": "899M-E2X93P", "id_update": 8000001036823656, "topic_group": "cwg5", "kafka_key": "899M-E2X93P", "kafka_epoch": 1590580609424, "kafka_partition": 0, "kafka_topic": "tickets-calculated_2"}
How to infer the schema?
SO answer says
schema = StructType([StructField(str(i), StringType(), True) for i in range(32)])
Why range(32) ?
Upvotes: 0
Views: 2097
Reputation: 87369
range(32)
in that example is just an example - they are generating schema with 32 columns, each of them having the number as a name. If you really want to define schema, then you need to explicitly define every column:
from pyspark.sql.types import *
schema = StructType([
StructField('code_event', IntegerType(), True),
StructField('code_event_system', StringType(), True),
...
])
But better way would be to avoid use of the RDD API, and directly read the file into a dataframe with following code (see documentation):
>>> data = spark.read.json('s3://cw-milenko-tests/Json_gzips/ticr_calculated_2_2020-05-27T11-59-06.json.gz')
>>> data.printSchema()
root
|-- code_event: string (nullable = true)
|-- code_event_system: string (nullable = true)
|-- company_id: string (nullable = true)
|-- date_event: string (nullable = true)
|-- date_event_real: string (nullable = true)
|-- ecode_class: string (nullable = true)
|-- ecode_event: string (nullable = true)
|-- eperiod_event: string (nullable = true)
|-- etl_date: string (nullable = true)
....
Upvotes: 2
Reputation: 1157
To answer your question the range(32) just indicates number of columns to which StrucField class can be applied for required schema. In your case there are 30 columns. Based on your data I was able to create dataframe using below logic:
from pyspark.sql.functions import *
from pyspark.sql.types import *
data_json = {"code_event": "1092406", "code_event_system": "LOTTO", "company_id": "2", "date_event": "2020-05-27 12:00:00.000",
"date_event_real": "0001-01-01 00:00:00.000", "ecode_class": "", "ecode_event": "183", "eperiod_event": "",
"etl_date": "2020-05-27", "event_no": 1, "group_no": 0, "name_event": "Ungaria Putto - 8/20", "name_event_short": "Ungaria Putto - 8/20",
"odd_coefficient": 1, "odd_coefficient_entry": 1, "odd_coefficient_user": 1, "odd_ekey": "11", "odd_name": "11", "odd_status": "",
"odd_type": "11", "odd_voidfactor": 0, "odd_win_types": "", "special_bet_value": "", "ticket_id": "899M-E2X93P", "id_update": 8000001036823656,
"topic_group": "cwg5", "kafka_key": "899M-E2X93P", "kafka_epoch": 1590580609424, "kafka_partition": 0, "kafka_topic": "tickets-calculated_2"}
column_names = [x for x in data_json.keys()]
row_data = [([x for x in data_json.values()])]
input = []
for i in column_names:
if str(type(data_json[i])).__contains__('str') :
input.append(StructField(str(i), StringType(), True))
elif str(type(data_json[i])).__contains__('int') and len(str(data_json[i])) <= 8:
input.append(StructField(str(i), IntegerType(), True))
else :
input.append(StructField(str(i), LongType(), True))
schema = StructType(input)
data = spark.createDataFrame(row_data, schema)
data.show()
Output
# +----------+-----------------+----------+--------------------+--------------------+-----------+-----------+-------------+----------+--------+--------+--------------------+--------------------+---------------+---------------------+--------------------+--------+--------+----------+--------+--------------+-------------+-----------------+-----------+----------------+-----------+-----------+-------------+---------------+--------------------+
# |code_event|code_event_system|company_id| date_event| date_event_real|ecode_class|ecode_event|eperiod_event| etl_date|event_no|group_no| name_event| name_event_short|odd_coefficient|odd_coefficient_entry|odd_coefficient_user|odd_ekey|odd_name|odd_status|odd_type|odd_voidfactor|odd_win_types|special_bet_value| ticket_id| id_update|topic_group| kafka_key| kafka_epoch|kafka_partition| kafka_topic|
# +----------+-----------------+----------+--------------------+--------------------+-----------+-----------+-------------+----------+--------+--------+--------------------+--------------------+---------------+---------------------+--------------------+--------+--------+----------+--------+--------------+-------------+-----------------+-----------+----------------+-----------+-----------+-------------+---------------+--------------------+
# | 1092406| LOTTO| 2|2020-05-27 12:00:...|0001-01-01 00:00:...| | 183| |2020-05-27| 1| 0|Ungaria Putto - 8/20|Ungaria Putto - 8/20| 1| 1| 1| 11| 11| | 11| 0| | |899M-E2X93P|8000001036823656| cwg5|899M-E2X93P|1590580609424| 0|tickets-calculated_2|
# +----------+-----------------+----------+--------------------+--------------------+-----------+-----------+-------------+----------+--------+--------+--------------------+--------------------+---------------+---------------------+--------------------+--------+--------+----------+--------+--------------+-------------+-----------------+-----------+----------------+-----------+-----------+-------------+---------------+--------------------+
Upvotes: 3