注意:不是与使用this question的MessageComponentInterface
相同。我改用WampServerInterface
,因此此问题专门与该部分有关。我需要一个带有代码示例和解释的答案,因为我可以看到这对将来的其他人有所帮助。
尝试对单个用户进行循环推送
我正在使用Ratchet和ZeroMQ的WAMP部分,并且目前有push integration tutorial的工作版本。
我正在尝试执行以下操作:
- zeromq服务器已启动并正在运行,已准备好记录订阅者和取消订阅者
- 用户通过websocket协议在其浏览器中进行连接
- 一个循环已启动,它将数据发送给请求该数据的特定用户
- 当用户断开连接时,该用户数据的循环将停止
我有要点(1)和(2)起作用,但是我遇到的问题是第三个问题:
首先:如何将数据仅发送给每个特定用户?广播将数据发送给所有人,除非“主题”可能最终成为单个用户ID?
第二:我有一个很大的安全问题。如果我要发送哪个用户ID想要从客户端进行订阅,这似乎是我需要的,那么用户可以将变量更改为另一个用户的ID及其数据将返回。
第三:我必须运行一个单独的PHP脚本,其中包含zeromq的代码以开始实际的循环。我不确定这是否是最好的方法,我宁愿让它完全在代码库中工作,而不是单独的php文件。这是我需要排序的主要领域。
以下代码显示了我目前拥有的。
仅从控制台运行的服务器
我从字面上键入php bin/push-server.php
来运行它。订阅和取消订阅将输出到此终端以进行调试。
$loop = React\EventLoop\Factory::create();
$pusher = Pusher;
$context = new React\ZMQ\Context($loop);
$pull = $context->getSocket(ZMQ::SOCKET_PULL);
$pull->bind('tcp://127.0.0.1:5555');
$pull->on('message', array($pusher, 'onMessage'));
$webSock = new React\Socket\Server($loop);
$webSock->listen(8080, '0.0.0.0'); // Binding to 0.0.0.0 means remotes can connect
$webServer = new Ratchet\Server\IoServer(
new Ratchet\WebSocket\WsServer(
new Ratchet\Wamp\WampServer(
$pusher
)
),
$webSock
);
$loop->run();
通过WebSocket发送数据的推送器
我已经省略了无用的内容,并专注于onMessage()
和onSubscribe()
方法。
public function onSubscribe(ConnectionInterface $conn, $topic)
{
$subject = $topic->getId();
$ip = $conn->remoteAddress;
if (!array_key_exists($subject, $this->subscribedTopics))
{
$this->subscribedTopics[$subject] = $topic;
}
$this->clients[] = $conn->resourceId;
echo sprintf("New Connection: %s" . PHP_EOL, $conn->remoteAddress);
}
public function onMessage($entry) {
$entryData = json_decode($entry, true);
var_dump($entryData);
if (!array_key_exists($entryData['topic'], $this->subscribedTopics)) {
return;
}
$topic = $this->subscribedTopics[$entryData['topic']];
// This sends out everything to multiple users, not what I want!!
// I can't send() to individual connections from here I don't think :S
$topic->broadcast($entryData);
}
开始循环使用上述Pusher代码的脚本
这是我的问题-这是一个单独的php文件,希望将来可以集成到其他代码中,但是目前我不确定如何正确使用它。我是否可以从会话中获取用户的ID?我仍然需要从客户端发送它...
// Thought sessions might work here but they don't work for subscription
session_start();
$userId = $_SESSION['userId'];
$loop = React\EventLoop\Factory::create();
$context = new ZMQContext();
$socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher');
$socket->connect("tcp://localhost:5555");
$i = 0;
$loop->addPeriodicTimer(4, function() use ($socket, $loop, $userId, &$i) {
$entryData = array(
'topic' => 'subscriptionTopicHere',
'userId' => $userId
);
$i++;
// So it doesn't go on infinitely if run from browser
if ($i >= 3)
{
$loop->stop();
}
// Send stuff to the queue
$socket->send(json_encode($entryData));
});
最后,要订阅的客户端js
$(document).ready(function() {
var conn = new ab.Session(
'ws://localhost:8080'
, function() {
conn.subscribe('topicHere', function(topic, data) {
console.log(topic);
console.log(data);
});
}
, function() {
console.warn('WebSocket connection closed');
}
, {
'skipSubprotocolCheck': true
}
);
});
结论
上面的方法有效,但是我确实需要弄清楚以下几点: