分类目录归档:programming

优雅关闭与session draining

最近看到nginx plus收费版里有个session draining的概念,在用nginx做反向代理,要优雅停止后端应用的话,需要先在nginx层控制请求不再发到相应的节点,但与此同时已经建立的链接则要继续保持,一直等这些链接都结束了再停止应用。

其实在免费版的nginx/tengine就能实现这个特性,在以前的这篇nginx反向代理对后端某个节点优雅下线的问题 文章里测试过后端tomcat有一个耗时很长的请求,当tengine端健康监测已发现后端节点不可用的情况下,该请求并不会被tengine中止,而会等后端响应结束后再返回给客户端。

优化关闭应用并不只是在负载均衡或反向代理层面就能完美解决的,后端的应用本身也要考虑很多细节。举一个例子,Actor怎么有序的关闭,可以参考之前分享的Governor模式,所有干活的Actor由Governor创建,在创建时就设定好优先级。

在应用关闭的时候,对master发送PoisonPill,示例代码如下:

// 在你的容器或微容器里监听到应用关闭事件时触发shutdown,最简单的情况下是在shutdownhook里
def shutdown(): Unit = {
    gracefulStopActors
    system.terminate
    Await.result(system.whenTerminated, Duration.Inf)
    springContext.close
}

private def gracefulStopActors {
    master ! PoisonPill
    signal.acquire //wait
}

在governor角色里,当收到Terminated消息后顺序结束子actor

...
case Terminated(actor) if actor == observable => {
  logger.info("===receive terminated message from: " + actor)
  // 先顺序的杀死所有托管的子actor,并将结果发给自己
  stopAll(managedActors.toList.sorted) pipeTo self
}
...

// 顺序的停止所有子actor
protected def stopAll(kids: List[OrderedActorRef]): Future[Any] = {
    kids match {
      case first :: Nil =>
        logger.info("===graceful stop: " + first.actor)
        gracefulStop(first.actor, stopTimeout).flatMap { _ =>
          Future { AllDead }
      }
      case first :: rest =>
        logger.info("===graceful stop: " + first.actor)
        gracefulStop(first.actor, stopTimeout).flatMap { _ =>
        stopAll(rest)
      }
      case Nil =>
        Future { AllDead }
    }
}

再举一个例子,应用里有一个线程一直轮询从redis里获取数据,你在关闭应用的时候如何优雅终止这个线程呢?下面用一段代码简单模拟这个场景;假设这个应用没有任何容器,结束就是靠收到kill信号,那么在ShutdownHook里,要阻塞的等待(一段时间)worker线程结束后再退出才算安全

public class Test {
  static volatile boolean running = true;

  public static void main(String[] args) {
  Thread worker = new Thread() {
      public void run() {
          while (running) {
              try {
                  System.out.println("i'm running.1");
                  Thread.sleep(1000);
                  System.out.println("i'm running.2");
                  Thread.sleep(1000);
                  System.out.println("i'm running.3");
                  Thread.sleep(1000);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }
      }
  };
  worker.start();
  Runtime.getRuntime().addShutdownHook(new Thread() {
      public void run() {
          System.out.println("receive shutdown signal.");
          running = false;

          while(worker.isAlive()) {
              System.out.println("waiting for worker thread finish.");
              try {
                  Thread.sleep(1000);
              } catch (InterruptedException e) {
              }
          }
          System.out.println("wroker thread finished!");
      }
  });
 }
}

shell下精确的定位一个命令

之前在这篇SIGTTIN? 里提出了问题,但没有交代背景,为什么用交互式shell执行”which mvn”命令,是为了更准确的获取用户当前环境里所用的mvn命令到底是哪个。若用户对mvn做过alias,优先使用alias过的指令,然后再选择$PATH路径下的命令;这个函数的完整内容如下:

function get_mvn_cmd() {
  if [[ "$OSTYPE" == *cygwin* ]];then
    ppid=$( ps -ef -p $$ | awk 'NR==2{print $3}' )
    user_shell=$( ps -p $ppid | awk 'NR==2{print $8}' )
    #has some trouble with cygwin, while Ctrl-c cannot terminal
    set -m
  else
    ppid=$( ps -oppid= $$ )
    user_shell=$( ps -ocomm= -p $ppid )
  fi

  # while as login shell, it's -bash not bash
  if [[ "$user_shell" == "-"* ]];then
    user_shell=${user_shell:1}
  fi

  mvn=$( $user_shell -ic "alias mvn" 2>/dev/null | cut -d'=' -f2 | sed "s/'//g" )

  if [ -z "$mvn" ];then
    $user_shell -ic "which mvn" >/dev/null
    if [ $? -eq 0 ];then
      mvn=$( $user_shell -ic "which mvn" | head -1 )
    fi
  fi

  if [ -z "$mvn" ]; then
    echo "mvn command not found" >&2
    kill -s TERM $TOP_PID
  else
    echo $mvn
  fi
}

函数里先获取用户所使用的shell(即当前脚本的父进程),然后以交互式执行alias mvn看看是否又被用户别名过,没有的话再使用which mvn获取mvn命令。

为什么写的那么啰嗦是想兼容好几种用户环境,如果是linux或者使用的gnu-which,可以利用gnu-which的一些参数一次性按优先级依次从”alias, functions, commands”里查找命令

$ (alias; declare -f) | gwhich --read-alias --read-functions mvn

上面利用gnu-which的--read-alias--read-functions参数优先从前边aliasdeclare -f输出的结果里查找,最后再从$PATH里查找。实际上centos7的bash下which命令就被alias过了:

alias which='alias | /usr/bin/which --tty-only --read-alias --show-dot --show-tilde'    

SIGTTIN?

我有一段脚本自己的mac机器默认用zsh,运行时一直很正常,今天给别的同事用,在bash下有些异常,追查了一下这个问题,把问题简化后如下:

#!/bin/bash
zsh -ic "which mvn"
zsh -ic "which mvn"

上面的脚本执行没有问题,但将里面的zsh换位bash,就会出现问题:

#!/bin/bash
bash -ic "which mvn"
bash -ic "which mvn"

在mac上执行,第一次bash -ic "which mvn"是成功的,但第二次执行时就会挂住:

 ➜  ./b.sh
/usr/local/bin/mvn
[1]  + 24649 suspended (tty input)  ./b.sh

/tmp/dd   [23:43:21]
[jobs:1] ➜    

后来想到which在zsh里是一个内置命令,而在bash下则是一个外部命令,可能有所差异,将zsh执行的命令也声明为外部命令:

#!/bin/bash
zsh -ic "/usr/bin/which mvn"
zsh -ic "/usr/bin/which mvn"

执行时会在第二次阻塞住,即使Ctrl-C也无法停止脚本。

这个问题很奇怪,两次以交互式调用shell执行一段命令(必须是外部命令)的话,第一次会成功,第二次则会suspend住。猜测可能是shell在第一次交互式执行结束后改变了上下文的什么状态,导致第二次再执行的时候挂住。在linux上用strace跟踪了一下脚本,看样子是因为SIGTTIN信号量所致,但其中缘由并不清楚,我把代码和strace的信息贴在这里,希望明白的人解释一下

$ cat b.sh
#!/bin/bash
bash -ic "ls"
bash -ic "ls"

$ strace ./b.sh

[hongjiang@localhost dd]$ strace ./b.sh
execve("./b.sh", ["./b.sh"], [/* 31 vars */]) = 0
brk(0)                                  = 0x1175000
mmap(NULL, 4096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7fa545ddf000
access("/etc/ld.so.preload", R_OK)      = -1 ENOENT (No such file or directory)
open("/etc/ld.so.cache", O_RDONLY|O_CLOEXEC) = 3
fstat(3, {st_mode=S_IFREG|0644, st_size=32094, ...}) = 0
mmap(NULL, 32094, PROT_READ, MAP_PRIVATE, 3, 0) = 0x7fa545dd7000
close(3)                                = 0
open("/lib64/libtinfo.so.5", O_RDONLY|O_CLOEXEC) = 3
read(3, "\177ELF\2\1\1\0\0\0\0\0\0\0\0\0\3\0>\0\1\0\0\0@\316\0\0\0\0\0\0"..., 832) = 832
fstat(3, {st_mode=S_IFREG|0755, st_size=174520, ...}) = 0
mmap(NULL, 2268928, PROT_READ|PROT_EXEC, MAP_PRIVATE|MAP_DENYWRITE, 3, 0) = 0x7fa545995000
mprotect(0x7fa5459ba000, 2097152, PROT_NONE) = 0
mmap(0x7fa545bba000, 20480, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_FIXED|MAP_DENYWRITE, 3, 0x25000) = 0x7fa545bba000
close(3)                                = 0
open("/lib64/libdl.so.2", O_RDONLY|O_CLOEXEC) = 3
read(3, "\177ELF\2\1\1\0\0\0\0\0\0\0\0\0\3\0>\0\1\0\0\0\320\16\0\0\0\0\0\0"..., 832) = 832
fstat(3, {st_mode=S_IFREG|0755, st_size=19512, ...}) = 0
mmap(NULL, 2109744, PROT_READ|PROT_EXEC, MAP_PRIVATE|MAP_DENYWRITE, 3, 0) = 0x7fa545791000
mprotect(0x7fa545794000, 2093056, PROT_NONE) = 0
mmap(0x7fa545993000, 8192, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_FIXED|MAP_DENYWRITE, 3, 0x2000) = 0x7fa545993000
close(3)                                = 0
open("/lib64/libc.so.6", O_RDONLY|O_CLOEXEC) = 3
read(3, "\177ELF\2\1\1\3\0\0\0\0\0\0\0\0\3\0>\0\1\0\0\0\0\34\2\0\0\0\0\0"..., 832) = 832
fstat(3, {st_mode=S_IFREG|0755, st_size=2107760, ...}) = 0
mmap(NULL, 4096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7fa545dd6000
mmap(NULL, 3932736, PROT_READ|PROT_EXEC, MAP_PRIVATE|MAP_DENYWRITE, 3, 0) = 0x7fa5453d0000
mprotect(0x7fa545586000, 2097152, PROT_NONE) = 0
mmap(0x7fa545786000, 24576, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_FIXED|MAP_DENYWRITE, 3, 0x1b6000) = 0x7fa545786000
mmap(0x7fa54578c000, 16960, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_FIXED|MAP_ANONYMOUS, -1, 0) = 0x7fa54578c000
close(3)                                = 0
mmap(NULL, 8192, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7fa545dd4000
arch_prctl(ARCH_SET_FS, 0x7fa545dd4740) = 0
mprotect(0x7fa545786000, 16384, PROT_READ) = 0
mprotect(0x7fa545993000, 4096, PROT_READ) = 0
mprotect(0x7fa545bba000, 16384, PROT_READ) = 0
mprotect(0x6dc000, 4096, PROT_READ)     = 0
mprotect(0x7fa545de0000, 4096, PROT_READ) = 0
munmap(0x7fa545dd7000, 32094)           = 0
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
open("/dev/tty", O_RDWR|O_NONBLOCK)     = 3
close(3)                                = 0
brk(0)                                  = 0x1175000
brk(0x1196000)                          = 0x1196000
brk(0)                                  = 0x1196000
open("/usr/lib/locale/locale-archive", O_RDONLY|O_CLOEXEC) = 3
fstat(3, {st_mode=S_IFREG|0644, st_size=106065056, ...}) = 0
mmap(NULL, 106065056, PROT_READ, MAP_PRIVATE, 3, 0) = 0x7fa53eea9000
close(3)                                = 0
brk(0)                                  = 0x1196000
getuid()                                = 1000
getgid()                                = 1000
geteuid()                               = 1000
getegid()                               = 1000
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
open("/proc/meminfo", O_RDONLY|O_CLOEXEC) = 3
fstat(3, {st_mode=S_IFREG|0444, st_size=0, ...}) = 0
mmap(NULL, 4096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7fa545dde000
read(3, "MemTotal:        1017160 kB\nMemF"..., 1024) = 1024
close(3)                                = 0
munmap(0x7fa545dde000, 4096)            = 0
rt_sigaction(SIGCHLD, {SIG_DFL, [], SA_RESTORER|SA_RESTART, 0x7fa545405650}, {SIG_DFL, [], 0}, 8) = 0
rt_sigaction(SIGCHLD, {SIG_DFL, [], SA_RESTORER|SA_RESTART, 0x7fa545405650}, {SIG_DFL, [], SA_RESTORER|SA_RESTART, 0x7fa545405650}, 8) = 0
rt_sigaction(SIGINT, {SIG_DFL, [], SA_RESTORER, 0x7fa545405650}, {SIG_DFL, [], 0}, 8) = 0
rt_sigaction(SIGINT, {SIG_DFL, [], SA_RESTORER, 0x7fa545405650}, {SIG_DFL, [], SA_RESTORER, 0x7fa545405650}, 8) = 0
rt_sigaction(SIGQUIT, {SIG_DFL, [], SA_RESTORER, 0x7fa545405650}, {SIG_DFL, [], 0}, 8) = 0
rt_sigaction(SIGQUIT, {SIG_DFL, [], SA_RESTORER, 0x7fa545405650}, {SIG_DFL, [], SA_RESTORER, 0x7fa545405650}, 8) = 0
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
rt_sigaction(SIGQUIT, {SIG_IGN, [], SA_RESTORER, 0x7fa545405650}, {SIG_DFL, [], SA_RESTORER, 0x7fa545405650}, 8) = 0
uname({sys="Linux", node="localhost.localdomain", ...}) = 0
stat("/tmp/dd", {st_mode=S_IFDIR|0775, st_size=4096, ...}) = 0
stat(".", {st_mode=S_IFDIR|0775, st_size=4096, ...}) = 0
getpid()                                = 2945
open("/usr/lib64/gconv/gconv-modules.cache", O_RDONLY) = 3
fstat(3, {st_mode=S_IFREG|0644, st_size=26254, ...}) = 0
mmap(NULL, 26254, PROT_READ, MAP_SHARED, 3, 0) = 0x7fa545dd8000
close(3)                                = 0
getppid()                               = 2942
getpgrp()                               = 2942
rt_sigaction(SIGCHLD, {0x441090, [], SA_RESTORER|SA_RESTART, 0x7fa545405650}, {SIG_DFL, [], SA_RESTORER|SA_RESTART, 0x7fa545405650}, 8) = 0
getrlimit(RLIMIT_NPROC, {rlim_cur=3909, rlim_max=3909}) = 0
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
open("./b.sh", O_RDONLY)                = 3
ioctl(3, SNDCTL_TMR_TIMEBASE or SNDRV_TIMER_IOCTL_NEXT_DEVICE or TCGETS, 0x7fff54edec40) = -1 ENOTTY (Inappropriate ioctl for device)
lseek(3, 0, SEEK_CUR)                   = 0
read(3, "#!/bin/sh\n\nbash -ic \"ls\"\nbash -i"..., 80) = 39
lseek(3, 0, SEEK_SET)                   = 0
getrlimit(RLIMIT_NOFILE, {rlim_cur=1024, rlim_max=4*1024}) = 0
fcntl(255, F_GETFD)                     = -1 EBADF (Bad file descriptor)
dup2(3, 255)                            = 255
close(3)                                = 0
fcntl(255, F_SETFD, FD_CLOEXEC)         = 0
fcntl(255, F_GETFL)                     = 0x8000 (flags O_RDONLY|O_LARGEFILE)
fstat(255, {st_mode=S_IFREG|0775, st_size=39, ...}) = 0
lseek(255, 0, SEEK_CUR)                 = 0
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
read(255, "#!/bin/sh\n\nbash -ic \"ls\"\nbash -i"..., 39) = 39
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
stat(".", {st_mode=S_IFDIR|0775, st_size=4096, ...}) = 0
stat("/home/hongjiang/.local/bin/bash", 0x7fff54ede900) = -1 ENOENT (No such file or directory)
stat("/home/hongjiang/bin/bash", 0x7fff54ede900) = -1 ENOENT (No such file or directory)
stat("/data/program/scala/bin/bash", 0x7fff54ede900) = -1 ENOENT (No such file or directory)
stat("/usr/lib64/qt-3.3/bin/bash", 0x7fff54ede900) = -1 ENOENT (No such file or directory)
stat("/usr/local/bin/bash", 0x7fff54ede900) = -1 ENOENT (No such file or directory)
stat("/usr/bin/bash", {st_mode=S_IFREG|0755, st_size=960384, ...}) = 0
stat("/usr/bin/bash", {st_mode=S_IFREG|0755, st_size=960384, ...}) = 0
geteuid()                               = 1000
getegid()                               = 1000
getuid()                                = 1000
getgid()                                = 1000
access("/usr/bin/bash", X_OK)           = 0
stat("/usr/bin/bash", {st_mode=S_IFREG|0755, st_size=960384, ...}) = 0
geteuid()                               = 1000
getegid()                               = 1000
getuid()                                = 1000
getgid()                                = 1000
access("/usr/bin/bash", R_OK)           = 0
stat("/usr/bin/bash", {st_mode=S_IFREG|0755, st_size=960384, ...}) = 0
stat("/usr/bin/bash", {st_mode=S_IFREG|0755, st_size=960384, ...}) = 0
geteuid()                               = 1000
getegid()                               = 1000
getuid()                                = 1000
getgid()                                = 1000
access("/usr/bin/bash", X_OK)           = 0
stat("/usr/bin/bash", {st_mode=S_IFREG|0755, st_size=960384, ...}) = 0
geteuid()                               = 1000
getegid()                               = 1000
getuid()                                = 1000
getgid()                                = 1000
access("/usr/bin/bash", R_OK)           = 0
rt_sigprocmask(SIG_BLOCK, [INT CHLD], [], 8) = 0
lseek(255, -14, SEEK_CUR)               = 25
clone(child_stack=0, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7fa545dd4a10) = 2946
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
rt_sigprocmask(SIG_BLOCK, [CHLD], [], 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
rt_sigprocmask(SIG_BLOCK, [CHLD], [], 8) = 0
rt_sigaction(SIGINT, {0x43e500, [], SA_RESTORER, 0x7fa545405650}, {SIG_DFL, [], SA_RESTORER, 0x7fa545405650}, 8) = 0
wait4(-1, b.sh
[{WIFEXITED(s) && WEXITSTATUS(s) == 0}], 0, NULL) = 2946
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
--- SIGCHLD {si_signo=SIGCHLD, si_code=CLD_EXITED, si_pid=2946, si_status=0, si_utime=0, si_stime=0} ---
wait4(-1, 0x7fff54ede450, WNOHANG, NULL) = -1 ECHILD (No child processes)
rt_sigreturn()                          = 0
rt_sigaction(SIGINT, {SIG_DFL, [], SA_RESTORER, 0x7fa545405650}, {0x43e500, [], SA_RESTORER, 0x7fa545405650}, 8) = 0
rt_sigprocmask(SIG_BLOCK, NULL, [], 8)  = 0
read(255, "bash -ic \"ls\"\n", 39)      = 14
stat("/usr/bin/bash", {st_mode=S_IFREG|0755, st_size=960384, ...}) = 0
stat("/usr/bin/bash", {st_mode=S_IFREG|0755, st_size=960384, ...}) = 0
geteuid()                               = 1000
getegid()                               = 1000
getuid()                                = 1000
getgid()                                = 1000
access("/usr/bin/bash", X_OK)           = 0
stat("/usr/bin/bash", {st_mode=S_IFREG|0755, st_size=960384, ...}) = 0
geteuid()                               = 1000
getegid()                               = 1000
getuid()                                = 1000
getgid()                                = 1000
access("/usr/bin/bash", R_OK)           = 0
rt_sigprocmask(SIG_BLOCK, [INT CHLD], [], 8) = 0
clone(child_stack=0, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7fa545dd4a10) = 2967
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
rt_sigprocmask(SIG_BLOCK, [CHLD], [], 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
rt_sigprocmask(SIG_BLOCK, [CHLD], [], 8) = 0
rt_sigaction(SIGINT, {0x43e500, [], SA_RESTORER, 0x7fa545405650}, {SIG_DFL, [], SA_RESTORER, 0x7fa545405650}, 8) = 0
wait4(-1, 0x7fff54edea00, 0, NULL)      = ? ERESTARTSYS (To be restarted if SA_RESTART is set)
--- SIGTTIN {si_signo=SIGTTIN, si_code=SI_USER, si_pid=2967, si_uid=1000} ---
--- stopped by SIGTTIN ---

scala雾中风景(26): 变量查找的问题

在Java/Scala的一个方法里,存在与全局变量同名的局部变量的话将会覆盖这个全局变量,但当在这个方法局部变量定义之前就引用这个变量,在Java和Scala的编译器里给出了不同的实现,先看Java里:

➜  cat B.java
public class B {
    static String name = "noname";
    public static void main(String[] args) {
      System.out.println(name);
      String name = "wang";
    }
}

➜  java B
noname

在main方法里第一行引用的name是全局变量,在同名的局部变量定义之前它从全局查找这个变量。而在Scala里:

 ➜  cat A.scala
object A {
    val name: String = "noname"

    def main(args: Array[String]) {
        println(name)
        val name = "wang"
    }
}

 ➜  scalac A.scala
A.scala:5: error: forward reference extends over definition of value name
    println(name)
            ^
one error found

编译时错误,它优先从方法内部的局部变量表类查找了。我更接受Java里的方式,而且这种方式已经深入人心了,对Scala为何这样做我不清楚是出于什么考虑,是有意这样还是实现上的bug?

scala雾中风景(25): try-finally表达式的类型推导

一段实际代码简化后如下:

class A {
  def foo():String = {
    try{
      Thread.sleep(1000)
    }finally{
      "ok"
    }
  }
}

本来期望这个foo方法返回finally块中的”ok”,但编译的时候却给出了类型不匹配的错误:

 ➜  scalac A.scala
A.scala:4: error: type mismatch;
 found   : Unit
 required: String
      Thread.sleep(1000)
                  ^

按说scala类型推断这么强大,不应该推断不出最终的返回值类型,从编译器的错误来看似乎它非要求在try代码块里最后一行表达式必须也是String类型的值,为什么finally里的表达式没有参与类型推断呢?

把上述代码稍作改动,在try代码块里明确的给出一个String结果

def foo():String = {
    try{
        Thread.sleep(1000)
        "res"
    }finally{
        "ok"
    }
}

再编译一下,却给出了一行警告:

 ➜  scalac A.scala
A.scala:7: warning: a pure expression does nothing in statement position; you may be omitting necessary parentheses
    "ok"
    ^
one warning found

分析一下为什么编译器认为这句表达式”does nothing”,而不把它当作返回值对待:

 ➜  scalac -Xprint:typer A.scala
 ...

def foo(): String = try {
  java.this.lang.Thread.sleep(1000L);
  "res"
} finally {
  "ok";
  ()
}

看到编译器在finally块的”ok”表达式后边自行增加了一个返回Unit类型的值(),看上去编译器认为finally块里的逻辑是一个“procedure”,一定要满足Unit

从scala语言规范来看,try-catch-finally表达式也是有返回值的,且返回值主要是取决于trycatch里的最后一行表达式,而finally被认为是做一些收尾的工作的,不应该在里面去改变返回结果。

具体到这个案例,foo方法声明的返回值类型是Stringfoo方法体里的try-finally表达式的值就是最终的返回值,而try-finally表达式的值是取决于try代码块里的最后一行表达式,而非finally块里的。

看几个例子:

scala> val a = try { 100 } finally { 200 }
<console>:7: warning: a pure expression does nothing in statement position; you may be omitting necessary parentheses
   val a = try { 100 } finally { 200 }
                                 ^
a: Int = 100

上面try-finally语句的结果是try里的值。

scala> val a = try { throw new Exception() } catch { case e:Exception => 200 }
a: Int = 200

上面try里发生了异常,最终的结果是catch里的。

scala> val a = try { throw new Exception() } catch { case e:Exception => 200 } finally { 300 }
<console>:7: warning: a pure expression does nothing in statement position; you may be omitting necessary parentheses
   val a = try { throw new Exception() } catch { case e:Exception => 200 } finally { 300 }
                                                                                     ^
a: Int = 200

上面finally里的表达式并不会被当作最终返回值。

当然,在finally块里是可以使用return关键字的,但return关键字在这里并不能改变try-finally表达式的结果类型,我们对原代码增加return再编译:

def foo():String = {
    try{
        Thread.sleep(1000)
    }finally{
      return "ok"
    }
}

 ➜  scalac A.scala
A.scala:4: error: type mismatch;
 found   : Unit
 required: String
      Thread.sleep(1000)
                  ^
one error found

依然编译错误,注意return是面向method的,属于流控层面,并不影响表达式的类型推断。因为在foo方法里try-finally就是最后一句表达式,所以编译器要求这句表达式的类型必须也满足foo的返回值类型签名。如果try-finally不是最后一句,就没有这个约束了,比如:

def foo():String = {
    try{
        Thread.sleep(1000)
    }finally{
      return "ok"
    }

    "no"
}

上面对foo方法在最后一行增加了一句返回”no”的表达式,使得前边的try-finally表达式类型推导不受方法签名的约束,编译可以通过了。当然这个代码逻辑肯定不会走到那里,我更希望编译器给出代码不可达的警告。

如果打开typer-debug编译选项,可以看到编译器总会期待方法里的最后一个表达式满足方法返回值类型,如果最后的这个表达式又是由多个更小粒度的表达式组合成的(比如这个try-finally,我们暂称它为大表达式),则进一步对这个大表达式拆分推导,约束其中的决定整个大表达式类型的小表达式也必须符合方法的返回类型,对于try-finally这个大表达式来说就是其中try块里的最后一行表达式。

对于try-catch-finallyfinally,编译器总是预期它里面的表达式类型为Unit,所以如果在里面的最后一条语句不是一个Unit类型的值,编译会自动给你加上。

注意,return ok这句表达式的类型是Nothing,不要混淆方法返回值类型和表达式自身类型。return, throw等跟流控相关的表达式都是Nothing,它可以满足任何类型,自发可以符合finally里的Unit预期。

其实这个问题是scala类型推导实现的问题,我们期望它更聪明一些,比如:

scala> def bar:String = { return "A";  "B"  }
bar: String

在Java里编译器会报错后边的语句不可达,但Scala里却编译通过。虽然后边的表达式没有意义,不会走到那儿,但并不意味着你能给出任意的值:

scala> def bar:String = { return "A";  200 }
<console>:7: error: type mismatch;
 found   : Int(200)
 required: String
   def bar:String = { return "A";  200 }

尽管后边的表达式不会被执行到,但它在编译时参与类型推导,因为对于该方法来说 { return "A"; 200 }整体是一个大表达式也必须满足String类型才行。

HttpURLConnection在底层是否复用socket的简单验证方式

关于JDK自身的HttpURLConnection在底层是否复用socket的测试方式,可以快速用repl和lsof来检测:

// 本地启动一个 http server,它返回十几个字符
 ➜  curl "http://localhost:8080/sleep?time=1000"
{"code":"ok"}

// 在repl下连续请求这个url 若干次
scala> val is = new java.net.URL("http://localhost:8080/sleep?time=100").openConnection.getInputStream; for(i <- 1 to 15) is.read; is.close
is: java.io.InputStream = sun.net.www.protocol.http.HttpURLConnection$HttpInputStream@1ba9117e

scala> val is = new java.net.URL("http://localhost:8080/sleep?time=100").openConnection.getInputStream; for(i <- 1 to 15) is.read; is.close
is: java.io.InputStream = sun.net.www.protocol.http.HttpURLConnection$HttpInputStream@a82c5f1

与此同时在另一个终端用lsof查看socket,每秒刷新一次,可看到客户端socket是同一个

 ➜  /usr/sbin/lsof -Pan -iTCP -r 1 -p 43280
=======
=======
COMMAND   PID      USER   FD   TYPE             DEVICE SIZE/OFF NODE NAME
java    43280 hongjiang   47u  IPv6 0x43acdfd2ea5b0c01      0t0  TCP 127.0.0.1:57304->127.0.0.1:8080 (ESTABLISHED)
=======
COMMAND   PID      USER   FD   TYPE             DEVICE SIZE/OFF NODE NAME
java    43280 hongjiang   47u  IPv6 0x43acdfd2ea5b0c01      0t0  TCP 127.0.0.1:57304->127.0.0.1:8080 (ESTABLISHED)
=======
COMMAND   PID      USER   FD   TYPE             DEVICE SIZE/OFF NODE NAME
java    43280 hongjiang   47u  IPv6 0x43acdfd2ea5b0c01      0t0  TCP 127.0.0.1:57304->127.0.0.1:8080 (ESTABLISHED)
=======
COMMAND   PID      USER   FD   TYPE             DEVICE SIZE/OFF NODE NAME
java    43280 hongjiang   47u  IPv6 0x43acdfd2ea5b0c01      0t0  TCP 127.0.0.1:57304->127.0.0.1:8080 (ESTABLISHED)  

这个话题是由URLConnection在关闭的时候应该调用close还是disConnect所引起的,关于jdk里keep-alive相关的一些参数不展开了。

验证disconnect方法:

// 执行若干次
scala> val conn = new java.net.URL("http://localhost:8080/sleep?time=100").openConnection.asInstanceOf[java.net.HttpURLConnection]; val is=conn.getInputStream; for(i <- 1 to 15) is.read; conn.disconnect

这时没法用lsof观察了,它最小刷新单位是1秒,因为每次连接立即关闭导致没机会看到,得用tcpdump来观察

 ➜  sudo tcpdump -i lo0  -s 1024 -l -A  port 8080

 ^[[A04:59:57.066577 IP localhost.57355 > localhost.http-alt: Flags [S]
 ...
 -`=.-`=.GET /sleep?time=100 HTTP/1.1
User-Agent: Java/1.8.0_51
Host: localhost:8080
Accept: text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2
Connection: keep-alive

...

05:00:05.407691 IP localhost.57356 > localhost.http-alt: Flags [P.], seq 1:168, ack 1, win 12759, options [nop,nop,TS val 761290281 ecr 761290281], length 167: HTTP: GET /sleep?time=100 HTTP/1.1
E...LF@.@.........................1........
-`^)-`^)GET /sleep?time=100 HTTP/1.1
User-Agent: Java/1.8.0_51
Host: localhost:8080
Accept: text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2
Connection: keep-alive 

... 

05:00:07.045830 IP localhost.57357 > localhost.http-alt: Flags [P.], seq 1:168, ack 1, win 12759, options [nop,nop,TS val 761291915 ecr 761291915], length 167: HTTP: GET /sleep?time=100 HTTP/1.1
E.....@.@................l.;.\.,..1........
-`d.-`d.GET /sleep?time=100 HTTP/1.1
User-Agent: Java/1.8.0_51
Host: localhost:8080
Accept: text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2
Connection: keep-alive

看到三次连接每次客户端socket端口都变了。

再谈AsyncHttpClient

以前曾经说过为何我们不使用AsyncHttpClient,当时主要是从需求出发,性能上不需要。不过在业务做活动推广的场景下,性能上是必须要考虑的;另外在Actor模型里,要调用第三方的http接口,为了避免Actor收发消息的底层线程池被阻塞也必须用异步的方式。在去年年底我们的一次活动,Akka里使用AsyncHttpClient(1.9版本)已经很好的验证过它们的组合以及惊诧的性能了。

最近看到AsyncHttpClient在2.0后被重构了,包名从com.ning变为了org.asynchttpclient,里面的一些类也有所重构,也升级了所依赖的netty的版本。最近正好一些新项目里调用第三方的http接口,尝试了一下新的AsyncHttpClient,结果发现它的readTimeout参数并不work,下面是两个版本的测试程序:

import com.ning.http.client.AsyncCompletionHandler;
import com.ning.http.client.AsyncHttpClientConfig;
import com.ning.http.client.Response;

public class AsyncV1Test {

    static class MyCallback extends AsyncCompletionHandler<Boolean> {

        @Override
        public Boolean onCompleted(Response response) throws Exception {
            System.out.println(response.getResponseBody());
            return true;
        }

        public void onThrowable(Throwable t) {
            t.printStackTrace(System.err);
        }
    }

    public static void main(String[] args) throws Exception {
        AsyncHttpClientConfig cfg = new AsyncHttpClientConfig.Builder().setReadTimeout(100).build();
        com.ning.http.client.AsyncHttpClient cli = new com.ning.http.client.AsyncHttpClient(cfg);
        cli.prepareGet("http://localhost:8080/sleep?time=200").execute(new MyCallback());

        // wait and quit
        Thread.sleep(1000);
        cli.close();
    }
}

上面是老版本的com.ning的客户端的readTimeout测试,设置的是100毫秒,请求的url在server端会sleep 200毫秒,超时的异常符合预期:

java.util.concurrent.TimeoutException: Read timeout to localhost/127.0.0.1:8080 of 100 ms
    at com.ning.http.client.providers.netty.request.timeout.TimeoutTimerTask.expire(TimeoutTimerTask.java:47)
    at com.ning.http.client.providers.netty.request.timeout.ReadTimeoutTimerTask.run(ReadTimeoutTimerTask.java:57)
    at org.jboss.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:556)
    at org.jboss.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:632)
    at org.jboss.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:369)
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at java.lang.Thread.run(Thread.java:745)

在新版本的org.asynchttpclient里测试同样的逻辑:

import org.asynchttpclient.AsyncCompletionHandler;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.Response;

public class AsyncV2Test {

    static class MyCallback extends AsyncCompletionHandler<Boolean> {

        @Override
        public Boolean onCompleted(Response response) throws Exception {
            System.out.println(response.getResponseBody());
            return true;
        }

        public void onThrowable(Throwable t) {
            t.printStackTrace(System.err);
        }
    }

    public static void main(String[] args) throws Exception{

        DefaultAsyncHttpClientConfig cfg = new DefaultAsyncHttpClientConfig.Builder().setReadTimeout(100).build();
        org.asynchttpclient.AsyncHttpClient cli = new DefaultAsyncHttpClient(cfg);
        cli.prepareGet("http://localhost:8080/sleep?time=200").execute(new MyCallback());

        // wait and quit
        Thread.sleep(1000);
        cli.close();
    }
}       

却没有发生超时的异常,而是正常打印出了最终的请求结果。估计是新版本重构后所带来的bug,我还是回退到老的com.ning.http.client.AsyncHttpClient

scala里模拟javascript/python里的生成器的效果

上一篇关于yield的字面含义,基本有答案了。对于python和javascript不太熟悉,这两个语言里yield都是配合生成器而使用的。比如下面这段 javascript 的例子从mozilla的网站上看到的,稍加改造:

➜  cat test.js

function* foo() {
  var index = 0;

  while (index <= 2 ) {
    console.log("here" + index);
    yield index++;
  }
}

var it = foo();

console.log("generator ready");

console.log(it.next());
console.log(it.next());
console.log(it.next());

它的输出结果是

➜  node test.js
generator ready
here0
{ value: 0, done: false }
here1
{ value: 1, done: false }
here2
{ value: 2, done: false }

在Scala里yield并不等效于javascript/python里的效果,要等效的话,必须通过lazy, Stream等延迟计算特性来实现:

➜  cat Test.scala
object Test {
  def main(args: Array[String]) {

    lazy val c = for( i <- (0 to 2).toStream ) yield {
      println("here" + i);
      i+1
    }

    println("ready")

    for( n <- c) {
      println(n)
    }
  }
}   

运行结果:

➜  scala Test.scala
ready
here0
1
here1
2
here2
3