我们学习了解了这么多关于PHP的知识,今天教你们如何快速进行php+kafka的安装,如果不会的“童鞋”,那就跟随本篇文章一起继续学习吧 1、 安装java,并设置相关的环境变量 2、安装kafka,这里以0.10.2版本为例 3、安装kafka的C操作库 4、安装php的kafka扩展 ,这里选择php-rdkafka扩展 https://github.com/arnaud-lb/php-rdkafka 修改php.ini,加入 extension=rdkafka.so 5、安装rdkafka的IDE代码提示文件 以phpstrom为例,在你的项目的External Libraries右键,选择Configure PHP Include Paths,把刚刚的路径添加进来。 6、编写php测试代码 producer: low-level consumer: high-level consumer: 推荐学习:《PHP视频教程》 以上就是教你如何快速进行php+kafka的安装的详细内容,更多请关注php中文网其它相关文章!> wget https://download.java.net/openjdk/jdk7u75/ri/openjdk-7u75-b13-linux-x64-18_dec_2014.tar.gz
> tar zxvf openjdk-7u75-b13-linux-x64-18_dec_2014.tar.gz
> mv java-se-7u75-ri/ /opt/
> export JAVA_HOME=/opt/java-se-7u75-ri
> export PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin
> export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/lib/tools.jar
#验证安装
> java -verison
openjdk version "1.7.0_75"
OpenJDK Runtime Environment (build 1.7.0_75-b13)
OpenJDK 64-Bit Server VM (build 24.75-b04, mixed mode)
> wget http://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz
> tar zxvf kafka_2.11-0.10.2.0.tgz
> mv kafka_2.11-0.10.2.0/ /opt/kafka
> cd /opt/kafka
#启动zookeeper
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
#启动kafka
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
#尝试创建一个topic
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test
#生产者写入消息
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
#消费者消费消息
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
> wget https://github.com/edenhill/librdkafka/archive/v1.3.0.tar.gz
> tar zxvf v1.3.0.tar.gz
> cd librdkafka-1.3.0
> ./configure
> make && make install
> wget https://github.com/arnaud-lb/php-rdkafka/archive/4.0.2.tar.gz
> tar 4.0.2.tar.gz
> cd php-rdkafka-4.0.2
> /opt/php7/bin/phpize
> ./configure --with-php-config=/opt/php7/bin/php-config
> make && make install
> composer create-project kwn/php-rdkafka-stubs php-rdkafka-stubs
<?php
$conf = new RdKafka/Conf();
$conf->set('log_level', LOG_ERR);
$conf->set('debug', 'admin');
$conf->set('metadata.broker.list', 'localhost:9092');
//If you need to produce exactly once and want to keep the original produce order, uncomment the line below
//$conf->set('enable.idempotence', 'true');
$producer = new RdKafka/Producer($conf);
$topic = $producer->newTopic("test2");
for ($i = 0; $i < 10; $i++) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
$producer->poll(0);
}
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new /RuntimeException('Was unable to flush, messages might be lost!');
}
<?php
$conf = new RdKafka/Conf();
$conf->set('log_level', LOG_ERR);
$conf->set('debug', 'admin');
// Set the group id. This is required when storing offsets on the broker
$conf->set('group.id', 'myConsumerGroup');
$rk = new RdKafka/Consumer($conf);
$rk->addBrokers("127.0.0.1");
$topicConf = new RdKafka/TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
// Set the offset store method to 'file'
$topicConf->set('offset.store.method', 'broker');
// Alternatively, set the offset store method to 'none'
// $topicConf->set('offset.store.method', 'none');
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');
$topic = $rk->newTopic("test2", $topicConf);
// Start consuming partition 0
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
$message = $topic->consume(0, 10000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
print_r($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more/n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out/n";
break;
default:
throw new /Exception($message->errstr(), $message->err);
break;
}
}
<?php
$conf = new RdKafka/Conf();
// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb(function (RdKafka/KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
echo "Assign: ";
var_dump($partitions);
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
echo "Revoke: ";
var_dump($partitions);
$kafka->assign(NULL);
break;
default:
throw new /Exception($err);
}
});
// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', 'myConsumerGroup');
// Initial list of Kafka brokers
$conf->set('metadata.broker.list', '127.0.0.1');
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$conf->set('auto.offset.reset', 'smallest');
$consumer = new RdKafka/KafkaConsumer($conf);
// Subscribe to topic 'test2'
$consumer->subscribe(['test2']);
echo "Waiting for partition assignment... (make take some time when/n";
echo "quickly re-joining the group after leaving it.)/n";
while (true) {
$message = $consumer->consume(1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
print_r($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more/n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out/n";
break;
default:
throw new /Exception($message->errstr(), $message->err);
break;
}
sleep(2);
}
声明:本文转载于:CSDN,如有侵犯,请联系admin@php.cn删除
- 上一篇:一文了解PHP中的装饰器模式
- 下一篇:一招解决 PHP 单例模式解析和实战