Justin
Justin

Reputation: 4853

Buffer size in Erlang / Golang port example

I have a crude Erlang-to-Golang port example, passing data from Erlang to Golang and echoing the response.

Problem is the amount of data I can transfer seems to be limited to 2^8 bytes (see below). I thought the problem was probably on the Golang side (not creating a big enough buffer) but replacing bufio.NewReader with bufio.NewReaderSize didn't work. So am now thinking the problem is maybe on the Erlang side.

What do I need to do to increase the buffer size / be able to echo a message larger than 2^8 bytes ?

TIA

justin@justin-ThinkPad-X240:~/work/erlang_golang_port$ erl -pa ebin
Erlang/OTP 17 [erts-6.4.1] [source] [64-bit] [smp:4:4] [async-threads:10] [kernel-poll:false]

Eshell V6.4.1  (abort with ^G)
1> port:start("./echo").
<0.35.0>
2> port:ping(65000).
65000
3> port:ping(66000).
** exception error: bad argument
     in function  port:call_port/1 (port.erl, line 20)
4> port:start("./echo").
<0.40.0>
5> port:ping(66000).    
65536

Go

package main

import (
    "bufio"
    "os"
)

const Delimiter = '\n'

func main() {
    // reader := bufio:NewReader(os.Stdin)
    reader := bufio.NewReaderSize(os.Stdin, 1677216) // 2**24;
    bytes, _ := reader.ReadBytes(Delimiter)
    os.Stdout.Write(bytes[:len(bytes)-1])
}

Erlang

-module(port).

-export([start/1, stop/0, init/1]).

-export([ping/1]).

-define(DELIMITER, [10]).

start(ExtPrg) ->
    spawn(?MODULE, init, [ExtPrg]).

stop() ->
    myname ! stop.

ping(N) ->
    Msg=[round(65+26*random:uniform()) || _ <- lists:seq(1, N)],
    call_port(Msg).

call_port(Msg) ->
    myname ! {call, self(), Msg},
    receive
    {myname, Result} ->
        length(Result)
    end.

init(ExtPrg) ->
    register(myname, self()),
    process_flag(trap_exit, true),
    Port = open_port({spawn, ExtPrg}, []),
    loop(Port).

loop(Port) ->
    receive
    {call, Caller, Msg} ->
        Port ! {self(), {command, Msg++?DELIMITER}},
        receive
        {Port, {data, Data}} ->
            Caller ! {myname, Data}
        end,
        loop(Port);
    stop ->
        Port ! {self(), close},
        receive
        {Port, closed} ->
            exit(normal)
        end;
    {'EXIT', Port, _Reason} ->
        exit(port_terminated)
    end.

Upvotes: 4

Views: 1113

Answers (3)

Anton Prokofiev
Anton Prokofiev

Reputation: 957

I have been struggling with the similar problem. Here the complete code of pipe module.

It allows sent text data to port and read all replies.

-module(apr_pipe).

-export([open_pipe/2,send/2,close/1]).

-export([loop/1,status/1,init/1]).

-include_lib("kernel/include/logger.hrl").

-define(MAX_LINE_LEN,4096).

open_pipe(Path,Cmd) ->
   State = #{path => Path, cmd => Cmd},
    Pid = spawn(?MODULE,init,[State]),
    Pid.

init(State) ->
    #{path := Path,cmd := Cmd} = State,
    FullFn = filename:join(Path,Cmd),
    Settings = [{line,?MAX_LINE_LEN},use_stdio,stderr_to_stdout,hide,binary,exit_status],
    Port = erlang:open_port({spawn_executable,FullFn},Settings),
    State2 = State#{port => Port, data => #{}},
    loop(State2).


send(Pid,Data)  -> Pid!{self(),send,Data}.
close(Pid)      -> Pid!{self(),send,close}.
status(Pid)     -> Pid!{self(),status}.

get_eol() -> <<"\n">>.

loop(State) ->
    receive
        {_Pid,send,close} -> 
                    ?LOG(notice,"got cmd: Close",[]),
                    Port = maps:get(port,State),
                    port_close(Port),
                    exit(normal);
        {Pid,send,Data} ->
                    ?LOG(notice,"Send Data ...",[]),
                    Port = maps:get(port,State),
                    port_command(Port,Data),
                    port_command(Port,get_eol()),
                    State2 = State#{status => data_sent, client => Pid},
                    loop(State2);
         {Pid,status} -> 
                    Port = maps:get(port,State),
                    ?LOG(notice,"Status: Port: ~p State: ~p",[Port,State]),
                    Pid!{status,Port,State},
                    loop(State);
        % port messages.
        {Port, {data,{noeol,Data}}} ->
                ?LOG(notice,"Port: ~p Data: ~p",[Port,Data]),
                CurData = maps:get(cur_data,State,[]),
                State2 = State#{cur_data => [Data | CurData]},
                loop(State2);

        {Port, {data, {eol,Data}}} ->
                ?LOG(notice,"Port: ~p Data: ~p",[Port,Data]),
                CurData = [Data | maps:get(cur_data,State,[])],
                CurData2 = lists:reverse(CurData),
                Reply    = list_to_binary(CurData2),
                Client = maps:get(client,State,undefined),
                State2 = State#{cur_data => [], client => undefined},
                case Client of
                    undefined -> ?LOG(error,"can not sent reply. Client: ~p Reply: ~p", [Client,Reply]),
                                 loop(State2);
                    _ -> Client!{reply,Reply},
                         loop(State2)
                 end;
        {_Port, closed} ->
                ?LOG(warning, "Port: ~p closed",[]),
                exit(normal);
        {'EXIT',  Port, Reason} ->
                 ?LOG(notice,"Port: ~p exit. Reason: ~p",[Port,Reason]),
                 exit(Reason);
        _Other -> ?LOG(error,"unexpected message: ~p",[_Other]),
                  exit({error,{unexpected_message,_Other}})
    end.

Upvotes: 1

Adam Lindberg
Adam Lindberg

Reputation: 16587

If you use start_link instead, you'll see that the port crashes after the first command:

1> port:start('go run port.go').
<0.118.0>
2> port:ping(65000).
65000
** exception error: port_terminated

If you change the Go code to run in a loop, this crash can be avoided:

func main() {
    for {
        // reader := bufio:NewReader(os.Stdin)
        reader := bufio.NewReaderSize(os.Stdin, 1677216) // 2**24;
        bytes, _ := reader.ReadBytes(Delimiter)
        os.Stdout.Write(bytes[:len(bytes)-1])
    }
}

Now we can see another interesting result:

33> c(port).
{ok,port}
40> port:ping(66000).
65536
41> port:ping(66000).
464
42> port:ping(66000).
65536
43> port:ping(66000).
464

Now we can see that no data is actually lost, it's just buffered in the port. Since you have not specified a framing protocol (using {packet, N} or {line, N} you are responsible yourself for collecting the data. It also seems that the internal buffer size of an Erlang port is 64K (although I found no documentation of this and no way to change it).

If you change your receive to get all data before returning, you'll every byte each time:

loop(Port) ->
    receive
    {call, Caller, Msg} ->
        Port ! {self(), {command, Msg++?DELIMITER}},
        Caller ! {myname, receive_all(Port, 10)},
        loop(Port);
    stop ->
        Port ! {self(), close},
        receive
        {Port, closed} ->
            exit(normal)
        end;
    {'EXIT', Port, _Reason} ->
        exit(port_terminated)
    end.

receive_all(Port, Timeout) -> receive_all(Port, Timeout, []).

receive_all(Port, Timeout, Data) ->
    receive
    {Port, {data, New}} ->
        receive_all(Port, Timeout, [New|Data])
    after Timeout ->
        lists:flatten(lists:reverse(Data))
    end.

Running this, we get:

1> c(port).
{ok,port}
2>
3> port:start('go run port.go').
<0.311.0>
4> port:ping(66000).
66000
5> port:ping(66000).
66000
6> port:ping(66000).
66000

Upvotes: 2

Lol4t0
Lol4t0

Reputation: 12547

  1. 2^8 is 256, not 65536 which is 2^16 (or 2 bytes).
  2. For excluding golang program you can simply replace your echo with GNU cat
  3. Default message max size for port communication is 64k, so when your port receives messages, the first one is leading 64k of the string. You can read port again to gain remaining data but you just drop them in your code.
  4. If you really want to communicate on line-based protocol you should configure your port accordingly:

{line, L}

Messages are delivered on a per line basis. Each line (delimited by the OS-dependent newline sequence) is delivered in one single message. The message data format is {Flag, Line}, where Flag is either eol or noeol and Line is the actual data delivered (without the newline sequence).

L specifies the maximum line length in bytes. Lines longer than this will be delivered in more than one message, with the Flag set to noeol for all but the last message. If end of file is encountered anywhere else than immediately following a newline sequence, the last line will also be delivered with the Flag set to noeol. In all other cases, lines are delivered with Flag set to eol.

The {packet, N} and {line, L} settings are mutually exclusive.

So your code would be

Port = open_port({spawn, ExtPrg}, [{line, ?PACKET_SIZE]),
%%...
{call, Caller, Msg} ->
    Port ! {self(), {command, Msg++?DELIMITER}},
    D = read_data(Port, []),
    Caller ! {myname, D},
    loop(Port);
%%...
read_data(Port, Prefix) ->
receive
    {Port, {data, {noeol, Data}}} ->
        read_data(Port, Prefix ++ Data);
    {Port, {data, {eol, Data}}} ->
        Prefix ++ Data
end.

Upvotes: 1

Related Questions