月度归档:2015年06月

对actor的邮箱计数

假设某个actor里消息有下面几种类型:

def wrappedReceive: PartialFunction[Any, Unit] = {
    case "kafkaReady"     => sendRequest()
    case Some(Task(data)) => assign(data)
    case None             => await()
    case "done"           => next()
}

当我想要查看这个actor的邮箱有没有堆积时,一个简单的方式是通过jmap来看类型的实例数:

$ jmap -histo:live 12662 

num     #instances         #bytes  class name
----------------------------------------------
...
3:   51906   9965952    com.wacai.xxx.dataobject.XX
...
6:   149338  3584112    java.util.concurrent.ConcurrentLinkedQueue$Node
7:   149318  3583632    akka.dispatch.Envelope
...
17:  7266    232512     scala.collection.mutable.ListBuffer
...
21:  7274    116384     scala.Some
22:  7266    116256     com.wacai.xxx.bridge.actor.Task

注意必须以live方式执行jmap触发一次full gc回收掉无用对象,否则计数不准确。上面大致可以看出Some[Task]对象实例有7266个,Envelop有149318个,因为None是一个单例,所以尽管可能堆积多条这样的消息,实例数里却只有1个,只能通过 Envelop的实例数减去其他类型的实例数来判断,但问题是Envelop实例数是所有actor的消息实例数;另外像String类型这样的消息类型也可能存在多个,很难准确判断每个actor的邮箱到底堆积了多少条。

Akka在2.0的时候去掉了mailboxSize方法,参考官方的这篇blog, 里面说了为何不再支持这个方法,也提到如果你真的想要获取mailboxSize可以自己扩展一下,于是按照这个思路,我们对enqueuedequeue做一个计数,以便在某些场景判断mailbox是否有堆积。

class MyMailbox extends akka.dispatch.UnboundedMailbox.MessageQueue {
    private val counter = new java.util.concurrent.atomic.AtomicInteger(0)

    override def dequeue(): Envelope = {
        counter.decrementAndGet()
        super.dequeue()
    }

    override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
        counter.incrementAndGet()
        super.enqueue(receiver, handle)
    }

    override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
        counter.set(0)
        super.cleanUp(owner, deadLetters)
    }

    def getSize(): Int = counter.get()
}

class MyMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType {

    override def create(owner: Option[ActorRef], system: Option[ActorSystem]) = 
        (owner, system) match {
        case (Some(o), Some(s)) ⇒
            val mailbox = new MyMailbox
            MailboxExtension(s).register(o, mailbox)
            mailbox
        case _ ⇒ throw new Exception("no mailbox owner or system given")
    }
}

然后放到一个akka的extension里使用:

object MailboxExtension extends ExtensionId[MailboxExtension] with ExtensionIdProvider {
    override def createExtension(s: ExtendedActorSystem) = new MailboxExtension(s)

    override def lookup = this

    def getSize()(implicit context: ActorContext): Int = 
                    MailboxExtension(context.system).getSize()
}


class MailboxExtension(val system: ExtendedActorSystem) extends Extension {

    private val mboxMap = new java.util.concurrent.ConcurrentHashMap[ActorRef, MyMailbox]

    def register(actorRef: ActorRef, mailbox: MyMailbox): Unit = mboxMap.put(actorRef, mailbox)

    def unregister(actorRef: ActorRef): Unit = mboxMap.remove(actorRef)

    def getSize()(implicit context: ActorContext): Int = {
        val mbox = mboxMap.get(context.self)
        if (mbox == null)
            throw new IllegalArgumentException("Mailbox not registered for: " + context.self)
        mbox.getSize()
    }
}

fastjson与awt

晚上在repl下模拟一个json的解析时发现,当调用fastjson时会在我的dock上弹出一个Java应用的icon,如下图:

这通常是程序里引用awt/swing之类的情况下才会发生,我很奇怪fastjson怎么会引起,对repl添加启动参数-J-verbose:class看看到底是否有加载awt先关的类:

看来温少考虑到了这种类型的解析场景,在代码里有相关的引用。

觉得心烦的话,启动repl时加一个java.awt.headless=true参数吧。

Georgia的多重翻译

在西溪福地创业园的时候,电梯里的广告曾打过一款可口可乐公司的咖啡“乔雅”,对应的英文”Georgia”。翻译这个品牌的人可能想要一个中国人容易记住的名字,猜测可能与“布尔乔亚”有关?

Georgia这个单词作为地名也有好几个意思,可能是美国的影响力太大了,以至于大多提到Georgia这个地方大家先想到的就是佐治亚州。可口可乐公司大本营就在佐治亚州,所以乔雅咖啡可能就是佐治亚州本地的一种咖啡?

几个月前看过一部电影《西伯利亚教育》,里面也出现了Georgia这个地方,但字幕组的翻译者直接翻译成了佐治亚州

这让斯大林同志出生地的格鲁吉亚人民很难堪

scalastyle工具

scalastyle 是个简单易用的code style检测工具,非常轻巧。有助于团队风格一致。集成在maven里用很方便。

从github里找到一个scalastyle_config.xml,有几个默认开启的选项对我们不太适用,可以关闭:

// 对文件开头的注释(licence)检测,非api代码的话建议关闭
class="org.scalastyle.file.HeaderMatchesChecker" level="warning" enabled="false"

// 强制 if 后边适用花括号
class="org.scalastyle.scalariform.IfBraceChecker" level="warning" enabled="false"

// 结尾必须有换行符
class="org.scalastyle.file.NewLineAtEofChecker" level="warning" enabled="false"

// 注释内容必须在注释符之后有个空格,如果ide格式化能保证的话最好,做不到且觉得心烦可以去掉
class="org.scalastyle.scalariform.SpaceAfterCommentStartChecker" level="warning" enabled="false"

// 如果开启,对于返回值为Unit的函数(也称为过程函数)要显式的声明返回值类型 Unit,没必要
class="org.scalastyle.scalariform.ProcedureDeclarationChecker" level="warning" enabled="false"

// 要看情况,如果日志里刻意打印一些连续的字符,可以把这个警告关闭
class="org.scalastyle.scalariform.MultipleStringLiteralsChecker" level="warning" enabled="false"

另外,程序里如果有正常使用println的情况,可以在RegexChecker里去掉值为”println”的”regex”参数

华东地区scala爱好者聚会(2015上海)

昨天下午在上海陆家嘴软件园组织了2015华东地区scala爱好者聚会,原本有6个topic,不巧老高和Intel的钟翔时间冲突没有参与。

报名人数有40人,到场的30人左右。4个topic分别是:

《scala配置与宏》
《scala大数据处理》
《某金融公司的scala使用经验》
《中等创业公司的后端技术选型》

聚会上有几个杭州的创业公司专门赶去的,他们比较进取,产品完全采用scala开发,甚至App端也采用scala。

分享的PPT除了某公司一篇不便公开的,其他都已经放在github上了。

kafka异步发送也存在消息重复的可能

遇到一个业务上的情况,在压力很大的情况下,某个topic的消息出现了小部分的冗余,发送端采用async模式。不确定kafka的async模式发送消息是否也会重试,文档里提到async模式发送增加了丢消息的可能,不知道message.send.max.retries参数是否也会起作用,群里讨论了一下,晚上跟踪了一下代码发现也是有 retry 的。

kafka.producer.async.DefaultEventHandlerhandle方法里

while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
  ......
}

看来不论同步还是异步发送,消息重复的可能都是存在的。