Akka的RoundRobinRouting实现存在bug

Akka的RoundRobinRouting实现存在bug,当累计执行超过Long最大值之后(变为负数),会导致分散不均,最后一个节点永远分配不上,直到累计值再次变为正数才恢复均匀。代码如下(2.4.9版本):

final class RoundRobinRoutingLogic extends RoutingLogic {
  val next = new AtomicLong

  override def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee =
    if (routees.nonEmpty) {
      val size = routees.size
      val index = (next.getAndIncrement % size).asInstanceOf[Int]
      routees(if (index < 0) size + index - 1 else index)
    } else NoRoutee

}

上面select函数里当index为负数时,返回size + index - 1存在问题,用一段java代码验证一下:

import java.util.concurrent.atomic.AtomicLong;

public class Test {

    static final AtomicLong next = new AtomicLong(-2003);
    static final int size = 4;

    static int getNext() {
        int index = (int) (next.getAndIncrement() % size);
        if (index < 0)
            return size + index - 1;
        else
            return index;
    }

    public static void main(String[] args) {
        for (int n = 0; n < 8; n++) {
            System.out.println(getNext());
        }
    }
}

// 运行后输出:
0
1
2
0
0
1
2
0

上面的代码假设有4个节点,当next为负数后,一直在"0,1,2"这三个节点上分配,不会分配给最后一个节点(即索引为3的),修正的方式是当index为负数后,返回其绝对值(即 -index)即可。

记录几个实践中的问题

1) nginx禁止对写操作timeout时retry

以前遇到的一个case,业务那边说一笔请求从nginx端发送给后端tomcat了2次(落在两个不同的tomcat节点上)。后来发现是nginx发给后端节点timeout,然后做了重试,发给了另一个节点。默认情况下nginx对后端error 和 timeout 都会做retry,可以明确的禁止在timeout的情况下禁止retry。当然如果集群读写分离的话,对于只读集群retry是无所谓的,但对于写确实存在问题。

2) kafka重启时因为数据日志文件名被人重命名过而导致启动失败

启动kafka broker的时候,会重新load之前的每个topic的数据,正常情况下会提示每个topic恢复完成。

INFO Recovering unflushed segment 588022 in log xxx-topic-0. (kafka.log.Log)
INFO Completed load of log xxx-topic-0 with log end offset 590676 (kafka.log.Log)

但当有些topic下的数据恢复失败的时候,会导致broker关闭,异常如下

ERROR There was an error in one of the threads during logs loading: java.lang.NumberFormatException: For input string: "test" (kafka.log.LogManager)
FATAL [Kafka Server 3], Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)

java.lang.NumberFormatException: For input string: "test"
      at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
      at java.lang.Long.parseLong(Long.java:589)
      at java.lang.Long.parseLong(Long.java:631)
      at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:251)
      at scala.collection.immutable.StringOps.toLong(StringOps.scala:30)
      at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:152)
      at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:141)
      at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
      at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
      at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
      at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
      at kafka.log.Log.loadSegments(Log.scala:141)
      at kafka.log.Log.<init>(Log.scala:67)
      at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$7$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:142)
      at kafka.utils.Utils$$anon$1.run(Utils.scala:54)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

这是因为某个目录下,存在一个 test.log 的文件

$ ls mytopic-0/
00000000000000000485.index  00000000000000000485.log  00000000000000000568.index  00000000000000000568.log  test.log

看上去这个 test.log 当时是把 00…log 给拷贝了一个,然后用编辑器去查看内容。而事后忘了清理掉,导致重启时把这个文件当成一个畸形文件了。因为kafka broker要求所有数据文件名称都是Long类型的。

3) 又一个actor阻塞的例子

在我自己的mac上测试的时候,一切正常,部署到dev环境就严重超时。jstack观察发现又是误用阻塞操作导致所有actor的线程都被阻塞所致,当时 EventProcessor 这个 Router 背后的实例数设置的是40,而这台dev环境的linux只有2核,根据当时akka的配置里的并发因子算出并发线程数是32,所以32个线程基本都被 eventProcessor 的40个actor全给占用了,因为它是不断发消息轮询的(我的mac是8核,运行时的线程数要大于40不会发生全部被阻塞的情况)。解决方式,一方面调大并发因子,把线城数提升上去,另一方面控制 eventProcessor 的实例数,不让它的阻塞操作影响到其他actor。(其实根上是没设计好,没有隔离阻塞操作,只不过这正好是个小应用,不需要过多考虑。)

优雅关闭与session draining

最近看到nginx plus收费版里有个session draining的概念,在用nginx做反向代理,要优雅停止后端应用的话,需要先在nginx层控制请求不再发到相应的节点,但与此同时已经建立的链接则要继续保持,一直等这些链接都结束了再停止应用。

其实在免费版的nginx/tengine就能实现这个特性,在以前的这篇nginx反向代理对后端某个节点优雅下线的问题 文章里测试过后端tomcat有一个耗时很长的请求,当tengine端健康监测已发现后端节点不可用的情况下,该请求并不会被tengine中止,而会等后端响应结束后再返回给客户端。

优化关闭应用并不只是在负载均衡或反向代理层面就能完美解决的,后端的应用本身也要考虑很多细节。举一个例子,Actor怎么有序的关闭,可以参考之前分享的Governor模式,所有干活的Actor由Governor创建,在创建时就设定好优先级。

在应用关闭的时候,对master发送PoisonPill,示例代码如下:

// 在你的容器或微容器里监听到应用关闭事件时触发shutdown,最简单的情况下是在shutdownhook里
def shutdown(): Unit = {
    gracefulStopActors
    system.terminate
    Await.result(system.whenTerminated, Duration.Inf)
    springContext.close
}

private def gracefulStopActors {
    master ! PoisonPill
    signal.acquire //wait
}

在governor角色里,当收到Terminated消息后顺序结束子actor

...
case Terminated(actor) if actor == observable => {
  logger.info("===receive terminated message from: " + actor)
  // 先顺序的杀死所有托管的子actor,并将结果发给自己
  stopAll(managedActors.toList.sorted) pipeTo self
}
...

// 顺序的停止所有子actor
protected def stopAll(kids: List[OrderedActorRef]): Future[Any] = {
    kids match {
      case first :: Nil =>
        logger.info("===graceful stop: " + first.actor)
        gracefulStop(first.actor, stopTimeout).flatMap { _ =>
          Future { AllDead }
      }
      case first :: rest =>
        logger.info("===graceful stop: " + first.actor)
        gracefulStop(first.actor, stopTimeout).flatMap { _ =>
        stopAll(rest)
      }
      case Nil =>
        Future { AllDead }
    }
}

再举一个例子,应用里有一个线程一直轮询从redis里获取数据,你在关闭应用的时候如何优雅终止这个线程呢?下面用一段代码简单模拟这个场景;假设这个应用没有任何容器,结束就是靠收到kill信号,那么在ShutdownHook里,要阻塞的等待(一段时间)worker线程结束后再退出才算安全

public class Test {
  static volatile boolean running = true;

  public static void main(String[] args) {
  Thread worker = new Thread() {
      public void run() {
          while (running) {
              try {
                  System.out.println("i'm running.1");
                  Thread.sleep(1000);
                  System.out.println("i'm running.2");
                  Thread.sleep(1000);
                  System.out.println("i'm running.3");
                  Thread.sleep(1000);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }
      }
  };
  worker.start();
  Runtime.getRuntime().addShutdownHook(new Thread() {
      public void run() {
          System.out.println("receive shutdown signal.");
          running = false;

          while(worker.isAlive()) {
              System.out.println("waiting for worker thread finish.");
              try {
                  Thread.sleep(1000);
              } catch (InterruptedException e) {
              }
          }
          System.out.println("wroker thread finished!");
      }
  });
 }
}

Actor里的偏函数与性能

Evan Chan分享的《Akka in Production: Our Story》是一篇非常务实的工程实践分享,里面有很多可借鉴的点子,其中的一个对消息接受的包装逻辑主要预留了扩展点:

trait ActorStack extends Actor {

    def wrappedReceive: Receive // Receive类型是一个偏函数

    def receive: Receive = {
        case x => if (wrappedReceive.isDefinedAt(x)) wrappedReceive(x) else unhandled(x)
    }
}

大部分人在实现业务逻辑时可能如下

class MyActor extends ActorStack {

    def wrappedReceive: Receive = {
        case Something => balabala
    }
}

这种情况下会有个小小的性能浪费,每次接收消息的时候,至少要创建一次偏函数对象。如果消息又正好在wrappedReceive里定义了的话,创建了2次偏函数对象,因为调用了2次wrappedReceive方法;这个是可以避免的,用一个成员把这个偏函数对象保存起来,避免每次都创建:

trait ActorStack extends Actor {

    def wrappedReceive: Receive

    private val logic = wrappedReceive

    def receive: Receive = {
        case x => if (logic.isDefinedAt(x)) logic(x) else unhandled(x)
    }
}

或许看客们又好奇,那么Akka对receive这个函数返回的偏函数又是怎么处理的?是否也存在每次都生产一下(同样的性能浪费),还是保存在某个地方? 没错,它在底层实现里是被放到behaviorStack这个行为栈里的,每次处理逻辑的时候从行为栈里取出这个偏函数进行调用。