银河源码

高品质源码点击这里给我发消息PHP-LotusAdmin官方论坛

PHP处理kafka消息队列

在安装php-kafka 扩展后,就可以开始编写 php 消费消息的脚本了,php-rdkafka 扩展提供了几种消息处理的方式

低级方式(Low level)

这种方式没有消费组的概念

<?php

$rk = new RdKafka\Consumer();
$rk->setLogLevel(LOG_DEBUG);
// 指定 broker 地址,多个地址用"," 分割
$rk->addBrokers("192.168.33.1:9092");


$topic = $rk->newTopic("test");
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);


while (true) {
    // 第一个参数是分区号
    // 第二个参数是超时时间
    $msg = $topic->consume(0, 1000);
    if ($msg->err) {
        echo $msg->errstr(), "\n";
        break;
    } else {
        echo $msg->payload, "\n";
    }
}

高级方式 (High level)

这种方式可以指定消费组,一个消费组内,一个consumer 进程只能读取一个分区

<?php

$conf = new RdKafka\Conf();

// Set a rebalance callback to log partition assignments (optional)
// 当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
    switch ($err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            echo "Assign: ";
            var_dump($partitions);
            $kafka->assign($partitions);
            break;

        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
            echo "Revoke: ";
            var_dump($partitions);
            $kafka->assign(NULL);
            break;

        default:
            throw new \Exception($err);
    }
});

// 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于 多于这个topic 分区的数量是没有意义的。
$conf->set('group.id', 'myConsumerGroup1');

//添加 kafka集群服务器地址
$conf->set('metadata.broker.list', '192.168.33.1:9092');

$topicConf = new RdKafka\TopicConf();


// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
//当没有初始偏移量时,从哪里开始读取
$topicConf->set('auto.offset.reset', 'smallest');


// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);

// 让消费者订阅log 主题
$consumer->subscribe(['log']);


while (true) {
    $message = $consumer->consume(120*1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

?>

本原创文章未经允许不得转载,为保证支付后正常下载,请把网站加入adblock信任名单

当前页面:银河源码 » PHP处理kafka消息队列

评论