国产成人精品久久免费动漫-国产成人精品天堂-国产成人精品区在线观看-国产成人精品日本-a级毛片无码免费真人-a级毛片毛片免费观看久潮喷

您的位置:首頁技術(shù)文章
文章詳情頁

利用擴展的方式在PHP中使用Kafka的教程分享

瀏覽:3日期:2022-06-14 10:34:01
目錄前言安裝1. 下載2. 目錄使用1. 生產(chǎn)2. 消費(從指定的 partition 消費)其他解決方法前言

由于之前在 PHP 中使用 Kafka 是通過 composer 包的方式,由于 nmred/kafka-php 很久沒有維護(hù),并且網(wǎng)上相關(guān)問題的文章也比較少。所以我這次換成 PHP 擴展 RdKafka 繼續(xù)使用,主要介紹擴展安裝和這種方式的基本操作。

安裝1. 下載

地址(找到與自己環(huán)境匹配的就可以)

2. 目錄

由于 php-rdkafka 依賴 librdkafka,linux 就需要先安裝 librdkafka 后安裝 php-rdkafka,而 windows 版本是如下幾個文件,安裝方法如下:

(1). 將 librdkafka.dll 和 librdkafka.pdb 放入 PHP 安裝的根目錄下,而 php_rdkafka.dll 和 php_rdkafka.pdb 放入 PHP 安裝目錄的 ext 下。

(2). php.ini 配置文件添加 extension=php_rdkafka.dll,最后重啟 PHP。

(3). php-m 或這 phpinfo (); 就可以查看到擴展了。

通過 get_declared_classes() 也可以查看到擴展里預(yù)設(shè)的函數(shù)了。

使用1. 生產(chǎn)public function kafkaTest(){$rk = new \RdKafka\Producer();$rk->addBrokers('127.0.0.1:9092');$topic = $rk->newTopic('shop');$ret = [];for ($i = 0; $i < 5; $i++) {$content = '第' . $i . '次發(fā)送失敗';$message = ['mobile' => '15623652142', 'content' => $content];$payload = json_encode($message);// 指定向0號partition生產(chǎn)數(shù)據(jù)$ret[]['produce_res'] = $topic->produce(0, 0, $payload, 'sms_$i');// 隨機選擇partition//$topic->produce(RD_KAFKA_PARTITION_UA, 0, $payload);if ($rk->getOutQLen() > 0) {$ret[]['produce_poll'] = $rk->poll(500);} else {$ret[]['produce_poll'] = $rk->poll(0);}}dump($ret);}

2. 消費(從指定的 partition 消費)protected function execute(Input $input, Output $output){$output->writeln('!!!hello kafka!!!');$conf = new \RdKafka\Conf();$conf->set('group.id', 'sms-consumer-group');$rk = new \RdKafka\Consumer($conf);$rk->addBrokers('127.0.0.1:9092');$topicConf = new \RdKafka\TopicConf();$topicConf->set('auto.commit.interval.ms', 100);$topicConf->set('offset.store.method', 'file');$topicConf->set('offset.store.path', sys_get_temp_dir());$topicConf->set('auto.offset.reset', 'smallest');$topic = $rk->newTopic('shop', $topicConf);$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);while(true) {// 設(shè)置消費時的時間間隔,單位毫秒,以下表示5秒消費一個$message = $topic->consume(0, 5000);if ($message) {echo '讀取到消息\n\r';// 消息對象,包括消息主題,消息創(chuàng)建時間戳,消息分區(qū)編號,消息主體,消息鍵名,消息長度等var_dump($message);switch ($message->err) {case RD_KAFKA_RESP_ERR_NO_ERROR:echo '讀取消息成功:\n\r';var_dump($message->payload);break;case RD_KAFKA_RESP_ERR__PARTITION_EOF:echo '讀取消息失敗\n\r';break;case RD_KAFKA_RESP_ERR__TIMED_OUT:echo '請求超時\n\r';break;default:throw new \Exception($message->errstr(), $message->err);break;}} else {echo '未讀取到消息\n\r';}}$output->writeln('!!!the end!!!');}

其他

在執(zhí)行消費過程中,發(fā)現(xiàn) kafka 停止服務(wù),拋出的異常:ERROR Shutdown broker because all log dirs in /tmp/kafka-logs have failed。

解決方法

刪除 kafka-logs 下的所有日志,再重新啟動 Kafaka, kafka-server-start.bat ....\config\server.properties &

到此這篇關(guān)于利用擴展的方式在PHP中使用Kafka的教程分享的文章就介紹到這了,更多相關(guān)PHP使用Kafka內(nèi)容請搜索好吧啦網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持好吧啦網(wǎng)!

標(biāo)簽: PHP
主站蜘蛛池模板: 欧美久久精品 | 97国产精品视频观看一 | 欧美一区二区精品 | 成年女人毛片免费播放视频m | 性欧美高清极品xx | 狠狠色丁香久久综合网 | 一本伊大人香蕉高清在线观看 | 亚洲欧美一区二区三区在饯 | 国产精品久久久久激情影院 | 国产美女在线精品亚洲二区 | 久久黄色精品视频 | 啪视| 成人综合在线观看 | 亚洲欧美视频在线观看 | 日本国产欧美 | 美女视频黄a全部 | 亚洲成在人 | 精品国产成人综合久久小说 | 中国美女隐私无遮挡免费视频 | 狠狠色噜噜狠狠狠米奇9999 | 中文字幕精品一区二区绿巨人 | 一机毛片 | 精品视频自拍 | 在线观看免费黄色网址 | 女人张开腿给男人捅 | 欧洲国产伦久久久久久久 | 国产农村乱子伦精品视频 | 国产微拍精品福利视频 | 黄毛片一级毛片 | 一区二区三区在线 | 欧 | 久久久99精品免费观看 | 毛片在线高清免费观看 | 精品国产免费观看一区 | 毛片a级三毛片免费播放 | 久久久全国免费视频 | 欧美亚洲另类久久综合 | 伊人色综合久久成人 | 成人三级在线播放 | 日本久久免费 | 久久综合一区二区三区 | 在线黄|