PHP TCP 套接字服务器 - esp32

问题描述 投票:0回答:1

我想使用 PHP 代码创建一个可以处理大量客户端的 TCP 套接字。事实上,我的客户端是想要连接到服务器并具有双向连接的网络模块。


显示已连接客户端的列表。 能够通过 HTML 页面向特定客户端发送消息。 能够同时连接多个客户端(在本例中,我的意思是 10 个)。 我有一个共享主机,我在上面运行所有服务器代码。该主机不提供控制台访问,所以我不知道如何使用像 ReactPHP 这样的现成库。



$host = '';
$port = 8085;

$clients = [];
$onlineUsersFile = 'online_users.json';

$server = stream_socket_server("tcp://$host:$port", $errno, $errstr);

if (!$server) {
die("Error: $errstr ($errno)\n");

echo "Server started on $host:$port\n";

while (true) {
$read = $clients;
$read[] = $server;

if (!stream_select($read, $write, $except, 1)) {
    // Check every 1 second for changes in connections

if (in_array($server, $read)) {
    $newClient = stream_socket_accept($server);
    socket_set_blocking($newClient, 0);
    $clients[] = $newClient;
    $data = "Welcome to the chat room!\n";
    stream_socket_sendto($newClient, $data);
    echo "New client connected\n";
    broadcastToAll($clients, "New client connected\n");
    $key = array_search($server, $read);

foreach ($read as $key => $client) {
    stream_set_timeout($client, 1);
    $data = fread($client, 1024);
    if ($data === false || feof($client)) {
        echo "Client disconnected\n";
        broadcastToAll($clients, "Client disconnected\n");
    if ($data != '') {
        broadcast($client, $data, $clients);
updateOnlineUsers($clients, $onlineUsersFile);

function broadcast($sender, $message, $clients)
foreach ($clients as $client) {
    if ($client !== $sender) {
        stream_socket_sendto($client, $message);

function broadcastToAll($clients, $message)
foreach ($clients as $client) {
    stream_socket_sendto($client, $message);

function updateOnlineUsers($clients, $file)
$onlineUsers = [];
foreach ($clients as $client) {
    $address = stream_socket_get_name($client, true);
    $onlineUsers[] = $address;
file_put_contents($file, json_encode($onlineUsers));
     $onlineUsers[] = $address;
    file_put_contents($file, json_encode($onlineUsers));

能够通过 HTML 页面向特定客户端发送消息。

iot tcpserver php-socket

我为我的社交网络编写了这个服务器。它使用 fcntl_fork 为每个连接的客户端创建进程。服务器可以使用stream_socket_pair与每个连接的客户端进行通信。这是代码:


//Muti process TCP server///////////////////////////////
//Designed by Sebastian Winbladh////////////////////////
//This program will exicute in a new process////////////
//Created for Talkie (Video Social Network) 2016-11-08//

//Maximum allowed network comunication size per package
//This one will also effect the MAX_SIZE_SERVER
//MAX_SIZE_SERVER is the uper limit to what can be sent in each package


//Maximum allowed package size for internal process comunication

//Read and write stream class for the sockets

class tich_api_socket_read_write{
    //Parameter = socket: points to the socket that this instance handles
    //maxbytes: The total number of bytes that can be sent in each package
    public $thread = "main";
    public $batch = "";
    public $test = "";
    function __construct(&$socket,$maxbytes = 1024,$thread  = "main"){ 
        $this->thread = $thread;
        $this->socket = &$socket;
        $this->maxbytes = $maxbytes;
        //Reading buffers
        $this->read_buffer = "";
        $this->total = -1; 
        $this->callback = NULL;
        $this->process = NULL;
    //Read function. Reads data from socket
    //Parameter = bytes: The bytes that we like to read from the socket
    private function read($bytes){
        if(($read = @stream_socket_recvfrom($this->socket,$bytes)) === false){ 
            return NULL;
        }return $read;
    //Write function. Writes data to socket
    //Parameter = data: Data that we like to write to the socket
    //No max write length. That is handled on the read part
    public function write(){
        //This line can crash when a ghost client has astibliched a connection to the server.
        //If the server like to call a client that has closed its connection this line will return -1, meaning no data writen
        $wrote = @stream_socket_sendto($this->socket,$this->batch);
        $this->batch = "";
        //Return true when data was writen
        return $wrote;
    public function writeToBatch($msg,$id = 0,$aug = -1, $alive = false){
        $length = strlen($msg);
            //Do nothing
        }else if($aug == -1){ $msg = pack("N",$length).pack("N",$id).$msg; 
        }else{ $msg = pack("N",$length).pack("N",$id).pack("N",$aug).$msg; }
        //Write to write buffer  
        $this->batch .= $msg;
    //This function iterates and reads a message from the 
    //input stream and adds it to the read buffer
    public function iterate(){ 
        //Get the max read bytes
        $bytesleft = $this->maxbytes; 
        $input = $this->read($bytesleft);
        //Break if the connections breaks
        if($input == NULL){return -1;} 
        else if(strlen($input)>0){
            $this->read_buffer.= $input;   
            //Here we take from our query buffer
            while(strlen($this->read_buffer) > 0){ 
                //Here we unpacking the size for the message
                //and the category this message is pointing to
                //Network order to host order conversion
                $offset = 8; 
                $bytes = unpack("NInt", substr($this->read_buffer,0,4))["Int"];
                //Ping was retrived from client
                if($bytes == 0){
                    if($this->thread != "main") {call_user_func($this->callback,$this->socket,"PING",0);}
                    $this->read_buffer = substr($this->read_buffer,4);
                //Normal message handling
                } else {
                    $category = unpack("NInt", substr($this->read_buffer,4,4) )["Int"];
                    if($this->thread == "main") {$offset = 12; $aug = unpack("NInt", substr($this->read_buffer,8,4) )["Int"];
                    //Call the read message hook callback here  
                    if(strlen($this->read_buffer) < $bytes)break;  
                    //Here we take a part of the read buffer when we have a complete message
                    $message = substr($this->read_buffer,$offset,$bytes);
                    //Cut the read message from the read buffer so that we can start reading
                    //any new message
                    $this->read_buffer = substr($this->read_buffer,$bytes+$offset);
                    //Call the tcp callback function
                    if($this->thread == "main") {
            }return 1;
        }return 0;
    //Parameter = complete: callback function that is called when a message has been read
    public function setReadBufferCallback($complete){
        $this->callback = $complete;
    //Sends one bytes to the client to check if it is connected
    public function alive(){ 

        $msg = pack("N",1);
        $this->writeToBatch($msg, 0, -1, true);

        //Send alive write to client

class tich_api_socket_IPC{
    public $aug = 0,$errors = array();
    function __construct(){ 
        $this->pipes = array(); 
        $this->ipcBuffers = array();  
    public function broadcastToPipeClients($ids,$data = NULL, $callback = NULL){
        $this->aug ++;
        $this->errors[$this->aug] = array($callback,$ids,array());
        //If no data was set as default, we use the message that is already in the buffer
        if($data == NULL){$data = $this->message;}
        if(is_array($data)){$data = json_encode($data);} 
            //Get the first item in array of ipc buffers
            $buffer = reset($this->ipcBuffers);
            //Write only one id
            if($buffer != NULL){ $buffer->writeToBatch($data,$ids,$this->aug);}
        }else if(is_array($ids)){ 
            //Get the first item in array of ipc buffers
            $buffer = reset($this->ipcBuffers);
            //Send data to multi processes
            foreach($ids as $item){if($buffer != NULL){ $buffer->writeToBatch($data,$item,$this->aug);}}
        return $buffer;
    //Needs to be called from the main process loop
    protected function iterate_IPC($pipes){
        //IPC socket handling////////
        //Reading from child process/
        foreach($pipes as $pipe){
            //Iterate pipes 

//Client socket
class tich_api_buffer extends tich_api_socket_IPC{
    public $message,$id,$category,$ghost,
    $pingfuturetime = 0,$pingsenttime = 0,$pingreturntime = 0;
    function __construct($socket,$init,$callback,$pipe){
        $this->id = -1;   
        $this->init = $init;  
        $this->client_socket = $socket;
        $this->clientBuffer = new tich_api_socket_read_write($this->client_socket,MAX_SIZE_NETWORK,"child"); 
        //Data from client app
        $this->clientBuffer->setReadBufferCallback(function($socket,$msg,$category) use ($callback){
            //Returned ping from client
            if($msg == "PING"){return;}
            $this->message = $msg;
            $this->category = $category;
        $this->pipes = array($pipe);
        $this->ipcBuffers[$pipe] = new tich_api_socket_read_write($pipe,MAX_SIZE_SERVER,"child");  
        //Data from mother process
        //Address and port numbers
        $this->address = NULL;
        $this->port = NULL; 
        //Client options
        $this->options = array();
        $adress = explode(":",stream_socket_get_name($this->client_socket, true)); 
        $this->address = $adress[0];
        $this->port = $adress[1];
        //Init read and write stream for this client
    //Messages from mother IPC pipe
    private function dataFromPipe($msg,$category){
        //No user with the id online 
            $index = array_search($msg, $this->errors[$category][1]);
            if($index >= 0 && $msg>0){$this->errors[$category][2][] = abs($msg);}
            //The cetegory is here the user function id. Not the user id
            if(count($this->errors[$category][1]) == 0){ 
            if($msg == "exit"){
                //Just retransmit the data to the client here 
    private function mainLoop(){
        //echo "Client ".$this->address." : ".$this->port." is now connected. \n";
        //Main client loop
            //Make selection from child sockets so we can read them
            $read = array_merge(array($this->client_socket),$this->pipes); 
            //Select object here
            //now call select
            if(false === ($num_changed_streams = stream_select($read, $write, $except, 5))){}
            //Iterate Intern process comunication 
            //Iterate the clients reads
                //Break if client disconnects
                if($this->clientBuffer->iterate() == -1){  break; }
                $this->pingfuturetime = microtime(true) + 60;
            //Test for client broken pipe. Send alives each minute
            if(microtime(true)  >= $this->pingfuturetime && $this->clientBuffer){ 
                //Un comment when the finale version is coming out
                if($this->pingreturntime < $this->pingsenttime){  break; }
                //Send alive call to client
                $this->pingsenttime = microtime(true);
                $this->pingfuturetime = microtime(true) + 60; 
        //echo "Client ".$this->address." : ".$this->port." disconnected. Closing program ".getmypid()."... \n";
        //Close client socket
    //Ping return function from client
    public function pingReturned(){
        $this->pingreturntime = microtime(true);
    public function close(){
        //Call the mother to close and clean hear pair socket
        //As this is a program we need to exit  
    //Function to send some data to this client
    public function broadcast($array){
        $query = json_encode($array);
        //Append data to the clientBuffer
        return $this->clientBuffer;
    public function setProcessId($id){
        $this->id = $id;  
        //Brodcast id to mother process and register this process
        return $this->broadcastToPipeClients($id,"register");
    //Gets a reference to the parameters object, so you can set it by like a normal array
    public function &getOptions(){
        return $this->options; 
//Server socket
class tich_api_server extends tich_api_socket_IPC{
    public $slots;
    function __construct($port){
        $this->slots = pow(2,32) / 2;
        $this->address = "";
        $this->port = $port;   
        $this->r = true;   
        $this->registered_childs = array();
        $ctx = stream_context_create(['socket' => ['ipv6_v6only' => true]]);
        $this->sock = stream_socket_server("tcp://".$this->address.":".$this->port, $errno, $errstr,
                           STREAM_SERVER_BIND | STREAM_SERVER_LISTEN, $ctx);
            die("Couldn't create socket: [$errno] $errstr \n");
    //Messages from childs IPC pipe
    private function dataFromPipe($socket,$msg,$category,$aug){
        //Handle socket exit
        if($msg == "exit"){
            //Close the live socket on the mother process
            //And clear any pointers to it
            $this->pipes = array_diff($this->pipes,array($socket));
            $this->registered_childs = array_diff($this->registered_childs,array((int)$socket));
            //Clear the icp buffer for this pipeline
        //Register child socket with an id on the main process
        }else if($msg == "register"){
            //Register what child process is assosiated with category
                $this->registered_childs[$category] = (int)$socket;
            //Id is already in the system
                //Select child process and transmit the msg 
                $process_id = $this->registered_childs[$category];
                $this->ipcBuffers[$process_id]->writeToBatch ("exit",$aug);
                $this->registered_childs[$category] = (int)$socket;
            //Select child process and transmit the msg 
            $process_id = $this->registered_childs[$category];
            //Check so a process has been found
            if($process_id != NULL){  
                //Pointing message to child thread
                $this->ipcBuffers[$process_id]->writeToBatch ($msg);
                $this->ipcBuffers[$socket]->writeToBatch     (-$category,$aug);
            //This client was not online
                //Error code 1 = not online
    //Public methods
    public function listen_and_read($init,$callback){ 
        pcntl_signal(SIGCHLD, SIG_IGN);
        //Child process, tree process
        //start loop to listen for incoming connections and process existing connections
        while (true){
            //Make selection from child sockets so we can read them
            $read = array_merge($this->pipes,array($this->sock));
            //Select object here
            //now call select - blocking call
            if(false === ($num_changed_streams = stream_select($read, $write, $except, 5)))
                //Faild to select 
            //If there is pipe data to read
            if($num_changed_streams > 0){
                $pipes_read = array_diff($read,array($this->sock)); 
            //TCP socket handling////////
                $sock = stream_socket_accept($this->sock); 
                if($sock === false){
                    //Do something if the socket accept faild
                //Handle connection
                }else if($sock > 0){
                    $pipe = array();
                    if (!($pipe = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_DGRAM, 0))) {
                        print "Faild to pair socket";
                    //Create new prosses
                    $this->slots ++;
                    if($this->slots >= pow(2,32)){$this->slots = pow(2,32) / 2;}
                    $newp = pcntl_fork();
                    if($newp == -1){
                        print "Faild to fork process";
                    }else if($newp){
                         //Mother process, main  
                        //Close socket to this process
                        $this->pipes[] = $pipe[1];
                        $this->ipcBuffers[$pipe[1]] = new tich_api_socket_read_write($pipe[1],MAX_SIZE_SERVER);  
                        //Set up a listener for child process comunication.
                        //Callback function fires when data can be read from child process
                            //Send the input to our handling function
                        //Continue to process other incoming connections
                        //Child process, tree process
                        //This process handles the logic for in and out streams from a client
                        //Close socket to this process
                        $this->pipes[0] = $pipe[0];
                        //Create a new client socket that handles this connected client
                        $socket_client = new tich_api_buffer($sock,$init,$callback,$pipe[0]);
                        //Register default client id

//The client class is only for oneway communication
//It is not ment to be a two way trip.
//Used for comunication with the server socket

//When using this client dont forget to call the write function to send the batch of data

class tich_api_client extends tich_api_socket_read_write{
    private $address,$port;
    function __construct ($address,$port){
        $this->address = $address;
        $this->port    = $port; 
        // Create a socket
        if(!($this->sock = stream_socket_client("tcp://".$this->address.":".$this->port,$errno, $errstr)))
            $errorcode = $errno;
            $errormsg  = $errstr;
            die("Couldn't create socket: [$errorcode] $errormsg \n");
        //Init parent class
    public function expandToBatch($msg){
        //Write to write buffer  
        $this->batch .= $msg.",";
    //Sends a message to the server
    public function broadcastToPipeClients($ids,$data = NULL,$p = 100){
        $data = json_encode(array($ids,$data,$p)); 
        //Write data to socket with the to data
    public function write(){
        $this->batch = "[". chop($this->batch,",") ."]";
        $length = strlen($this->batch); 
        $msg = pack("N",$length).pack("N",0).$this->batch;
        //Write to write buffer  
        $this->batch = $msg;
        //Finaly we do the write


您可能需要进行一些重构才能使其适合您。 我用这段代码初始化它:

$tich_stream = new tich_api_server($port);
            //Functon will run on start up. It is the client default setup
                //Init function
                $params = &$client->getOptions();
                $params["autherized"] = "0";
                $params["key"] = "";
            //Function will only run when client get a complete message
            },function($client) use ($tich_ask,$tich_api,$mysql_collections){
                $encryption = new Encryption();
                $message = $encryption->decrypt($client->message);
                $params = &$client->getOptions();
                //Handle incoming messages from public tcp clients
                if($client->address != ""){
                    if(@($params["autherized"] == "1")){ 
                        $result = $tich_api->apiCall($tich_ask->question,$tich_ask->values,$client);
                        if($result != NULL){
                            $result["question"] = $tich_ask->question . " return"; 
                    }else if($tich_ask->question == "confirm login tcp"){ 
                        $return = $tich_api->apiCall($tich_ask->question,$tich_ask->values);
                        //Set process id to users id, so we can access it later
                        if($return["ret"] > 0){$params["autherized"] = "1";
                        }else{ $params["autherized"] = "0";}
                        $params["key"]        = $return["key"];
                        //Send return message to client
                        $client->broadcast(array("ret" => $params["autherized"],"question"=>"confirm login tcp"))->write(); 
                        $client->broadcast(array("ret" => $params["autherized"], "error"=>"1","question"=>"confirm login tcp"))->write(); 
                //Handle incoming messages from the private tcp client
                //That is clients that has been connected from this machine(localhost)
                }else if($client->address == ""){
                    $data = json_decode($client->message,true);  
                    //Parse array of data coming from pipeline
                    foreach($data as $item){
                        //As we dont set any process id, the process will bee initelized as a ghost connection
                    //This method includes a write medthod


© 2019 - 2024. All rights reserved.