kolosy
kolosy

Reputation: 3099

asynchronous process cleanup in elixir

I'm building an Elixir app that consumes messages from RabbitMQ. Using the amqp library, I've got a gen_server set up that subscribes to Rabbit, and handles incoming messages. What I want, is for each incoming message to be processed asynchronously. That's easy - I can grab the message, stick it in a Task and fire it off. What's making it a bit more complicated is that I also want to ack or nack the message after the task that's processing it finishes. To do that, I need to supervise the task (either through an actual Task.Supervisor or just Process.monitor), and then wait for an EXIT signal.

Here's where things get a bit confusing for me. I can't seem to find a way to asynchronously wait for a process exit. I either have to use Task.yield to catch the exit signal, or run a receive message loop (which is blocking). The way out seems to be to run an intermediate task/supervisor to wrap the cleanup logic, but it seems ugly.

So, as I understand it, I need:

:my_app -> parent Task.Supervisor -> cleanup task -> some form of supervision -> work Task

Is that right? If not, what is?

Upvotes: 2

Views: 483

Answers (3)

Aleksei Matiushkin
Aleksei Matiushkin

Reputation: 121000

It’s not advised to recommend an external packages on SO; I wanted to post it as a comment, but posting as an answer for the sake of formatting.

There is a package to enhance Task.Supervisor functionality, called Tarearbol. It allows to delegate the task outcome’s handling to 3-rd party, specifying the callbacks and the default behavior:

Tarearbol.ensure fn ->
  result = PROCESS
  {:ok, result}
end, accept_not_ok: false,
     attempts: 3,
     on_success: fn -> ACK end,
     on_fail: fn -> NACK end

It will handle crashes internally, retry attempts times and call the callback then.

Upvotes: 0

Aleksei Matiushkin
Aleksei Matiushkin

Reputation: 121000

If you don’t wont to mess up with handling exit info from the tasks, the easiest approach would be to provide your own helper to wrap the task to execute it and then ack. It might look like:

in your rabbit GenServer

def ack_me(channel, tag), do: Basic.ack(channel, tag)

wrapper

defmodule TaskAckHelper do
  async_ack((() -> any), {binary(), bynary()}) :: Task.t
  def async_ack(fun, {channel, tag}) do
    Task.async(fn ->
      fun.()
      Rabbit.ack_me(channel, tag)
    end)  
  end
  async_ack({atom(), atom(), [any()]}, {binary(), bynary()}) :: Task.t
  def async_ack({mod, fun, args} {channel, tag}) do
    Task.async(fn ->
      apply(mod, fun, args)
      Rabbit.ack_me(channel, tag)
    end)  
  end
end

Now in consume you might simply:

TaskAckHelper.async_ack(fn -> IO.puts(:ok) end, {channel, tag})

This example is a bit contrived, in real life you would probably want to implement and spawn a Task.Supervisor to orchestrate tasks created from TaskAckHelper, but it would work as expected even without it (though it’s advised to have a supervisor to trap exits.)

Upvotes: 0

Mike Buhot
Mike Buhot

Reputation: 4885

Assuming your RabbitMQ consumer is a GenServer, you can start tasks with async_nolink and handle their completion or failure in handle_info

From the docs

Compatibility with OTP behaviours

If you create a task using async_nolink inside an OTP behaviour like GenServer, you should match on the message coming from the task inside your GenServer.handle_info/2 callback.

The reply sent by the task will be in the format {ref, result}, where ref is the monitor reference held by the task struct and result is the return value of the task function.

Keep in mind that, regardless of how the task created with async_nolink terminates, the caller’s process will always receive a :DOWN message with the same ref value that is held by the task struct. If the task terminates normally, the reason in the :DOWN message will be :normal.

Upvotes: 4

Related Questions