Jake Abramson
Jake Abramson

Reputation: 31

PHP Text Stream/Buffer to share real-time data between scripts

PHP stream_socket_server/client with local file access issue.

I'm using a modification of this script: php: How to save the client socket (not closed), so a further script may retrieve it to send an answer? but I cannot get the local file portion to work properly.

What I'm trying to do is essentially stream data between PHP processes/scripts by using a file as a middle-man, essentially streaming data.

I'm having trouble with the existing script where I'm opening/adding to the existing file.

On the stream_socket_server side, it'll work once (file doesn't exist), but then throws the error below on any subsequent attempt to run;

PHP Warning: stream_socket_server(): unable to connect to unix://./temp.sock (Unknown error)

Seems that when the stream_socket_server creates the file, it makes it as read only with details in the snippet below;

rwxrwxr-x 1 xxx xxx    0 Jun 13 20:05 temp.sock

I've tried adjusting the permissions to something more forgiving, but no luck.

On the socket client side, I cannot ever get it to open the file, existing or not.

$socket = stream_socket_server('unix://./temp.sock', $errno, $errstr);
$sock = stream_socket_client('unix:///./temp.sock', $errno, $errstr);

PHP Warning: stream_socket_server(): unable to connect to unix://./temp.sock (Unknown error) (server when file already exists)

PHP Warning: stream_socket_client(): unable to connect to unix://./temp.sock (Connection refused) (client)

Upvotes: 3

Views: 1979

Answers (2)

hanshenrik
hanshenrik

Reputation: 21463

let me just preface this with: are you sure you need unix sockets? are you sure proc_open()'s pipes aren't sufficient for your goal? proc_open() is significantly easier to use than unix sockets.. moving on,

caveats: don't trust fread() to read all your data in 1 go, especially when sending lager amounts of data (like megabytes), you'll need some way to communicate how big your message is going to be, that can be achieved by starting all your messages with a message length header, for example a little-endian uint64 string, you can generate that with

/**
 * convert a native php int to a little-endian uint64_t (binary) string
 *
 * @param int $i
 * @return string
 */
function to_little_uint64_t(int $i): string
{
    return pack('P', $i);
}

and you can parse it with

/**
 * convert a (binary) string containing a little-endian uint64_t
 * to a native php int
 *
 * @param string $i
 * @return int
 */
function from_little_uint64_t(string $i): int
{
    $arr = unpack('Puint64_t', $i);
    return $arr['uint64_t'];
}

sometimes fread() will not return all the data in the first call, and you'll have to keep calling fread() and append the data to get the full message, here's an implementation of such a fread()-loop:

/**
 * read X bytes from $handle,
 * or throw an exception if that's not possible.
 *
 * @param mixed $handle
 * @param int $bytes
 * @throws \RuntimeException
 * @return string
 */
function fread_all($handle, int $bytes): string
{
    $ret = "";
    if ($bytes < 1) {
        // ...
        return $ret;
    }
    $bytes_remaining = $bytes;
    for (;;) {
        $read_now = fread($handle, $bytes_remaining);
        $read_now_bytes = (is_string($read_now) ? strlen($read_now) : 0);
        if ($read_now_bytes > 0) {
            $ret .= $read_now;
            if ($read_now_bytes === $bytes_remaining) {
                return $ret;
            }
            $bytes_remaining -= $read_now_bytes;
        } else {
            throw new \RuntimeException("could only read " . strlen($ret) . "/{$bytes} bytes!");
        }
    }
}

furthermore, when sending larger amounts of data, you can't trust fwrite() either, sometimes you'll need to call fwrite, see how many bytes it wrote, then substr()-cut off the bytes that were actually written, and send the rest in a 2nd fwrite(), and so on, here's an implementation of an fwrite()-loop that keeps writing until everything is written (or throws an exception if it couldn't write everything):

/**
 * write the full string to $handle,
 * or throw a RuntimeException if that's not possible
 *
 * @param mixed $handle
 * @param string $data
 * @throws \RuntimeException
 */
function fwrite_all($handle, string $data): void
{
    $len = $original_len = strlen($data);
    $written_total = 0;
    while ($len > 0) {
        $written_now = fwrite($handle, $data);
        if ($written_now === $len) {
            return;
        }
        if ($written_now <= 0) {
            throw new \RuntimeException("could only write {$written_total}/{$original_len} bytes!");
        }
        $written_total += $written_now;
        $data = substr($data, $written_now);
        $len -= $written_now;
        assert($len > 0);
    }
}

.. with that out of the way, you can create your server like

$server_errno = null;
$server_errstr = "";
$server_path = __FILE__ . ".socket";
$server = stream_socket_server("unix://" . $server_path, $server_errno, $server_errstr, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN);
if (! $server || ! ! $server_errno) {
    throw new \RuntimeException("failed to create server {$server_path} - errno: {$server_errno} errstr: {$server_errstr}");
}
register_shutdown_function(function () use (&$server_path, &$server) {
    // cleanup
    fclose($server);
    unlink($server_path);
});
var_dump("listening on {$server_path}", $server);

now if you just need to support talking to 1 client, with just a single message, one could do

echo "waiting for connection...";
$client = stream_socket_accept($server);
echo "connection!\n";
echo "reading message size header..";
stream_set_blocking($client, true);
// size header is a little-endian 64-bit (8-byte) unsigned integer
$size_header = fread_all($client, 8);
$size_header = from_little_uint64_t($size_header);
echo "got size header, message size: {$size_header}\n";
echo "reading message...";
$message = fread_all($client, $size_header);
echo "message recieved: ";
var_dump($message);
$reply = "did you know that the hex-encoded sha1-hash of your message is " . bin2hex(hash("sha1", $message, true)) . " ?";
echo "sending reply: {$reply}\n";
fwrite_all($client, to_little_uint64_t(strlen($reply)) . $reply);
echo "reply sent!\n";

the client could then look like

$unix_socket_path = __DIR__ . "/unixserver.php.socket";
$conn_errno = 0;
$conn_errstr = "";
echo "connecting to unix socket..";
$conn = stream_socket_client("unix://" . $unix_socket_path, $conn_errno, $conn_errstr, (float) ($timeout ?? ini_get("default_socket_timeout")), STREAM_CLIENT_CONNECT);
if (! $conn || ! ! $conn_errno) {
    throw new \RuntimeException("unable to connect to unix socket path at {$unix_socket_path} - errno: {$conn_errno} errstr: {$conn_errstr}");
}
stream_set_blocking($conn, true);
echo "connected!\n";
$message = "Hello World";
echo "sending message: {$message}\n";
fwrite_all($conn, to_little_uint64_t(strlen($message)) . $message);
echo "message sent! waitinf for reply..";
$reply_length_header = fread_all($conn, 8);
$reply_length_header = from_little_uint64_t($reply_length_header);
echo "got reply header, length: {$reply_length_header}\n";
echo "reciving reply..";
$reply = fread_all($conn, $reply_length_header);
echo "recieved reply: ";
var_dump($reply);

now running the server we get:

hans@dev2020:~/projects/misc$ php unixserver.php 
string(59) "listening on /home/hans/projects/misc/unixserver.php.socket"
resource(5) of type (stream)
waiting for connection...

then running the client,

hans@dev2020:~/projects/misc$ php unixclient.php 
connecting to unix socket..connected!
sending message: Hello World
message sent! waitinf for reply..got reply header, length: 105
reciving reply..recieved reply: string(105) "did you know that the hex-encoded sha1-hash of your message is 0a4d55a8d778e5022fab701977c5d840bbc486d0 ?"

now looking back at our server, we'll see:

hans@dev2020:~/projects/misc$ php unixserver.php 
string(59) "listening on /home/hans/projects/misc/unixserver.php.socket"
resource(5) of type (stream)
waiting for connection...connection!
reading message size header..got size header, message size: 11
reading message...message recieved: string(11) "Hello World"
sending reply: did you know that the hex-encoded sha1-hash of your message is 0a4d55a8d778e5022fab701977c5d840bbc486d0 ?
reply sent!

this only works for 1 client at a time, with just a single reply/response, but at least it uses fread/fwrite loops correctly, and ensures the the entire message, no matter how big it is, is always sent/received in full..

lets do something more interesting: creating a server that can talk with an unlimited amount of clients asynchronously

// clients key is the client-id, and the value is the client socket
$clients = [];

stream_set_blocking($server, false);
$check_for_client_activity = function () use (&$clients, &$server): void {
    $select_read_arr = $clients;
    $select_read_arr[] = $server;
    $select_except_arr = [];
    $empty_array = [];
    $activity_count = stream_select($select_read_arr, $empty_array, $empty_array, 0, 0);
    if ($activity_count < 1) {
        // no activity.
        return;
    }
    foreach ($select_read_arr as $sock) {
        if ($sock === $server) {
            echo "new connections! probably..";
            // stream_set_blocking() has no effect on stream_socket_accept,
            // and stream_socket_accept will still block when the socket is non-blocking,
            // unless timeout is 0, but if timeout is 0 and there is no waiting connections,
            // php will throw PHP Warning: stream_socket_accept(): accept failed: Connection timed
            // so it seems using @ to make php stfu is the easiest way here
            $peername = "";
            while ($new_connection = @stream_socket_accept($server, 0, $peername)) {
                socket_set_blocking($new_connection, true);
                $clients[] = $new_connection;
                echo "new client! id: " . array_key_last($clients) . " peername: {$peername}\n";
            }
        } else {

            $client_id = array_search($sock, $clients, true);
            assert(! ! $client_id);
            echo "new message from client id {$client_id}\n";
            try {
                $message_length_header = fread_all($sock, 8);
                $message_length_header = from_little_uint64_t($message_length_header);
                $message = fread_all($sock, $message_length_header);
                echo "message: ";
                var_dump($message);
            } catch (Throwable $ex) {
                echo "could not read the full message, probably means the client has been disconnected. removing client..\n";
                // removing client
                stream_socket_shutdown($sock, STREAM_SHUT_RDWR);
                fclose($sock);
                unset($clients[$client_id]);
            }
        }
    }
};
for (;;) {
    // pretend we're doing something else..
    sleep(1);
    echo "checking for client activity again!\n";
    $check_for_client_activity();
}

now just call $check_for_client_activity(); whenever convinient, to see if you have a message from any of your clients. and if you have nothing to do and want to wait until you get a message from any of your clients, you can do like

$empty_array = [];
$select_read_arr=$clients;
$select_read_arr[]=$server;
$activity_count = stream_select($select_read_arr, $empty_array, $empty_array, null, null);

warning though, with the 2 last arguments to stream_select() being null, stream_select can block indefinitely if you dont get any new connections and nothing happens with any of your clients. (you can set another timeout, like 1 second or whatever, to set a timeout. null means "wait forever")

Upvotes: 3

maxleroy
maxleroy

Reputation: 126

There is actually many reasons why you cannot achieve this.

  • First to make things easier use a file that doesn't start with '.' so it's not hidden in your finder/terminal.
  • Then make sure to delete the socket file after each time you run your scripts. You can do so in your script with unlink() or by hand rm temp.sock If you don't then the socket server can't be created as it exists already.

You might think it doesn't work but in reality it does: - !preg_match('/\r?\n\r?\n/', $buffer) this condition prevents the buffer to be outputted in your persist script because it waits for this double carriage return to arrive in the socket to print everything up. So data might go in the socket and be read in persist script but not echoed to the response.

Couldn't spend too much time on it but here is a version of the two files that work. Make sure you run persist.php before senddata.php

persist.php

<?php 

$socket = stream_socket_server('unix://unique.sock', $errno, $errstr);

if (!$socket) {
  echo "$errstr ($errno)<br />\n";
} else {
  while ($conn = stream_socket_accept($socket)) {
    $buffer = "";
    while (false === strpos($buffer, 'QUIT')) {
        $buffer .= fread($conn, 2046); 
    }
    echo $buffer;      
    flush();

    // Respond to socket client
    fwrite($conn,  "200 OK HTTP/1.1\r\n\r\n");
    fclose($conn);
    break;
  }
  fclose($socket);
  unlink('unique.sock');
}

senddata.php

<?php 

$sock = stream_socket_client('unix://unique.sock', $errno, $errstr);
if (false == $sock) {
    die('error');
}
while ($data = fgets(STDIN)) {
    fwrite($sock, $data);
    fflush($sock);  
}

fclose($sock);  

Not sure exactly in which context you wish to use this. But this can help you understand how to use sockets. If you don't need crazy fast performances, Or if you're more in a web context, I'd advise you to switch and use WebSockets.

There's a great library you can find here : http://socketo.me/

This is modern and Object oriented. Hope it helps.

Upvotes: 2

Related Questions