Reputation: 21956
There are many discussions on SO about database abstraction. I'm a bit surprised there aren't similar discussions about message queue abstraction. Rather than designing to a specific MQ implementation (RabbitMQ, IBM MQ Series, IronMQ etc.), we would prefer to use a neutral / abstracting layer, so the underlying implementation could be changed without major surgery later.
Can you recommend a solution?
Upvotes: 3
Views: 1394
Reputation: 101
The most neutral way to handle a message queue is to just use a database to store your messages. If your app get's more complex you can switch to something more sophisticated.
Right now I'm using PeeWee ORM to handle the messages. During testing the database is set to use an SQLite instance. In production the postgres server is set. You could just use the sqlite if the app doesn't use a lot of write concurrency.
from enum import Enum
from peewee import ( # type: ignore
Model,
CharField,
DateTimeField,
ForeignKeyField,
Proxy,
UUIDField,
ModelSelect,
)
from peewee_extra_fields import EnumField # type: ignore
PRODUCTION_DB_URL = "postgres://test_db_z9du_user:b5TWChOlCpKQRHcAg8u64kUpVIjNba5O@dpg-cc8ltahgp3jhesr2u9u0-a.oregon-postgres.render.com/test_db_z9du" # pylint: disable=line-too-long
TEST_DB_PATH = str(HERE.parent / "data/test.db")
SQLITE_URL = f"sqlite:////{TEST_DB_PATH}".replace("\\", "/")
DB_URL = SQLITE_URL if IS_TESTING else PRODUCTION_DB_URL
if IS_TESTING:
db_dir = os.path.dirname(TEST_DB_PATH)
if not os.path.exists(db_dir):
print(f"Creating test db directory {db_dir}")
os.makedirs(db_dir, exist_ok=True)
os.makedirs(db_dir, exist_ok=True)
database_proxy = Proxy()
database_proxy.initialize(connect(DB_URL))
class BaseModel(Model):
"""A base model that will use our Sqlite database."""
class Meta: # pylint: disable=too-few-public-methods
"""Meta class."""
database = database_proxy
database_proxy = Proxy()
database_proxy.initialize(connect(DB_URL))
class MessageStatusEnum(Enum):
"""Enum class for message status"""
QUEUED = 1
PROCESSING = 2
ERROR = 3
class Message(BaseModel):
"""Message model."""
user = ForeignKeyField(User, backref="messages", primary_key=True)
type = CharField(index=True)
message = CharField(null=True)
last_error_msg = CharField(null=True)
created = DateTimeField(index=True, default=datetime.now)
status = EnumField(
index=True, enum=MessageStatusEnum, default=MessageStatusEnum.QUEUED
) # MessageStatusEnum.QUEUED or MessageStatusEnum.PROCESSING
Example of how to use message queue:
def pop_next_email_drm_task_for_processing(self) -> Message | None:
"""Pop the next email drm task."""
try:
with database_proxy.atomic():
# get the first one
cursor = (
Message.select()
.where(
Message.status == MessageStatusEnum.QUEUED and Message.type == "email_drm"
)
.order_by(Message.created)
.limit(1)
)
message = cursor.first()
if message is None:
return None
message.status = MessageStatusEnum.PROCESSING
message.save()
return message
Upvotes: 0
Reputation: 21956
The most promising option so far appears to be Kombu which is described as a high-level abstraction of the AMQ protocol (e.g. RabbitMQ) but supports several messaging back-ends that don't require AMQP, including REDIS, Beanstalk, Amazon SQS, CouchDB, MongoDB and Zookeeper -- in varying degrees.
I will give this a try and report back. Kind of don't like answering and accepting my own question :) -- will look at other answers to and change the accepted one if yours is better.
Upvotes: 3