Reputation: 6349
When I run my docker-compose for the first time and run my faust app for the first time after that, the producer sends messages ok but the consumer doesn't get the messages. If I restart the app, it works fine. My app looks like:
import random
import faust
from datetime import timedelta, datetime
from time import time
class StockTransaction(faust.Record):
date: datetime
price: float
stock: str
class Candlestick(faust.Record):
start_aggregation_period_timestamp: datetime
end_aggregation_period_timestamp: datetime
start_price: float
high_price: float
low_price: float
end_price: float
aggregation_count: int
def aggregate_transaction(self, stock_transaction: StockTransaction):
unit_price = stock_transaction.price
if self.aggregation_count == 0:
self.start_aggregation_period_timestamp = stock_transaction.date
self.end_aggregation_period_timestamp = stock_transaction.date
self.start_price = unit_price
self.low_price = unit_price
self.end_price = unit_price
if self.start_aggregation_period_timestamp > stock_transaction.date:
self.start_aggregation_period_timestamp = stock_transaction.date
self.start_price = unit_price
if self.end_aggregation_period_timestamp < stock_transaction.date:
self.end_aggregation_period_timestamp = stock_transaction.date
self.end_price = unit_price
self.high_price = max(self.high_price or unit_price, unit_price)
self.low_price = min(self.low_price or unit_price, unit_price)
self.aggregation_count += 1
TOPIC = 'raw-event'
SINK = 'agg-event'
TABLE = 'tumbling_table'
KAFKA = 'kafka://localhost:9092'
CLEANUP_INTERVAL = 1.0
WINDOW = 10
WINDOW_EXPIRES = 20
PARTITIONS = 1
app = faust.App(TABLE, broker=KAFKA, topic_partitions=1, version=1)
app.conf.table_cleanup_interval = CLEANUP_INTERVAL
source = app.topic(TOPIC, value_type=StockTransaction, key_type=str)
sink = app.topic(SINK, value_type=Candlestick)
def window_processor(stock, candlestick):
print(candlestick)
sink.send_soon(value=candlestick)
candlesticks = app.Table(
TABLE,
default=lambda: Candlestick(
start_aggregation_period_timestamp=None,
end_aggregation_period_timestamp=None,
start_price=0.0,
high_price=0.0,
low_price=0.0,
end_price=0.0,
aggregation_count=0
),
partitions=1,
on_window_close=window_processor
).tumbling(
timedelta(seconds=WINDOW),
expires=timedelta(seconds=WINDOW_EXPIRES)
).relative_to_field(StockTransaction.date)
@app.timer(0.1)
async def produce():
price = random.uniform(100, 200)
await source.send(
key="AAPL",
value=StockTransaction(stock="AAPL", price=price, date=int(time()))
)
@app.agent(source)
async def consume(transactions):
transaction: StockTransaction
async for transaction in transactions:
candlestick_window = candlesticks[transaction.stock]
current_window = candlestick_window.current()
current_window.aggregate_transaction(transaction)
candlesticks[transaction.stock] = current_window
if __name__ == '__main__':
app.main()
And the docker-compose looks like:
version: '3.6'
services:
zookeeper:
image: wurstmeister/zookeeper:latest
ports:
- '2181:2181'
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
restart: unless-stopped
kafka:
image: wurstmeister/kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_HOST_NAME: kafka:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://kafka:29092,OUTSIDE://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,OUTSIDE://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CREATE_TOPICS: 'order-books:3:2'
restart: unless-stopped
kafdrop:
image: obsidiandynamics/kafdrop:3.27.0
depends_on:
- kafka
- zookeeper
ports:
- 19000:9000
environment:
KAFKA_BROKERCONNECT: kafka:29092
restart: unless-stopped
On the first initialization I get:
C:\Users\mclea\anaconda3\envs\faust-candlesticks\python.exe C:\Users\mclea\src\faust-candlesticks\app.py worker -l debug
+ƒaµS† v0.11.2+--------------------------------------------------------------+
| id | tumbling_table |
| transport | [URL('kafka://localhost:9092')] |
| store | memory: |
| web | http://localhost:6066/ |
| log | -stderr- (debug) |
| pid | 11664 |
| hostname | tommclean |
| platform | CPython 3.11.9 (Windows AMD64) |
| + | Cython (MSC v.1916 64 bit (AMD64)) |
| drivers | |
| transport | aiokafka=0.11.0 |
| web | aiohttp=3.10.5 |
| datadir | C:\Users\mclea\src\faust-candlesticks\tumbling_table-data |
| appdir | C:\Users\mclea\src\faust-candlesticks\tumbling_table-data\v1 |
+-------------+--------------------------------------------------------------+
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Topic raw-event is not available during auto-create initialization
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Topic raw-event is not available during auto-create initialization
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Topic raw-event is not available during auto-create initialization
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Topic raw-event is not available during auto-create initialization
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Topic raw-event is not available during auto-create initialization
OK ^
And it sends messages fine to the raw-event
topic, but the consumer doesn't read any messages. On the second time running the app, I get:
C:\Users\mclea\anaconda3\envs\faust-candlesticks\python.exe C:\Users\mclea\src\faust-candlesticks\app.py worker -l debug
+ƒaµS† v0.11.2+--------------------------------------------------------------+
| id | tumbling_table |
| transport | [URL('kafka://localhost:9092')] |
| store | memory: |
| web | http://localhost:6066/ |
| log | -stderr- (debug) |
| pid | 9728 |
| hostname | tommclean |
| platform | CPython 3.11.9 (Windows AMD64) |
| + | Cython (MSC v.1916 64 bit (AMD64)) |
| drivers | |
| transport | aiokafka=0.11.0 |
| web | aiohttp=3.10.5 |
| datadir | C:\Users\mclea\src\faust-candlesticks\tumbling_table-data |
| appdir | C:\Users\mclea\src\faust-candlesticks\tumbling_table-data\v1 |
+-------------+--------------------------------------------------------------+
OK ^
<Candlestick: start_aggregation_period_timestamp=1724361217, end_aggregation_period_timestamp=1724361219, start_price=189.76106609658018, high_price=189.76106609658018, low_price=105.2808779884955, end_price=105.2808779884955, aggregation_count=24>
<Candlestick: start_aggregation_period_timestamp=1724361220, end_aggregation_period_timestamp=1724361229, start_price=187.37812934079548, high_price=199.99724165672342, low_price=100.35921078816915, end_price=193.91107473467878, aggregation_count=91>
...
Topic agg-event is not available during auto-create initialization
Topic agg-event is not available during auto-create initialization
So works fine. How can I fix this so that the app works successfully on the first launch?
Upvotes: 0
Views: 42