PHP实现RabbitMQ消息队列的示例代码
RabbitMQ是一个开源的消息队列系统,支持多种消息递协议。在PHP中使用RabbitMQ实现消息队列非常简单,本文将详细介绍如何使用PHP和RabbitMQ实现消息队列,并提供两个示例说明。
环境准备
在开始之前,需要确保已安装了以下环境:
- PHP 5.4 或以上版本
- RabbitMQ 服务器
安装php-amqplib
在使用PHP和RabbitMQ实现消息队列之前,需要安装php-amqplib。可以通过Composer安装php-amqplib,也可以手动下载并安装。
示例一:使用PHP和RabbitMQ实现消息队列
在本例中,我们将使用PHP和RabbitMQ实现消息队列。具体步骤如下:
- 创建一个生产者并发送消息。
- 创建一个消费者并接收消息。
1. 创建一个生产者并发送消息
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";
$channel->close();
$connection->close();
?>
在上述代码中,我们创建了一个生产者并发送了一条消息。在$channel->queue_declare
方法中,我们创建了一个名为hello
的队列。在$channel->basic_publish
方法中,我们将消息发送到队列中。
2. 创建一个消费者并接收消息
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
在上述代码中,我们创建了一个消费者并接收了一条消息。在$channel->queue_declare
方法中,我们创建了一个名为hello
的队列。在$channel->basic_consume
方法中,我们注册了一个回调函数,用于处理接收到的消息。
示例二:使用PHP和RabbitMQ实现RPC调用
在本例中,我们将使用PHP和RabbitMQ实现RPC调用。具体步骤如下:
- 创建一个RPC客户端并发送请求。
- 创建一个RPC服务器并处理请求。
1. 创建一个RPC客户端并发送请求
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class FibonacciRpcClient {
private $connection;
private $channel;
private $callback_queue;
private $response;
private $corr_id;
public function __construct() {
$this->connection = new AMQPStreamConnection(
'localhost', 5672, 'guest', 'guest'
);
$this->channel = $this->connection->channel();
list($this->callback_queue, ,) = $this->channel->queue_declare(
"", false, false, true, false
);
$this->channel->basic_consume(
$this->callback_queue, '', false, true, false, false,
array($this, 'on_response')
);
}
public function on_response($rep) {
if($rep->get('correlation_id') == $this->corr_id) {
$this->response = $rep->body;
}
}
public function call($n) {
$this->response = null;
$this->corr_id = uniqid();
$msg = new AMQPMessage(
(string) $n,
array('correlation_id' => $this->corr_id,
'reply_to' => $this->callback_queue)
);
$this->channel->basic_publish($msg, '', 'rpc_queue');
while(!$this->response) {
$this->channel->wait();
}
return intval($this->response);
}
};
$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "\n";
?>
在上述代码中,我们创建了一个RPC客户端并发送了一条请求。在$this->channel->queue_declare
方法中,我们创建了一个随机的、独占的队列。在$msg
中,我们设置了请求的相关ID和回复队列。在$this->channel->basic_publish
方法中,我们将请求发送到队列中。在while
循环中,我们等待接收到回复。
2. 创建一个RPC服务器并处理请求
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
function fib($n) {
if ($n == 0) {
return 0;
}
if ($n == 1) {
return 1;
}
return fib($n-1) + fib($n-2);
}
class FibonacciRpcServer {
private $connection;
private $channel;
public function __construct() {
$this->connection = new AMQPStreamConnection(
'localhost', 5672, 'guest', 'guest'
);
$this->channel = $this->connection->channel();
$this->channel->queue_declare('rpc_queue', false, false, false, false);
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
'rpc_queue', '', false, false, false, false,
array($this, 'on_request')
);
}
public function on_request($request) {
$n = intval($request->body);
echo " [.] fib(", $n, ")\n";
$response = fib($n);
$msg = new AMQPMessage(
(string) $response,
array('correlation_id' => $request->get('correlation_id'))
);
$request->delivery_info['channel']->basic_publish(
$msg, '', $request->get('reply_to')
);
$request->delivery_info['channel']->basic_ack(
$request->delivery_info['delivery_tag']
);
}
public function start() {
echo " [x] Awaiting RPC requests\n";
while(count($this->channel->callbacks)) {
$this->channel->wait();
}
}
};
$fibonacci_rpc = new FibonacciRpcServer();
$fibonacci_rpc->start();
?>
在上述代码中,我们创建了一个RPC服务器并处理了一条请求。在$this->channel->queue_declare
方法中,我们创建了一个名为rpc_queue
的队列。在$this->channel->basic_consume
方法中,我们注册了一个回调函数,用于处理接收到的请求。在$msg
中,我们设置了回复的相关ID。在$request->delivery_info['channel']->basic_publish
方法中,我们将回复发送到回复队列中。在$request->delivery_info['channel']->basic_ack
方法中,我们确认已经处理了请求。
总结
本文介绍了如何使用PHP和RabbitMQ实现消息队列和RPC调用,并提供了两个示例说明。RabbitMQ是一个功能强大的消息队列系统,可以用于构建高可用性、高性能的分布式应用程序。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:PHP实现RabbitMQ消息列队的示例代码 - Python技术站