Reputation: 117
Please take time to read full question to understand the exact issue. Thankyou.
I have a runner/driver program that listens to a Kafka topic and dispatches tasks using a ThreadPoolExecuter
whenever a new message is received on the topic ( as shown below ) :
consumer = KafkaConsumer(CONSUMER_TOPIC, group_id='ME2',
bootstrap_servers=[f"{KAFKA_SERVER_HOST}:{KAFKA_SERVER_PORT}"],
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
enable_auto_commit=False,
auto_offset_reset='latest',
max_poll_records=1,
max_poll_interval_ms=300000)
with ThreadPoolExecutor(max_workers=10) as executor:
futures = []
for message in consumer:
futures.append(executor.submit(SOME_FUNCTION, ARG1, ARG2))
There is a bunch of code in between but that code is not important here so I have skipped it.
Now, the SOME_FUNCTION is from another python script that is imported ( infact there is a hierarchy of imports that happen in later stages ). What is important is that at some point in these scripts, I call the Multiprocessing
Pool because I need to do parallel processing on data ( SIMD - single instruction multiple data ) and use the apply_async function to do so.
for loop_message_chunk in loop_message_chunks:
res_list.append(self.pool.apply_async(self.one_matching.match, args=(hash_set, loop_message_chunk, fields)))
Now, I have 2 versions of the runner/driver program :
Kafka based ( the one shown above )
Listen To Kafka -> Start A Thread -> Start Multiprocessing
REST based ( using flask to achieve same task with a REST call )
Listen to REST endpoint -> Start Multiprocessing
Why 2 runner/driver scripts you ask? - this microservice will be used by multiple teams and some want synchronous REST based while some teams want a real time and asynchronous system that is KAFKA based
When I do logging from the parallelized function ( self.one_matching.match
in above example ) it works when called through the REST version but not when called using the KAFKA version ( basically when multiprocessing is kicked off by a thread - it does not work ).
Also notice that only the logging from the parallelized function does not work. rest of the scripts in the hierarchy from runner to the script that calls apply_async - which includes scripts that are called from within the thread - log successfully.
Other details :
logging.getLogger
in every other script called after the runner script to get specific loggers to log to different filesLogger Config ( values replaced with generic since I cannot chare exact names ):
version: 1
formatters:
simple:
format: '%(asctime)s | %(name)s | %(filename)s : %(funcName)s : %(lineno)d | %(levelname)s :: %(message)s'
custom1:
format: '%(asctime)s | %(filename)s :: %(message)s'
time-message:
format: '%(asctime)s | %(message)s'
handlers:
console:
class: logging.StreamHandler
level: DEBUG
formatter: simple
stream: ext://sys.stdout
handler1:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 5
formatter: simple
level: DEBUG
filename: logs/logfile1.log
handler2:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 30
formatter: custom1
level: INFO
filename: logs/logfile2.log
handler3:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 30
formatter: time-message
level: DEBUG
filename: logs/logfile3.log
handler4:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 30
formatter: time-message
level: DEBUG
filename: logs/logfile4.log
handler5:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 5
formatter: simple
level: DEBUG
filename: logs/logfile5.log
loggers:
logger1:
level: DEBUG
handlers: [console, handler1]
propagate: no
logger2:
level: DEBUG
handlers: [console, handler5]
propagate: no
logger3:
level: INFO
handlers: [handler2]
propagate: no
logger4:
level: DEBUG
handlers: [console, handler3]
propagate: no
logger5:
level: DEBUG
handlers: [console, handler4]
propagate: no
kafka:
level: WARNING
handlers: [console]
propogate: no
root:
level: INFO
handlers: [console]
propogate: no
Upvotes: 3
Views: 3102
Reputation: 771
First of all, I'm not using exactly the same stack. I'm using fastaapi and Redis pubsub and it would be tedious for me to replicate it for flask and Kafka now. I think in principle it should work the same way. At least it might point you tome some misconfiguration in your code. Also, I'm hardcoding the logger config.
I'm sorry to paste a lot of code but I want to provide a complete working example, maybe I'm missing something in your description, you haven't provided a minimal working example.
I have four files:
app.py (fastapi application)
config.py (setup config variables and logger)
redis_ps (redis consumer/listener)
utils (processing function (some_function), redis publish function)
and redis container
docker pull redis
docker run --restart unless-stopped --publish 6379:6379 --name redis -d redis
python3 app.py (will run server and pubsub listener)
python3 utils.py (will publish message over pubsub)
curl -X 'POST' \
'http://0.0.0.0:5000/sync' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '[[2,4],[6, 8]]'
[2021-12-08 17:54:32,688] DEBUG in utils: Run some_function, caller: pubsub
[2021-12-08 17:54:32,688] DEBUG in utils: Run some_function, caller: pubsub
[2021-12-08 17:54:32,698] DEBUG in utils: caller: pubsub, Processing 1, result 1
[2021-12-08 17:54:32,698] DEBUG in utils: caller: pubsub, Processing 3, result 9
[2021-12-08 17:54:32,698] DEBUG in utils: caller: pubsub, Processing 5, result 25
[2021-12-08 17:54:32,698] DEBUG in utils: caller: pubsub, Processing 7, result 49
[2021-12-08 17:54:39,519] DEBUG in utils: Run some_function, caller: rest api
[2021-12-08 17:54:39,520] DEBUG in utils: Run some_function, caller: rest api
[2021-12-08 17:54:39,531] DEBUG in utils: caller: rest api, Processing 8, result 64
[2021-12-08 17:54:39,531] DEBUG in utils: caller: rest api, Processing 6, result 36
[2021-12-08 17:54:39,531] DEBUG in utils: caller: rest api, Processing 2, result 4
[2021-12-08 17:54:39,531] DEBUG in utils: caller: rest api, Processing 4, result 16
app.py
from concurrent import futures
from typing import List
import uvicorn
from fastapi import FastAPI, APIRouter
from redis_ps import PubSubWorkerThreadListen
from utils import some_function
router = APIRouter()
@router.post("/sync")
def sync_process(data: List[List[int]]):
with futures.ThreadPoolExecutor(max_workers=2) as executor:
future_all = [executor.submit(some_function, loop_message_chunks=d, caller="rest api") for d in data]
return [future.result() for future in future_all]
def create_app():
app = FastAPI(title="app", openapi_url="/openapi.json", docs_url="/")
app.include_router(router)
thread = PubSubWorkerThreadListen()
thread.start()
return app
if __name__ == "__main__":
_app = create_app()
uvicorn.run(_app, host="0.0.0.0", port=5000, debug=True, log_level="debug")
config.py
import sys
import logging
COMPONENT_NAME = "test_logger"
REDIS_URL = "redis://localhost:6379"
def setup_logger(logger_name: str, log_level=logging.DEBUG, fmt: logging.Formatter = None):
fmt = fmt or logging.Formatter("[%(asctime)s] %(levelname)s in %(module)s: %(message)s")
handler = logging.StreamHandler(sys.stdout)
handler.name = "h_console"
handler.setFormatter(fmt)
handler.setLevel(log_level)
logger_ = logging.getLogger(logger_name)
logger_.addHandler(handler)
logger_.setLevel(log_level)
return logger_
setup_logger(COMPONENT_NAME)
redis.ps
import json
import logging
import threading
import time
from concurrent import futures
from typing import Dict, List, Union
import redis
from config import COMPONENT_NAME, REDIS_URL
from utils import some_function
logger = logging.getLogger(COMPONENT_NAME)
class PubSubWorkerThreadListen(threading.Thread):
def __init__(self):
super().__init__()
self._running = threading.Event()
@staticmethod
def connect_pubsub() -> redis.client.PubSub:
while True:
try:
r = redis.Redis.from_url(REDIS_URL)
p = r.pubsub()
p.psubscribe(["*:*:*"])
logger.info("Connected to Redis")
return p
except Exception:
time.sleep(0.1)
def run(self):
if self._running.is_set():
return
self._running.set()
while self._running.is_set():
p = self.connect_pubsub()
try:
listen(p)
except Exception as e:
logger.error(f"Failed to process Redis message or failed to connect: {e}")
time.sleep(0.1)
def stop(self):
self._running.clear()
def get_data(msg) -> Union[Dict, List]:
data = msg.get("data")
if isinstance(data, int):
# the first message has {'data': 1}
return []
try:
return json.loads(data)
except Exception as e:
logger.warning("Failed to parse data in the message (%s) with error %s", msg, e)
return []
def listen(p_):
logger.debug("Start listening")
while True:
for msg_ in p_.listen():
data = get_data(msg_)
if data:
with futures.ThreadPoolExecutor(max_workers=2) as executor:
future_all = [executor.submit(some_function, loop_message_chunks=d, caller="pubsub") for d in data]
[future.result() for future in future_all]
utils.py
import json
import logging
from multiprocessing import Pool
from typing import List
import redis
from config import COMPONENT_NAME, REDIS_URL
logger = logging.getLogger(COMPONENT_NAME)
def one_matching(v, caller: str = ""):
logger.debug(f"caller: {caller}, Processing {v}, result {v*v}")
return v * v
def some_function(loop_message_chunks: List[int], caller: str):
logger.debug(f"Run some_function, caller: {caller}")
with Pool(2) as pool:
v = [pool.apply_async(one_matching, args=(i, caller)) for i in loop_message_chunks]
res_list = [res.get(timeout=1) for res in v]
return res_list
def publish():
data = [[1, 3], [5, 7]]
r_ = redis.Redis.from_url(REDIS_URL)
logger.debug("Published message %s %s", "test", data)
r_.publish("test:test:test", json.dumps(data).encode())
if __name__ == "__main__":
publish()
Upvotes: 0
Reputation: 11075
Possible answer: get rid of the threads and use asyncio instead
example pseudocode structure (cobbled together from these examples)
#pseudocode example structure: probably has bugs...
from aiokafka import AIOKafkaConsumer
import asyncio
from concurrent.futures import ProcessPoolExecutor
from functools import partial
async def SOME_FUNCTION_CO(executor, **kwargs):
res_list = []
for loop_message_chunk in loop_message_chunks:
res_list.append(executor.submit(self.one_matching.match, hash_set, loop_message_chunk, fields))
#call concurrent.futures.wait on res_list later, and cancel unneeded futures (regarding one of your prior questions)
return res_list
async def consume():
consumer = AIOKafkaConsumer(
'my_topic', 'my_other_topic',
bootstrap_servers='localhost:9092',
group_id="my-group")
# Get cluster layout and join group `my-group`
await consumer.start()
#Global executor:
#I would also suggest using a "spawn" context unless you really need the
#performance of "fork".
ctx = multiprocessing.get_context("spawn")
tasks = [] #similar to futures in your example (Task subclasses asyncio.Future which is similar to concurrent.futures.Future as well)
with ProcessPoolExecutor(mp_context=ctx) as executor:
try:
# Consume messages
async for msg in consumer:
tasks.append(asyncio.create_task(SOME_FUNCTION_CO(executor, **kwargs)))
finally:
# Will leave consumer group; perform autocommit if enabled.
await consumer.stop()
if __name__ == "__main__":
asyncio.run(consume())
I keep going back and forth on how I think I should represent SOME_FUNCTION
in this example, but the key point here is that in the loop over msg in consumer
, you are scheduling the tasks to be complete eventually. If any of these tasks take a long time it could block the main loop (which is also running the async for msg in consumer
line). Instead; any of these tasks that could take a long time should return a future of some type quickly so you can simply access the result once it's ready.
Upvotes: 1