如何使用 php-amqplib 连接到 rmohr/activemq docker 镜像

问题描述 投票:0回答:1

我正在尝试创建一个 cutome 命令,该命令应该使用来自 activemq 队列的消息。 我正在使用 php-amqplib,并且创建了自定义连接器和 ActiveMQServiceProvider。 我在运行时遇到此错误

sail artisan horizon:consume-activemq

 PhpAmqpLib\Exception\AMQPInvalidFrameException 

  Invalid frame type 65

...

  7   app/Queue/Connectors/ActiveMQConnector.php:15
      PhpAmqpLib\Connection\AMQPStreamConnection::__construct()

  8   app/Console/Commands/ConsumeActiveMQMessages.php:29
      App\Queue\Connectors\ActiveMQConnector::connect()

我检查了docker日志,似乎php-amqplib不支持AMQP v1.0

Connection attempt from non AMQP v1.0 client. AMQP,0,0,9,1
2024-02-22 13:24:44  WARN | Transport Connection to: tcp://192.168.65.1:32999 failed: org.apache.activemq.transport.amqp.AmqpProtocolException: Connection from the client using unsupported AMQP attempted

我是否理解错误或配置错误?

这些是我的配置:

queue.php

    'connections' => [

...
        'activemq' => [
            'driver' => 'activemq',
            'host' => env('ACTIVEMQ_HOST', 'localhost'),
            'port' => env('ACTIVEMQ_PORT', 61613),
            'username' => env('ACTIVEMQ_USERNAME', 'guest'),
            'password' => env('ACTIVEMQ_PASSWORD', 'guest'),
            'queue' => env('ACTIVEMQ_QUEUE', ''),
            'exchange_name' => env('ACTIVEMQ_EXCHANGE_NAME', ''),
        ],

mylocal.env

ACTIVEMQ_HOST=host.docker.internal
ACTIVEMQ_PORT=5672
ACTIVEMQ_USER=admin
ACTIVEMQ_PASSWORD=admin
ACTIVEMQ_QUEUE=activemqTest

ActiveMQServiceProvider.php

<?php

namespace App\Providers;

use App\Queue\Connectors\ActiveMQConnector;
use Illuminate\Queue\QueueManager;
use Illuminate\Support\ServiceProvider;

class ActiveMQServiceProvider extends ServiceProvider
{
    /**
     * Register services.
     */
    public function register(): void
    {
    }

    /**
     * Bootstrap services.
     */
    public function boot(): void
    {
        $this->app->make(QueueManager::class)->addConnector('activemq', function () {
            return new ActiveMQConnector();
        });
    }
}

ActiveMQConnector.php

<?php

namespace App\Queue\Connectors;

use Illuminate\Queue\Connectors\ConnectorInterface;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ActiveMQConnector implements ConnectorInterface
{
    /**
     * @throws \Exception
     */
    public function connect(array $config)
    {
        return new AMQPStreamConnection(
            $config['host'],
            $config['port'],
            $config['username'],
            $config['password'],
            $config['vhost']
        );
    }
}

ConsumeActiveMQMessages.php

<?php

namespace App\Console\Commands;

use App\Queue\Connectors\ActiveMQConnector;
use Illuminate\Console\Command;
use PhpAmqpLib\Message\AMQPMessage;

class ConsumeActiveMQMessages extends Command
{
    protected $signature = 'horizon:consume-activemq';

    protected $description = 'Consume messages from ActiveMQ and process them within Horizon';

    /**
     * @throws \Exception
     */
    public function handle()
    {
        $connector = new ActiveMQConnector();
        $config = [
            'host' => config('queue.connections.activemq.host'),
            'port' => config('queue.connections.activemq.port'),
            'username' => config('queue.connections.activemq.username'),
            'password' =>config('queue.connections.activemq.password'),
            'vhost' => config('queue.connections.activemq.vhost') !== null ?config('queue.connections.activemq.vhost') : '/',
        ];

        $connection = $connector->connect($config);

        $channel = $connection->channel();

        $callback = function (AMQPMessage $message) {
            $this->processMessage($message);
        };

        $channel->basic_consume(config('queue.connections.activemq.queue'), '', false, true, false, false, $callback);

        while ($channel->is_consuming()) {
            $channel->wait();
        }

        $channel->close();
        $connection->close();
    }

    protected function processMessage(AMQPMessage $message)
    {
        $this->info('Received message: ' . $message->getBody());
    }
}

Laravel:V10.10 PHP V8.1

我尝试更改主机和端口,但没有任何改变。

php laravel docker activemq
1个回答
2
投票

ActiveMQ(经典和 Artemis)采用 AMQP 1.0 ISO 标准 AMQP 规范,而 php-amqplib 似乎仅支持 0.9.1 草案标准,因此您无法使用该客户端连接到 ActiveMQ。如果您需要使用该运行时,您需要找到一个支持 AMQP 1.0 的 php 客户端。

© www.soinside.com 2019 - 2024. All rights reserved.