lapinkoira
lapinkoira

Reputation: 8988

GenStage.from_enumerable hangs with an intermitent stream

I have a stream which doesn't produce data as fast as its consumed.

So I have a producer defined like this:

def start_link() do
  create_stream
  |> GenStage.from_enumerable(name: Producer)
end

Then my producer-consumer subscribes to it

  def init(:ok) do
    {:producer_consumer, :the_state_does_not_matter, subscribe_to: [Producer]}
  end

And my consumer subscribes to mu producer-consumer

  def init(:ok) do
    {:consumer, :the_state_does_not_matter, subscribe_to: [ProducerConsumer]}
  end

The issue I am having is the consumer hangs, I think because at some point the producer didn't manage to get new data and as stated in the docs:

When the enumerable finishes or halts, the stage will exit with :normal reason. This means that, if a consumer subscribes to the enumerable stage and the: cancel option is set to: permanent, which is the default, the consumer will also exit with: the normal reason

So I read more and it suggests to add the option cancel:: transient to don't finish the stage. I added it like this but it's not working, am I missing something?

|> GenStage.from_enumerable(name: Producer, cancel: :transient)

Originally I was using a Flow.into_stages(flow, [ProducerConsumer]) but I cant do that because I cant reference (or I don't know how) the ProducerConsumer from my supervisor tree

children = [
  {Producer, []},
  {ProducerConsumer, []},
  {Consumer, []}
]

Update

Updating passing reference to Flow.into_stages from the child definition

children = [
  {Producer, [name: ProducerConsumer]},
  {ProducerConsumer, []},
  {Consumer, []}
]

def start_link(producer_consumer) do
  create_stream
  |> Flow.into_stages(producer_consumer)
end

** (Mix) Could not start application test: Application.start(:normal, []) returned an error: shutdown: failed to start child: Producer ** (EXIT) exited in: GenServer.call({:name, ProducerConsumer}, {:"$subscribe", nil, #PID<0.2031.0>, [cancel: :transient]}, 5000) ** (EXIT) no connection to Elixir.ProducerConsumer

Upvotes: 0

Views: 150

Answers (1)

lapinkoira
lapinkoira

Reputation: 8988

The error:

** (Mix) Could not start application test: Application.start(:normal, []) returned an error: shutdown: failed to start child: Producer ** (EXIT) exited in: GenServer.call({:name, ProducerConsumer}, {:"$subscribe", nil, #PID<0.2031.0>, [cancel: :transient]}, 5000) ** (EXIT) no connection to Elixir.ProducerConsumer

Just means when the Flow.into_stages trying to sync to the provided consumer that consumer must already be running.

So, the order is important when supervising, something like this:

children = [
  Consumer,
  FlowProducerWorker # worker which implements Flow.into_stages(flow, [Consumer])
]

Upvotes: 0

Related Questions