MMahya RA
MMahya RA

Reputation: 1

PHP TCP Socket Server - esp32

I want to create a TCP socket using PHP code that can handle a large number of clients. In fact, my clients are network modules that want to connect to the server and have a two-way connection.

I have a few goals in mind:

Display a list of connected clients. Ability to send messages to a specific client through an HTML page. Ability to connect multiple clients simultaneously (in this case, I mean 10). I have a shared host, and I run all the server code on it. This host doesn't provide console access, so I have no idea how to use ready-made libraries like ReactPHP.

Currently, I have only managed to create a list of connected clients. Here is the code for that:

<?php
error_reporting(E_ALL);

$host = '0.0.0.0';
$port = 8085;

$clients = [];
$onlineUsersFile = 'online_users.json';

$server = stream_socket_server("tcp://$host:$port", $errno, $errstr);

if (!$server) {
die("Error: $errstr ($errno)\n");
}

echo "Server started on $host:$port\n";

while (true) {
$read = $clients;
$read[] = $server;

if (!stream_select($read, $write, $except, 1)) {
    // Check every 1 second for changes in connections
    continue;
}

if (in_array($server, $read)) {
    $newClient = stream_socket_accept($server);
    socket_set_blocking($newClient, 0);
    $clients[] = $newClient;
    $data = "Welcome to the chat room!\n";
    stream_socket_sendto($newClient, $data);
    echo "New client connected\n";
    broadcastToAll($clients, "New client connected\n");
    $key = array_search($server, $read);
    unset($read[$key]);
}

foreach ($read as $key => $client) {
    stream_set_timeout($client, 1);
    $data = fread($client, 1024);
    if ($data === false || feof($client)) {
        fclose($client);
        unset($clients[$key]);
        echo "Client disconnected\n";
        broadcastToAll($clients, "Client disconnected\n");
        continue;
    }
    if ($data != '') {
        broadcast($client, $data, $clients);
    }
}
updateOnlineUsers($clients, $onlineUsersFile);
}

function broadcast($sender, $message, $clients)
{
foreach ($clients as $client) {
    if ($client !== $sender) {
        stream_socket_sendto($client, $message);
    }
}
}

function broadcastToAll($clients, $message)
{
foreach ($clients as $client) {
    stream_socket_sendto($client, $message);
}
}

function updateOnlineUsers($clients, $file)
{
$onlineUsers = [];
foreach ($clients as $client) {
    $address = stream_socket_get_name($client, true);
    $onlineUsers[] = $address;
}
file_put_contents($file, json_encode($onlineUsers));
}
     $onlineUsers[] = $address;
    }
    file_put_contents($file, json_encode($onlineUsers));
}

Ability to send messages to a specific client through an HTML page.

Upvotes: -2

Views: 193

Answers (1)

Juggernogger93
Juggernogger93

Reputation: 21

I wrote this server for my social network. It uses fcntl_fork to create process for each connected client. The server can communicate with every connected client using stream_socket_pair. Here is the code:

<?php 

////////////////////////////////////////////////////////
//Muti process TCP server///////////////////////////////
//Designed by Sebastian Winbladh////////////////////////
//This program will exicute in a new process////////////
//Created for Talkie (Video Social Network) 2016-11-08//
////////////////////////////////////////////////////////

// Maximum allowed network communication size per package
// This one will also effect the MAX_SIZE_SERVER
// MAX_SIZE_SERVER is the uper limit to what can be sent in each package

define("MAX_SIZE_NETWORK",100000);

// Maximum allowed package size for internal process communication
define("MAX_SIZE_SERVER",100000);

// Read and write stream class for the sockets

class tich_api_socket_read_write{
    
    //Parameter = socket: points to the socket that this instance handles
    //maxbytes: The total number of bytes that can be sent in each package
    
    public $thread = "main";
    public $batch = "";
    public $test = "";
    
    function __construct(&$socket,$maxbytes = 1024,$thread  = "main"){ 
    
        $this->thread = $thread;
        $this->socket = &$socket;
        $this->maxbytes = $maxbytes;
        
        //Reading buffers
        $this->read_buffer = "";
        $this->total = -1; 
        
        $this->callback = NULL;
        $this->process = NULL;
    
    }  
    
    //Read function. Reads data from socket
    //Parameter = bytes: The bytes that we like to read from the socket
    
    private function read($bytes){
        
        if(($read = @stream_socket_recvfrom($this->socket,$bytes)) === false){ 
            return NULL;
        }return $read;
        
    }
    
    //Write function. Writes data to socket
    //Parameter = data: Data that we like to write to the socket
    //No max write length. That is handled on the read part
    
    public function write(){
         
        //This line can crash when a ghost client has astibliched a connection to the server.
        //If the server like to call a client that has closed its connection this line will return -1, meaning no data writen
        $wrote = @stream_socket_sendto($this->socket,$this->batch);
        $this->batch = "";
         
        //Return true when data was writen
        return $wrote;
        
    }
    
    public function writeToBatch($msg,$id = 0,$aug = -1, $alive = false){
    
        $length = strlen($msg);
        
        if($alive){
            //Do nothing
        }else if($aug == -1){ $msg = pack("N",$length).pack("N",$id).$msg; 
        }else{ $msg = pack("N",$length).pack("N",$id).pack("N",$aug).$msg; }
        
        //Write to write buffer  
        $this->batch .= $msg;
        
    }
    
    //This function iterates and reads a message from the 
    //input stream and adds it to the read buffer
    //...........................................
    
    public function iterate(){ 
         
        //Get the max read bytes
        $bytesleft = $this->maxbytes; 
        
        $input = $this->read($bytesleft);
        
         
        //Break if the connections breaks
        if($input == NULL){return -1;} 
        else if(strlen($input)>0){
        
            $this->read_buffer.= $input;   
            
            //Here we take from our query buffer
            while(strlen($this->read_buffer) > 0){ 
                
                //Here we unpacking the size for the message
                //and the category this message is pointing to
                
                //Network order to host order conversion
                ////////////////////////////////////////
                
                
                $offset = 8; 
                $bytes = unpack("NInt", substr($this->read_buffer,0,4))["Int"];
                     
        
                //Ping was retrived from client
                if($bytes == 0){
                    
                    if($this->thread != "main") {call_user_func($this->callback,$this->socket,"PING",0);}
                    $this->read_buffer = substr($this->read_buffer,4);
                    
                //Normal message handling
                } else {
                    
                    
                    $category = unpack("NInt", substr($this->read_buffer,4,4) )["Int"];
                    
                    if($this->thread == "main") {$offset = 12; $aug = unpack("NInt", substr($this->read_buffer,8,4) )["Int"];
                    } 
                     
                    //Call the read message hook callback here  
                    if(strlen($this->read_buffer) - $offset < $bytes)break;  
                    
                    //Here we take a part of the read buffer when we have a complete message
                    $message = substr($this->read_buffer,$offset,$bytes);
                    
                    //Cut the read message from the read buffer so that we can start reading
                    //any new message
                    $this->read_buffer = substr($this->read_buffer,$bytes+$offset);
                     
                    //Call the tcp callback function
                    
                    if($this->thread == "main") {
                         
                        call_user_func($this->callback,$this->socket,$message,$category,$aug);}else{ 
                     
                    call_user_func($this->callback,$this->socket,$message,$category);}
                    
                }
                
            }return 1;
            
        }return 0;
        
    }
    
    //Parameter = complete: callback function that is called when a message has been read
    
    public function setReadBufferCallback($complete){
         
        $this->callback = $complete;
        
    }
    
    //Sends one bytes to the client to check if it is connected
    public function alive(){ 

        $msg = pack("N",1);
        $this->writeToBatch($msg, 0, -1, true);

        //Send alive write to client
        $this->write();
        
    }
    
}

class tich_api_socket_IPC{
    
    public $aug = 0,$errors = array();
    
    function __construct(){ 
        
        $this->pipes = array(); 
        $this->ipcBuffers = array();  
    
    }
    
    public function broadcastToPipeClients($ids,$data = NULL, $callback = NULL){
         
        $this->aug ++;
        $this->errors[$this->aug] = array($callback,$ids,array());
         
        //If no data was set as default, we use the message that is already in the buffer
        if($data == NULL){$data = $this->message;}
        if(is_array($data)){$data = json_encode($data);} 
        
        if(is_numeric($ids)){
             
            //Get the first item in array of ipc buffers
            $buffer = reset($this->ipcBuffers);
            
            //Write only one id
            if($buffer != NULL){ $buffer->writeToBatch($data,$ids,$this->aug);}
            
        }else if(is_array($ids)){ 
            
            //Get the first item in array of ipc buffers
            $buffer = reset($this->ipcBuffers);
            
            //Send data to multi processes
            foreach($ids as $item){if($buffer != NULL){ $buffer->writeToBatch($data,$item,$this->aug);}}
            
        }
        
        return $buffer;
    }
    
    //Needs to be called from the main process loop
    protected function iterate_IPC($pipes){
         
        /////////////////////////////
        //IPC socket handling////////
        //Reading from child process/
        /////////////////////////////
         
        foreach($pipes as $pipe){
                  
            //Iterate pipes 
            $this->ipcBuffers[$pipe]->iterate(); 
             
        }  
        
    }
    
}

//Client socket
class tich_api_buffer extends tich_api_socket_IPC{
    
    public $message,$id,$category,$ghost,
    $pingfuturetime = 0,$pingsenttime = 0,$pingreturntime = 0;
    
    function __construct($socket,$init,$callback,$pipe){
        
        parent::__construct(); 
        
        $this->id = -1;   
        $this->init = $init;  
        $this->client_socket = $socket;
         
        $this->clientBuffer = new tich_api_socket_read_write($this->client_socket,MAX_SIZE_NETWORK,"child"); 
        
        //Data from client app
        $this->clientBuffer->setReadBufferCallback(function($socket,$msg,$category) use ($callback){
         
            //Returned ping from client
            $this->pingReturned();
            if($msg == "PING"){return;}
         
            $this->message = $msg;
            $this->category = $category;
            call_user_func($callback,$this); 
            
        });
        
        $this->pipes = array($pipe);
        
        $this->ipcBuffers[$pipe] = new tich_api_socket_read_write($pipe,MAX_SIZE_SERVER,"child");  
        
        //Data from mother process
        $this->ipcBuffers[$pipe]->setReadBufferCallback(function($socket,$msg,$category){
          
            $this->dataFromPipe($msg,$category);
            
        });
        
        //Address and port numbers
        $this->address = NULL;
        $this->port = NULL; 
        
        //Client options
        $this->options = array();
        
        $adress = explode(":",stream_socket_get_name($this->client_socket, true)); 
        $this->address = $adress[0];
        $this->port = $adress[1];
         
        call_user_func($this->init,$this);
        
        //Init read and write stream for this client
        $this->mainLoop();
        
    }  
    
    //Messages from mother IPC pipe
    private function dataFromPipe($msg,$category){
         
                  
        //No user with the id online 
        if(is_numeric($msg)){
         
            $index = array_search($msg, $this->errors[$category][1]);
            
            
            
            unset($this->errors[$category][1][$index]);
             
            if($index >= 0 && $msg>0){$this->errors[$category][2][] = abs($msg);}
            
            //The cetegory is here the user function id. Not the user id
            if(count($this->errors[$category][1]) == 0){ 
                               
                if(is_object($this->errors[$category][0])){ 
                    
                    call_user_func($this->errors[$category][0],$this->errors[$category][2]);}
                unset($this->errors[$category]);
                
            } 
            
        }else{
            
            if($msg == "exit"){
                  
                $this->broadcast(array("question"=>"error","ret"=>"2"))->write();
                $this->close();
                
            }else{
                //Just retransmit the data to the client here 
                $this->clientBuffer->writeToBatch($msg,$category);
                $this->clientBuffer->write();
            }
        }
                
    }
    
    private function mainLoop(){
        
        //Connected
        //echo "Client ".$this->address." : ".$this->port." is now connected. \n";
        
        //Main client loop
        while(true){  
             
            //Make selection from child sockets so we can read them
            $read = array_merge(array($this->client_socket),$this->pipes); 
            
            // Select object here
            // now call select
            if(false === ($num_changed_streams = stream_select($read, $write, $except, 5))){}
            
            // Iterate Intern process communication 
            if(in_array($this->pipes[0],$read)){
                 
                $this->iterate_IPC($this->pipes);
                 
            }
            
            //Iterate the clients reads
            if(in_array($this->client_socket,$read)){
             
                //Break if client disconnects
                if($this->clientBuffer->iterate() == -1){  break; }
                $this->pingfuturetime = microtime(true) + 60;
                
            }
            
            //Test for client broken pipe. Send alives each minute
            if(microtime(true)  >= $this->pingfuturetime && $this->clientBuffer){ 
                
                //Un comment when the finale version is coming out
                if($this->pingreturntime < $this->pingsenttime){  break; }
                
                //Send alive call to client
                $this->pingsenttime = microtime(true);
                $this->clientBuffer->alive();
                
                $this->pingfuturetime = microtime(true) + 60; 
                
                
            }
            
        }
        
        //Disconnected
        //echo "Client ".$this->address." : ".$this->port." disconnected. Closing program ".getmypid()."... \n";
        
        //Close client socket
        $this->close();
         
    }
    
    //Ping return function from client
    public function pingReturned(){
        
        $this->pingreturntime = microtime(true);
        
    }
    
    public function close(){
        
        //Call the mother to close and clean hear pair socket
        $this->broadcastToPipeClients($this->id,"exit")->write();
        
        stream_socket_shutdown($this->pipes[0],STREAM_SHUT_RDWR);
        fclose($this->pipes[0]);
        stream_socket_shutdown($this->client_socket,STREAM_SHUT_RDWR);
        fclose($this->client_socket);
        
        //As this is a program we need to exit  
        exit();
        
    }
    
    //Function to send some data to this client
    public function broadcast($array){
          
        $query = json_encode($array);
          
        //Append data to the clientBuffer
        $this->clientBuffer->writeToBatch($query);
        return $this->clientBuffer;
        
    } 
    
    public function setProcessId($id){
        
        $this->id = $id;  
         
        //Brodcast id to mother process and register this process
        return $this->broadcastToPipeClients($id,"register");
        
    }  
    
    //Gets a reference to the parameters object, so you can set it by like a normal array
    public function &getOptions(){
         
        return $this->options; 
        
    }
    
    
}
  
//Server socket
class tich_api_server extends tich_api_socket_IPC{
    
    public $slots;
    
    function __construct($port){
         
        parent::__construct();
         
        $this->slots = pow(2,32) / 2;
        $this->address = "85.159.212.83";
        $this->port = $port;   
        $this->r = true;   
        $this->registered_childs = array();
        
        $ctx = stream_context_create(['socket' => ['ipv6_v6only' => true]]);
        $this->sock = stream_socket_server("tcp://".$this->address.":".$this->port, $errno, $errstr,
                           STREAM_SERVER_BIND | STREAM_SERVER_LISTEN, $ctx);
        
        if(!$this->sock)
        { 
            die("Couldn't create socket: [$errno] $errstr \n");
        } 
                
    }
    
    //Messages from childs IPC pipe
    private function dataFromPipe($socket,$msg,$category,$aug){
         
        //Handle socket exit
        if($msg == "exit"){
            
            //Close the live socket on the mother process
            //And clear any pointers to it
            fclose($socket);
            $this->pipes = array_diff($this->pipes,array($socket));
            $this->registered_childs = array_diff($this->registered_childs,array((int)$socket));
             
            //Clear the icp buffer for this pipeline
            unset($this->ipcBuffers[(int)$socket]); 
             
        //Register child socket with an id on the main process
        }else if($msg == "register"){
             
            //Register what child process is assosiated with category
            if(!array_key_exists($category,$this->registered_childs)){
                
                $this->registered_childs[$category] = (int)$socket;
            
            //Id is already in the system
            }else{ 
                
                //Select child process and transmit the msg 
                $process_id = $this->registered_childs[$category];
                $this->ipcBuffers[$process_id]->writeToBatch ("exit",$aug);
                $this->ipcBuffers[$process_id]->write();
                    
                $this->registered_childs[$category] = (int)$socket;
                
            }  
             
        }else{
              
            //Select child process and transmit the msg 
            $process_id = $this->registered_childs[$category];
                
            //Check so a process has been found
            if($process_id != NULL){  
                
                //Pointing message to child thread
                $this->ipcBuffers[$process_id]->writeToBatch ($msg);
                $this->ipcBuffers[$process_id]->write();
                
                $this->ipcBuffers[$socket]->writeToBatch     (-$category,$aug);
                $this->ipcBuffers[$socket]->write();
            
            //This client was not online
            }else{
                
                //Error code 1 = not online
                $this->ipcBuffers[$socket]->writeToBatch($category,$aug);
                $this->ipcBuffers[$socket]->write();
                
            }
            
        }
        
    }
    
    //Public methods
    public function listen_and_read($init,$callback){ 
        
        //Whait
        pcntl_signal(SIGCHLD, SIG_IGN);
        
        //Child process, tree process
          
        //start loop to listen for incoming connections and process existing connections
        while (true){
             
            //Make selection from child sockets so we can read them
            $read = array_merge($this->pipes,array($this->sock));
            
            //Select object here
            //now call select - blocking call
            if(false === ($num_changed_streams = stream_select($read, $write, $except, 5)))
            { 
                //Faild to select 
                continue;
            }
             
            //If there is pipe data to read
            if($num_changed_streams > 0){
                  
                $pipes_read = array_diff($read,array($this->sock)); 
                $this->iterate_IPC($pipes_read);
                
            } 
            
            /////////////////////////////
            //TCP socket handling////////
            /////////////////////////////
            
            if(in_array($this->sock,$read)){
            
                $sock = stream_socket_accept($this->sock); 
                 
                if($sock === false){
                    
                    //Do something if the socket accept faild
                        
                //Handle connection
                }else if($sock > 0){
                    
                    $pipe = array();
                    if (!($pipe = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_DGRAM, 0))) {
                        print "Faild to pair socket";
                        continue;
                    }
                
                    //Create new prosses
                    $this->slots ++;
                    if($this->slots >= pow(2,32)){$this->slots = pow(2,32) / 2;}
                    
                    $newp = pcntl_fork();
            
                    if($newp == -1){
                    
                        print "Faild to fork process";
                    
                    }else if($newp){
                        
                         //Mother process, main  
             
                        //Close socket to this process
                        fclose($pipe[0]);  
                        $this->pipes[] = $pipe[1];
                          
                        $this->ipcBuffers[$pipe[1]] = new tich_api_socket_read_write($pipe[1],MAX_SIZE_SERVER);  
                        
                        // Set up a listener for child process communication.
                        // Callback function fires when data can be read from child process
                        $this->ipcBuffers[$pipe[1]]->setReadBufferCallback(function($socket,$msg,$category,$aug){
                            
                            //Send the input to our handling function
                            $this->dataFromPipe($socket,$msg,$category,$aug); 
                            
                        });
                        
                        //Continue to process other incoming connections
                        continue; 
                    
                    
                    }else{ 
                         
                        //Child process, tree process
                        //This process handles the logic for in and out streams from a client
                         
                        //Close socket to this process
                        fclose($pipe[1]);  
                        $this->pipes[0] = $pipe[0];
                    
                        //Create a new client socket that handles this connected client
                        $socket_client = new tich_api_buffer($sock,$init,$callback,$pipe[0]);
                        
                        //Register default client id
                        $socket_client->setProcessId($this->slots);
                         
                    }
                }
            }
        } 
    }  
}

// The client class is only for oneway communication
// It is not meant to be a two way trip.
// Used for communication with the server socket

// When using this client don't forget to call the write function to send the batch of data

class tich_api_client extends tich_api_socket_read_write{
    
    private $address,$port;
    
    function __construct ($address,$port){
        
        $this->address = $address;
        $this->port    = $port; 
         
        // Create a socket
        if(!($this->sock = stream_socket_client("tcp://".$this->address.":".$this->port,$errno, $errstr)))
        {
            $errorcode = $errno;
            $errormsg  = $errstr;
             
            die("Couldn't create socket: [$errorcode] $errormsg \n");
        }   
        
        //Init parent class
        parent::__construct($this->sock); 
        
    }
    
    public function expandToBatch($msg){
        
        //Write to write buffer  
        $this->batch .= $msg.",";
        
    }
    
    //Sends a message to the server
    public function broadcastToPipeClients($ids,$data = NULL,$p = 100){
        
        $data = json_encode(array($ids,$data,$p)); 
         
        //Write data to socket with the to data
        $this->expandToBatch($data);
        
    } 
    
    public function write(){
        
        $this->batch = "[". chop($this->batch,",") ."]";
        
        $length = strlen($this->batch); 
        $msg = pack("N",$length).pack("N",0).$this->batch;
        
        //Write to write buffer  
        $this->batch = $msg;
        
        //Finaly we do the write
        parent::write();
        
    }   
     
}

?>

You probably have to do some refactoring to make it work for you. I initialize it with this code:

$tich_stream = new tich_api_server($port);
            
            //Functon will run on start up. It is the client default setup
            $tich_stream->listen_and_read(function($client){
                 
                //Init function
                $params = &$client->getOptions();
                $params["autherized"] = "0";
                $params["key"] = "";
             
            //Function will only run when client get a complete message
            },function($client) use ($tich_ask,$tich_api,$mysql_collections){
                    
                $encryption = new Encryption();
                $message = $encryption->decrypt($client->message);
                                          
                $tich_ask->parseQustion($message);
                $params = &$client->getOptions();
                
                //Handle incoming messages from public tcp clients
                if($client->address != "85.159.212.83"){
                 
                    if(@($params["autherized"] == "1")){ 
                         
                        $result = $tich_api->apiCall($tich_ask->question,$tich_ask->values,$client);
                        
                        if($result != NULL){
                            $result["question"] = $tich_ask->question . " return"; 
                            
                            $client->broadcast($result)->write();
                        }
                        reset($client->ipcBuffers)->write();
                         
                    }else if($tich_ask->question == "confirm login tcp"){ 
                     
                        $return = $tich_api->apiCall($tich_ask->question,$tich_ask->values);
                         
                        //Set process id to users id, so we can access it later
                        
                        $client->setProcessId($return["ret"])->write();
                        
                        if($return["ret"] > 0){$params["autherized"] = "1";
                        }else{ $params["autherized"] = "0";}
                         
                        $params["key"]        = $return["key"];
                        
                        //Send return message to client
                        $client->broadcast(array("ret" => $params["autherized"],"question"=>"confirm login tcp"))->write(); 
                        
                    }else{
                          
                        $client->broadcast(array("ret" => $params["autherized"], "error"=>"1","question"=>"confirm login tcp"))->write(); 
                    
                    }
                    
                //Handle incoming messages from the private tcp client
                //That is clients that has been connected from this machine(localhost)
                }else if($client->address == "85.159.212.83"){
                     
                    $data = json_decode($client->message,true);  
                     
                    //Parse array of data coming from pipeline
                    foreach($data as $item){
                         
                        //As we dont set any process id, the process will bee initelized as a ghost connection
                        $client->broadcastToPipeClients($item[0],$item[1]); 
                        
                    }
                      
                    // This method includes a write method
                    $client->close();
                }
            });

You need to change ip address in the code to make it work on you sever. You need console access to start the server. So many this won't help. But here you go anyways.

Upvotes: 1

Related Questions