Bercovici Adrian
Bercovici Adrian

Reputation: 9360

Understanding why process stops receiving messages

I have a situation where i have 3 processes:

The server sends a request to monitor and the monitor first checks if worker is busy.If busy, monitor enqueues the message (if capacity is not reached) or else forwards it to worker. When the worker finishes processing it notifies both the client , and also the monitor

My problem is that my worker process stops responding after processing the first message

-module(mq).
-compile(export_all).

-record(monstate,{
    queue,
    qc,
    wpid,
    free=true,
    wref,
    init=false,
    frun=false
}).
-record(sstate,{
    init=false,
    mpid=null,
    mref=null
}).

-define(QUEUE_SIZE,5).
-define(PROC_SLEEP,2000).


createProcess({M,F,A})->
    Pid=spawn(M,F,[A]),
    Ref=erlang:monitor(process,Pid),
    {Pid,Ref}.

start()->
    spawn(?MODULE,server,[#sstate{init=false}]).

server(State=#sstate{init=I})when I=:=false ->
    {MPid,MRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
    server(State#sstate{init=true,mpid=MPid,mref=MRef});

server(State=#sstate{mpid=MPid,mref=MRef})->
    receive
           {From,state}->From ! State,
                            server(State);
           {From,Message}-> MPid ! {request,{From,Message}},
                            server(State);
                
            {'DOWN',MRef,process,MPid,_}-> {NewMPid,NewMRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
                                            server(State#sstate{mpid=NewMPid,mref=NewMRef});
            _ ->exit(invalid_message)
                                    
    end.
  

tryEnqueue(Message,MState=#monstate{queue=Q,qc=C}) when C<?QUEUE_SIZE->
    NewQueue=queue:in(Message,Q),
    {queued,MState#monstate{qc=C+1,queue=NewQueue}};
tryEnqueue(_,MState)->{queue_full,MState}.

monitor(MState=#monstate{wpid=_,wref=_,init=I}) when I=:= false ->
    {WorkerPid,WorkerRef}=createProcess({?MODULE,worker,self()}),
    monitor(MState#monstate{wpid=WorkerPid,wref=WorkerRef,init=true,qc=0,queue=queue:new(),frun=true});

monitor(MState=#monstate{wpid=W,free=F,wref=Ref,queue=Q,qc=C,frun=R})->
    receive
        
        {request,{From ,Message}} ->  
                                       {Result,NewState}=tryEnqueue({From,Message},MState),
                                        case Result of 
                                            queue_full -> From ! {queue_full,Message};
                                            _ -> ok
                                        end,
                                        case R of
                                            true -> self() ! {worker,{finished,R}},
                                                    monitor(NewState#monstate{frun=false});
                                            false -> monitor(NewState#monstate{frun=false})
                                        end;
                                       

        {worker,{finished,_}}-> case queue:out(Q) of
                                    {{_,Element},Rest} -> W ! Element,
                                                    monitor(MState#monstate{free=false,queue=Rest,qc=C-1});
                                    {empty,Rest} -> monitor(MState#monstate{free=true,queue=Rest})
                                end;

        {'DOWN',Ref,process,_,_}->
             {NewWorkerPid,NewWorkerRef}=createProcess({?MODULE,worker,self()}),
             monitor(MState#monstate{wpid=NewWorkerPid,wref=NewWorkerRef,free=true});

        _->exit(invalid_message)

    end.

worker(MPid)->
    receive 
        {From,MSG} ->
            timer:sleep(?PROC_SLEEP),
            From ! {processed,MSG},
            MPid ! {worker,{finished,MSG}},
            worker(MPid);
        _ ->exit(bad_msg)
    end.

Usage

2> A=mq:start().
<0.83.0>
3> A ! {self(),aa}.
{<0.76.0>,aa}
4> flush().
Shell got {processed,aa}
ok
5> A ! {self(),aa}.
{<0.76.0>,aa}
6> flush().
ok

I have added a tracer to see what is happening:

10> dbg:tracer().
{ok,<0.96.0>}
11> dbg:p(new,[sos,m]).
{ok,[{matched,nonode@nohost,0}]}

First run:

14> A ! {self(),aa}.
(<0.100.0>) << {<0.76.0>,aa}     // message received my server 
(<0.100.0>) <0.101.0> ! {request,{<0.76.0>,aa}}   //message forwarded by server to monitor
{<0.76.0>,aa}
(<0.101.0>) << {request,{<0.76.0>,aa}}         
15> (<0.101.0>) <0.101.0> ! {worker,{finished,true}} //monitor starting the cycle
15> (<0.101.0>) << {worker,{finished,true}}  
15> (<0.101.0>) <0.102.0> ! {<0.76.0>,aa}  // monitor sending message to worker
15> (<0.102.0>) << {<0.76.0>,aa}
15> (<0.105.0>) <0.62.0> ! {io_request,<0.105.0>,
                           #Ref<0.3226054513.2760638467.167990>,
                           {get_until,unicode,
                               ["15",62,32],
                               erl_scan,tokens,
                               [1,[text]]}}
15> (<0.102.0>) << timeout                      //worker getting timeout ??
15> (<0.102.0>) <0.76.0> ! {processed,aa}    //worker sends to self() thje message
15> (<0.102.0>) <0.101.0> ! {worker,{finished,aa}}   //worker notifies monitor to update state
15> (<0.101.0>) << {worker,{finished,aa}}

Second run:

15> A ! {self(),aa}.
(<0.100.0>) << {<0.76.0>,aa}
(<0.100.0>) <0.101.0> ! {request,{<0.76.0>,aa}}   //monitor receiveing message
{<0.76.0>,aa}
(<0.101.0>) << {request,{<0.76.0>,aa}}
16> (<0.106.0>) <0.62.0> ! {io_request,<0.106.0>,
                           #Ref<0.3226054513.2760638467.168007>,
                           {get_until,unicode,
                               ["16",62,32],
                               erl_scan,tokens,
                               [1,[text]]}}

As you can see from my trace , in the first call i do not understand what happens.Does my worker get timeouted and if so why ?

P.S The frun variable is used as a flag that is true only at the first monitor iteration so that when the first item arrives the process will call itself to process it (send it to the worker) since the worker is free of duty. After the first run the monitor will dequeue items from the queue whenever the worker signals he is free.

Update

So after the helpful comments i have changed my logic a bit in the monitor so that the worker gets a message on the first run , or , after he is done and notifies the monitor , there are still items in the queue of the monitor. I still can't make it work.Where is the deadlock ?

monitor(MState=#monstate{wpid=W,free=F,wref=Ref,queue=Q,qc=C,frun=FirstRun})->
        receive
            {request,{From ,Message}} -> case FirstRun of
                                            true ->  W ! {From,Message},
                                                     monitor(MState#monstate{frun=false,free=false});                                                     
                                            false -> 
                                                     St=case tryEnqueue({From,Message},MState) of 
                                                           {queue_full,S} -> From ! {queue_full,Message},
                                                                             S;
                                                           {queued,S} -> S
                                                        end,
                                                     monitor(St)
                                             end;
                                                                        
            {worker,{finished,_}}-> case queue:out(Q) of
                                        {{_,Element},Rest} -> W ! Element,
                                                        monitor(MState#monstate{free=false,queue=Rest,qc=C-1});
                                        {empty,Rest} -> monitor(MState#monstate{free=true,queue=Rest})
                                    end;

        end.

Upvotes: 0

Views: 74

Answers (2)

The monitor behavior need to depend on frun. It just needs to depend on whether worker is free. I have updated monitor function to reflect this in the following code.

-module(mq).
-compile(export_all).

-record(monstate,{
    queue,
    qc,
    wpid,
    free=true,
    wref,
    init=false
}).
-record(sstate,{
    init=false,
    mpid=null,
    mref=null
}).

-define(QUEUE_SIZE,5).
-define(PROC_SLEEP,2000).


createProcess({M,F,A})->
    Pid=spawn(M,F,[A]),
    Ref=erlang:monitor(process,Pid),
    {Pid,Ref}.

start()->
    spawn(?MODULE,server,[#sstate{init=false}]).

server(State=#sstate{init=I})when I=:=false ->
    {MPid,MRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
    server(State#sstate{init=true,mpid=MPid,mref=MRef});

server(State=#sstate{mpid=MPid,mref=MRef})->
    receive
           {From,state}->From ! State,
                            server(State);
           {From,Message}-> MPid ! {request,{From,Message}},
                            server(State);

            {'DOWN',MRef,process,MPid,_}-> {NewMPid,NewMRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
                                            server(State#sstate{mpid=NewMPid,mref=NewMRef});
            _ ->exit(invalid_message)

    end.


tryEnqueue(Message,MState=#monstate{queue=Q,qc=C}) when C<?QUEUE_SIZE->
    NewQueue=queue:in(Message,Q),
    {queued,MState#monstate{qc=C+1,queue=NewQueue}};
tryEnqueue(_,MState)->{queue_full,MState}.

monitor(MState=#monstate{wpid=_,wref=_,init=I}) when I=:= false ->
    {WorkerPid,WorkerRef}=createProcess({?MODULE,worker,self()}),
    monitor(MState#monstate{wpid=WorkerPid,wref=WorkerRef,init=true,qc=0,queue=queue:new()});

monitor(MState=#monstate{wpid=W,free=F,wref=Ref,queue=Q,qc=C})->
  receive
    {request,{From ,Message}} ->
      %% check whether worker is free or not
      case F of
        true ->
          W ! {From,Message},
          monitor(MState#monstate{free=false});

        false ->
          St=case tryEnqueue({From,Message},MState) of
               {queue_full,S} ->
                 From ! {queue_full,Message},
                 S;
               {queued,S} -> S
             end,
          monitor(St)
      end;

    {worker,{finished,_}} ->
      case queue:out(Q) of
        {{_,Element},Rest} ->
          W ! Element,
          monitor(MState#monstate{free=false,queue=Rest,qc=C-1});

        {empty,Rest} ->
          monitor(MState#monstate{free=true,queue=Rest})
      end;

    {'DOWN',Ref,process,_,_} ->
      {NewWorkerPid,NewWorkerRef}=createProcess({?MODULE,worker,self()}),
      monitor(MState#monstate{wpid=NewWorkerPid,wref=NewWorkerRef,free=true});

    _->exit(invalid_message)

  end.

worker(MPid)->
  receive
    {From,MSG} ->
      timer:sleep(?PROC_SLEEP),
      From ! {processed,MSG},
      MPid ! {worker,{finished,MSG}},
      worker(MPid);
    _ ->exit(bad_msg)
  end.

Usage

Eshell V10.5  (abort with ^G)
1> c(mq).
mq.erl:2: Warning: export_all flag enabled - all functions will be exported
{ok,mq}
2> A=mq:start().
<0.92.0>
3> A ! {self(),aa}.
{<0.85.0>,aa}
4> flush().
Shell got {processed,aa}
ok
5> A ! {self(),aa}.
{<0.85.0>,aa}
6> flush().
Shell got {processed,aa}
ok
7> A ! {self(), aa}, A ! {self(), bb}.
{<0.85.0>,bb}
8> flush().                           
Shell got {processed,aa}
Shell got {processed,bb}
ok
9> 

Upvotes: 1

Jos&#233; M
Jos&#233; M

Reputation: 3509

In your code, it seems that frun is always false after the first run:

            case R of
                true -> self() ! {worker,{finished,R}},
                        monitor(NewState#monstate{frun=false});
                false -> monitor(NewState#monstate{frun=false})
            end;

Once it reaches false, no {worker, {finished, R}} message will be delivered and so no element will be extracted from the queue.

Update: Deadlock sequence:

  1. Monitor receives first job and forwards it to the worker
  2. Monitor's frun is false now
  3. Worker performs the job
  4. Worker notifies monitor that the job has finished
  5. Since monitor's queue is empty, nothing happens.
  6. Monitor receives second job, since frun is false, the job is not forwarded to the worker

Upvotes: 1

Related Questions