PHP TCP 套接字服务器 - esp32

问题描述 投票:0回答:1

我想使用 PHP 代码创建一个可以处理大量客户端的 TCP 套接字。事实上,我的客户端是想要连接到服务器并具有双向连接的网络模块。

我心中有几个目标:

显示已连接客户端的列表。 能够通过 HTML 页面向特定客户端发送消息。 能够同时连接多个客户端(在本例中,我的意思是 10 个)。 我有一个共享主机,我在上面运行所有服务器代码。该主机不提供控制台访问,所以我不知道如何使用像 ReactPHP 这样的现成库。

目前,我只能创建一个已连接客户端的列表。这是代码:

<?php
error_reporting(E_ALL);

$host = '0.0.0.0';
$port = 8085;

$clients = [];
$onlineUsersFile = 'online_users.json';

$server = stream_socket_server("tcp://$host:$port", $errno, $errstr);

if (!$server) {
die("Error: $errstr ($errno)\n");
}

echo "Server started on $host:$port\n";

while (true) {
$read = $clients;
$read[] = $server;

if (!stream_select($read, $write, $except, 1)) {
    // Check every 1 second for changes in connections
    continue;
}

if (in_array($server, $read)) {
    $newClient = stream_socket_accept($server);
    socket_set_blocking($newClient, 0);
    $clients[] = $newClient;
    $data = "Welcome to the chat room!\n";
    stream_socket_sendto($newClient, $data);
    echo "New client connected\n";
    broadcastToAll($clients, "New client connected\n");
    $key = array_search($server, $read);
    unset($read[$key]);
}

foreach ($read as $key => $client) {
    stream_set_timeout($client, 1);
    $data = fread($client, 1024);
    if ($data === false || feof($client)) {
        fclose($client);
        unset($clients[$key]);
        echo "Client disconnected\n";
        broadcastToAll($clients, "Client disconnected\n");
        continue;
    }
    if ($data != '') {
        broadcast($client, $data, $clients);
    }
}
updateOnlineUsers($clients, $onlineUsersFile);
}

function broadcast($sender, $message, $clients)
{
foreach ($clients as $client) {
    if ($client !== $sender) {
        stream_socket_sendto($client, $message);
    }
}
}

function broadcastToAll($clients, $message)
{
foreach ($clients as $client) {
    stream_socket_sendto($client, $message);
}
}

function updateOnlineUsers($clients, $file)
{
$onlineUsers = [];
foreach ($clients as $client) {
    $address = stream_socket_get_name($client, true);
    $onlineUsers[] = $address;
}
file_put_contents($file, json_encode($onlineUsers));
}
     $onlineUsers[] = $address;
    }
    file_put_contents($file, json_encode($onlineUsers));
}

能够通过 HTML 页面向特定客户端发送消息。

iot tcpserver php-socket
1个回答
0
投票

我为我的社交网络编写了这个服务器。它使用 fcntl_fork 为每个连接的客户端创建进程。服务器可以使用stream_socket_pair与每个连接的客户端进行通信。这是代码:

<?php 

////////////////////////////////////////////////////////
//Muti process TCP server///////////////////////////////
//Designed by Sebastian Winbladh////////////////////////
//This program will exicute in a new process////////////
//Created for Talkie (Video Social Network) 2016-11-08//
////////////////////////////////////////////////////////

//Maximum allowed network comunication size per package
//This one will also effect the MAX_SIZE_SERVER
//MAX_SIZE_SERVER is the uper limit to what can be sent in each package

define("MAX_SIZE_NETWORK",100000);

//Maximum allowed package size for internal process comunication
define("MAX_SIZE_SERVER",100000);

//Read and write stream class for the sockets

class tich_api_socket_read_write{
    
    //Parameter = socket: points to the socket that this instance handles
    //maxbytes: The total number of bytes that can be sent in each package
    
    public $thread = "main";
    public $batch = "";
    public $test = "";
    
    function __construct(&$socket,$maxbytes = 1024,$thread  = "main"){ 
    
        $this->thread = $thread;
        $this->socket = &$socket;
        $this->maxbytes = $maxbytes;
        
        //Reading buffers
        $this->read_buffer = "";
        $this->total = -1; 
        
        $this->callback = NULL;
        $this->process = NULL;
    
    }  
    
    //Read function. Reads data from socket
    //Parameter = bytes: The bytes that we like to read from the socket
    
    private function read($bytes){
        
        if(($read = @stream_socket_recvfrom($this->socket,$bytes)) === false){ 
            return NULL;
        }return $read;
        
    }
    
    //Write function. Writes data to socket
    //Parameter = data: Data that we like to write to the socket
    //No max write length. That is handled on the read part
    
    public function write(){
         
        //This line can crash when a ghost client has astibliched a connection to the server.
        //If the server like to call a client that has closed its connection this line will return -1, meaning no data writen
        $wrote = @stream_socket_sendto($this->socket,$this->batch);
        $this->batch = "";
         
        //Return true when data was writen
        return $wrote;
        
    }
    
    public function writeToBatch($msg,$id = 0,$aug = -1, $alive = false){
    
        $length = strlen($msg);
        
        if($alive){
            //Do nothing
        }else if($aug == -1){ $msg = pack("N",$length).pack("N",$id).$msg; 
        }else{ $msg = pack("N",$length).pack("N",$id).pack("N",$aug).$msg; }
        
        //Write to write buffer  
        $this->batch .= $msg;
        
    }
    
    //This function iterates and reads a message from the 
    //input stream and adds it to the read buffer
    //...........................................
    
    public function iterate(){ 
         
        //Get the max read bytes
        $bytesleft = $this->maxbytes; 
        
        $input = $this->read($bytesleft);
        
         
        //Break if the connections breaks
        if($input == NULL){return -1;} 
        else if(strlen($input)>0){
        
            $this->read_buffer.= $input;   
            
            //Here we take from our query buffer
            while(strlen($this->read_buffer) > 0){ 
                
                //Here we unpacking the size for the message
                //and the category this message is pointing to
                
                //Network order to host order conversion
                ////////////////////////////////////////
                
                
                $offset = 8; 
                $bytes = unpack("NInt", substr($this->read_buffer,0,4))["Int"];
                     
        
                //Ping was retrived from client
                if($bytes == 0){
                    
                    if($this->thread != "main") {call_user_func($this->callback,$this->socket,"PING",0);}
                    $this->read_buffer = substr($this->read_buffer,4);
                    
                //Normal message handling
                } else {
                    
                    
                    $category = unpack("NInt", substr($this->read_buffer,4,4) )["Int"];
                    
                    if($this->thread == "main") {$offset = 12; $aug = unpack("NInt", substr($this->read_buffer,8,4) )["Int"];
                    } 
                     
                    //Call the read message hook callback here  
                    if(strlen($this->read_buffer) < $bytes)break;  
                    
                    //Here we take a part of the read buffer when we have a complete message
                    $message = substr($this->read_buffer,$offset,$bytes);
                    
                    //Cut the read message from the read buffer so that we can start reading
                    //any new message
                    $this->read_buffer = substr($this->read_buffer,$bytes+$offset);
                     
                    //Call the tcp callback function
                    
                    if($this->thread == "main") {
                         
                        call_user_func($this->callback,$this->socket,$message,$category,$aug);}else{ 
                     
                    call_user_func($this->callback,$this->socket,$message,$category);}
                    
                }
                
            }return 1;
            
        }return 0;
        
    }
    
    //Parameter = complete: callback function that is called when a message has been read
    
    public function setReadBufferCallback($complete){
         
        $this->callback = $complete;
        
    }
    
    //Sends one bytes to the client to check if it is connected
    public function alive(){ 

        $msg = pack("N",1);
        $this->writeToBatch($msg, 0, -1, true);

        //Send alive write to client
        $this->write();
        
    }
    
}

class tich_api_socket_IPC{
    
    public $aug = 0,$errors = array();
    
    function __construct(){ 
        
        $this->pipes = array(); 
        $this->ipcBuffers = array();  
    
    }
    
    public function broadcastToPipeClients($ids,$data = NULL, $callback = NULL){
         
        $this->aug ++;
        $this->errors[$this->aug] = array($callback,$ids,array());
         
        //If no data was set as default, we use the message that is already in the buffer
        if($data == NULL){$data = $this->message;}
        if(is_array($data)){$data = json_encode($data);} 
        
        if(is_numeric($ids)){
             
            //Get the first item in array of ipc buffers
            $buffer = reset($this->ipcBuffers);
            
            //Write only one id
            if($buffer != NULL){ $buffer->writeToBatch($data,$ids,$this->aug);}
            
        }else if(is_array($ids)){ 
            
            //Get the first item in array of ipc buffers
            $buffer = reset($this->ipcBuffers);
            
            //Send data to multi processes
            foreach($ids as $item){if($buffer != NULL){ $buffer->writeToBatch($data,$item,$this->aug);}}
            
        }
        
        return $buffer;
    }
    
    //Needs to be called from the main process loop
    protected function iterate_IPC($pipes){
         
        /////////////////////////////
        //IPC socket handling////////
        //Reading from child process/
        /////////////////////////////
         
        foreach($pipes as $pipe){
                  
            //Iterate pipes 
            $this->ipcBuffers[$pipe]->iterate(); 
             
        }  
        
    }
    
}

//Client socket
class tich_api_buffer extends tich_api_socket_IPC{
    
    public $message,$id,$category,$ghost,
    $pingfuturetime = 0,$pingsenttime = 0,$pingreturntime = 0;
    
    function __construct($socket,$init,$callback,$pipe){
        
        parent::__construct(); 
        
        $this->id = -1;   
        $this->init = $init;  
        $this->client_socket = $socket;
         
        $this->clientBuffer = new tich_api_socket_read_write($this->client_socket,MAX_SIZE_NETWORK,"child"); 
        
        //Data from client app
        $this->clientBuffer->setReadBufferCallback(function($socket,$msg,$category) use ($callback){
         
            //Returned ping from client
            $this->pingReturned();
            if($msg == "PING"){return;}
         
            $this->message = $msg;
            $this->category = $category;
            call_user_func($callback,$this); 
            
        });
        
        $this->pipes = array($pipe);
        
        $this->ipcBuffers[$pipe] = new tich_api_socket_read_write($pipe,MAX_SIZE_SERVER,"child");  
        
        //Data from mother process
        $this->ipcBuffers[$pipe]->setReadBufferCallback(function($socket,$msg,$category){
          
            $this->dataFromPipe($msg,$category);
            
        });
        
        //Address and port numbers
        $this->address = NULL;
        $this->port = NULL; 
        
        //Client options
        $this->options = array();
        
        $adress = explode(":",stream_socket_get_name($this->client_socket, true)); 
        $this->address = $adress[0];
        $this->port = $adress[1];
         
        call_user_func($this->init,$this);
        
        //Init read and write stream for this client
        $this->mainLoop();
        
    }  
    
    //Messages from mother IPC pipe
    private function dataFromPipe($msg,$category){
         
                  
        //No user with the id online 
        if(is_numeric($msg)){
         
            $index = array_search($msg, $this->errors[$category][1]);
            
            
            
            unset($this->errors[$category][1][$index]);
             
            if($index >= 0 && $msg>0){$this->errors[$category][2][] = abs($msg);}
            
            //The cetegory is here the user function id. Not the user id
            if(count($this->errors[$category][1]) == 0){ 
                               
                if(is_object($this->errors[$category][0])){ 
                    
                    call_user_func($this->errors[$category][0],$this->errors[$category][2]);}
                unset($this->errors[$category]);
                
            } 
            
        }else{
            
            if($msg == "exit"){
                  
                $this->broadcast(array("question"=>"error","ret"=>"2"))->write();
                $this->close();
                
            }else{
                //Just retransmit the data to the client here 
                $this->clientBuffer->writeToBatch($msg,$category);
                $this->clientBuffer->write();
            }
        }
                
    }
    
    private function mainLoop(){
        
        //Connected
        //echo "Client ".$this->address." : ".$this->port." is now connected. \n";
        
        //Main client loop
        while(true){  
             
            //Make selection from child sockets so we can read them
            $read = array_merge(array($this->client_socket),$this->pipes); 
            
            //Select object here
            //now call select
            if(false === ($num_changed_streams = stream_select($read, $write, $except, 5))){}
            
            //Iterate Intern process comunication 
            if(in_array($this->pipes[0],$read)){
                 
                $this->iterate_IPC($this->pipes);
                 
            }
            
            //Iterate the clients reads
            if(in_array($this->client_socket,$read)){
             
                //Break if client disconnects
                if($this->clientBuffer->iterate() == -1){  break; }
                $this->pingfuturetime = microtime(true) + 60;
                
            }
            
            //Test for client broken pipe. Send alives each minute
            if(microtime(true)  >= $this->pingfuturetime && $this->clientBuffer){ 
                
                //Un comment when the finale version is coming out
                if($this->pingreturntime < $this->pingsenttime){  break; }
                
                //Send alive call to client
                $this->pingsenttime = microtime(true);
                $this->clientBuffer->alive();
                
                $this->pingfuturetime = microtime(true) + 60; 
                
                
            }
            
        }
        
        //Disconnected
        //echo "Client ".$this->address." : ".$this->port." disconnected. Closing program ".getmypid()."... \n";
        
        //Close client socket
        $this->close();
         
    }
    
    //Ping return function from client
    public function pingReturned(){
        
        $this->pingreturntime = microtime(true);
        
    }
    
    public function close(){
        
        //Call the mother to close and clean hear pair socket
        $this->broadcastToPipeClients($this->id,"exit")->write();
        
        stream_socket_shutdown($this->pipes[0],STREAM_SHUT_RDWR);
        fclose($this->pipes[0]);
        stream_socket_shutdown($this->client_socket,STREAM_SHUT_RDWR);
        fclose($this->client_socket);
        
        //As this is a program we need to exit  
        exit();
        
    }
    
    //Function to send some data to this client
    public function broadcast($array){
          
        $query = json_encode($array);
          
        //Append data to the clientBuffer
        $this->clientBuffer->writeToBatch($query);
        return $this->clientBuffer;
        
    } 
    
    public function setProcessId($id){
        
        $this->id = $id;  
         
        //Brodcast id to mother process and register this process
        return $this->broadcastToPipeClients($id,"register");
        
    }  
    
    //Gets a reference to the parameters object, so you can set it by like a normal array
    public function &getOptions(){
         
        return $this->options; 
        
    }
    
    
}
  
//Server socket
class tich_api_server extends tich_api_socket_IPC{
    
    public $slots;
    
    function __construct($port){
         
        parent::__construct();
         
        $this->slots = pow(2,32) / 2;
        $this->address = "85.159.212.83";
        $this->port = $port;   
        $this->r = true;   
        $this->registered_childs = array();
        
        $ctx = stream_context_create(['socket' => ['ipv6_v6only' => true]]);
        $this->sock = stream_socket_server("tcp://".$this->address.":".$this->port, $errno, $errstr,
                           STREAM_SERVER_BIND | STREAM_SERVER_LISTEN, $ctx);
        
        if(!$this->sock)
        { 
            die("Couldn't create socket: [$errno] $errstr \n");
        } 
                
    }
    
    //Messages from childs IPC pipe
    private function dataFromPipe($socket,$msg,$category,$aug){
         
        //Handle socket exit
        if($msg == "exit"){
            
            //Close the live socket on the mother process
            //And clear any pointers to it
            fclose($socket);
            $this->pipes = array_diff($this->pipes,array($socket));
            $this->registered_childs = array_diff($this->registered_childs,array((int)$socket));
             
            //Clear the icp buffer for this pipeline
            unset($this->ipcBuffers[(int)$socket]); 
             
        //Register child socket with an id on the main process
        }else if($msg == "register"){
             
            //Register what child process is assosiated with category
            if(!array_key_exists($category,$this->registered_childs)){
                
                $this->registered_childs[$category] = (int)$socket;
            
            //Id is already in the system
            }else{ 
                
                //Select child process and transmit the msg 
                $process_id = $this->registered_childs[$category];
                $this->ipcBuffers[$process_id]->writeToBatch ("exit",$aug);
                $this->ipcBuffers[$process_id]->write();
                    
                $this->registered_childs[$category] = (int)$socket;
                
            }  
             
        }else{
              
            //Select child process and transmit the msg 
            $process_id = $this->registered_childs[$category];
                
            //Check so a process has been found
            if($process_id != NULL){  
                
                //Pointing message to child thread
                $this->ipcBuffers[$process_id]->writeToBatch ($msg);
                $this->ipcBuffers[$process_id]->write();
                
                $this->ipcBuffers[$socket]->writeToBatch     (-$category,$aug);
                $this->ipcBuffers[$socket]->write();
            
            //This client was not online
            }else{
                
                //Error code 1 = not online
                $this->ipcBuffers[$socket]->writeToBatch($category,$aug);
                $this->ipcBuffers[$socket]->write();
                
            }
            
        }
        
    }
    
    //Public methods
    public function listen_and_read($init,$callback){ 
        
        //Whait
        pcntl_signal(SIGCHLD, SIG_IGN);
        
        //Child process, tree process
          
        //start loop to listen for incoming connections and process existing connections
        while (true){
             
            //Make selection from child sockets so we can read them
            $read = array_merge($this->pipes,array($this->sock));
            
            //Select object here
            //now call select - blocking call
            if(false === ($num_changed_streams = stream_select($read, $write, $except, 5)))
            { 
                //Faild to select 
                continue;
            }
             
            //If there is pipe data to read
            if($num_changed_streams > 0){
                  
                $pipes_read = array_diff($read,array($this->sock)); 
                $this->iterate_IPC($pipes_read);
                
            } 
            
            /////////////////////////////
            //TCP socket handling////////
            /////////////////////////////
            
            if(in_array($this->sock,$read)){
            
                $sock = stream_socket_accept($this->sock); 
                 
                if($sock === false){
                    
                    //Do something if the socket accept faild
                        
                //Handle connection
                }else if($sock > 0){
                    
                    $pipe = array();
                    if (!($pipe = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_DGRAM, 0))) {
                        print "Faild to pair socket";
                        continue;
                    }
                
                    //Create new prosses
                    $this->slots ++;
                    if($this->slots >= pow(2,32)){$this->slots = pow(2,32) / 2;}
                    
                    $newp = pcntl_fork();
            
                    if($newp == -1){
                    
                        print "Faild to fork process";
                    
                    }else if($newp){
                        
                         //Mother process, main  
             
                        //Close socket to this process
                        fclose($pipe[0]);  
                        $this->pipes[] = $pipe[1];
                          
                        $this->ipcBuffers[$pipe[1]] = new tich_api_socket_read_write($pipe[1],MAX_SIZE_SERVER);  
                        
                        //Set up a listener for child process comunication.
                        //Callback function fires when data can be read from child process
                        $this->ipcBuffers[$pipe[1]]->setReadBufferCallback(function($socket,$msg,$category,$aug){
                            
                            //Send the input to our handling function
                            $this->dataFromPipe($socket,$msg,$category,$aug); 
                            
                        });
                        
                        //Continue to process other incoming connections
                        continue; 
                    
                    
                    }else{ 
                         
                        //Child process, tree process
                        //This process handles the logic for in and out streams from a client
                         
                        //Close socket to this process
                        fclose($pipe[1]);  
                        $this->pipes[0] = $pipe[0];
                    
                        //Create a new client socket that handles this connected client
                        $socket_client = new tich_api_buffer($sock,$init,$callback,$pipe[0]);
                        
                        //Register default client id
                        $socket_client->setProcessId($this->slots);
                         
                    }
                }
            }
        } 
    }  
}

//The client class is only for oneway communication
//It is not ment to be a two way trip.
//Used for comunication with the server socket

//When using this client dont forget to call the write function to send the batch of data

class tich_api_client extends tich_api_socket_read_write{
    
    private $address,$port;
    
    function __construct ($address,$port){
        
        $this->address = $address;
        $this->port    = $port; 
         
        // Create a socket
        if(!($this->sock = stream_socket_client("tcp://".$this->address.":".$this->port,$errno, $errstr)))
        {
            $errorcode = $errno;
            $errormsg  = $errstr;
             
            die("Couldn't create socket: [$errorcode] $errormsg \n");
        }   
        
        //Init parent class
        parent::__construct($this->sock); 
        
    }
    
    public function expandToBatch($msg){
        
        //Write to write buffer  
        $this->batch .= $msg.",";
        
    }
    
    //Sends a message to the server
    public function broadcastToPipeClients($ids,$data = NULL,$p = 100){
        
        $data = json_encode(array($ids,$data,$p)); 
         
        //Write data to socket with the to data
        $this->expandToBatch($data);
        
    } 
    
    public function write(){
        
        $this->batch = "[". chop($this->batch,",") ."]";
        
        $length = strlen($this->batch); 
        $msg = pack("N",$length).pack("N",0).$this->batch;
        
        //Write to write buffer  
        $this->batch = $msg;
        
        //Finaly we do the write
        parent::write();
        
    }   
     
}

?>

您可能需要进行一些重构才能使其适合您。 我用这段代码初始化它:

$tich_stream = new tich_api_server($port);
            
            //Functon will run on start up. It is the client default setup
            $tich_stream->listen_and_read(function($client){
                 
                //Init function
                $params = &$client->getOptions();
                $params["autherized"] = "0";
                $params["key"] = "";
             
            //Function will only run when client get a complete message
            },function($client) use ($tich_ask,$tich_api,$mysql_collections){
                    
                $encryption = new Encryption();
                $message = $encryption->decrypt($client->message);
                                          
                $tich_ask->parseQustion($message);
                $params = &$client->getOptions();
                
                //Handle incoming messages from public tcp clients
                if($client->address != "85.159.212.83"){
                 
                    if(@($params["autherized"] == "1")){ 
                         
                        $result = $tich_api->apiCall($tich_ask->question,$tich_ask->values,$client);
                        
                        if($result != NULL){
                            $result["question"] = $tich_ask->question . " return"; 
                            
                            $client->broadcast($result)->write();
                        }
                        reset($client->ipcBuffers)->write();
                         
                    }else if($tich_ask->question == "confirm login tcp"){ 
                     
                        $return = $tich_api->apiCall($tich_ask->question,$tich_ask->values);
                         
                        //Set process id to users id, so we can access it later
                        
                        $client->setProcessId($return["ret"])->write();
                        
                        if($return["ret"] > 0){$params["autherized"] = "1";
                        }else{ $params["autherized"] = "0";}
                         
                        $params["key"]        = $return["key"];
                        
                        //Send return message to client
                        $client->broadcast(array("ret" => $params["autherized"],"question"=>"confirm login tcp"))->write(); 
                        
                    }else{
                          
                        $client->broadcast(array("ret" => $params["autherized"], "error"=>"1","question"=>"confirm login tcp"))->write(); 
                    
                    }
                    
                //Handle incoming messages from the private tcp client
                //That is clients that has been connected from this machine(localhost)
                }else if($client->address == "85.159.212.83"){
                     
                    $data = json_decode($client->message,true);  
                     
                    //Parse array of data coming from pipeline
                    foreach($data as $item){
                         
                        //As we dont set any process id, the process will bee initelized as a ghost connection
                        $client->broadcastToPipeClients($item[0],$item[1]); 
                        
                    }
                      
                    //This method includes a write medthod
                    $client->close();
                    
                }
                
            });

您需要更改代码中的IP地址才能使其在您的服务器上运行。

© www.soinside.com 2019 - 2024. All rights reserved.