php-mqtt 在发布者离线一段时间后停止接收消息

问题描述 投票:0回答:1

我设置了一个 systemd 服务(在 Ubuntu 22.04 中),以便使用 php-mqtt 库处理来自 Mosquitto 的消息。这是我的整个代码:

<?php header("Access-Control-Allow-Origin: *");
require 'database.php'; 
require('vendor/autoload.php');

use PhpMqtt\Client\MqttClient;
use PhpMqtt\Client\ConnectionSettings;

ini_set('display_errors', 1);
error_reporting(E_ALL);

$server   = '<server>';
$port     = 1883;
$clientId = '<id>';
$username = '<user>';
$password = '<password>';

$connectionSettings = (new ConnectionSettings)
  ->setUsername($username)
  ->setPassword($password)
  ->setKeepAliveInterval(60)
  ->setLastWillQualityOfService(1)
  ->setConnectTimeout(60)
  ->setMaxReconnectAttempts(PHP_INT_MAX)
  ->setReconnectAutomatically(true);

$mqtt = new MqttClient($server, $port, $clientId, MqttClient::MQTT_3_1);
$mqtt->connect($connectionSettings, false);

function append($db, $sample) {
  try {
    $stmt = $db->prepare("INSERT INTO sensors (sampled, temperature, humidity) VALUES (CURRENT_TIMESTAMP(), :temperature, :humidity);");
    $stmt->execute($sample);
  } catch (PDOException $e) {
    echo "Error: " . $e->getMessage();
  }    
}

$mqtt->subscribe('<topic1>', function ($topic, $message) use (&$data, &$dbApp) {
  printf("Received message on topic [%s]: %s\n", $topic, $message);

  if ($message === 'Offline') {
    $sample = [
      'temperature' => NULL,
      'humidity' => NULL
    ];

    append($dbApp, $sample);
  }

}, 0);

$mqtt->subscribe('<topic2>', function ($topic, $message) use (&$data, &$dbApp) {
  printf("Received message on topic [%s]: %s\n", $topic, $message);
  $obj = json_decode($message);

  $sample = [
    'temperature' => floatval($obj->Temperature),
    'humidity' => floatval($obj->Humidity)
  ];
  
  append($dbApp, $sample);
}, 0);
   
$mqtt->loop(true);
$mqtt->close();
$dbApp = null;      

这里是 systemd 单元:

[Unit]
Description=MQTT receiver
 
[Service]
Type=simple
ExecStart=/usr/bin/php /usr/share/nginx/html/mqtt.php
Restart=always
RestartSec=10
 
[Install]
WantedBy=multi-user.target

这里观察到的行为:

  1. 启动经纪人
  2. 开始
    mqtt.service
  3. 启动发布者(物联网设备),以 LWT 形式发送
    <topic1>
    离线消息并发送
    <topic2>
    实际数据
  4. 检查一切正常:好的
  5. 关闭发布器电源并等待断开连接(收到LWT消息):OK
  6. 一段时间后再次打开并确认它仍然按预期工作:好的

如果第 6 点中的“某个时间”的持续时间是几个小时(例如过夜),就会出现问题。第二天早上,我打开传感器电源,但 PHP 页面不再接收消息(即不再显示“已收到主题消息...”)。

使用

mosquitto_sub
订阅主题会收到消息 - 因此代理正在运行。

systemd 服务处于活动状态并且没有显示错误,我看到了前一天的最后一行。重新启动服务会再次收到消息。

这似乎是一个很好的指标,我在代码中犯了错误,但我找不到错误所在。

php mqtt systemd mosquitto
1个回答
0
投票

根据用户Namoshek的好建议,我解决了更改代码如下:

database.php

<?php

$SERVER_APP = "localhost";
$DATABASE_APP = "<database>";
$USERNAME_APP = "<user>";
$PASSWORD_APP = "<password>";

function db_connect() {
    global $SERVER_APP, $DATABASE_APP, $USERNAME_APP, $PASSWORD_APP;

    try {
        $dbApp = new PDO("mysql:host=$SERVER_APP;dbname=$DATABASE_APP", $USERNAME_APP, $PASSWORD_APP);
        $dbApp->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
        $dbApp->setAttribute(PDO::ATTR_EMULATE_PREPARES, false);
        return $dbApp;
    } catch (PDOException $e) {
        echo "Connection to APP database failed:" . PHP_EOL . $e->getMessage();
    }
}

mqtt.php

<?php header("Access-Control-Allow-Origin: *");
require 'database.php';
require('vendor/autoload.php');

use PhpMqtt\Client\MqttClient;
use PhpMqtt\Client\ConnectionSettings;

ini_set('display_errors', 1);
error_reporting(E_ALL);

$server   = '<address>';
$port     = 1883;
$clientId = '<id>';
$username = '<user>';
$password = '<password>';

$connectionSettings = (new ConnectionSettings)
  ->setUsername($username)
  ->setPassword($password)
  ->setKeepAliveInterval(60)
  ->setConnectTimeout(60)
  ->setMaxReconnectAttempts(PHP_INT_MAX)
  ->setReconnectAutomatically(true);

$mqtt = new MqttClient($server, $port, $clientId, MqttClient::MQTT_3_1_1);
$mqtt->connect($connectionSettings, false);

function append($sample) {
  $dbApp = db_connect();   
  try {
    $stmt = $dbApp->prepare("INSERT INTO sensors (sampled, temperature, humidity) VALUES (CURRENT_TIMESTAMP(), :temperature, :humidity);");
    $stmt->execute($sample);
  } catch (PDOException $e) {
    echo "Error: " . $e->getMessage();
  }    
  $dbApp = null;      
}

$mqtt->subscribe('<topic>', function ($topic, $message) {
  printf("Received message on topic [%s]: %s\n", $topic, $message);
  $obj = json_decode($message);

  $sample = [
    'temperature' => floatval($obj->Temperature),
    'humidity' => floatval($obj->Humidity)
  ];
  
  append($sample);
});
   
$mqtt->loop(true);
$mqtt->close();

现在每次我需要追加记录时都会创建与数据库的连接,并在之后立即关闭。

© www.soinside.com 2019 - 2024. All rights reserved.