Akka的RoundRobinRouting实现存在bug

Akka的RoundRobinRouting实现存在bug,当累计执行超过Long最大值之后(变为负数),会导致分散不均,最后一个节点永远分配不上,直到累计值再次变为正数才恢复均匀。代码如下(2.4.9版本):

final class RoundRobinRoutingLogic extends RoutingLogic {
  val next = new AtomicLong

  override def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee =
    if (routees.nonEmpty) {
      val size = routees.size
      val index = (next.getAndIncrement % size).asInstanceOf[Int]
      routees(if (index < 0) size + index - 1 else index)
    } else NoRoutee

}

上面select函数里当index为负数时,返回size + index - 1存在问题,用一段java代码验证一下:

import java.util.concurrent.atomic.AtomicLong;

public class Test {

    static final AtomicLong next = new AtomicLong(-2003);
    static final int size = 4;

    static int getNext() {
        int index = (int) (next.getAndIncrement() % size);
        if (index < 0)
            return size + index - 1;
        else
            return index;
    }

    public static void main(String[] args) {
        for (int n = 0; n < 8; n++) {
            System.out.println(getNext());
        }
    }
}

// 运行后输出:
0
1
2
0
0
1
2
0

上面的代码假设有4个节点,当next为负数后,一直在"0,1,2"这三个节点上分配,不会分配给最后一个节点(即索引为3的),修正的方式是当index为负数后,返回其绝对值(即 -index)即可。

再谈AsyncHttpClient

以前曾经说过为何我们不使用AsyncHttpClient,当时主要是从需求出发,性能上不需要。不过在业务做活动推广的场景下,性能上是必须要考虑的;另外在Actor模型里,要调用第三方的http接口,为了避免Actor收发消息的底层线程池被阻塞也必须用异步的方式。在去年年底我们的一次活动,Akka里使用AsyncHttpClient(1.9版本)已经很好的验证过它们的组合以及惊诧的性能了。

最近看到AsyncHttpClient在2.0后被重构了,包名从com.ning变为了org.asynchttpclient,里面的一些类也有所重构,也升级了所依赖的netty的版本。最近正好一些新项目里调用第三方的http接口,尝试了一下新的AsyncHttpClient,结果发现它的readTimeout参数并不work,下面是两个版本的测试程序:

import com.ning.http.client.AsyncCompletionHandler;
import com.ning.http.client.AsyncHttpClientConfig;
import com.ning.http.client.Response;

public class AsyncV1Test {

    static class MyCallback extends AsyncCompletionHandler<Boolean> {

        @Override
        public Boolean onCompleted(Response response) throws Exception {
            System.out.println(response.getResponseBody());
            return true;
        }

        public void onThrowable(Throwable t) {
            t.printStackTrace(System.err);
        }
    }

    public static void main(String[] args) throws Exception {
        AsyncHttpClientConfig cfg = new AsyncHttpClientConfig.Builder().setReadTimeout(100).build();
        com.ning.http.client.AsyncHttpClient cli = new com.ning.http.client.AsyncHttpClient(cfg);
        cli.prepareGet("http://localhost:8080/sleep?time=200").execute(new MyCallback());

        // wait and quit
        Thread.sleep(1000);
        cli.close();
    }
}

上面是老版本的com.ning的客户端的readTimeout测试,设置的是100毫秒,请求的url在server端会sleep 200毫秒,超时的异常符合预期:

java.util.concurrent.TimeoutException: Read timeout to localhost/127.0.0.1:8080 of 100 ms
    at com.ning.http.client.providers.netty.request.timeout.TimeoutTimerTask.expire(TimeoutTimerTask.java:47)
    at com.ning.http.client.providers.netty.request.timeout.ReadTimeoutTimerTask.run(ReadTimeoutTimerTask.java:57)
    at org.jboss.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:556)
    at org.jboss.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:632)
    at org.jboss.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:369)
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at java.lang.Thread.run(Thread.java:745)

在新版本的org.asynchttpclient里测试同样的逻辑:

import org.asynchttpclient.AsyncCompletionHandler;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.Response;

public class AsyncV2Test {

    static class MyCallback extends AsyncCompletionHandler<Boolean> {

        @Override
        public Boolean onCompleted(Response response) throws Exception {
            System.out.println(response.getResponseBody());
            return true;
        }

        public void onThrowable(Throwable t) {
            t.printStackTrace(System.err);
        }
    }

    public static void main(String[] args) throws Exception{

        DefaultAsyncHttpClientConfig cfg = new DefaultAsyncHttpClientConfig.Builder().setReadTimeout(100).build();
        org.asynchttpclient.AsyncHttpClient cli = new DefaultAsyncHttpClient(cfg);
        cli.prepareGet("http://localhost:8080/sleep?time=200").execute(new MyCallback());

        // wait and quit
        Thread.sleep(1000);
        cli.close();
    }
}       

却没有发生超时的异常,而是正常打印出了最终的请求结果。估计是新版本重构后所带来的bug,我还是回退到老的com.ning.http.client.AsyncHttpClient

Actor里的偏函数与性能

Evan Chan分享的《Akka in Production: Our Story》是一篇非常务实的工程实践分享,里面有很多可借鉴的点子,其中的一个对消息接受的包装逻辑主要预留了扩展点:

trait ActorStack extends Actor {

    def wrappedReceive: Receive // Receive类型是一个偏函数

    def receive: Receive = {
        case x => if (wrappedReceive.isDefinedAt(x)) wrappedReceive(x) else unhandled(x)
    }
}

大部分人在实现业务逻辑时可能如下

class MyActor extends ActorStack {

    def wrappedReceive: Receive = {
        case Something => balabala
    }
}

这种情况下会有个小小的性能浪费,每次接收消息的时候,至少要创建一次偏函数对象。如果消息又正好在wrappedReceive里定义了的话,创建了2次偏函数对象,因为调用了2次wrappedReceive方法;这个是可以避免的,用一个成员把这个偏函数对象保存起来,避免每次都创建:

trait ActorStack extends Actor {

    def wrappedReceive: Receive

    private val logic = wrappedReceive

    def receive: Receive = {
        case x => if (logic.isDefinedAt(x)) logic(x) else unhandled(x)
    }
}

或许看客们又好奇,那么Akka对receive这个函数返回的偏函数又是怎么处理的?是否也存在每次都生产一下(同样的性能浪费),还是保存在某个地方? 没错,它在底层实现里是被放到behaviorStack这个行为栈里的,每次处理逻辑的时候从行为栈里取出这个偏函数进行调用。

Patterns.ask 是使用一个临时创建的actor发消息而非自身

用ask发消息给另一个actor时,在接收方收到消息时对发送者做了watch,但发现并不是目标actor,而是一个临时的actor: Actor[akka://AkkaActorSystem/temp/$a]

发送者actor里的逻辑:

val f = targetActor ? Register
Await.result(f, timeout.duration)

接收者actor里的逻辑:

case Register => {
  this.observable = sender
  watch(sender)
  become(...)
}

后来发现这个是因为ask模式就这么设计的,它使用一个临时创建的actor来转发消息并等待对方返回。

跟踪到ask方法内部,是创建了一个PromiseActorRef:

val a = new PromiseActorRef(provider, result)产生了一个临时的Actor[akka://AkkaActorSystem/temp/$a] 被用来代替当前actor发送消息给目标,当它接收到目标回应的结果后,会被卸载掉。