月度归档:2015年05月

scala雾中风景(23): Nothing类型引发的NullPointerException

这个问题以前遇到过,这次又发生了,记录一下,避免新人再犯类似错误。

一个DAO的方法调用 mybatis 里的 SqlSessionTemplate.selectOne("...") 方法时没有指定类型,大致代码如下:

def check(...): Boolean = {
    ...
    val data = moneySessionTemplate.selectOne("queryXXX", params)
    data != null
}

程序意图是判断数据库里是否有某条记录,存在则返回返回true。 运行的时候上面的代码可能会抛出 NullPointerException 并提示是在 data != null 这一行,让人乍一看感觉很诡异。

这里又是类型系统的一个”陷阱”,因为val data 是一个不存在的值,它是Nothing类型。为何会是Nothing类型,又是因为selectOne方法的泛型参数在运行期类型无法推断所致的。我们模拟一下:

➜  cat -n Test.scala
 1
 2  object Test {
 3
 4    def selectOne[T](): T = { null.asInstanceOf[T] }
 5
 6    def main(args: Array[String]) {
 7      val r = selectOne()
 8      println("ok?")
 9    }
10  } 

上面的代码,编译和执行都没有问题,但当我们增加一行判断r是否为空的语句时:

➜  cat -n Test.scala
 1
 2  object Test {
 3
 4    def selectOne[T](): T = { null.asInstanceOf[T] }
 5
 6    def main(args: Array[String]) {
 7      val r = selectOne()
 8      if ( r != null )  // 运行时异常
 9        println("ok?")
10    }
11  }

运行时在上面的第8行,会抛出空指针异常:

➜  scala Test
java.lang.NullPointerException
    at Test$.main(Test.scala:8)
    at Test.main(Test.scala)
    ...

究其原因是因为r在之前被推导为了Nothing类型,是没有对应任何实例的一个“幽灵”,在访问这种类型的变量时都会抛出NullPointerException。那么问题来了,r的类型是由selectOne方法决定的,在这个方法里我明明是把null造型成结果类型返回的,为啥这里r的类型不是NullAnyRef而是Nothing呢?

因为调用selectOne方法的时候没有显示的声明类型参数T,编译器会对这种情况采用Nothing作为类型参数,比如:

scala> val l = new java.util.ArrayList
        l: java.util.ArrayList[Nothing] = []

所以 val r = selectOne() 这条语句实际被翻译为了(通过-Xprint:jvm)

val r: Nothing = selectOne().asInstanceOf[Nothing]

selectOne()的运行期结果并不是null,而是一个Nothing类型的幽灵,因为没有任何其他类型的值可以在运行期显式造型为Nothing类型,除了它自己:

null.asInstanceOf[Nothing] // 编译通过,运行时抛NPE

"test".asInstanceOf[Nothing]  // 编译通过,运行时抛ClassCastException

因为调用方法时类型参数的缺失,在类型推导时致使val r成了一个“幽灵”: 不可访问的值;后续对它的访问产生了NPE.

Never ever block an actor

遇到一个Akka进程没法被正常停止的情况,程序退出时调用了system.awaitTermination,发现阻塞在了某个actor线程上:

"AkkaActorSystem-akka.actor.default-dispatcher-8" #25 prio=5 os_prio=0 tid=0x00007f319c002800 nid=0x76b2 waiting on condition [0x00007f31fbcfb000]
 java.lang.Thread.State: WAITING (parking)
  at sun.misc.Unsafe.park(Native Method)
  - parking to wait for  <0x00000000c0645828> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
  at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
  at xxx.KafkaMessageReceiver.prepareTask(KafkaMessageReceiver.scala:78)
  ...
  at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
  at com.wacai.csw.bridge.service.KafkaMessageReceiver.aroundReceive(KafkaMessageReceiver.scala:21)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
  at akka.actor.ActorCell.invoke(ActorCell.scala:487)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
  at akka.dispatch.Mailbox.run(Mailbox.scala:221)
  at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

这个程序里主要有2个Actor,一个负责从kafka读取数据,另一个处理这些数据。为了避免两边的流速不匹配,采用了pull模式,kafka消费者Actor跟DataProcessor之间通过一个BlockingQueue来维持平衡;而ProcessorActor在从BlockingQueue获取数据时因为使用了take方法导致了Actor线程被阻塞,系统关闭时等待这个Actor的消息处理完,也被阻塞住了。

按说KafkaConsumer端在往BlockqingQueue放数据(put方法)时也可能存在类似阻塞的情况,但这个Consumer并未使用Akka Actor的线程调度,而是一个独立的线程,所以并不影响system.awaitTermination

解决的方式是将take替换为poll阻塞一个可以接受的时间依然拿不到数据返回None,在另一方Actor的receive方法里针对None类型数据再做轮询。

使用Actor模型有个原则是“never ever block an actor”,切记。