abkf12
abkf12

Reputation: 221

How to handle timestamp in Pyspark Structured Streaming

I'm trying to parse the datetime to later do group by at certain hours in structured streaming.

Currently I have code like this:

distinct_table = service_table\
        .select(psf.col('crime_id'),
                psf.col('original_crime_type_name'),
                psf.to_timestamp(psf.col('call_date_time')).alias('call_datetime'),
                psf.col('address'),
                psf.col('disposition'))

Which gives output in console:

+---------+------------------------+-------------------+--------------------+------------+
| crime_id|original_crime_type_name|      call_datetime|             address| disposition|
+---------+------------------------+-------------------+--------------------+------------+
|183652852|                Burglary|2018-12-31 18:52:00|600 Block Of Mont...|         HAN|
|183652839|            Passing Call|2018-12-31 18:51:00|500 Block Of Clem...|         HAN|
|183652841|                  22500e|2018-12-31 18:51:00|2600 Block Of Ale...|         CIT|

When I try to apply this udf to convert the timestamp (call_datetime column):

import pyspark.sql.functions as psf
from dateutil.parser import parse as parse_date

@psf.udf(StringType())
def udf_convert_time(timestamp):
    d = parse_date(timestamp)
    return str(d.strftime('%y%m%d%H'))

I get a Nonetype error..

File "/Users/dev/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/Users/dev/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/dev/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 149, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "<string>", line 1, in <lambda>
  File "/Users/dev/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 74, in <lambda>
    return lambda *a: f(*a)
  File "/Users/PycharmProjects/data-streaming-project/solution/streaming/data_stream.py", line 29, in udf_convert_time
    d = parse_date(timestamp)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/dateutil/parser.py", line 697, in parse
    return DEFAULTPARSER.parse(timestr, **kwargs)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/dateutil/parser.py", line 301, in parse
    res = self._parse(timestr, **kwargs)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/dateutil/parser.py", line 349, in _parse
    l = _timelex.split(timestr)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/dateutil/parser.py", line 143, in split
    return list(cls(s))
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/dateutil/parser.py", line 137, in next
    token = self.get_token()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/dateutil/parser.py", line 68, in get_token
    nextchar = self.instream.read(1)
AttributeError: 'NoneType' object has no attribute 'read'

This is the query plan:

pyspark.sql.utils.StreamingQueryException: u'Writing job aborted.\n=== Streaming Query ===\nIdentifier: [id = 958a6a46-f718-49c4-999a-661fea2dc564, runId = fc9a7a78-c311-42b7-bbed-7718b4cc1150]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {KafkaSource[Subscribe[service-calls]]: {"service-calls":{"0":200}}}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nProject [crime_id#25, original_crime_type_name#26, call_datetime#53, address#33, disposition#32, udf_convert_time(call_datetime#53) AS parsed_time#59]\n+- Project [crime_id#25, original_crime_type_name#26, to_timestamp(\'call_date_time, None) AS call_datetime#53, address#33, disposition#32]\n   +- Project [SERVICE_CALLS#23.crime_id AS crime_id#25, SERVICE_CALLS#23.original_crime_type_name AS original_crime_type_name#26, SERVICE_CALLS#23.report_date AS report_date#27, SERVICE_CALLS#23.call_date AS call_date#28, SERVICE_CALLS#23.offense_date AS offense_date#29, SERVICE_CALLS#23.call_time AS call_time#30, SERVICE_CALLS#23.call_date_time AS call_date_time#31, SERVICE_CALLS#23.disposition AS disposition#32, SERVICE_CALLS#23.address AS address#33, SERVICE_CALLS#23.city AS city#34, SERVICE_CALLS#23.state AS state#35, SERVICE_CALLS#23.agency_id AS agency_id#36, SERVICE_CALLS#23.address_type AS address_type#37, SERVICE_CALLS#23.common_location AS common_location#38]\n      +- Project [jsontostructs(StructField(crime_id,StringType,true), StructField(original_crime_type_name,StringType,true), StructField(report_date,StringType,true), StructField(call_date,StringType,true), StructField(offense_date,StringType,true), StructField(call_time,StringType,true), StructField(call_date_time,StringType,true), StructField(disposition,StringType,true), StructField(address,StringType,true), StructField(city,StringType,true), StructField(state,StringType,true), StructField(agency_id,StringType,true), StructField(address_type,StringType,true), StructField(common_location,StringType,true), value#21, Some(America/Los_Angeles)) AS SERVICE_CALLS#23]\n         +- Project [cast(value#8 as string) AS value#21]\n            +- StreamingExecutionRelation KafkaSource[Subscribe[service-calls]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]\n'

I'm using StringType for all columns and using to_timestamp for timestamp columns (which seems to work).

I verified and all the data I'm using (just like 100 rows) do have values. Any idea how to debug this?

EDIT

Input is coming from Kafka - Schema is shown above in the error log (all StringType())

Upvotes: 0

Views: 1362

Answers (1)

thePurplePython
thePurplePython

Reputation: 2767

Best not to use udf because they don't use spark catalyst optimizer and especially when the spark.sql.functions modules have functions available. This code will transform your timestamp.

import pyspark.sql.functions as F
import pyspark.sql.types as T

rawData = [(183652852, "Burglary", "2018-12-31 18:52:00", "600 Block Of Mont", "HAN"),
           (183652839, "Passing Call", "2018-12-31 18:51:00", "500 Block Of Clem", "HAN"),
           (183652841, "22500e", "2018-12-31 18:51:00", "2600 Block Of Ale", "CIT")]

df = spark.createDataFrame(rawData).toDF("crime_id",\
                                         "original_crime_type_name",\
                                         "call_datetime",\
                                         "address",\
                                         "disposition")

date_format_source="yyyy-MM-dd HH:mm:ss"
date_format_target="yyyy-MM-dd HH"

df.select("*")\
.withColumn("new_time_format",\
            F.from_unixtime(F.unix_timestamp(F.col("call_datetime"),\
                                             date_format_source),\
                            date_format_target)\
            .cast(T.TimestampType()))\
.withColumn("time_string", F.date_format(F.col("new_time_format"), "yyyyMMddHH"))\
.select("call_datetime", "new_time_format", "time_string")\
.show(truncate=True)

+-------------------+-------------------+-----------+
|      call_datetime|    new_time_format|time_string|
+-------------------+-------------------+-----------+
|2018-12-31 18:52:00|2018-12-31 18:00:00| 2018123118|
|2018-12-31 18:51:00|2018-12-31 18:00:00| 2018123118|
|2018-12-31 18:51:00|2018-12-31 18:00:00| 2018123118|
+-------------------+-------------------+-----------+

Upvotes: 2

Related Questions