kolko
kolko

Reputation: 300

Erlang, try to make gen_server: call with many responses

Try to use OTP-style in project and got one OTP-interface question. What solution is more popular/beautiful?

What I have:

  1. web-server with mochiweb
  2. one process, what spawns many (1000-2000) children. Children contain state (netflow-speed). Process proxies messages to children and create new children, if need.

In mochiweb I have one page with speed of all actors, how whey made:

    nf_collector ! {get_abonents_speed, self()},
    receive
        {abonents_speed_count, AbonentsCount} ->
            ok
    end,
%% write http header, chunked
%% and while AbonentsCount != 0,  receive speed and write http

This is not-opt style, how i can understand. Solutions:

  1. In API synchronous function get all requests with speed and return list with all speeds. But I want write it to client at once.
  2. One argument of API-function is callback:

    nf_collector:get_all_speeds(fun (Speed) -> Resp:write_chunk(templater(Speed)) end)
    
  3. Return iterator: One of results of get_all_speeds will be function with receive-block. Every call of it will return {ok, Speed}, at the end it return {end}.

get_all_speeds() ->
    nf_collector ! {get_abonents_speed, self()},
    receive
        {abonents_speed_count, AbonentsCount} ->
            ok
    end,
    {ok, fun() -> 
        create_receive_fun(AbonentsCount)
    end}.

create_receive_fun(0)->
    {end};

create_receive_fun(Count)->
        receive
            {abonent_speed, Speed} ->
                Speed
        end,
        {ok, Speed, create_receive_fun(Count-1)}.

Upvotes: 2

Views: 742

Answers (1)

Michael
Michael

Reputation: 3729

Spawn your 'children' from a supervisor:

-module(ch_sup).
-behaviour(supervisor).
-export([start_link/0, init/1, start_child/1]).
start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) -> {ok, {{simple_one_for_one}, [{ch, {ch, start_link, []}, transient, 1000, worker, [ch]}]}}.
start_child(Data) -> supervisor:start_child(?MODULE, [Data]).

Start them with ch_sup:start_child/1 (Data is whatever).

Implement your children as a gen_server:

-module(ch).
-behaviour(gen_server).
-record(?MODULE, {speed}).

...

get_speed(Pid, Timeout) ->
    try
        gen_server:call(Pid, get, Timeout)
    catch
        exit:{timeout, _} -> timeout;
        exit:{noproc, _} -> died
    end
.

...

handle_call(get, _From, St) -> {reply, {ok, St#?MODULE.speed}, St} end.

You can now use the supervisor to get the list of running children and query them, though you have to accept the possibility of a child dying between getting the list of children and calling them, and obviously a child could for some reason be alive but not respond, or respond with an error, etc.

The get_speed/2 function above returns either {ok, Speed} or died or timeout. It remains for you to filter appropriately according to your applications needs; easy with a list comprehension, here's a few.

Just the speeds:

[Speed || {ok, Speed} <- [ch:get_speed(Pid, 1000) || Pid <-
    [Pid || {undefined, Pid, worker, [ch]} <-
        supervisor:which_children(ch_sup)
        ]
    ]].

Pid and speed tuples:

[{Pid, Speed} || {Pid, {ok, Speed}} <-
    [{Pid, ch:get_speed(Pid, 1000)} || Pid <-
        [Pid || {undefined, Pid, worker, [ch]} <-
                supervisor:which_children(ch_sup)]
        ]
    ].

All results, including timeouts and 'died' results for children that died before you got to them:

[{Pid, Any} || {Pid, Any} <-
    [{Pid, ch:get_speed(Pid, 1000)} || Pid <-
        [Pid || {undefined, Pid, worker, [ch]} <-
                supervisor:which_children(ch_sup)]
        ]
    ].

In most situations you almost certainly don't want anything other than the speeds, because what are you going to do about deaths and timeouts? You want those that die to be respawned by the supervisor, so the problem is more or less fixed by the time you know about it, and timeouts, as with any fault, are a separate problem, to be dealt with in whatever way you see fit... There's no need to mix the fault fixing logic with the data retrieval logic though.

Now, the problem with all these, which I think you were getting at in your post, but I'm not quite sure, is that the timeout of 1000 is for each call, and each call is synchronous one after the other, so for 1000 children with a 1 second timeout, it could take 1000 seconds to produce no results. Making time timeout 1ms might be the answer, but to do it properly is a bit more complicated:

get_speeds() ->
    ReceiverPid = self(),
    Ref = make_ref(),
    Pids = [Pid || {undefined, Pid, worker, [ch]} <-
            supervisor:which_children(ch_sup)],
    lists:foreach(
        fun(Pid) -> spawn(
            fun() -> ReceiverPid ! {Ref, ch:get_speed(Pid, 1000)} end
            ) end,
        Pids),
    receive_speeds(Ref, length(Pids), os_milliseconds(), 1000)
.

receive_speeds(_Ref, 0, _StartTime, _Timeout) ->
    [];
receive_speeds(Ref, Remaining, StartTime, Timeout) ->
    Time = os_milliseconds(),
    TimeLeft = Timeout - Time + StartTime,
    receive
        {Ref, acc_timeout} ->
            [];
        {Ref, {ok, Speed}} ->
            [Speed | receive_speeds(Ref, Remaining-1, StartTime, Timeout)];
        {Ref, _} ->
            receive_speeds(Ref, Remaining-1, StartTime, Timeout)
    after TimeLeft ->
        []
    end
.

os_milliseconds() ->
    {OsMegSecs, OsSecs, OsMilSecs} = os:timestamp(),
    round(OsMegSecs*1000000 + OsSecs + OsMilSecs/1000)
.

Here each call is spawned in a different process and the replies collected, until the 'master timeout' or they have all been received.

Code has largely been cut-n-pasted from various works I have lying round, and edited manually and by search replace, to anonymise it and remove surplus, so it's probably mostly compilable quality, but I don't promise I didn't break anything.

Upvotes: 1

Related Questions