标签归档:scala

Int与Integer的拆箱问题

一段程序从java迁移到scala后遇到的,

private val map = new java.util.concurrent.ConcurrentHashMap[String, Int]

val c = map.get(key)

// 下面的if语句在eclipse里提示
// comparing values of types Int and Null using `==' will always yield false
if (c == null) { 
    ...  
}

问题简化后如下:

scala> val m = new java.util.concurrent.ConcurrentHashMap[String, Int]
m: java.util.concurrent.ConcurrentHashMap[String,Int] = {}

scala> m.get("key")
res0: Int = 0

scala> val m = new java.util.concurrent.ConcurrentHashMap[String, java.lang.Integer]
m: java.util.concurrent.ConcurrentHashMap[String,Integer] = {}

scala> m.get("key")
res2: Integer = null

为何value设置为scala里的Int类型得到0而用java里的Integer确是null? 跟以前遇到的null造型为值类型时为何不抛异常 是相似的事

$ scala -Xprint:jvm -e 'val m = new java.util.concurrent.ConcurrentHashMap[String, Int]; val c = m.get("key")'
...
anon$1.this.m = new java.util.concurrent.ConcurrentHashMap();
anon$1.this.c = scala.Int.unbox(anon$1.this.m().get("key"));
...

$ scala -Xprint:jvm -e 'val n:Int = null.asInstanceOf[Int]' ...
anon$1.this.n = scala.Int.unbox(null);
...

scala> val n:Int = null.asInstanceOf[Int]
n: Int = 0

scala> scala.Int.unbox(null)
res3: Int = 0



scala> null.asInstanceOf[java.lang.Integer]
res5: Integer = null

scala> null.asInstanceOf[Int]
res6: Int = 0

scala.Int.unbox 注释里提到运行时是 scala.runtime.BoxesRunTime里的unboxToInt方法

package scala.runtime.BoxesRunTime;

public static int unboxToInt(Object i) {
    return i == null ? 0 : ((java.lang.Integer)i).intValue();
}

还是在对待Int类型时,两种语言设计上的差异,Scala里统一了primitive类型和引用类型。

Actor里的偏函数与性能

Evan Chan分享的《Akka in Production: Our Story》是一篇非常务实的工程实践分享,里面有很多可借鉴的点子,其中的一个对消息接受的包装逻辑主要预留了扩展点:

trait ActorStack extends Actor {

    def wrappedReceive: Receive // Receive类型是一个偏函数

    def receive: Receive = {
        case x => if (wrappedReceive.isDefinedAt(x)) wrappedReceive(x) else unhandled(x)
    }
}

大部分人在实现业务逻辑时可能如下

class MyActor extends ActorStack {

    def wrappedReceive: Receive = {
        case Something => balabala
    }
}

这种情况下会有个小小的性能浪费,每次接收消息的时候,至少要创建一次偏函数对象。如果消息又正好在wrappedReceive里定义了的话,创建了2次偏函数对象,因为调用了2次wrappedReceive方法;这个是可以避免的,用一个成员把这个偏函数对象保存起来,避免每次都创建:

trait ActorStack extends Actor {

    def wrappedReceive: Receive

    private val logic = wrappedReceive

    def receive: Receive = {
        case x => if (logic.isDefinedAt(x)) logic(x) else unhandled(x)
    }
}

或许看客们又好奇,那么Akka对receive这个函数返回的偏函数又是怎么处理的?是否也存在每次都生产一下(同样的性能浪费),还是保存在某个地方? 没错,它在底层实现里是被放到behaviorStack这个行为栈里的,每次处理逻辑的时候从行为栈里取出这个偏函数进行调用。

scala雾中风景(24): break与异常捕获

这段代码如果把异常捕获部分的Exception替换为Throwable会有什么不同?

import scala.util.control.Breaks.break
import scala.util.control.Breaks.breakable

object Main {

  @throws(classOf[Exception])
  def prepare() = false

  def main(args:Array[String]) {
    var i = 0

    while(i < 3) {
      breakable {

        try{
          i+=1
          if(!prepare()) break
          println("do something here")
        }catch {
          //case e: Throwable => e.printStackTrace
          case e: Exception => e.printStackTrace
        }

        println("done")
      }
    }

  }
}

问题的关键在于scala里的break是通过异常机制实现的流控,与java里完全不同。break是抛出了一个BreakControl的异常,也是从Throwable接口继承下来的。所以当应用在捕获异常时,try代码块里有break流控语句,需要注意对流控异常不要干预,如果这种情况下真要捕获Throwable的话,正确的方式是:

try{
    if (...) break
    ...

}catch {
    case e0: ControlThrowable => throw e0 // 不要干预流控的异常
    case e1: Throwable => e1.printStackTrace
}

null造型为值类型时为何不抛异常

问题简化后看一下代码片段:

// Number引用类型
scala> def foo[T](t: T) = {  t.asInstanceOf[Number] }
foo: [T](t: T)Number

scala> foo(null)
res3: Number = null

// Int值类型
scala> def foo[T](t: T) = {  t.asInstanceOf[Int] }
foo: [T](t: T)Int

scala> foo(null)
res4: Int = 0

// 其它值类型
scala> class V(val i:Int) extends AnyVal
defined class V

scala> null.asInstanceOf[V]
res10: V = V@0

把 null 造型为java.lang.Number这种引用类型的时候没有问题,但造型为scala里的值类型时,为何不抛出异常?与直觉不符。背后是装箱,拆箱所致。

在stackoverflow上也有讨论

// 不涉及到unboxing
scala> val a:Any = null.asInstanceOf[Int]
a: Any = null

// 涉及到unboxing
scala> val a = null.asInstanceOf[Int]
a: Int = 0  

// 在scala.Int伴生对象里定义的运行时拆箱逻辑
def unbox(x: java.lang.Object): Int = x.asInstanceOf[java.lang.Integer].intValue()

另外,有人问为什么不抛出空指针异常,因为规范里是抛异常:

asInstanceOf[T ] returns the “null” object itself if T conforms to scala.AnyRef,and throws a NullPointerException otherwise.
A reference to any other member of the “null” object causes a NullPointerException to be thrown.

这个规范已经更新了,2.10里的规范已经没有要求抛出异常了。

https://issues.scala-lang.org/browse/SI-4437

https://github.com/scala/scala-dist/pull/104

对actor的邮箱计数

假设某个actor里消息有下面几种类型:

def wrappedReceive: PartialFunction[Any, Unit] = {
    case "kafkaReady"     => sendRequest()
    case Some(Task(data)) => assign(data)
    case None             => await()
    case "done"           => next()
}

当我想要查看这个actor的邮箱有没有堆积时,一个简单的方式是通过jmap来看类型的实例数:

$ jmap -histo:live 12662 

num     #instances         #bytes  class name
----------------------------------------------
...
3:   51906   9965952    com.wacai.xxx.dataobject.XX
...
6:   149338  3584112    java.util.concurrent.ConcurrentLinkedQueue$Node
7:   149318  3583632    akka.dispatch.Envelope
...
17:  7266    232512     scala.collection.mutable.ListBuffer
...
21:  7274    116384     scala.Some
22:  7266    116256     com.wacai.xxx.bridge.actor.Task

注意必须以live方式执行jmap触发一次full gc回收掉无用对象,否则计数不准确。上面大致可以看出Some[Task]对象实例有7266个,Envelop有149318个,因为None是一个单例,所以尽管可能堆积多条这样的消息,实例数里却只有1个,只能通过 Envelop的实例数减去其他类型的实例数来判断,但问题是Envelop实例数是所有actor的消息实例数;另外像String类型这样的消息类型也可能存在多个,很难准确判断每个actor的邮箱到底堆积了多少条。

Akka在2.0的时候去掉了mailboxSize方法,参考官方的这篇blog, 里面说了为何不再支持这个方法,也提到如果你真的想要获取mailboxSize可以自己扩展一下,于是按照这个思路,我们对enqueuedequeue做一个计数,以便在某些场景判断mailbox是否有堆积。

class MyMailbox extends akka.dispatch.UnboundedMailbox.MessageQueue {
    private val counter = new java.util.concurrent.atomic.AtomicInteger(0)

    override def dequeue(): Envelope = {
        counter.decrementAndGet()
        super.dequeue()
    }

    override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
        counter.incrementAndGet()
        super.enqueue(receiver, handle)
    }

    override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
        counter.set(0)
        super.cleanUp(owner, deadLetters)
    }

    def getSize(): Int = counter.get()
}

class MyMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType {

    override def create(owner: Option[ActorRef], system: Option[ActorSystem]) = 
        (owner, system) match {
        case (Some(o), Some(s)) ⇒
            val mailbox = new MyMailbox
            MailboxExtension(s).register(o, mailbox)
            mailbox
        case _ ⇒ throw new Exception("no mailbox owner or system given")
    }
}

然后放到一个akka的extension里使用:

object MailboxExtension extends ExtensionId[MailboxExtension] with ExtensionIdProvider {
    override def createExtension(s: ExtendedActorSystem) = new MailboxExtension(s)

    override def lookup = this

    def getSize()(implicit context: ActorContext): Int = 
                    MailboxExtension(context.system).getSize()
}


class MailboxExtension(val system: ExtendedActorSystem) extends Extension {

    private val mboxMap = new java.util.concurrent.ConcurrentHashMap[ActorRef, MyMailbox]

    def register(actorRef: ActorRef, mailbox: MyMailbox): Unit = mboxMap.put(actorRef, mailbox)

    def unregister(actorRef: ActorRef): Unit = mboxMap.remove(actorRef)

    def getSize()(implicit context: ActorContext): Int = {
        val mbox = mboxMap.get(context.self)
        if (mbox == null)
            throw new IllegalArgumentException("Mailbox not registered for: " + context.self)
        mbox.getSize()
    }
}

scalastyle工具

scalastyle 是个简单易用的code style检测工具,非常轻巧。有助于团队风格一致。集成在maven里用很方便。

从github里找到一个scalastyle_config.xml,有几个默认开启的选项对我们不太适用,可以关闭:

// 对文件开头的注释(licence)检测,非api代码的话建议关闭
class="org.scalastyle.file.HeaderMatchesChecker" level="warning" enabled="false"

// 强制 if 后边适用花括号
class="org.scalastyle.scalariform.IfBraceChecker" level="warning" enabled="false"

// 结尾必须有换行符
class="org.scalastyle.file.NewLineAtEofChecker" level="warning" enabled="false"

// 注释内容必须在注释符之后有个空格,如果ide格式化能保证的话最好,做不到且觉得心烦可以去掉
class="org.scalastyle.scalariform.SpaceAfterCommentStartChecker" level="warning" enabled="false"

// 如果开启,对于返回值为Unit的函数(也称为过程函数)要显式的声明返回值类型 Unit,没必要
class="org.scalastyle.scalariform.ProcedureDeclarationChecker" level="warning" enabled="false"

// 要看情况,如果日志里刻意打印一些连续的字符,可以把这个警告关闭
class="org.scalastyle.scalariform.MultipleStringLiteralsChecker" level="warning" enabled="false"

另外,程序里如果有正常使用println的情况,可以在RegexChecker里去掉值为”println”的”regex”参数

华东地区scala爱好者聚会(2015上海)

昨天下午在上海陆家嘴软件园组织了2015华东地区scala爱好者聚会,原本有6个topic,不巧老高和Intel的钟翔时间冲突没有参与。

报名人数有40人,到场的30人左右。4个topic分别是:

《scala配置与宏》
《scala大数据处理》
《某金融公司的scala使用经验》
《中等创业公司的后端技术选型》

聚会上有几个杭州的创业公司专门赶去的,他们比较进取,产品完全采用scala开发,甚至App端也采用scala。

分享的PPT除了某公司一篇不便公开的,其他都已经放在github上了。

scala雾中风景(23): Nothing类型引发的NullPointerException

这个问题以前遇到过,这次又发生了,记录一下,避免新人再犯类似错误。

一个DAO的方法调用 mybatis 里的 SqlSessionTemplate.selectOne("...") 方法时没有指定类型,大致代码如下:

def check(...): Boolean = {
    ...
    val data = moneySessionTemplate.selectOne("queryXXX", params)
    data != null
}

程序意图是判断数据库里是否有某条记录,存在则返回返回true。 运行的时候上面的代码可能会抛出 NullPointerException 并提示是在 data != null 这一行,让人乍一看感觉很诡异。

这里又是类型系统的一个”陷阱”,因为val data 是一个不存在的值,它是Nothing类型。为何会是Nothing类型,又是因为selectOne方法的泛型参数在运行期类型无法推断所致的。我们模拟一下:

➜  cat -n Test.scala
 1
 2  object Test {
 3
 4    def selectOne[T](): T = { null.asInstanceOf[T] }
 5
 6    def main(args: Array[String]) {
 7      val r = selectOne()
 8      println("ok?")
 9    }
10  } 

上面的代码,编译和执行都没有问题,但当我们增加一行判断r是否为空的语句时:

➜  cat -n Test.scala
 1
 2  object Test {
 3
 4    def selectOne[T](): T = { null.asInstanceOf[T] }
 5
 6    def main(args: Array[String]) {
 7      val r = selectOne()
 8      if ( r != null )  // 运行时异常
 9        println("ok?")
10    }
11  }

运行时在上面的第8行,会抛出空指针异常:

➜  scala Test
java.lang.NullPointerException
    at Test$.main(Test.scala:8)
    at Test.main(Test.scala)
    ...

究其原因是因为r在之前被推导为了Nothing类型,是没有对应任何实例的一个“幽灵”,在访问这种类型的变量时都会抛出NullPointerException。那么问题来了,r的类型是由selectOne方法决定的,在这个方法里我明明是把null造型成结果类型返回的,为啥这里r的类型不是NullAnyRef而是Nothing呢?

因为调用selectOne方法的时候没有显示的声明类型参数T,编译器会对这种情况采用Nothing作为类型参数,比如:

scala> val l = new java.util.ArrayList
        l: java.util.ArrayList[Nothing] = []

所以 val r = selectOne() 这条语句实际被翻译为了(通过-Xprint:jvm)

val r: Nothing = selectOne().asInstanceOf[Nothing]

selectOne()的运行期结果并不是null,而是一个Nothing类型的幽灵,因为没有任何其他类型的值可以在运行期显式造型为Nothing类型,除了它自己:

null.asInstanceOf[Nothing] // 编译通过,运行时抛NPE

"test".asInstanceOf[Nothing]  // 编译通过,运行时抛ClassCastException

因为调用方法时类型参数的缺失,在类型推导时致使val r成了一个“幽灵”: 不可访问的值;后续对它的访问产生了NPE.

Never ever block an actor

遇到一个Akka进程没法被正常停止的情况,程序退出时调用了system.awaitTermination,发现阻塞在了某个actor线程上:

"AkkaActorSystem-akka.actor.default-dispatcher-8" #25 prio=5 os_prio=0 tid=0x00007f319c002800 nid=0x76b2 waiting on condition [0x00007f31fbcfb000]
 java.lang.Thread.State: WAITING (parking)
  at sun.misc.Unsafe.park(Native Method)
  - parking to wait for  <0x00000000c0645828> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
  at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
  at xxx.KafkaMessageReceiver.prepareTask(KafkaMessageReceiver.scala:78)
  ...
  at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
  at com.wacai.csw.bridge.service.KafkaMessageReceiver.aroundReceive(KafkaMessageReceiver.scala:21)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
  at akka.actor.ActorCell.invoke(ActorCell.scala:487)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
  at akka.dispatch.Mailbox.run(Mailbox.scala:221)
  at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

这个程序里主要有2个Actor,一个负责从kafka读取数据,另一个处理这些数据。为了避免两边的流速不匹配,采用了pull模式,kafka消费者Actor跟DataProcessor之间通过一个BlockingQueue来维持平衡;而ProcessorActor在从BlockingQueue获取数据时因为使用了take方法导致了Actor线程被阻塞,系统关闭时等待这个Actor的消息处理完,也被阻塞住了。

按说KafkaConsumer端在往BlockqingQueue放数据(put方法)时也可能存在类似阻塞的情况,但这个Consumer并未使用Akka Actor的线程调度,而是一个独立的线程,所以并不影响system.awaitTermination

解决的方式是将take替换为poll阻塞一个可以接受的时间依然拿不到数据返回None,在另一方Actor的receive方法里针对None类型数据再做轮询。

使用Actor模型有个原则是“never ever block an actor”,切记。

scala雾中风景(22): var变量与赋值操作符

晚上review代码时发现的一个问题,同事写的一段代码简化后如下:

var map = scala.collection.immutable.Map[Int,Int]()
map += (1->2)
map += (3->4)
...
if (xxx) map.get(..)

这里逻辑上感觉有问题,map这里用的不可变集合,+=方法应该是返回一个新的值,后边的get应该不能生效。另外觉得 += 这个方法似乎应该只在mutable集合下才有,试图在eclipse下点击进入这个+=方法看看,果然也无法进入。

因为代码上下文逻辑较多,不确定是不是上下文有影响,于是在repl下验证一下这个方法。

scala> val m = scala.collection.immutable.Map[Int,Int]()

scala> m += (1->1)
<console>:12: error: value += is not a member of scala.collection.immutable.Map[Int,Int]
          m += (1->1)
            ^

提示说Map没有这个方法,奇怪为何那段代码在eclipse编译没有问题。然后注意到同事用的是var修饰的map

修改为var之后可以使用+=了,看一下编译器怎么翻译的:

scala> import reflect.runtime.universe._

scala> var m = Map[Int,Int]()

scala> reify ( m += (1->1) )
res4: reflect.runtime.universe.Expr[Unit] = 
        Expr[Unit]($read.m_$eq($read.m.$plus(Predef.ArrowAssoc(1).$minus$greater(1))))

原来这里+=是赋值操作符(assignment operator)而不是方法,恍然大悟,在scala下赋值操作符仅对var修饰的变量生效。m += (k->v)相当于 m = m + (k->v)Map里正好定义了+方法:

def + [B1 >: B](kv: (A, B1)): Map[A, B1]

如果没有定义过+方法的话,编译器会采用字符串连接,也就是对+前后的对象进行toString然后拼接;这种情况下会因为类型不匹配而报错。

所以他的代码也能work,只是这里使用+=操作符容易误会为是方法。建议还是采用val修饰一个可变的Map,来存放数据。

补充,对于var变量,即使以.+=方式调用,编译器也会对待为操作符:

scala> reify ( m.+=(1->1) )
res11: reflect.runtime.universe.Expr[Unit] = 
        Expr[Unit]($read.m_$eq($read.m.$plus(Predef.ArrowAssoc(1).$minus$greater(1))))

个人觉得这种情况下编译器报错似乎更合适。