标签归档:kafka

记录几个实践中的问题

1) nginx禁止对写操作timeout时retry

以前遇到的一个case,业务那边说一笔请求从nginx端发送给后端tomcat了2次(落在两个不同的tomcat节点上)。后来发现是nginx发给后端节点timeout,然后做了重试,发给了另一个节点。默认情况下nginx对后端error 和 timeout 都会做retry,可以明确的禁止在timeout的情况下禁止retry。当然如果集群读写分离的话,对于只读集群retry是无所谓的,但对于写确实存在问题。

2) kafka重启时因为数据日志文件名被人重命名过而导致启动失败

启动kafka broker的时候,会重新load之前的每个topic的数据,正常情况下会提示每个topic恢复完成。

INFO Recovering unflushed segment 588022 in log xxx-topic-0. (kafka.log.Log)
INFO Completed load of log xxx-topic-0 with log end offset 590676 (kafka.log.Log)

但当有些topic下的数据恢复失败的时候,会导致broker关闭,异常如下

ERROR There was an error in one of the threads during logs loading: java.lang.NumberFormatException: For input string: "test" (kafka.log.LogManager)
FATAL [Kafka Server 3], Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)

java.lang.NumberFormatException: For input string: "test"
      at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
      at java.lang.Long.parseLong(Long.java:589)
      at java.lang.Long.parseLong(Long.java:631)
      at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:251)
      at scala.collection.immutable.StringOps.toLong(StringOps.scala:30)
      at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:152)
      at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:141)
      at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
      at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
      at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
      at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
      at kafka.log.Log.loadSegments(Log.scala:141)
      at kafka.log.Log.<init>(Log.scala:67)
      at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$7$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:142)
      at kafka.utils.Utils$$anon$1.run(Utils.scala:54)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

这是因为某个目录下,存在一个 test.log 的文件

$ ls mytopic-0/
00000000000000000485.index  00000000000000000485.log  00000000000000000568.index  00000000000000000568.log  test.log

看上去这个 test.log 当时是把 00…log 给拷贝了一个,然后用编辑器去查看内容。而事后忘了清理掉,导致重启时把这个文件当成一个畸形文件了。因为kafka broker要求所有数据文件名称都是Long类型的。

3) 又一个actor阻塞的例子

在我自己的mac上测试的时候,一切正常,部署到dev环境就严重超时。jstack观察发现又是误用阻塞操作导致所有actor的线程都被阻塞所致,当时 EventProcessor 这个 Router 背后的实例数设置的是40,而这台dev环境的linux只有2核,根据当时akka的配置里的并发因子算出并发线程数是32,所以32个线程基本都被 eventProcessor 的40个actor全给占用了,因为它是不断发消息轮询的(我的mac是8核,运行时的线程数要大于40不会发生全部被阻塞的情况)。解决方式,一方面调大并发因子,把线城数提升上去,另一方面控制 eventProcessor 的实例数,不让它的阻塞操作影响到其他actor。(其实根上是没设计好,没有隔离阻塞操作,只不过这正好是个小应用,不需要过多考虑。)

kafka的advertised.host.name参数

机器有两块网卡,kafka的host.name开始只绑定在了内部IP上,另一块对外网卡无法访问,把值设置为空的话会kafka监听端口在所有的网卡上绑定,但是在外网访问时,客户端又遇到了java.nio.channels.ClosedChannelException异常信息,server端用tcpdump分析的时候发现客户端有传递kafka所在机器的机器名过来。在client里断点跟踪一下发现是findLeader的时候返回的元信息是机器名而不是IP。客户端无法解析这个机器名所以出现了前面的异常。

在server.properties 里还有另一个参数是解决这个问题的, advertised.host.name参数用来配置返回的host.name值,把这个参数配置为外网IP地址即可。

这个参数默认没有启用,默认是返回的java.net.InetAddress.getCanonicalHostName的值,在我的mac上这个值并不等于hostname的值而是返回IP,但在linux上这个值就是hostname的值。

kafka异步发送也存在消息重复的可能

遇到一个业务上的情况,在压力很大的情况下,某个topic的消息出现了小部分的冗余,发送端采用async模式。不确定kafka的async模式发送消息是否也会重试,文档里提到async模式发送增加了丢消息的可能,不知道message.send.max.retries参数是否也会起作用,群里讨论了一下,晚上跟踪了一下代码发现也是有 retry 的。

kafka.producer.async.DefaultEventHandlerhandle方法里

while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
  ......
}

看来不论同步还是异步发送,消息重复的可能都是存在的。

卡夫卡的炼狱

在kafka的代码里,会看到Purgatory(炼狱), Reaper(死神) 等字眼,可见作者的文艺范儿十足,除了是卡夫卡迷之外,还可能是个但丁迷。借用他比较文学的字眼做一次标题党,这篇blog用来记录使用kafka过程中遇到的一些问题。

1) 测试环境单机多实例时log4j的问题

在同一台机器上启用多个kafka实例的话,除了server.properties要各自设置,还需要注意一下它们的log4j配置。虽然多个实例使用同一个日志配置运行也没有问题,但对于排查问题很不方便。所以各个实例,也需要自己的log4j配置。

但是在对每个实例指定了各自的log4j.properties之后,发现里面的kafka.logs.dir变量不管怎么定义,都是用的默认的路径。开启log4j的-Dlog4j.debug发现properties是被正确加载的,但并没有使用properties里的kafka.logs.dir这个变量的值,有可能是被系统环境变量给覆盖了,因为log4j.properties里的变量值,优先用系统环境变量,环境变量没有定义才用properties里变量的值。grep了一下果然,是在”kafka-run-class.sh”脚本里也声明了这个变量:

KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"

要对各个实例设置不同的log4j日志路径,需要修改掉这里。

2) 关闭实例时zkClient线程异常,无法退出的情况

kafka server停止时,触发各个模块/线程的shutdown行为:

1) 停止 SocketServer
2) 停止 RequestHandlerPool  线程池
3) 停止 ReplicaManager
4) 停止 zkClient 线程
ok, shutdown completed

在zkClient的停止过程,如果遇到zookeeper端异常(挂掉)的情况,有可能陷入尝试重连的死循环,日志里看到它会一直尝试连接zookeeper失败,无法停止(可能需要多次kill甚至kill -9)。这个问题偶然发生,并不是必然,还没有仔细分析,暂没精力去跟踪。

WARN Session 0x14b498f4bda0006 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)

java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146)

3) 基于SimpleConsumer轮询时的问题

在采用基于SimpleConsumer的消费端实现时,当消息已经处理完最新的一条时(消费端能力大于生产端),轮询的策略需要注意一下。在测试环境下,我们遇到过一个情况是大量的轮询导致整个测试环境网络的流量异常,原因是该topic一直没有新消息,consumer端的轮询没有设置等待参数,也没有在client线程里判断进行一个短暂的sleep。几乎是以死循环的方式不断跟server端通讯,尽管每次的数据包很小,但只要有几个这样的消费端足以引起网络流量的异常。

巨石聊了一下,发现这里有个参数maxWait可以设置(默认是0),当服务器端没有新的消息时判断是否阻塞直到maxWait。不过在尝试的过程中发现这个参数单独使用并不work,还需要对minBytes也设置(设置一个大于0的值)才行,参见KafkaApis的handleFetchRequest方法:

val dataRead = readMessageSets(fetchRequest)
val bytesReadable = dataRead.values.map(_.messages.sizeInBytes).sum

if(fetchRequest.maxWait <= 0 ||
   bytesReadable >= fetchRequest.minBytes ||
   fetchRequest.numPartitions <= 0) {
  ...
  requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
} else {
  ...
  fetchRequestPurgatory.watch(delayedFetch)
}

这两个参数同时设置的情况下才有效:

val fetchReq = new FetchRequestBuilder().clientId(getClientId)
            .addFetch(kafkaTopic, kafkaPartition, offset, fetchSize)
            .maxWait(5000) 
            .minBytes(1)
            .build()

//2015.4.22 补充
在最新的0.8.2.1 版本里,kafka-run-class.sh脚本里做了改进,判断LOG_DIR变量为空才设置默认值,不像以前那样是写死的,这样可以在执行这个脚本之前声明日志路径。

用strings命令查看kafka-log内容

kafka的log内容格式还不没怎么了解,想快速浏览消息内容的话,除了使用它自带的kafka-console-consumer.sh脚本,还可以直接去看log文件本身,不过内容里有部分二进制字符,通过命令看的话会有乱码。

strings 命令可以过滤掉二进制编码,但默认它也会过滤掉中文字符,只留有英文字符。要用它的-e S参数可以同时过滤出中文或英文字符,但仍会包含了小部分的二进制编码,可以在通过iconv去掉一下,能大致看到消息内容:

$ cat log-strings.sh
#!/bin/bash

PROG_NAME=$0
LOG_FILE=$1

if [ -z "$LOG_FILE" ];then
  echo "Usage: $PROG_NAME logfile"
  exit 1
fi

strings -e S "$LOG_FILE" | iconv -c -f "UTF-8" -t "UTF-8"