Reputation: 3099
I'm building an Elixir app that consumes messages from RabbitMQ. Using the amqp
library, I've got a gen_server
set up that subscribes to Rabbit, and handles incoming messages. What I want, is for each incoming message to be processed asynchronously. That's easy - I can grab the message, stick it in a Task
and fire it off. What's making it a bit more complicated is that I also want to ack
or nack
the message after the task that's processing it finishes. To do that, I need to supervise the task (either through an actual Task.Supervisor
or just Process.monitor
), and then wait for an EXIT
signal.
Here's where things get a bit confusing for me. I can't seem to find a way to asynchronously wait for a process exit. I either have to use Task.yield
to catch the exit signal, or run a receive
message loop (which is blocking). The way out seems to be to run an intermediate task/supervisor to wrap the cleanup logic, but it seems ugly.
So, as I understand it, I need:
:my_app -> parent Task.Supervisor -> cleanup task -> some form of supervision -> work Task
Is that right? If not, what is?
Upvotes: 2
Views: 483
Reputation: 121000
It’s not advised to recommend an external packages on SO; I wanted to post it as a comment, but posting as an answer for the sake of formatting.
There is a package to enhance Task.Supervisor
functionality, called Tarearbol
. It allows to delegate the task outcome’s handling to 3-rd party, specifying the callbacks and the default behavior:
Tarearbol.ensure fn ->
result = PROCESS
{:ok, result}
end, accept_not_ok: false,
attempts: 3,
on_success: fn -> ACK end,
on_fail: fn -> NACK end
It will handle crashes internally, retry attempts
times and call the callback then.
Upvotes: 0
Reputation: 121000
If you don’t wont to mess up with handling exit info from the tasks, the easiest approach would be to provide your own helper to wrap the task to execute it and then ack. It might look like:
in your rabbit GenServer
def ack_me(channel, tag), do: Basic.ack(channel, tag)
wrapper
defmodule TaskAckHelper do
async_ack((() -> any), {binary(), bynary()}) :: Task.t
def async_ack(fun, {channel, tag}) do
Task.async(fn ->
fun.()
Rabbit.ack_me(channel, tag)
end)
end
async_ack({atom(), atom(), [any()]}, {binary(), bynary()}) :: Task.t
def async_ack({mod, fun, args} {channel, tag}) do
Task.async(fn ->
apply(mod, fun, args)
Rabbit.ack_me(channel, tag)
end)
end
end
Now in consume
you might simply:
TaskAckHelper.async_ack(fn -> IO.puts(:ok) end, {channel, tag})
This example is a bit contrived, in real life you would probably want to implement and spawn a Task.Supervisor
to orchestrate tasks created from TaskAckHelper
, but it would work as expected even without it (though it’s advised to have a supervisor to trap exits.)
Upvotes: 0
Reputation: 4885
Assuming your RabbitMQ consumer is a GenServer, you can start tasks with async_nolink
and handle their completion or failure in handle_info
From the docs
Compatibility with OTP behaviours
If you create a task using async_nolink inside an OTP behaviour like GenServer, you should match on the message coming from the task inside your GenServer.handle_info/2 callback.
The reply sent by the task will be in the format {ref, result}, where ref is the monitor reference held by the task struct and result is the return value of the task function.
Keep in mind that, regardless of how the task created with async_nolink terminates, the caller’s process will always receive a :DOWN message with the same ref value that is held by the task struct. If the task terminates normally, the reason in the :DOWN message will be :normal.
Upvotes: 4