MQ¶
zan框架MQ采用go语言开源消息队列NSQ,提供操作NSQ的客户端SDK,对应的包名称为nsq-client。
配置¶
MQ连接配置在项目 resource/config/$ENV/nsq.php下。
<?php
/**
* 说明:
* 1. 只有lookup项必填, 其他全部选填
* 2. 所有时间配置 单位: ms
*/
return [
// ["必填"]lookup 节点地址
"lookup" => [
"http://www.example.com"
]
];
另外,workerStart文件夹下需要新增配置文件.config.php
<?php
use Zan\Framework\Components\Nsq\InitializeSQS;
return [
InitializeSQS::class,
];
接口¶
NSQ目前提供pub和sub接口,接口规范为
<?php
class SQS {
/**
* 订阅
* @param string $topic
* @param string $channel
* @param MsgHandler|callable $msgHandler
* @param int $maxInFlight
* @return \Generator yield return Consumer
* @throws NsqException
*/
public static function subscribe($topic, $channel, $msgHandler, $maxInFlight = -1);
/**
* 取消订阅
* @param string $topic
* @param string $channel
* @return bool
*/
public static function unSubscribe($topic, $channel);
/**
* 发布
* @param string $topic
* @param string[] ...$messages
* @return \Generator yield bool
* @throws NsqException
*/
public static function publish($topic, ...$messages);
/**
* 统计信息
* @return array
*/
public static function stat();
}
使用示例¶
订阅
$topic = "zan_mqworker_test";
$ch = "ch";
//msgHandler为callable function
yield SQS::subscribe($topic, $ch, function(Message $msg, Consumer $consumer) {});
//msgHandler为interface MsgHandler
yield SQS::subscribe($topic, $ch, new BenchMsgHandler(), 1);
取消订阅
yield SQS::unSubscribe($topic, $ch);
发布
$oneMsg = "hello";
$multiMsgs = [
"hello",
"hi",
];
yield SQS::publish($topic, $oneMsg);
yield SQS::publish($topic, "hello", "hi");
yield SQS::publish($topic, ...$multiMsgs);