我想使用 PHP 代码创建一个可以处理大量客户端的 TCP 套接字。事实上,我的客户端是想要连接到服务器并具有双向连接的网络模块。
我心中有几个目标:
显示已连接客户端的列表。 能够通过 HTML 页面向特定客户端发送消息。 能够同时连接多个客户端(在本例中,我的意思是 10 个)。 我有一个共享主机,我在上面运行所有服务器代码。该主机不提供控制台访问,所以我不知道如何使用像 ReactPHP 这样的现成库。
目前,我只能创建一个已连接客户端的列表。这是代码:
<?php
error_reporting(E_ALL);
$host = '0.0.0.0';
$port = 8085;
$clients = [];
$onlineUsersFile = 'online_users.json';
$server = stream_socket_server("tcp://$host:$port", $errno, $errstr);
if (!$server) {
die("Error: $errstr ($errno)\n");
}
echo "Server started on $host:$port\n";
while (true) {
$read = $clients;
$read[] = $server;
if (!stream_select($read, $write, $except, 1)) {
// Check every 1 second for changes in connections
continue;
}
if (in_array($server, $read)) {
$newClient = stream_socket_accept($server);
socket_set_blocking($newClient, 0);
$clients[] = $newClient;
$data = "Welcome to the chat room!\n";
stream_socket_sendto($newClient, $data);
echo "New client connected\n";
broadcastToAll($clients, "New client connected\n");
$key = array_search($server, $read);
unset($read[$key]);
}
foreach ($read as $key => $client) {
stream_set_timeout($client, 1);
$data = fread($client, 1024);
if ($data === false || feof($client)) {
fclose($client);
unset($clients[$key]);
echo "Client disconnected\n";
broadcastToAll($clients, "Client disconnected\n");
continue;
}
if ($data != '') {
broadcast($client, $data, $clients);
}
}
updateOnlineUsers($clients, $onlineUsersFile);
}
function broadcast($sender, $message, $clients)
{
foreach ($clients as $client) {
if ($client !== $sender) {
stream_socket_sendto($client, $message);
}
}
}
function broadcastToAll($clients, $message)
{
foreach ($clients as $client) {
stream_socket_sendto($client, $message);
}
}
function updateOnlineUsers($clients, $file)
{
$onlineUsers = [];
foreach ($clients as $client) {
$address = stream_socket_get_name($client, true);
$onlineUsers[] = $address;
}
file_put_contents($file, json_encode($onlineUsers));
}
$onlineUsers[] = $address;
}
file_put_contents($file, json_encode($onlineUsers));
}
能够通过 HTML 页面向特定客户端发送消息。
我为我的社交网络编写了这个服务器。它使用 fcntl_fork 为每个连接的客户端创建进程。服务器可以使用stream_socket_pair与每个连接的客户端进行通信。这是代码:
<?php
////////////////////////////////////////////////////////
//Muti process TCP server///////////////////////////////
//Designed by Sebastian Winbladh////////////////////////
//This program will exicute in a new process////////////
//Created for Talkie (Video Social Network) 2016-11-08//
////////////////////////////////////////////////////////
//Maximum allowed network comunication size per package
//This one will also effect the MAX_SIZE_SERVER
//MAX_SIZE_SERVER is the uper limit to what can be sent in each package
define("MAX_SIZE_NETWORK",100000);
//Maximum allowed package size for internal process comunication
define("MAX_SIZE_SERVER",100000);
//Read and write stream class for the sockets
class tich_api_socket_read_write{
//Parameter = socket: points to the socket that this instance handles
//maxbytes: The total number of bytes that can be sent in each package
public $thread = "main";
public $batch = "";
public $test = "";
function __construct(&$socket,$maxbytes = 1024,$thread = "main"){
$this->thread = $thread;
$this->socket = &$socket;
$this->maxbytes = $maxbytes;
//Reading buffers
$this->read_buffer = "";
$this->total = -1;
$this->callback = NULL;
$this->process = NULL;
}
//Read function. Reads data from socket
//Parameter = bytes: The bytes that we like to read from the socket
private function read($bytes){
if(($read = @stream_socket_recvfrom($this->socket,$bytes)) === false){
return NULL;
}return $read;
}
//Write function. Writes data to socket
//Parameter = data: Data that we like to write to the socket
//No max write length. That is handled on the read part
public function write(){
//This line can crash when a ghost client has astibliched a connection to the server.
//If the server like to call a client that has closed its connection this line will return -1, meaning no data writen
$wrote = @stream_socket_sendto($this->socket,$this->batch);
$this->batch = "";
//Return true when data was writen
return $wrote;
}
public function writeToBatch($msg,$id = 0,$aug = -1, $alive = false){
$length = strlen($msg);
if($alive){
//Do nothing
}else if($aug == -1){ $msg = pack("N",$length).pack("N",$id).$msg;
}else{ $msg = pack("N",$length).pack("N",$id).pack("N",$aug).$msg; }
//Write to write buffer
$this->batch .= $msg;
}
//This function iterates and reads a message from the
//input stream and adds it to the read buffer
//...........................................
public function iterate(){
//Get the max read bytes
$bytesleft = $this->maxbytes;
$input = $this->read($bytesleft);
//Break if the connections breaks
if($input == NULL){return -1;}
else if(strlen($input)>0){
$this->read_buffer.= $input;
//Here we take from our query buffer
while(strlen($this->read_buffer) > 0){
//Here we unpacking the size for the message
//and the category this message is pointing to
//Network order to host order conversion
////////////////////////////////////////
$offset = 8;
$bytes = unpack("NInt", substr($this->read_buffer,0,4))["Int"];
//Ping was retrived from client
if($bytes == 0){
if($this->thread != "main") {call_user_func($this->callback,$this->socket,"PING",0);}
$this->read_buffer = substr($this->read_buffer,4);
//Normal message handling
} else {
$category = unpack("NInt", substr($this->read_buffer,4,4) )["Int"];
if($this->thread == "main") {$offset = 12; $aug = unpack("NInt", substr($this->read_buffer,8,4) )["Int"];
}
//Call the read message hook callback here
if(strlen($this->read_buffer) < $bytes)break;
//Here we take a part of the read buffer when we have a complete message
$message = substr($this->read_buffer,$offset,$bytes);
//Cut the read message from the read buffer so that we can start reading
//any new message
$this->read_buffer = substr($this->read_buffer,$bytes+$offset);
//Call the tcp callback function
if($this->thread == "main") {
call_user_func($this->callback,$this->socket,$message,$category,$aug);}else{
call_user_func($this->callback,$this->socket,$message,$category);}
}
}return 1;
}return 0;
}
//Parameter = complete: callback function that is called when a message has been read
public function setReadBufferCallback($complete){
$this->callback = $complete;
}
//Sends one bytes to the client to check if it is connected
public function alive(){
$msg = pack("N",1);
$this->writeToBatch($msg, 0, -1, true);
//Send alive write to client
$this->write();
}
}
class tich_api_socket_IPC{
public $aug = 0,$errors = array();
function __construct(){
$this->pipes = array();
$this->ipcBuffers = array();
}
public function broadcastToPipeClients($ids,$data = NULL, $callback = NULL){
$this->aug ++;
$this->errors[$this->aug] = array($callback,$ids,array());
//If no data was set as default, we use the message that is already in the buffer
if($data == NULL){$data = $this->message;}
if(is_array($data)){$data = json_encode($data);}
if(is_numeric($ids)){
//Get the first item in array of ipc buffers
$buffer = reset($this->ipcBuffers);
//Write only one id
if($buffer != NULL){ $buffer->writeToBatch($data,$ids,$this->aug);}
}else if(is_array($ids)){
//Get the first item in array of ipc buffers
$buffer = reset($this->ipcBuffers);
//Send data to multi processes
foreach($ids as $item){if($buffer != NULL){ $buffer->writeToBatch($data,$item,$this->aug);}}
}
return $buffer;
}
//Needs to be called from the main process loop
protected function iterate_IPC($pipes){
/////////////////////////////
//IPC socket handling////////
//Reading from child process/
/////////////////////////////
foreach($pipes as $pipe){
//Iterate pipes
$this->ipcBuffers[$pipe]->iterate();
}
}
}
//Client socket
class tich_api_buffer extends tich_api_socket_IPC{
public $message,$id,$category,$ghost,
$pingfuturetime = 0,$pingsenttime = 0,$pingreturntime = 0;
function __construct($socket,$init,$callback,$pipe){
parent::__construct();
$this->id = -1;
$this->init = $init;
$this->client_socket = $socket;
$this->clientBuffer = new tich_api_socket_read_write($this->client_socket,MAX_SIZE_NETWORK,"child");
//Data from client app
$this->clientBuffer->setReadBufferCallback(function($socket,$msg,$category) use ($callback){
//Returned ping from client
$this->pingReturned();
if($msg == "PING"){return;}
$this->message = $msg;
$this->category = $category;
call_user_func($callback,$this);
});
$this->pipes = array($pipe);
$this->ipcBuffers[$pipe] = new tich_api_socket_read_write($pipe,MAX_SIZE_SERVER,"child");
//Data from mother process
$this->ipcBuffers[$pipe]->setReadBufferCallback(function($socket,$msg,$category){
$this->dataFromPipe($msg,$category);
});
//Address and port numbers
$this->address = NULL;
$this->port = NULL;
//Client options
$this->options = array();
$adress = explode(":",stream_socket_get_name($this->client_socket, true));
$this->address = $adress[0];
$this->port = $adress[1];
call_user_func($this->init,$this);
//Init read and write stream for this client
$this->mainLoop();
}
//Messages from mother IPC pipe
private function dataFromPipe($msg,$category){
//No user with the id online
if(is_numeric($msg)){
$index = array_search($msg, $this->errors[$category][1]);
unset($this->errors[$category][1][$index]);
if($index >= 0 && $msg>0){$this->errors[$category][2][] = abs($msg);}
//The cetegory is here the user function id. Not the user id
if(count($this->errors[$category][1]) == 0){
if(is_object($this->errors[$category][0])){
call_user_func($this->errors[$category][0],$this->errors[$category][2]);}
unset($this->errors[$category]);
}
}else{
if($msg == "exit"){
$this->broadcast(array("question"=>"error","ret"=>"2"))->write();
$this->close();
}else{
//Just retransmit the data to the client here
$this->clientBuffer->writeToBatch($msg,$category);
$this->clientBuffer->write();
}
}
}
private function mainLoop(){
//Connected
//echo "Client ".$this->address." : ".$this->port." is now connected. \n";
//Main client loop
while(true){
//Make selection from child sockets so we can read them
$read = array_merge(array($this->client_socket),$this->pipes);
//Select object here
//now call select
if(false === ($num_changed_streams = stream_select($read, $write, $except, 5))){}
//Iterate Intern process comunication
if(in_array($this->pipes[0],$read)){
$this->iterate_IPC($this->pipes);
}
//Iterate the clients reads
if(in_array($this->client_socket,$read)){
//Break if client disconnects
if($this->clientBuffer->iterate() == -1){ break; }
$this->pingfuturetime = microtime(true) + 60;
}
//Test for client broken pipe. Send alives each minute
if(microtime(true) >= $this->pingfuturetime && $this->clientBuffer){
//Un comment when the finale version is coming out
if($this->pingreturntime < $this->pingsenttime){ break; }
//Send alive call to client
$this->pingsenttime = microtime(true);
$this->clientBuffer->alive();
$this->pingfuturetime = microtime(true) + 60;
}
}
//Disconnected
//echo "Client ".$this->address." : ".$this->port." disconnected. Closing program ".getmypid()."... \n";
//Close client socket
$this->close();
}
//Ping return function from client
public function pingReturned(){
$this->pingreturntime = microtime(true);
}
public function close(){
//Call the mother to close and clean hear pair socket
$this->broadcastToPipeClients($this->id,"exit")->write();
stream_socket_shutdown($this->pipes[0],STREAM_SHUT_RDWR);
fclose($this->pipes[0]);
stream_socket_shutdown($this->client_socket,STREAM_SHUT_RDWR);
fclose($this->client_socket);
//As this is a program we need to exit
exit();
}
//Function to send some data to this client
public function broadcast($array){
$query = json_encode($array);
//Append data to the clientBuffer
$this->clientBuffer->writeToBatch($query);
return $this->clientBuffer;
}
public function setProcessId($id){
$this->id = $id;
//Brodcast id to mother process and register this process
return $this->broadcastToPipeClients($id,"register");
}
//Gets a reference to the parameters object, so you can set it by like a normal array
public function &getOptions(){
return $this->options;
}
}
//Server socket
class tich_api_server extends tich_api_socket_IPC{
public $slots;
function __construct($port){
parent::__construct();
$this->slots = pow(2,32) / 2;
$this->address = "85.159.212.83";
$this->port = $port;
$this->r = true;
$this->registered_childs = array();
$ctx = stream_context_create(['socket' => ['ipv6_v6only' => true]]);
$this->sock = stream_socket_server("tcp://".$this->address.":".$this->port, $errno, $errstr,
STREAM_SERVER_BIND | STREAM_SERVER_LISTEN, $ctx);
if(!$this->sock)
{
die("Couldn't create socket: [$errno] $errstr \n");
}
}
//Messages from childs IPC pipe
private function dataFromPipe($socket,$msg,$category,$aug){
//Handle socket exit
if($msg == "exit"){
//Close the live socket on the mother process
//And clear any pointers to it
fclose($socket);
$this->pipes = array_diff($this->pipes,array($socket));
$this->registered_childs = array_diff($this->registered_childs,array((int)$socket));
//Clear the icp buffer for this pipeline
unset($this->ipcBuffers[(int)$socket]);
//Register child socket with an id on the main process
}else if($msg == "register"){
//Register what child process is assosiated with category
if(!array_key_exists($category,$this->registered_childs)){
$this->registered_childs[$category] = (int)$socket;
//Id is already in the system
}else{
//Select child process and transmit the msg
$process_id = $this->registered_childs[$category];
$this->ipcBuffers[$process_id]->writeToBatch ("exit",$aug);
$this->ipcBuffers[$process_id]->write();
$this->registered_childs[$category] = (int)$socket;
}
}else{
//Select child process and transmit the msg
$process_id = $this->registered_childs[$category];
//Check so a process has been found
if($process_id != NULL){
//Pointing message to child thread
$this->ipcBuffers[$process_id]->writeToBatch ($msg);
$this->ipcBuffers[$process_id]->write();
$this->ipcBuffers[$socket]->writeToBatch (-$category,$aug);
$this->ipcBuffers[$socket]->write();
//This client was not online
}else{
//Error code 1 = not online
$this->ipcBuffers[$socket]->writeToBatch($category,$aug);
$this->ipcBuffers[$socket]->write();
}
}
}
//Public methods
public function listen_and_read($init,$callback){
//Whait
pcntl_signal(SIGCHLD, SIG_IGN);
//Child process, tree process
//start loop to listen for incoming connections and process existing connections
while (true){
//Make selection from child sockets so we can read them
$read = array_merge($this->pipes,array($this->sock));
//Select object here
//now call select - blocking call
if(false === ($num_changed_streams = stream_select($read, $write, $except, 5)))
{
//Faild to select
continue;
}
//If there is pipe data to read
if($num_changed_streams > 0){
$pipes_read = array_diff($read,array($this->sock));
$this->iterate_IPC($pipes_read);
}
/////////////////////////////
//TCP socket handling////////
/////////////////////////////
if(in_array($this->sock,$read)){
$sock = stream_socket_accept($this->sock);
if($sock === false){
//Do something if the socket accept faild
//Handle connection
}else if($sock > 0){
$pipe = array();
if (!($pipe = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_DGRAM, 0))) {
print "Faild to pair socket";
continue;
}
//Create new prosses
$this->slots ++;
if($this->slots >= pow(2,32)){$this->slots = pow(2,32) / 2;}
$newp = pcntl_fork();
if($newp == -1){
print "Faild to fork process";
}else if($newp){
//Mother process, main
//Close socket to this process
fclose($pipe[0]);
$this->pipes[] = $pipe[1];
$this->ipcBuffers[$pipe[1]] = new tich_api_socket_read_write($pipe[1],MAX_SIZE_SERVER);
//Set up a listener for child process comunication.
//Callback function fires when data can be read from child process
$this->ipcBuffers[$pipe[1]]->setReadBufferCallback(function($socket,$msg,$category,$aug){
//Send the input to our handling function
$this->dataFromPipe($socket,$msg,$category,$aug);
});
//Continue to process other incoming connections
continue;
}else{
//Child process, tree process
//This process handles the logic for in and out streams from a client
//Close socket to this process
fclose($pipe[1]);
$this->pipes[0] = $pipe[0];
//Create a new client socket that handles this connected client
$socket_client = new tich_api_buffer($sock,$init,$callback,$pipe[0]);
//Register default client id
$socket_client->setProcessId($this->slots);
}
}
}
}
}
}
//The client class is only for oneway communication
//It is not ment to be a two way trip.
//Used for comunication with the server socket
//When using this client dont forget to call the write function to send the batch of data
class tich_api_client extends tich_api_socket_read_write{
private $address,$port;
function __construct ($address,$port){
$this->address = $address;
$this->port = $port;
// Create a socket
if(!($this->sock = stream_socket_client("tcp://".$this->address.":".$this->port,$errno, $errstr)))
{
$errorcode = $errno;
$errormsg = $errstr;
die("Couldn't create socket: [$errorcode] $errormsg \n");
}
//Init parent class
parent::__construct($this->sock);
}
public function expandToBatch($msg){
//Write to write buffer
$this->batch .= $msg.",";
}
//Sends a message to the server
public function broadcastToPipeClients($ids,$data = NULL,$p = 100){
$data = json_encode(array($ids,$data,$p));
//Write data to socket with the to data
$this->expandToBatch($data);
}
public function write(){
$this->batch = "[". chop($this->batch,",") ."]";
$length = strlen($this->batch);
$msg = pack("N",$length).pack("N",0).$this->batch;
//Write to write buffer
$this->batch = $msg;
//Finaly we do the write
parent::write();
}
}
?>
您可能需要进行一些重构才能使其适合您。 我用这段代码初始化它:
$tich_stream = new tich_api_server($port);
//Functon will run on start up. It is the client default setup
$tich_stream->listen_and_read(function($client){
//Init function
$params = &$client->getOptions();
$params["autherized"] = "0";
$params["key"] = "";
//Function will only run when client get a complete message
},function($client) use ($tich_ask,$tich_api,$mysql_collections){
$encryption = new Encryption();
$message = $encryption->decrypt($client->message);
$tich_ask->parseQustion($message);
$params = &$client->getOptions();
//Handle incoming messages from public tcp clients
if($client->address != "85.159.212.83"){
if(@($params["autherized"] == "1")){
$result = $tich_api->apiCall($tich_ask->question,$tich_ask->values,$client);
if($result != NULL){
$result["question"] = $tich_ask->question . " return";
$client->broadcast($result)->write();
}
reset($client->ipcBuffers)->write();
}else if($tich_ask->question == "confirm login tcp"){
$return = $tich_api->apiCall($tich_ask->question,$tich_ask->values);
//Set process id to users id, so we can access it later
$client->setProcessId($return["ret"])->write();
if($return["ret"] > 0){$params["autherized"] = "1";
}else{ $params["autherized"] = "0";}
$params["key"] = $return["key"];
//Send return message to client
$client->broadcast(array("ret" => $params["autherized"],"question"=>"confirm login tcp"))->write();
}else{
$client->broadcast(array("ret" => $params["autherized"], "error"=>"1","question"=>"confirm login tcp"))->write();
}
//Handle incoming messages from the private tcp client
//That is clients that has been connected from this machine(localhost)
}else if($client->address == "85.159.212.83"){
$data = json_decode($client->message,true);
//Parse array of data coming from pipeline
foreach($data as $item){
//As we dont set any process id, the process will bee initelized as a ghost connection
$client->broadcastToPipeClients($item[0],$item[1]);
}
//This method includes a write medthod
$client->close();
}
});
您需要更改代码中的IP地址才能使其在您的服务器上运行。