在kafka的代码里,会看到Purgatory(炼狱), Reaper(死神) 等字眼,可见作者的文艺范儿十足,除了是卡夫卡迷之外,还可能是个但丁迷。借用他比较文学的字眼做一次标题党,这篇blog用来记录使用kafka过程中遇到的一些问题。
1) 测试环境单机多实例时log4j的问题
在同一台机器上启用多个kafka实例的话,除了server.properties要各自设置,还需要注意一下它们的log4j配置。虽然多个实例使用同一个日志配置运行也没有问题,但对于排查问题很不方便。所以各个实例,也需要自己的log4j配置。
但是在对每个实例指定了各自的log4j.properties之后,发现里面的kafka.logs.dir
变量不管怎么定义,都是用的默认的路径。开启log4j的-Dlog4j.debug
发现properties是被正确加载的,但并没有使用properties里的kafka.logs.dir
这个变量的值,有可能是被系统环境变量给覆盖了,因为log4j.properties里的变量值,优先用系统环境变量,环境变量没有定义才用properties里变量的值。grep了一下果然,是在”kafka-run-class.sh”脚本里也声明了这个变量:
KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"
要对各个实例设置不同的log4j日志路径,需要修改掉这里。
2) 关闭实例时zkClient线程异常,无法退出的情况
kafka server停止时,触发各个模块/线程的shutdown行为:
1) 停止 SocketServer
2) 停止 RequestHandlerPool 线程池
3) 停止 ReplicaManager
4) 停止 zkClient 线程
ok, shutdown completed
在zkClient的停止过程,如果遇到zookeeper端异常(挂掉)的情况,有可能陷入尝试重连的死循环,日志里看到它会一直尝试连接zookeeper失败,无法停止(可能需要多次kill甚至kill -9)。这个问题偶然发生,并不是必然,还没有仔细分析,暂没精力去跟踪。
WARN Session 0x14b498f4bda0006 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146)
3) 基于SimpleConsumer轮询时的问题
在采用基于SimpleConsumer的消费端实现时,当消息已经处理完最新的一条时(消费端能力大于生产端),轮询的策略需要注意一下。在测试环境下,我们遇到过一个情况是大量的轮询导致整个测试环境网络的流量异常,原因是该topic一直没有新消息,consumer端的轮询没有设置等待参数,也没有在client线程里判断进行一个短暂的sleep。几乎是以死循环的方式不断跟server端通讯,尽管每次的数据包很小,但只要有几个这样的消费端足以引起网络流量的异常。
跟巨石聊了一下,发现这里有个参数maxWait
可以设置(默认是0),当服务器端没有新的消息时判断是否阻塞直到maxWait
。不过在尝试的过程中发现这个参数单独使用并不work,还需要对minBytes
也设置(设置一个大于0的值)才行,参见KafkaApis的handleFetchRequest
方法:
val dataRead = readMessageSets(fetchRequest)
val bytesReadable = dataRead.values.map(_.messages.sizeInBytes).sum
if(fetchRequest.maxWait <= 0 ||
bytesReadable >= fetchRequest.minBytes ||
fetchRequest.numPartitions <= 0) {
...
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
} else {
...
fetchRequestPurgatory.watch(delayedFetch)
}
这两个参数同时设置的情况下才有效:
val fetchReq = new FetchRequestBuilder().clientId(getClientId)
.addFetch(kafkaTopic, kafkaPartition, offset, fetchSize)
.maxWait(5000)
.minBytes(1)
.build()
//2015.4.22 补充
在最新的0.8.2.1 版本里,kafka-run-class.sh
脚本里做了改进,判断LOG_DIR
变量为空才设置默认值,不像以前那样是写死的,这样可以在执行这个脚本之前声明日志路径。
你们kafka用在什么业务场景下?
1) 业务之间,发送消息/交换数据。
2) 数据转存
3) 日志收集