Reputation: 33
I am working on crypto data I have fetched 1 minute kline data from binance exchange using RUST and send to kafka(redpanda) topic name "WS_Binance_Data_ALL"
consuming "WS_Binance_Data_ALL" kline data from kafka (redpanda) and processing data to aggregate 5 minutes of candlestick data in one minute
Sending this processed aggregated data to another topic "WS_Binance_Data_ALL_5min"
Problem is This processed data is missing one latest candle stick
`
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, from_json, col, expr
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, LongType
import logging, os, shutil
import json
from datetime import datetime
# Set up logging
log_file_path = os.path.dirname(os.path.abspath(__file__))
log_file = os.path.join(log_file_path, "tumbleLOG.log")
logging.basicConfig(
filename=log_file,
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s]: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
# Initialize Spark session
spark = SparkSession.builder \
.appName("KafkaCandlestick") \
.config("spark.sql.streaming.stateStore.stateSchemaCheck", "false") \
.config("spark.sql.shuffle.partitions", "4") \
.config("spark.streaming.backpressure.enabled", "true") \
.config("spark.streaming.kafka.maxRatePerPartition", "100") \
.config("spark.streaming.stopGracefullyOnShutdown", "true") \
.getOrCreate()
# Define the schema for the input data
schema = StructType([
StructField("base_asset_value", StringType()),
StructField("close_price", StringType()),
StructField("close_time", LongType()),
StructField("high_price", StringType()),
StructField("ignore_value", StringType()),
StructField("low_price", StringType()),
StructField("number_of_trades", StringType()),
StructField("open_price", StringType()),
StructField("open_time", LongType()),
StructField("symbol", StringType()),
StructField("taker_buy_base_asset_volume", StringType()),
StructField("taker_buy_quote_asset_volume", StringType()),
StructField("volume", StringType()),
StructField("x", StringType())
])
# Define Kafka parameters
kafka_bootstrap_servers = "localhost:9092"
input_topic = "WS_Binance_Data_ALL"
output_topic = "WS_Binance_Data_ALL_5min"
# Clear checkpoint directory
checkpoint_dir = "/tmp/candlestick_checkpoint"
if os.path.exists(checkpoint_dir):
shutil.rmtree(checkpoint_dir)
# Read data from Kafka topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", input_topic) \
.option("startingOffsets", "earliest") \
.option("maxOffsets", "10 minutes ago") \
.load()
# .option("startingOffsets", "earliest") \
#latest
# Deserialize JSON data and select necessary columns
df = df.selectExpr("CAST(value AS STRING)") \
.select(from_json("value", schema).alias("data")) \
.select("data.*") \
.withColumn("close_time", (col("close_time") / 1000).cast(TimestampType())) \
.withColumn("open_time", (col("open_time") / 1000).cast(TimestampType())) \
.withColumn("high_price", col("high_price").cast(DoubleType())) \
.withColumn("low_price", col("low_price").cast(DoubleType())) \
.withColumn("open_price", col("open_price").cast(DoubleType())) \
.withColumn("close_price", col("close_price").cast(DoubleType())) \
.withColumn("volume", col("volume").cast(DoubleType()))
# Group data into 5-minute candlesticks by symbol
candlestick_df = df \
.withWatermark("open_time", "5 minutes") \
.groupBy("symbol", window("open_time", "5 minutes", "5 minutes", startTime=0)) \
.agg(
expr("first(open_price) as open_price"),
expr("max(high_price) as high_price"),
expr("min(low_price) as low_price"),
expr("last(close_price) as close_price"),
expr("sum(volume) as volume")
)
candlestick_df.printSchema()
# Adjust the window end to be one second before the actual end
# candlestick_df = candlestick_df \
# .withColumn("window_end_adjusted", col("window.end") - expr("INTERVAL 1 SECOND"))
# Write aggregated data to Kafka
query = candlestick_df \
.selectExpr("CAST(symbol AS STRING) AS key", "to_json(struct(*)) AS value") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("topic", output_topic) \
.option("checkpointLocation", checkpoint_dir) \
.option("complete",checkpoint_dir) \
.trigger(processingTime='1 minute') \
.start()
# .trigger(processingTime='60 seconds') \
# .outputMode("complete") \
# .option("append",checkpoint_dir) \
# Define a function to print the watermark
def print_watermark(query):
progress = query.lastProgress
if progress:
now = datetime.now()
dt_string = now.strftime("%Y-%m-%d %H:%M:%S")
print("current datetime :", dt_string)
watermark = progress['eventTime']['watermark']
print(f"Current Watermark: {watermark}")
logging.info(f"Current Watermark: {watermark}")
# Attach a listener to the query to print the watermark
query.awaitTermination(10) # Adjust the timeout as needed
print_watermark(query)
# Continuously monitor the watermark
while True:
print_watermark(query)
query.awaitTermination(60) # Check watermark every 60 seconds
Upvotes: 0
Views: 37