Reputation: 348
I have a simulated temperature sensor which send data to an Apache Kafka topic (I'm using Confluent platform 6.1.0), I need to read from this topic with Apache Spark (I'm using version 3.1.1) and I need to calculate average temperature in a window of 5 minutes (simulated sensor send data without delay..but I want to consider a delay of 60 minutes, for this reason I use withWatermark). Finally I want to write this average on another Kafka topic. In my code it seems average it is not calculated on temperature data because I have this result:
Batch: 0
-------------------------------------------
+--------------------+--------------+-----------+------------------+
| window|avg(partition)|avg(offset)|avg(timestampType)|
+--------------------+--------------+-----------+------------------+
|{2021-05-12 10:29...| 0.0| 134.5| 0.0|
|{2021-05-12 12:45...| 0.0| 175.5| 0.0|
|{2021-05-12 11:55...| 0.0| 167.0| 0.0|
|{2021-05-12 12:46...| 0.0| 178.5| 0.0|
|{2021-05-12 10:33...| 0.0| 156.5| 0.0|
|{2021-05-12 11:40...| 0.0| 160.0| 0.0|
|{2021-05-12 10:27...| 0.0| 122.5| 0.0|
|{2021-05-12 10:31...| 0.0| 146.5| 0.0|
|{2021-05-12 13:01...| 0.0| 181.5| 0.0|
|{2021-05-12 10:18...| 0.0| 80.0| 0.0|
|{2021-05-12 10:32...| 0.0| 152.5| 0.0|
|{2021-05-12 10:17...| 0.0| 37.5| 0.0|
|{2021-05-12 10:22...| 0.0| 94.0| 0.0|
|{2021-05-12 11:41...| 0.0| 164.0| 0.0|
|{2021-05-12 10:23...| 0.0| 98.5| 0.0|
|{2021-05-12 10:16...| 0.0| 3.5| 0.0|
|{2021-05-12 10:25...| 0.0| 110.5| 0.0|
|{2021-05-12 10:30...| 0.0| 140.5| 0.0|
|{2021-05-12 10:28...| 0.0| 128.5| 0.0|
|{2021-05-12 11:56...| 0.0| 171.5| 0.0|
+--------------------+--------------+-----------+------------------+
only showing top 20 rows
and it doesn't write data on Kafka because I receive the error:
pyspark.sql.utils.AnalysisException: cannot resolve '`partition`' given input columns: [avg(offset), avg(partition), avg(timestampType), window]; line 1 pos 4;
'Project [unresolvedalias('avg('partition), Some(org.apache.spark.sql.Column$$Lambda$1698/0x0000000840d45840@5117787c)), unresolvedalias(cast('key as string), None), unresolvedalias(cast('value as string), None)]
+- Aggregate [window#35], [window#35 AS window#21, avg(cast(partition#10 as bigint)) AS avg(partition)#32, avg(offset#11L) AS avg(offset)#33, avg(cast(timestampType#13 as bigint)) AS avg(timestampType)#34]
+- Filter isnotnull(timestamp#12)
+- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#12, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#12, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#12, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#12, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 60000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#12, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#12, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#12, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#12, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 60000000) + 0) + 60000000), LongType, TimestampType)) AS window#35, key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12-T60000ms, timestampType#13]
+- EventTimeWatermark timestamp#12: timestamp, 1 minutes
+- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@72bee61d, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@390238cf, [startingOffsets=earliest, kafka.bootstrap.servers=localhost:9092, subscribe=temperature], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5fbd93c5,kafka,List(),None,List(),None,Map(kafka.bootstrap.servers -> localhost:9092, subscribe -> temperature, startingOffsets -> earliest),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
This is my code (Kafka producer and Spark reader):
#Kafka producer
from confluent_kafka import Producer, KafkaError
import json
import ccloud_lib
import random
from random import randrange
import datetime
import time
if __name__ == '__main__':
# Read arguments and configurations and initialize
args = ccloud_lib.parse_args()
config_file = args.config_file
topic = args.topic
conf = ccloud_lib.read_ccloud_config(config_file)
# Create Producer instance
producer_conf = ccloud_lib.pop_schema_registry_params_from_config(conf)
producer = Producer(producer_conf)
# Create topic if needed
ccloud_lib.create_topic(conf, topic)
delivered_records = 0
# Optional per-message on_delivery handler (triggered by poll() or flush())
# when a message has been successfully delivered or
# permanently failed delivery (after retries).
def acked(err, msg):
global delivered_records
"""Delivery report handler called on
successful or failed delivery of message
"""
if err is not None:
print("Failed to deliver message: {}".format(err))
else:
delivered_records += 1
print("Produced record to topic {} partition [{}] @ offset {}"
.format(msg.topic(), msg.partition(), msg.offset()))
guid_base = "0-ZZZ12345678-"
formato = "urn:example:sensor:temp"
temp_data = {}
while 1:
rand_num = str(round(random.randrange(0, 9), 2)) + str(round(random.randrange(0, 9), 2))
temp_init_weight = round(random.uniform(-5, 5), 2)
temp_delta = round(random.uniform(10, 20), 2)
guid = guid_base + rand_num
temperature = temp_delta
today = datetime.datetime.today()
datestr = today.isoformat()
data = {'eventTime': datestr, 'temperatura': temperature} # ok
time.sleep(10)
record_key = "temperatura"
##record_value = json.dumps(temp_data)
record_value = json.dumps(data)
print("\n")
producer.produce(topic, key=record_key, value=record_value, on_delivery=acked)
producer.poll(0)
record_value = ''
producer.flush()
print("{} messages were produced to topic {}!".format(delivered_records, topic))
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
# Spark session & context
spark = (SparkSession
.builder
.master('local')
.appName('TemperatureStreamApp')
# Add kafka package
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1")
.getOrCreate())
sc = spark.sparkContext
# Create stream dataframe setting kafka server, topic and offset option
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092") # kafka server
.option("subscribe", "temperature") # topic
.option("startingOffsets", "earliest") # start from beginning
.load())
windowedAvg = df\
.withWatermark("timestamp", "5 minutes") \
.groupBy(
window(df.timestamp, "5 minutes", "60 minutes")).avg()
query = windowedAvg\
.writeStream\
.outputMode('complete')\
.format('console')\
.option('truncate', 'true')\
.start()
# write on kafka topic avgtemperature
qk = (windowedAvg \
.selectExpr("avg(partition)", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("checkpointLocation", "/home/kafka/Documenti/confluent/examples-6.1.0-post/clients/cloud/python/kafkaStream") \
.option("topic", "avgtemperature") \
.outputMode("complete") \
.start())
query.awaitTermination()
How can I calculate the average on temperature column and save in the Kafa topic avgtemperature the results? Thanks.
Upvotes: 0
Views: 454
Reputation: 10035
A few things to note based on the attempt you shared. Before attempting to process the data from the kafka stream you should extract the value and cast to your datatype of choice. Since you are submitting a json value, I used a schema temperature_schema
to extract the eventTime
and temperature
and converting those to columns in the streaming dataframe.
It is good that you included a watermark, however the column timestamp
did not exist in your streaming dataframe and as such I continue by using your eventTime
which was casted to a timestamp
.
I also made amendments to output to your kafka sink. You received the error because the kafka sink was expecting a specific set of column names notably:
Column | Type |
---|---|
key (optional) | string or binary |
value (required) | string or binary |
headers (optional) | array |
topic (*optional) | string |
partition (optional) | int |
Additional documentation when working and running kafka jobs are available here. Be sure to refer to the documentation based on your spark version.
from pyspark.sql import functions as F
from pyspark.sql import types as T
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092") # kafka server
.option("subscribe", "temperature") # topic
.option("startingOffsets", "earliest") # start from beginning
.load())
# create schema for temperature
temperature_schema = T.StructType([
T.StructField("eventTime",T.StringType(),True),
T.StructField("temperatura",T.FloatType(),True),
])
# extract temperature data and ensure `eventTime` is timestamp
df = (
df.selectExpr("CAST(value as string)")
.select(F.from_json(F.col("value"),temperature_schema).alias("json_value"))
.selectExpr("json_value.*") # gives us a dataframe with columns (eventTime,temperatura)
.select(
F.expr("CAST(eventTime as timestamp)").alias("eventTime"),
F.col("temperatura")
)
)
# when using window you will get a range or value resembling [start,end].
# I have chosen the `start` for this example
windowedAvg = (
df.withWatermark("eventTime", "5 minutes")
.groupBy(window(F.col("eventTime"), "5 minutes", "60 minutes").alias('eventTimeWindow'))
.agg(F.avg("temperatura").alias("avgtemperature"))
.select(
F.col("eventTimeWindow.start").alias("eventTime"),
F.col("avgtemperature")
)
)
# continue with your code to write to your various streams
query = windowedAvg\
.writeStream\
.outputMode('complete')\
.format('console')\
.option('truncate', 'true')\
.start()
# write on kafka topic avgtemperature
# here i've chosen as an example to use the eventTime as the key and the value to be the avgtemperature
qk = (windowedAvg
.select(
F.expr("CAST(eventTime AS STRING)").alias("key"),
F.expr("CAST(avgtemperature AS STRING)").alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "/home/kafka/Documenti/confluent/examples-6.1.0-post/clients/cloud/python/kafkaStream")
.option("topic", "avgtemperature")
.outputMode("complete")
.start())
query.awaitTermination()
Upvotes: 2