Reputation: 383
I'll try to simplify the problem I am trying to solve. I have an employee data stream which is being read from a JSON file and has the following schema:
StructType([ \
StructField("timeStamp", TimestampType()),\
StructField("emp_id", LongType()),\
StructField("on_duty", LongType()) ])
# on_duty is an int boolean-> 0,1
Sample:
{"timeStamp": 1514765160, "emp_id": 12471979, "on_duty": 0}
{"timeStamp": 1514765161, "emp_id": 12472154, "on_duty": 1}
I would like to find out 2 things every minute, the total number of employees online and those NOT on duty and process it using structured spark streaming
This is per minute wrt. the timestamp, not the system time.
_producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:
json.dumps(x).encode('utf-8'))
# schedule.every(1).minutes.do(_producer.send(topic_name, value=( json.loads(json.dumps(dataDict))) ) )
with open(filepath, 'r', encoding="utf16") as f:
for item in json_lines.reader(f):
dataDict.update({'timeStamp':item['timestamp'],
'emp_id':item['emp_id'],
'on_duty':item['on_duty']})
_producer.send(topic_name, value=( json.loads(json.dumps(dataDict))) )
sleep(1)
# ^ Threading doesn't work BTW
emp_stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "emp_dstream") \
.option("startingOffsets", "latest") \
.load() \
.selectExpr("CAST(value AS STRING)")
emp_data = emp_stream.select([
get_json_object(col("value").cast("string"), "$.{}".format(c)).alias(c)
for c in ["timeStamp", "emp_id", "on_duty"]])
# this query is a filler attempt which is not the end goal of the task
query = emp_data.groupBy(["on_duty"]).count()
emp_data.writeStream \
.outputMode("append") \
.format("console") \
.start() \
.awaitTermination()
I am confused how to proceed. Do I make changes in the kafka producer or while processing the stream with spark? And how should I do that?
Would be grateful for any hints or help!
Update Acc to @Srinivas solution
....----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+
|[1970-01-18 04:46:00, 1970-01-18 04:47:00]|1970-01-18 04:46:05|1070 |[1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,....
-------------------------------------------
Batch: 40
-------------------------------------------
+------------------------------------------+-------------------+--------------+-----------------+
|window |timestamp |Online_emp|Available_emp|
+------------------------------------------+-------------------+--------------+-----------------+
|[2017-12-31 16:04:00, 2017-12-31 16:05:00]|2017-12-31 16:04:53|20 |12 |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:44|20 |0 |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:47|4 |0 |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:27|20 |4 |
|[2017-12-31 16:03:00, 2017-12-31 16:04:00]|2017-12-31 16:03:10|4 |0 |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:25|4 |0 |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:42|12 |4 |
|[2017-12-31 16:03:00, 2017-12-31 16:04:00]|2017-12-31 16:03:20|4 |0 |
|[2017-12-31 16:03:00, 2017-12-31 16:04:00]|2017-12-31 16:03:49|4 |0 |
|[2017-12-31 16:03:00, 2017-12-31 16:04:00]|2017-12-31 16:03:44|12 |8 |
|[2017-12-31 16:02:00, 2017-12-31 16:03:00]|2017-12-31 16:02:19|8 |4 |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:15|8 |0 |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:08|12 |4 |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:50|8 |0 |
|[2017-12-31 16:04:00, 2017-12-31 16:05:00]|2017-12-31 16:04:27|16 |0 |
|[2017-12-31 16:00:00, 2017-12-31 16:01:00]|2017-12-31 16:00:38|5 |0 |
|[2017-12-31 16:03:00, 2017-12-31 16:04:00]|2017-12-31 16:03:13|4 |4 |
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:36|8 |4 |
|[2017-12-31 16:04:00, 2017-12-31 16:05:00]|2017-12-31 16:04:59|24 |4 |
|[2017-12-31 16:00:00, 2017-12-31 16:01:00]|2017-12-31 16:00:40|10 |0 |
+------------------------------------------+-------------------+--------------+-----------------+
only showing top 20 rows
-------------------------------------------
Batch: 41
-------------------------------------------
+------------------------------------------+-------------------+--------------+-----------------+
|window |timestamp |Online_emp|Available_emp|
+------------------------------------------+-------------------+--------------+-----------------+
|[2017-12-31 16:04:00, 2017-12-31 16:05:00]|2017-12-31 16:04:53|20 |12 |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:44|20 |0 |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:47|4 |0 |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:27|20 |4 |
|[2017-12-31 16:03:00, 2017-12-31 16:04:00]|2017-12-31 16:03:10|4 |0 |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:25|4 |0 |
Update 2
How to get output like this:
Time Online_Emp Available_Emp
2019-01-01 00:00:00 52 23
2019-01-01 00:01:00 58 19
2019-01-01 00:02:00 65 28
Upvotes: 2
Views: 2279
Reputation: 10362
Use window
function.
Sample data in Kafka
{"timeStamp": 1592669811475, "emp_id": 12471979, "on_duty": 0}
{"timeStamp": 1592669811475, "emp_id": 12472154, "on_duty": 1}
{"timeStamp": 1592669811475, "emp_id": 12471980, "on_duty": 0}
{"timeStamp": 1592669811475, "emp_id": 12472181, "on_duty": 1}
{"timeStamp": 1592669691475, "emp_id": 12471982, "on_duty": 0}
{"timeStamp": 1592669691475, "emp_id": 12472183, "on_duty": 1}
{"timeStamp": 1592669691475, "emp_id": 12471984, "on_duty": 0}
{"timeStamp": 1592669571475, "emp_id": 12472185, "on_duty": 1}
{"timeStamp": 1592669571475, "emp_id": 12472186, "on_duty": 1}
{"timeStamp": 1592669571475, "emp_id": 12472187, "on_duty": 0}
{"timeStamp": 1592669571475, "emp_id": 12472188, "on_duty": 1}
{"timeStamp": 1592669631475, "emp_id": 12472185, "on_duty": 1}
{"timeStamp": 1592669631475, "emp_id": 12472186, "on_duty": 1}
{"timeStamp": 1592669631475, "emp_id": 12472187, "on_duty": 0}
{"timeStamp": 1592669631475, "emp_id": 12472188, "on_duty": 1}
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, StructField, StructType, LongType, TimestampType
schema = StructType([ \
StructField("timeStamp", LongType()), \
StructField("emp_id", LongType()), \
StructField("on_duty", LongType())])
df = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe","emp_dstream")\
.option("startingOffsets", "earliest")\
.load()\
.selectExpr("CAST(value AS STRING)")\
.select(F.from_json(F.col("value"), schema).alias("value"))\
.select(F.col("value.*"))\
.withColumn("timestamp",F.from_unixtime(F.col("timestamp") / 1000))\
.groupBy(F.window(F.col("timestamp"), "1 minutes"), F.col("timestamp"))\
.agg(F.count(F.col("timeStamp")).alias("total_employees"),F.collect_list(F.col("on_duty")).alias("on_duty"),F.sum(F.when(F.col("on_duty") == 0, F.lit(1)).otherwise(F.lit(0))).alias("not_on_duty"))\
.writeStream\
.format("console")\
.outputMode("complete")\
.option("truncate", "false")\
.start()\
.awaitTermination()
Output
+---------------------------------------------+-------------------+---------------+------------+-----------+
|window |timestamp |total_employees|on_duty |not_on_duty|
+---------------------------------------------+-------------------+---------------+------------+-----------+
|[2020-06-20 21:42:00.0,2020-06-20 21:43:00.0]|2020-06-20 21:42:51|4 |[1, 1, 0, 1]|1 |
|[2020-06-20 21:44:00.0,2020-06-20 21:45:00.0]|2020-06-20 21:44:51|3 |[0, 1, 0] |2 |
|[2020-06-20 21:46:00.0,2020-06-20 21:47:00.0]|2020-06-20 21:46:51|4 |[0, 1, 0, 1]|2 |
|[2020-06-20 21:43:00.0,2020-06-20 21:44:00.0]|2020-06-20 21:43:51|4 |[1, 1, 0, 1]|1 |
+---------------------------------------------+-------------------+---------------+------------+-----------+
Spark Batch
spark \
.read \
.schema(schema) \
.json("/tmp/data/emp_data.json") \
.select(F.to_json(F.struct("*")).cast("string").alias("value")) \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "emp_data") \
.save()
Spark Streaming
spark \
.readStream \
.schema(schema) \
.json("/tmp/data/emp_data.json") \
.select(F.to_json(F.struct("*")).cast("string").alias("value")) \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "emp_data") \
.start()
JSON data in kafka
/tmp/data> kafka-console-consumer --bootstrap-server localhost:9092 --topic emp_data
{"timeStamp":1592669811475,"emp_id":12471979,"on_duty":0}
{"timeStamp":1592669811475,"emp_id":12472154,"on_duty":1}
{"timeStamp":1592669811475,"emp_id":12471980,"on_duty":0}
{"timeStamp":1592669811475,"emp_id":12472181,"on_duty":1}
{"timeStamp":1592669691475,"emp_id":12471982,"on_duty":0}
{"timeStamp":1592669691475,"emp_id":12472183,"on_duty":1}
{"timeStamp":1592669691475,"emp_id":12471984,"on_duty":0}
{"timeStamp":1592669571475,"emp_id":12472185,"on_duty":1}
{"timeStamp":1592669571475,"emp_id":12472186,"on_duty":1}
{"timeStamp":1592669571475,"emp_id":12472187,"on_duty":0}
{"timeStamp":1592669571475,"emp_id":12472188,"on_duty":1}
{"timeStamp":1592669631475,"emp_id":12472185,"on_duty":1}
{"timeStamp":1592669631475,"emp_id":12472186,"on_duty":1}
{"timeStamp":1592669631475,"emp_id":12472187,"on_duty":0}
{"timeStamp":1592669631475,"emp_id":12472188,"on_duty":1}
^CProcessed a total of 15 messages
Upvotes: 3