Reputation: 9360
I have a situation where i have 3 processes:
server
monitor
worker
The server
sends a request to monitor and the monitor
first checks if worker
is busy.If busy, monitor
enqueues the message (if capacity is not reached) or else forwards it to worker
.
When the worker finishes processing it notifies both the client , and also the monitor
My problem is that my worker process stops responding after processing the first message
-module(mq).
-compile(export_all).
-record(monstate,{
queue,
qc,
wpid,
free=true,
wref,
init=false,
frun=false
}).
-record(sstate,{
init=false,
mpid=null,
mref=null
}).
-define(QUEUE_SIZE,5).
-define(PROC_SLEEP,2000).
createProcess({M,F,A})->
Pid=spawn(M,F,[A]),
Ref=erlang:monitor(process,Pid),
{Pid,Ref}.
start()->
spawn(?MODULE,server,[#sstate{init=false}]).
server(State=#sstate{init=I})when I=:=false ->
{MPid,MRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
server(State#sstate{init=true,mpid=MPid,mref=MRef});
server(State=#sstate{mpid=MPid,mref=MRef})->
receive
{From,state}->From ! State,
server(State);
{From,Message}-> MPid ! {request,{From,Message}},
server(State);
{'DOWN',MRef,process,MPid,_}-> {NewMPid,NewMRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
server(State#sstate{mpid=NewMPid,mref=NewMRef});
_ ->exit(invalid_message)
end.
tryEnqueue(Message,MState=#monstate{queue=Q,qc=C}) when C<?QUEUE_SIZE->
NewQueue=queue:in(Message,Q),
{queued,MState#monstate{qc=C+1,queue=NewQueue}};
tryEnqueue(_,MState)->{queue_full,MState}.
monitor(MState=#monstate{wpid=_,wref=_,init=I}) when I=:= false ->
{WorkerPid,WorkerRef}=createProcess({?MODULE,worker,self()}),
monitor(MState#monstate{wpid=WorkerPid,wref=WorkerRef,init=true,qc=0,queue=queue:new(),frun=true});
monitor(MState=#monstate{wpid=W,free=F,wref=Ref,queue=Q,qc=C,frun=R})->
receive
{request,{From ,Message}} ->
{Result,NewState}=tryEnqueue({From,Message},MState),
case Result of
queue_full -> From ! {queue_full,Message};
_ -> ok
end,
case R of
true -> self() ! {worker,{finished,R}},
monitor(NewState#monstate{frun=false});
false -> monitor(NewState#monstate{frun=false})
end;
{worker,{finished,_}}-> case queue:out(Q) of
{{_,Element},Rest} -> W ! Element,
monitor(MState#monstate{free=false,queue=Rest,qc=C-1});
{empty,Rest} -> monitor(MState#monstate{free=true,queue=Rest})
end;
{'DOWN',Ref,process,_,_}->
{NewWorkerPid,NewWorkerRef}=createProcess({?MODULE,worker,self()}),
monitor(MState#monstate{wpid=NewWorkerPid,wref=NewWorkerRef,free=true});
_->exit(invalid_message)
end.
worker(MPid)->
receive
{From,MSG} ->
timer:sleep(?PROC_SLEEP),
From ! {processed,MSG},
MPid ! {worker,{finished,MSG}},
worker(MPid);
_ ->exit(bad_msg)
end.
Usage
2> A=mq:start().
<0.83.0>
3> A ! {self(),aa}.
{<0.76.0>,aa}
4> flush().
Shell got {processed,aa}
ok
5> A ! {self(),aa}.
{<0.76.0>,aa}
6> flush().
ok
I have added a tracer to see what is happening:
10> dbg:tracer().
{ok,<0.96.0>}
11> dbg:p(new,[sos,m]).
{ok,[{matched,nonode@nohost,0}]}
First run:
14> A ! {self(),aa}.
(<0.100.0>) << {<0.76.0>,aa} // message received my server
(<0.100.0>) <0.101.0> ! {request,{<0.76.0>,aa}} //message forwarded by server to monitor
{<0.76.0>,aa}
(<0.101.0>) << {request,{<0.76.0>,aa}}
15> (<0.101.0>) <0.101.0> ! {worker,{finished,true}} //monitor starting the cycle
15> (<0.101.0>) << {worker,{finished,true}}
15> (<0.101.0>) <0.102.0> ! {<0.76.0>,aa} // monitor sending message to worker
15> (<0.102.0>) << {<0.76.0>,aa}
15> (<0.105.0>) <0.62.0> ! {io_request,<0.105.0>,
#Ref<0.3226054513.2760638467.167990>,
{get_until,unicode,
["15",62,32],
erl_scan,tokens,
[1,[text]]}}
15> (<0.102.0>) << timeout //worker getting timeout ??
15> (<0.102.0>) <0.76.0> ! {processed,aa} //worker sends to self() thje message
15> (<0.102.0>) <0.101.0> ! {worker,{finished,aa}} //worker notifies monitor to update state
15> (<0.101.0>) << {worker,{finished,aa}}
Second run:
15> A ! {self(),aa}.
(<0.100.0>) << {<0.76.0>,aa}
(<0.100.0>) <0.101.0> ! {request,{<0.76.0>,aa}} //monitor receiveing message
{<0.76.0>,aa}
(<0.101.0>) << {request,{<0.76.0>,aa}}
16> (<0.106.0>) <0.62.0> ! {io_request,<0.106.0>,
#Ref<0.3226054513.2760638467.168007>,
{get_until,unicode,
["16",62,32],
erl_scan,tokens,
[1,[text]]}}
As you can see from my trace , in the first call i do not understand what happens.Does my worker
get timeouted and if so why ?
P.S The frun
variable is used as a flag that is true only at the first monitor
iteration so that when the first item arrives the process will call itself to process it (send it to the worker) since the worker is free of duty.
After the first run the monitor
will dequeue items from the queue whenever the worker
signals he is free.
Update
So after the helpful comments i have changed my logic a bit in the monitor
so that the worker
gets a message on the first run , or , after he is done and notifies the monitor
, there are still items in the queue of the monitor
.
I still can't make it work.Where is the deadlock ?
monitor(MState=#monstate{wpid=W,free=F,wref=Ref,queue=Q,qc=C,frun=FirstRun})->
receive
{request,{From ,Message}} -> case FirstRun of
true -> W ! {From,Message},
monitor(MState#monstate{frun=false,free=false});
false ->
St=case tryEnqueue({From,Message},MState) of
{queue_full,S} -> From ! {queue_full,Message},
S;
{queued,S} -> S
end,
monitor(St)
end;
{worker,{finished,_}}-> case queue:out(Q) of
{{_,Element},Rest} -> W ! Element,
monitor(MState#monstate{free=false,queue=Rest,qc=C-1});
{empty,Rest} -> monitor(MState#monstate{free=true,queue=Rest})
end;
end.
Upvotes: 0
Views: 74
Reputation: 391
The monitor
behavior need to depend on frun
. It just needs to depend on whether worker
is free
. I have updated monitor
function to reflect this in the following code.
-module(mq).
-compile(export_all).
-record(monstate,{
queue,
qc,
wpid,
free=true,
wref,
init=false
}).
-record(sstate,{
init=false,
mpid=null,
mref=null
}).
-define(QUEUE_SIZE,5).
-define(PROC_SLEEP,2000).
createProcess({M,F,A})->
Pid=spawn(M,F,[A]),
Ref=erlang:monitor(process,Pid),
{Pid,Ref}.
start()->
spawn(?MODULE,server,[#sstate{init=false}]).
server(State=#sstate{init=I})when I=:=false ->
{MPid,MRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
server(State#sstate{init=true,mpid=MPid,mref=MRef});
server(State=#sstate{mpid=MPid,mref=MRef})->
receive
{From,state}->From ! State,
server(State);
{From,Message}-> MPid ! {request,{From,Message}},
server(State);
{'DOWN',MRef,process,MPid,_}-> {NewMPid,NewMRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
server(State#sstate{mpid=NewMPid,mref=NewMRef});
_ ->exit(invalid_message)
end.
tryEnqueue(Message,MState=#monstate{queue=Q,qc=C}) when C<?QUEUE_SIZE->
NewQueue=queue:in(Message,Q),
{queued,MState#monstate{qc=C+1,queue=NewQueue}};
tryEnqueue(_,MState)->{queue_full,MState}.
monitor(MState=#monstate{wpid=_,wref=_,init=I}) when I=:= false ->
{WorkerPid,WorkerRef}=createProcess({?MODULE,worker,self()}),
monitor(MState#monstate{wpid=WorkerPid,wref=WorkerRef,init=true,qc=0,queue=queue:new()});
monitor(MState=#monstate{wpid=W,free=F,wref=Ref,queue=Q,qc=C})->
receive
{request,{From ,Message}} ->
%% check whether worker is free or not
case F of
true ->
W ! {From,Message},
monitor(MState#monstate{free=false});
false ->
St=case tryEnqueue({From,Message},MState) of
{queue_full,S} ->
From ! {queue_full,Message},
S;
{queued,S} -> S
end,
monitor(St)
end;
{worker,{finished,_}} ->
case queue:out(Q) of
{{_,Element},Rest} ->
W ! Element,
monitor(MState#monstate{free=false,queue=Rest,qc=C-1});
{empty,Rest} ->
monitor(MState#monstate{free=true,queue=Rest})
end;
{'DOWN',Ref,process,_,_} ->
{NewWorkerPid,NewWorkerRef}=createProcess({?MODULE,worker,self()}),
monitor(MState#monstate{wpid=NewWorkerPid,wref=NewWorkerRef,free=true});
_->exit(invalid_message)
end.
worker(MPid)->
receive
{From,MSG} ->
timer:sleep(?PROC_SLEEP),
From ! {processed,MSG},
MPid ! {worker,{finished,MSG}},
worker(MPid);
_ ->exit(bad_msg)
end.
Usage
Eshell V10.5 (abort with ^G)
1> c(mq).
mq.erl:2: Warning: export_all flag enabled - all functions will be exported
{ok,mq}
2> A=mq:start().
<0.92.0>
3> A ! {self(),aa}.
{<0.85.0>,aa}
4> flush().
Shell got {processed,aa}
ok
5> A ! {self(),aa}.
{<0.85.0>,aa}
6> flush().
Shell got {processed,aa}
ok
7> A ! {self(), aa}, A ! {self(), bb}.
{<0.85.0>,bb}
8> flush().
Shell got {processed,aa}
Shell got {processed,bb}
ok
9>
Upvotes: 1
Reputation: 3509
In your code, it seems that frun
is always false after the first run:
case R of
true -> self() ! {worker,{finished,R}},
monitor(NewState#monstate{frun=false});
false -> monitor(NewState#monstate{frun=false})
end;
Once it reaches false
, no {worker, {finished, R}}
message will be delivered and so no element will be extracted from the queue.
Update: Deadlock sequence:
frun
is false nowfrun
is false, the job is not forwarded to the workerUpvotes: 1