Gulfraz
Gulfraz

Reputation: 33

Missing Last candlestick of 5 minutes using pyspark

  1. I am working on crypto data I have fetched 1 minute kline data from binance exchange using RUST and send to kafka(redpanda) topic name "WS_Binance_Data_ALL"

  2. consuming "WS_Binance_Data_ALL" kline data from kafka (redpanda) and processing data to aggregate 5 minutes of candlestick data in one minute

  3. Sending this processed aggregated data to another topic "WS_Binance_Data_ALL_5min"

  4. Problem is This processed data is mienter image description heressing one latest candle stickenter image description here

`

from pyspark.sql import SparkSession
from pyspark.sql.functions import window, from_json, col, expr
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, LongType
import logging, os, shutil
import json
from datetime import datetime

# Set up logging
log_file_path = os.path.dirname(os.path.abspath(__file__))
log_file = os.path.join(log_file_path, "tumbleLOG.log")
logging.basicConfig(
    filename=log_file,
    level=logging.DEBUG,
    format="%(asctime)s [%(levelname)s]: %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)

# Initialize Spark session
spark = SparkSession.builder \
    .appName("KafkaCandlestick") \
    .config("spark.sql.streaming.stateStore.stateSchemaCheck", "false") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.streaming.backpressure.enabled", "true") \
    .config("spark.streaming.kafka.maxRatePerPartition", "100") \
    .config("spark.streaming.stopGracefullyOnShutdown", "true") \
    .getOrCreate()

# Define the schema for the input data
schema = StructType([
    StructField("base_asset_value", StringType()),
    StructField("close_price", StringType()),
    StructField("close_time", LongType()),
    StructField("high_price", StringType()),
    StructField("ignore_value", StringType()),
    StructField("low_price", StringType()),
    StructField("number_of_trades", StringType()),
    StructField("open_price", StringType()),
    StructField("open_time", LongType()),
    StructField("symbol", StringType()),
    StructField("taker_buy_base_asset_volume", StringType()),
    StructField("taker_buy_quote_asset_volume", StringType()),
    StructField("volume", StringType()),
    StructField("x", StringType())
])

# Define Kafka parameters
kafka_bootstrap_servers = "localhost:9092"
input_topic = "WS_Binance_Data_ALL"
output_topic = "WS_Binance_Data_ALL_5min"

# Clear checkpoint directory
checkpoint_dir = "/tmp/candlestick_checkpoint"
if os.path.exists(checkpoint_dir):
    shutil.rmtree(checkpoint_dir)

# Read data from Kafka topic
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", input_topic) \
    .option("startingOffsets", "earliest") \
    .option("maxOffsets", "10 minutes ago") \
    .load()

# .option("startingOffsets", "earliest") \
#latest


# Deserialize JSON data and select necessary columns
df = df.selectExpr("CAST(value AS STRING)") \
    .select(from_json("value", schema).alias("data")) \
    .select("data.*") \
    .withColumn("close_time", (col("close_time") / 1000).cast(TimestampType())) \
    .withColumn("open_time", (col("open_time") / 1000).cast(TimestampType())) \
    .withColumn("high_price", col("high_price").cast(DoubleType())) \
    .withColumn("low_price", col("low_price").cast(DoubleType())) \
    .withColumn("open_price", col("open_price").cast(DoubleType())) \
    .withColumn("close_price", col("close_price").cast(DoubleType())) \
    .withColumn("volume", col("volume").cast(DoubleType()))




# Group data into 5-minute candlesticks by symbol
candlestick_df = df \
    .withWatermark("open_time", "5 minutes") \
    .groupBy("symbol", window("open_time",  "5 minutes", "5 minutes", startTime=0)) \
    .agg(
        expr("first(open_price) as open_price"),
        expr("max(high_price) as high_price"),
        expr("min(low_price) as low_price"),
        expr("last(close_price) as close_price"),
        expr("sum(volume) as volume")
    )

candlestick_df.printSchema()

# Adjust the window end to be one second before the actual end
# candlestick_df = candlestick_df \
#     .withColumn("window_end_adjusted", col("window.end") - expr("INTERVAL 1 SECOND"))

# Write aggregated data to Kafka
query = candlestick_df \
    .selectExpr("CAST(symbol AS STRING) AS key", "to_json(struct(*)) AS value") \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("topic", output_topic) \
    .option("checkpointLocation", checkpoint_dir) \
    .option("complete",checkpoint_dir) \
    .trigger(processingTime='1 minute') \
    .start()

# .trigger(processingTime='60 seconds') \
# .outputMode("complete") \
# .option("append",checkpoint_dir) \

# Define a function to print the watermark
def print_watermark(query):
    progress = query.lastProgress
    if progress:
        now = datetime.now()
        dt_string = now.strftime("%Y-%m-%d %H:%M:%S")
        print("current datetime :", dt_string)

        watermark = progress['eventTime']['watermark']
        print(f"Current Watermark: {watermark}")
        logging.info(f"Current Watermark: {watermark}")

# Attach a listener to the query to print the watermark
query.awaitTermination(10)  # Adjust the timeout as needed
print_watermark(query)

# Continuously monitor the watermark
while True:
    print_watermark(query)
    query.awaitTermination(60)  # Check watermark every 60 seconds

Upvotes: 0

Views: 37

Answers (0)

Related Questions