Tom McLean
Tom McLean

Reputation: 6349

Faust consumer/agent doesnt run on the first initialization

When I run my docker-compose for the first time and run my faust app for the first time after that, the producer sends messages ok but the consumer doesn't get the messages. If I restart the app, it works fine. My app looks like:

import random

import faust
from datetime import timedelta, datetime
from time import time


class StockTransaction(faust.Record):
    date: datetime
    price: float
    stock: str


class Candlestick(faust.Record):
    start_aggregation_period_timestamp: datetime
    end_aggregation_period_timestamp: datetime
    start_price: float
    high_price: float
    low_price: float
    end_price: float
    aggregation_count: int

    def aggregate_transaction(self, stock_transaction: StockTransaction):
        unit_price = stock_transaction.price

        if self.aggregation_count == 0:
            self.start_aggregation_period_timestamp = stock_transaction.date
            self.end_aggregation_period_timestamp = stock_transaction.date
            self.start_price = unit_price
            self.low_price = unit_price
            self.end_price = unit_price

        if self.start_aggregation_period_timestamp > stock_transaction.date:
            self.start_aggregation_period_timestamp = stock_transaction.date
            self.start_price = unit_price

        if self.end_aggregation_period_timestamp < stock_transaction.date:
            self.end_aggregation_period_timestamp = stock_transaction.date
            self.end_price = unit_price

        self.high_price = max(self.high_price or unit_price, unit_price)
        self.low_price = min(self.low_price or unit_price, unit_price)
        self.aggregation_count += 1


TOPIC = 'raw-event'
SINK = 'agg-event'
TABLE = 'tumbling_table'
KAFKA = 'kafka://localhost:9092'
CLEANUP_INTERVAL = 1.0
WINDOW = 10
WINDOW_EXPIRES = 20
PARTITIONS = 1


app = faust.App(TABLE, broker=KAFKA, topic_partitions=1, version=1)
app.conf.table_cleanup_interval = CLEANUP_INTERVAL


source = app.topic(TOPIC, value_type=StockTransaction, key_type=str)
sink = app.topic(SINK, value_type=Candlestick)


def window_processor(stock, candlestick):
    print(candlestick)
    sink.send_soon(value=candlestick)


candlesticks = app.Table(
    TABLE,
    default=lambda: Candlestick(
        start_aggregation_period_timestamp=None,
        end_aggregation_period_timestamp=None,
        start_price=0.0,
        high_price=0.0,
        low_price=0.0,
        end_price=0.0,
        aggregation_count=0
    ),
    partitions=1,
    on_window_close=window_processor
).tumbling(
    timedelta(seconds=WINDOW),
    expires=timedelta(seconds=WINDOW_EXPIRES)
).relative_to_field(StockTransaction.date)


@app.timer(0.1)
async def produce():
    price = random.uniform(100, 200)
    await source.send(
        key="AAPL",
        value=StockTransaction(stock="AAPL", price=price, date=int(time()))
    )


@app.agent(source)
async def consume(transactions):
    transaction: StockTransaction
    async for transaction in transactions:
        candlestick_window = candlesticks[transaction.stock]
        current_window = candlestick_window.current()
        current_window.aggregate_transaction(transaction)
        candlesticks[transaction.stock] = current_window


if __name__ == '__main__':
    app.main()

And the docker-compose looks like:

version: '3.6'

services:

  zookeeper:
    image: wurstmeister/zookeeper:latest
    ports:
      - '2181:2181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    restart: unless-stopped

  kafka:
    image: wurstmeister/kafka:latest
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ADVERTISED_HOST_NAME: kafka:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://kafka:29092,OUTSIDE://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,OUTSIDE://localhost:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CREATE_TOPICS: 'order-books:3:2'
    restart: unless-stopped

  kafdrop:
    image: obsidiandynamics/kafdrop:3.27.0
    depends_on:
      - kafka
      - zookeeper
    ports:
      - 19000:9000
    environment:
      KAFKA_BROKERCONNECT: kafka:29092
    restart: unless-stopped

On the first initialization I get:

C:\Users\mclea\anaconda3\envs\faust-candlesticks\python.exe C:\Users\mclea\src\faust-candlesticks\app.py worker -l debug 
+ƒaµS† v0.11.2+--------------------------------------------------------------+
| id          | tumbling_table                                               |
| transport   | [URL('kafka://localhost:9092')]                              |
| store       | memory:                                                      |
| web         | http://localhost:6066/                                       |
| log         | -stderr- (debug)                                             |
| pid         | 11664                                                        |
| hostname    | tommclean                                                    |
| platform    | CPython 3.11.9 (Windows AMD64)                               |
|        +    | Cython (MSC v.1916 64 bit (AMD64))                           |
| drivers     |                                                              |
|   transport | aiokafka=0.11.0                                              |
|   web       | aiohttp=3.10.5                                               |
| datadir     | C:\Users\mclea\src\faust-candlesticks\tumbling_table-data    |
| appdir      | C:\Users\mclea\src\faust-candlesticks\tumbling_table-data\v1 |
+-------------+--------------------------------------------------------------+
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Topic raw-event is not available during auto-create initialization
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Topic raw-event is not available during auto-create initialization
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Topic raw-event is not available during auto-create initialization
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Topic raw-event is not available during auto-create initialization
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Topic raw-event is not available during auto-create initialization
 OK ^

And it sends messages fine to the raw-event topic, but the consumer doesn't read any messages. On the second time running the app, I get:

C:\Users\mclea\anaconda3\envs\faust-candlesticks\python.exe C:\Users\mclea\src\faust-candlesticks\app.py worker -l debug 
+ƒaµS† v0.11.2+--------------------------------------------------------------+
| id          | tumbling_table                                               |
| transport   | [URL('kafka://localhost:9092')]                              |
| store       | memory:                                                      |
| web         | http://localhost:6066/                                       |
| log         | -stderr- (debug)                                             |
| pid         | 9728                                                         |
| hostname    | tommclean                                                    |
| platform    | CPython 3.11.9 (Windows AMD64)                               |
|        +    | Cython (MSC v.1916 64 bit (AMD64))                           |
| drivers     |                                                              |
|   transport | aiokafka=0.11.0                                              |
|   web       | aiohttp=3.10.5                                               |
| datadir     | C:\Users\mclea\src\faust-candlesticks\tumbling_table-data    |
| appdir      | C:\Users\mclea\src\faust-candlesticks\tumbling_table-data\v1 |
+-------------+--------------------------------------------------------------+
 OK ^
<Candlestick: start_aggregation_period_timestamp=1724361217, end_aggregation_period_timestamp=1724361219, start_price=189.76106609658018, high_price=189.76106609658018, low_price=105.2808779884955, end_price=105.2808779884955, aggregation_count=24>
<Candlestick: start_aggregation_period_timestamp=1724361220, end_aggregation_period_timestamp=1724361229, start_price=187.37812934079548, high_price=199.99724165672342, low_price=100.35921078816915, end_price=193.91107473467878, aggregation_count=91>
...
Topic agg-event is not available during auto-create initialization
Topic agg-event is not available during auto-create initialization

So works fine. How can I fix this so that the app works successfully on the first launch?

Upvotes: 0

Views: 42

Answers (0)

Related Questions