Patterns.ask 是使用一个临时创建的actor发消息而非自身

用ask发消息给另一个actor时,在接收方收到消息时对发送者做了watch,但发现并不是目标actor,而是一个临时的actor: Actor[akka://AkkaActorSystem/temp/$a]

发送者actor里的逻辑:

val f = targetActor ? Register
Await.result(f, timeout.duration)

接收者actor里的逻辑:

case Register => {
  this.observable = sender
  watch(sender)
  become(...)
}

后来发现这个是因为ask模式就这么设计的,它使用一个临时创建的actor来转发消息并等待对方返回。

跟踪到ask方法内部,是创建了一个PromiseActorRef:

val a = new PromiseActorRef(provider, result)产生了一个临时的Actor[akka://AkkaActorSystem/temp/$a] 被用来代替当前actor发送消息给目标,当它接收到目标回应的结果后,会被卸载掉。

对actor的邮箱计数

假设某个actor里消息有下面几种类型:

def wrappedReceive: PartialFunction[Any, Unit] = {
    case "kafkaReady"     => sendRequest()
    case Some(Task(data)) => assign(data)
    case None             => await()
    case "done"           => next()
}

当我想要查看这个actor的邮箱有没有堆积时,一个简单的方式是通过jmap来看类型的实例数:

$ jmap -histo:live 12662 

num     #instances         #bytes  class name
----------------------------------------------
...
3:   51906   9965952    com.wacai.xxx.dataobject.XX
...
6:   149338  3584112    java.util.concurrent.ConcurrentLinkedQueue$Node
7:   149318  3583632    akka.dispatch.Envelope
...
17:  7266    232512     scala.collection.mutable.ListBuffer
...
21:  7274    116384     scala.Some
22:  7266    116256     com.wacai.xxx.bridge.actor.Task

注意必须以live方式执行jmap触发一次full gc回收掉无用对象,否则计数不准确。上面大致可以看出Some[Task]对象实例有7266个,Envelop有149318个,因为None是一个单例,所以尽管可能堆积多条这样的消息,实例数里却只有1个,只能通过 Envelop的实例数减去其他类型的实例数来判断,但问题是Envelop实例数是所有actor的消息实例数;另外像String类型这样的消息类型也可能存在多个,很难准确判断每个actor的邮箱到底堆积了多少条。

Akka在2.0的时候去掉了mailboxSize方法,参考官方的这篇blog, 里面说了为何不再支持这个方法,也提到如果你真的想要获取mailboxSize可以自己扩展一下,于是按照这个思路,我们对enqueuedequeue做一个计数,以便在某些场景判断mailbox是否有堆积。

class MyMailbox extends akka.dispatch.UnboundedMailbox.MessageQueue {
    private val counter = new java.util.concurrent.atomic.AtomicInteger(0)

    override def dequeue(): Envelope = {
        counter.decrementAndGet()
        super.dequeue()
    }

    override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
        counter.incrementAndGet()
        super.enqueue(receiver, handle)
    }

    override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
        counter.set(0)
        super.cleanUp(owner, deadLetters)
    }

    def getSize(): Int = counter.get()
}

class MyMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType {

    override def create(owner: Option[ActorRef], system: Option[ActorSystem]) = 
        (owner, system) match {
        case (Some(o), Some(s)) ⇒
            val mailbox = new MyMailbox
            MailboxExtension(s).register(o, mailbox)
            mailbox
        case _ ⇒ throw new Exception("no mailbox owner or system given")
    }
}

然后放到一个akka的extension里使用:

object MailboxExtension extends ExtensionId[MailboxExtension] with ExtensionIdProvider {
    override def createExtension(s: ExtendedActorSystem) = new MailboxExtension(s)

    override def lookup = this

    def getSize()(implicit context: ActorContext): Int = 
                    MailboxExtension(context.system).getSize()
}


class MailboxExtension(val system: ExtendedActorSystem) extends Extension {

    private val mboxMap = new java.util.concurrent.ConcurrentHashMap[ActorRef, MyMailbox]

    def register(actorRef: ActorRef, mailbox: MyMailbox): Unit = mboxMap.put(actorRef, mailbox)

    def unregister(actorRef: ActorRef): Unit = mboxMap.remove(actorRef)

    def getSize()(implicit context: ActorContext): Int = {
        val mbox = mboxMap.get(context.self)
        if (mbox == null)
            throw new IllegalArgumentException("Mailbox not registered for: " + context.self)
        mbox.getSize()
    }
}

Never ever block an actor

遇到一个Akka进程没法被正常停止的情况,程序退出时调用了system.awaitTermination,发现阻塞在了某个actor线程上:

"AkkaActorSystem-akka.actor.default-dispatcher-8" #25 prio=5 os_prio=0 tid=0x00007f319c002800 nid=0x76b2 waiting on condition [0x00007f31fbcfb000]
 java.lang.Thread.State: WAITING (parking)
  at sun.misc.Unsafe.park(Native Method)
  - parking to wait for  <0x00000000c0645828> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
  at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
  at xxx.KafkaMessageReceiver.prepareTask(KafkaMessageReceiver.scala:78)
  ...
  at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
  at com.wacai.csw.bridge.service.KafkaMessageReceiver.aroundReceive(KafkaMessageReceiver.scala:21)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
  at akka.actor.ActorCell.invoke(ActorCell.scala:487)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
  at akka.dispatch.Mailbox.run(Mailbox.scala:221)
  at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

这个程序里主要有2个Actor,一个负责从kafka读取数据,另一个处理这些数据。为了避免两边的流速不匹配,采用了pull模式,kafka消费者Actor跟DataProcessor之间通过一个BlockingQueue来维持平衡;而ProcessorActor在从BlockingQueue获取数据时因为使用了take方法导致了Actor线程被阻塞,系统关闭时等待这个Actor的消息处理完,也被阻塞住了。

按说KafkaConsumer端在往BlockqingQueue放数据(put方法)时也可能存在类似阻塞的情况,但这个Consumer并未使用Akka Actor的线程调度,而是一个独立的线程,所以并不影响system.awaitTermination

解决的方式是将take替换为poll阻塞一个可以接受的时间依然拿不到数据返回None,在另一方Actor的receive方法里针对None类型数据再做轮询。

使用Actor模型有个原则是“never ever block an actor”,切记。