MQ

zan框架MQ采用go语言开源消息队列NSQ,提供操作NSQ的客户端SDK,对应的包名称为nsq-client。

配置

MQ连接配置在项目 resource/config/$ENV/nsq.php下。

<?php
/**
 * 说明:
 * 1. 只有lookup项必填, 其他全部选填
 * 2. 所有时间配置 单位: ms
 */
return [
    // ["必填"]lookup 节点地址
    "lookup" => [
        "http://www.example.com"
    ]
];

另外,workerStart文件夹下需要新增配置文件.config.php

<?php
use Zan\Framework\Components\Nsq\InitializeSQS;

return [
    InitializeSQS::class,
];

接口

NSQ目前提供pub和sub接口,接口规范为

<?php
class SQS {
    /**
     * 订阅
     * @param string $topic
     * @param string $channel
     * @param MsgHandler|callable $msgHandler
     * @param int $maxInFlight
     * @return \Generator yield return Consumer
     * @throws NsqException
     */
    public static function subscribe($topic, $channel, $msgHandler, $maxInFlight = -1);

    /**
     * 取消订阅
     * @param string $topic
     * @param string $channel
     * @return bool
     */
    public static function unSubscribe($topic, $channel);

    /**
     * 发布
     * @param string $topic
     * @param string[] ...$messages
     * @return \Generator yield bool
     * @throws NsqException
     */
    public static function publish($topic, ...$messages);

    /**
     * 统计信息
     * @return array
     */
    public static function stat();
}

使用示例

  • 订阅

$topic = "zan_mqworker_test";
$ch = "ch";
//msgHandler为callable function
yield SQS::subscribe($topic, $ch, function(Message $msg, Consumer $consumer) {});
//msgHandler为interface MsgHandler
yield SQS::subscribe($topic, $ch, new BenchMsgHandler(), 1);
  • 取消订阅

yield SQS::unSubscribe($topic, $ch);
  • 发布

$oneMsg = "hello";
$multiMsgs = [
    "hello",
    "hi",
];
yield SQS::publish($topic, $oneMsg);
yield SQS::publish($topic, "hello", "hi");
yield SQS::publish($topic, ...$multiMsgs);