标签归档:actor

Akka的RoundRobinRouting实现存在bug

Akka的RoundRobinRouting实现存在bug,当累计执行超过Long最大值之后(变为负数),会导致分散不均,最后一个节点永远分配不上,直到累计值再次变为正数才恢复均匀。代码如下(2.4.9版本):

final class RoundRobinRoutingLogic extends RoutingLogic {
  val next = new AtomicLong

  override def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee =
    if (routees.nonEmpty) {
      val size = routees.size
      val index = (next.getAndIncrement % size).asInstanceOf[Int]
      routees(if (index < 0) size + index - 1 else index)
    } else NoRoutee

}

上面select函数里当index为负数时,返回size + index - 1存在问题,用一段java代码验证一下:

import java.util.concurrent.atomic.AtomicLong;

public class Test {

    static final AtomicLong next = new AtomicLong(-2003);
    static final int size = 4;

    static int getNext() {
        int index = (int) (next.getAndIncrement() % size);
        if (index < 0)
            return size + index - 1;
        else
            return index;
    }

    public static void main(String[] args) {
        for (int n = 0; n < 8; n++) {
            System.out.println(getNext());
        }
    }
}

// 运行后输出:
0
1
2
0
0
1
2
0

上面的代码假设有4个节点,当next为负数后,一直在"0,1,2"这三个节点上分配,不会分配给最后一个节点(即索引为3的),修正的方式是当index为负数后,返回其绝对值(即 -index)即可。

再谈AsyncHttpClient

以前曾经说过为何我们不使用AsyncHttpClient,当时主要是从需求出发,性能上不需要。不过在业务做活动推广的场景下,性能上是必须要考虑的;另外在Actor模型里,要调用第三方的http接口,为了避免Actor收发消息的底层线程池被阻塞也必须用异步的方式。在去年年底我们的一次活动,Akka里使用AsyncHttpClient(1.9版本)已经很好的验证过它们的组合以及惊诧的性能了。

最近看到AsyncHttpClient在2.0后被重构了,包名从com.ning变为了org.asynchttpclient,里面的一些类也有所重构,也升级了所依赖的netty的版本。最近正好一些新项目里调用第三方的http接口,尝试了一下新的AsyncHttpClient,结果发现它的readTimeout参数并不work,下面是两个版本的测试程序:

import com.ning.http.client.AsyncCompletionHandler;
import com.ning.http.client.AsyncHttpClientConfig;
import com.ning.http.client.Response;

public class AsyncV1Test {

    static class MyCallback extends AsyncCompletionHandler<Boolean> {

        @Override
        public Boolean onCompleted(Response response) throws Exception {
            System.out.println(response.getResponseBody());
            return true;
        }

        public void onThrowable(Throwable t) {
            t.printStackTrace(System.err);
        }
    }

    public static void main(String[] args) throws Exception {
        AsyncHttpClientConfig cfg = new AsyncHttpClientConfig.Builder().setReadTimeout(100).build();
        com.ning.http.client.AsyncHttpClient cli = new com.ning.http.client.AsyncHttpClient(cfg);
        cli.prepareGet("http://localhost:8080/sleep?time=200").execute(new MyCallback());

        // wait and quit
        Thread.sleep(1000);
        cli.close();
    }
}

上面是老版本的com.ning的客户端的readTimeout测试,设置的是100毫秒,请求的url在server端会sleep 200毫秒,超时的异常符合预期:

java.util.concurrent.TimeoutException: Read timeout to localhost/127.0.0.1:8080 of 100 ms
    at com.ning.http.client.providers.netty.request.timeout.TimeoutTimerTask.expire(TimeoutTimerTask.java:47)
    at com.ning.http.client.providers.netty.request.timeout.ReadTimeoutTimerTask.run(ReadTimeoutTimerTask.java:57)
    at org.jboss.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:556)
    at org.jboss.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:632)
    at org.jboss.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:369)
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at java.lang.Thread.run(Thread.java:745)

在新版本的org.asynchttpclient里测试同样的逻辑:

import org.asynchttpclient.AsyncCompletionHandler;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.Response;

public class AsyncV2Test {

    static class MyCallback extends AsyncCompletionHandler<Boolean> {

        @Override
        public Boolean onCompleted(Response response) throws Exception {
            System.out.println(response.getResponseBody());
            return true;
        }

        public void onThrowable(Throwable t) {
            t.printStackTrace(System.err);
        }
    }

    public static void main(String[] args) throws Exception{

        DefaultAsyncHttpClientConfig cfg = new DefaultAsyncHttpClientConfig.Builder().setReadTimeout(100).build();
        org.asynchttpclient.AsyncHttpClient cli = new DefaultAsyncHttpClient(cfg);
        cli.prepareGet("http://localhost:8080/sleep?time=200").execute(new MyCallback());

        // wait and quit
        Thread.sleep(1000);
        cli.close();
    }
}       

却没有发生超时的异常,而是正常打印出了最终的请求结果。估计是新版本重构后所带来的bug,我还是回退到老的com.ning.http.client.AsyncHttpClient

Actor里的偏函数与性能

Evan Chan分享的《Akka in Production: Our Story》是一篇非常务实的工程实践分享,里面有很多可借鉴的点子,其中的一个对消息接受的包装逻辑主要预留了扩展点:

trait ActorStack extends Actor {

    def wrappedReceive: Receive // Receive类型是一个偏函数

    def receive: Receive = {
        case x => if (wrappedReceive.isDefinedAt(x)) wrappedReceive(x) else unhandled(x)
    }
}

大部分人在实现业务逻辑时可能如下

class MyActor extends ActorStack {

    def wrappedReceive: Receive = {
        case Something => balabala
    }
}

这种情况下会有个小小的性能浪费,每次接收消息的时候,至少要创建一次偏函数对象。如果消息又正好在wrappedReceive里定义了的话,创建了2次偏函数对象,因为调用了2次wrappedReceive方法;这个是可以避免的,用一个成员把这个偏函数对象保存起来,避免每次都创建:

trait ActorStack extends Actor {

    def wrappedReceive: Receive

    private val logic = wrappedReceive

    def receive: Receive = {
        case x => if (logic.isDefinedAt(x)) logic(x) else unhandled(x)
    }
}

或许看客们又好奇,那么Akka对receive这个函数返回的偏函数又是怎么处理的?是否也存在每次都生产一下(同样的性能浪费),还是保存在某个地方? 没错,它在底层实现里是被放到behaviorStack这个行为栈里的,每次处理逻辑的时候从行为栈里取出这个偏函数进行调用。

Patterns.ask 是使用一个临时创建的actor发消息而非自身

用ask发消息给另一个actor时,在接收方收到消息时对发送者做了watch,但发现并不是目标actor,而是一个临时的actor: Actor[akka://AkkaActorSystem/temp/$a]

发送者actor里的逻辑:

val f = targetActor ? Register
Await.result(f, timeout.duration)

接收者actor里的逻辑:

case Register => {
  this.observable = sender
  watch(sender)
  become(...)
}

后来发现这个是因为ask模式就这么设计的,它使用一个临时创建的actor来转发消息并等待对方返回。

跟踪到ask方法内部,是创建了一个PromiseActorRef:

val a = new PromiseActorRef(provider, result)产生了一个临时的Actor[akka://AkkaActorSystem/temp/$a] 被用来代替当前actor发送消息给目标,当它接收到目标回应的结果后,会被卸载掉。

对actor的邮箱计数

假设某个actor里消息有下面几种类型:

def wrappedReceive: PartialFunction[Any, Unit] = {
    case "kafkaReady"     => sendRequest()
    case Some(Task(data)) => assign(data)
    case None             => await()
    case "done"           => next()
}

当我想要查看这个actor的邮箱有没有堆积时,一个简单的方式是通过jmap来看类型的实例数:

$ jmap -histo:live 12662 

num     #instances         #bytes  class name
----------------------------------------------
...
3:   51906   9965952    com.wacai.xxx.dataobject.XX
...
6:   149338  3584112    java.util.concurrent.ConcurrentLinkedQueue$Node
7:   149318  3583632    akka.dispatch.Envelope
...
17:  7266    232512     scala.collection.mutable.ListBuffer
...
21:  7274    116384     scala.Some
22:  7266    116256     com.wacai.xxx.bridge.actor.Task

注意必须以live方式执行jmap触发一次full gc回收掉无用对象,否则计数不准确。上面大致可以看出Some[Task]对象实例有7266个,Envelop有149318个,因为None是一个单例,所以尽管可能堆积多条这样的消息,实例数里却只有1个,只能通过 Envelop的实例数减去其他类型的实例数来判断,但问题是Envelop实例数是所有actor的消息实例数;另外像String类型这样的消息类型也可能存在多个,很难准确判断每个actor的邮箱到底堆积了多少条。

Akka在2.0的时候去掉了mailboxSize方法,参考官方的这篇blog, 里面说了为何不再支持这个方法,也提到如果你真的想要获取mailboxSize可以自己扩展一下,于是按照这个思路,我们对enqueuedequeue做一个计数,以便在某些场景判断mailbox是否有堆积。

class MyMailbox extends akka.dispatch.UnboundedMailbox.MessageQueue {
    private val counter = new java.util.concurrent.atomic.AtomicInteger(0)

    override def dequeue(): Envelope = {
        counter.decrementAndGet()
        super.dequeue()
    }

    override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
        counter.incrementAndGet()
        super.enqueue(receiver, handle)
    }

    override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
        counter.set(0)
        super.cleanUp(owner, deadLetters)
    }

    def getSize(): Int = counter.get()
}

class MyMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType {

    override def create(owner: Option[ActorRef], system: Option[ActorSystem]) = 
        (owner, system) match {
        case (Some(o), Some(s)) ⇒
            val mailbox = new MyMailbox
            MailboxExtension(s).register(o, mailbox)
            mailbox
        case _ ⇒ throw new Exception("no mailbox owner or system given")
    }
}

然后放到一个akka的extension里使用:

object MailboxExtension extends ExtensionId[MailboxExtension] with ExtensionIdProvider {
    override def createExtension(s: ExtendedActorSystem) = new MailboxExtension(s)

    override def lookup = this

    def getSize()(implicit context: ActorContext): Int = 
                    MailboxExtension(context.system).getSize()
}


class MailboxExtension(val system: ExtendedActorSystem) extends Extension {

    private val mboxMap = new java.util.concurrent.ConcurrentHashMap[ActorRef, MyMailbox]

    def register(actorRef: ActorRef, mailbox: MyMailbox): Unit = mboxMap.put(actorRef, mailbox)

    def unregister(actorRef: ActorRef): Unit = mboxMap.remove(actorRef)

    def getSize()(implicit context: ActorContext): Int = {
        val mbox = mboxMap.get(context.self)
        if (mbox == null)
            throw new IllegalArgumentException("Mailbox not registered for: " + context.self)
        mbox.getSize()
    }
}

Never ever block an actor

遇到一个Akka进程没法被正常停止的情况,程序退出时调用了system.awaitTermination,发现阻塞在了某个actor线程上:

"AkkaActorSystem-akka.actor.default-dispatcher-8" #25 prio=5 os_prio=0 tid=0x00007f319c002800 nid=0x76b2 waiting on condition [0x00007f31fbcfb000]
 java.lang.Thread.State: WAITING (parking)
  at sun.misc.Unsafe.park(Native Method)
  - parking to wait for  <0x00000000c0645828> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
  at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
  at xxx.KafkaMessageReceiver.prepareTask(KafkaMessageReceiver.scala:78)
  ...
  at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
  at com.wacai.csw.bridge.service.KafkaMessageReceiver.aroundReceive(KafkaMessageReceiver.scala:21)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
  at akka.actor.ActorCell.invoke(ActorCell.scala:487)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
  at akka.dispatch.Mailbox.run(Mailbox.scala:221)
  at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

这个程序里主要有2个Actor,一个负责从kafka读取数据,另一个处理这些数据。为了避免两边的流速不匹配,采用了pull模式,kafka消费者Actor跟DataProcessor之间通过一个BlockingQueue来维持平衡;而ProcessorActor在从BlockingQueue获取数据时因为使用了take方法导致了Actor线程被阻塞,系统关闭时等待这个Actor的消息处理完,也被阻塞住了。

按说KafkaConsumer端在往BlockqingQueue放数据(put方法)时也可能存在类似阻塞的情况,但这个Consumer并未使用Akka Actor的线程调度,而是一个独立的线程,所以并不影响system.awaitTermination

解决的方式是将take替换为poll阻塞一个可以接受的时间依然拿不到数据返回None,在另一方Actor的receive方法里针对None类型数据再做轮询。

使用Actor模型有个原则是“never ever block an actor”,切记。