分类目录归档:programming

Akka的RoundRobinRouting实现存在bug

Akka的RoundRobinRouting实现存在bug,当累计执行超过Long最大值之后(变为负数),会导致分散不均,最后一个节点永远分配不上,直到累计值再次变为正数才恢复均匀。代码如下(2.4.9版本):

final class RoundRobinRoutingLogic extends RoutingLogic {
  val next = new AtomicLong

  override def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee =
    if (routees.nonEmpty) {
      val size = routees.size
      val index = (next.getAndIncrement % size).asInstanceOf[Int]
      routees(if (index < 0) size + index - 1 else index)
    } else NoRoutee

}

上面select函数里当index为负数时,返回size + index - 1存在问题,用一段java代码验证一下:

import java.util.concurrent.atomic.AtomicLong;

public class Test {

    static final AtomicLong next = new AtomicLong(-2003);
    static final int size = 4;

    static int getNext() {
        int index = (int) (next.getAndIncrement() % size);
        if (index < 0)
            return size + index - 1;
        else
            return index;
    }

    public static void main(String[] args) {
        for (int n = 0; n < 8; n++) {
            System.out.println(getNext());
        }
    }
}

// 运行后输出:
0
1
2
0
0
1
2
0

上面的代码假设有4个节点,当next为负数后,一直在"0,1,2"这三个节点上分配,不会分配给最后一个节点(即索引为3的),修正的方式是当index为负数后,返回其绝对值(即 -index)即可。

spring-boot 1.4.x遇到的cpu高的问题

如果你的spring-boot应用里tomcat线程耗cpu较高,并主要耗在做读取jar的操作上(堆栈类似下面),可能跟我们遇到同样的问题。

    CRC32.update(byte[], int, int) line: 76
    JarInputStream(ZipInputStream).read(byte[], int, int) line: 200
    JarInputStream.read(byte[], int, int) line: 207
    JarInputStream(ZipInputStream).closeEntry() line: 140
    JarInputStream(ZipInputStream).getNextEntry() line: 118
    JarInputStream.getNextEntry() line: 142
    JarInputStream.getNextJarEntry() line: 179
    JarWarResourceSet.getArchiveEntries(boolean) line: 112
    JarWarResourceSet(AbstractArchiveResourceSet).getResource(String) line: 256
    StandardRoot.getResourceInternal(String, boolean) line: 280
    CachedResource.validateResource(boolean) line: 95
    Cache.getResource(String, boolean) line: 69
    StandardRoot.getResource(String, boolean, boolean) line: 215
    StandardRoot.getResource(String) line: 205
    Mapper.internalMapWrapper(Mapper$ContextVersion, CharChunk, MappingData) line: 1027
    Mapper.internalMap(CharChunk, CharChunk, String, MappingData) line: 842
    Mapper.map(MessageBytes, MessageBytes, String, MappingData) line: 698
    CoyoteAdapter.postParseRequest(Request, Request, Response, Response) line: 672
    CoyoteAdapter.service(Request, Response) line: 344
    Http11Processor.service(SocketWrapperBase<?>) line: 784
    Http11Processor(AbstractProcessorLight).process(SocketWrapperBase<?>, SocketEvent) line: 66
    AbstractProtocol$ConnectionHandler<S>.process(SocketWrapperBase<S>, SocketEvent) line: 802
    NioEndpoint$SocketProcessor.doRun() line: 1410
    NioEndpoint$SocketProcessor(SocketProcessorBase<S>).run() line: 49
    ThreadPoolExecutor(ThreadPoolExecutor).runWorker(ThreadPoolExecutor$Worker) line: 1142
    ThreadPoolExecutor$Worker.run() line: 617
    TaskThread$WrappingRunnable.run() line: 61
    TaskThread(Thread).run() line: 745  

这种情况只发生在 spring-boot 1.4.x版本(及1.3.x版本,更早的没有确认),1.5.x已经没有这个问题。

主要的改变在org.springframework.boot.context.embedded.tomcat.TomcatEmbeddedServletContainerFactory的内部类StoreMergedWebXmlListeneronStart方法:

// TomcatEmbeddedContext 启动时触发了该监听器
private void onStart(Context context) {
    ServletContext servletContext = context.getServletContext();
    if (servletContext.getAttribute(MERGED_WEB_XML) == null) {
        servletContext.setAttribute(MERGED_WEB_XML, getEmptyWebXml());
    }
    // 注意最后这句,1.5.3版本已经去掉了这句,它导致变慢
    TomcatResources.get(context).addClasspathResources(); 
}

addClasspathResources方法里对于jar资源的处理,不同的tomcat版本方式有所不同,spring-boot 中如果使用嵌入式的 tomcat8 的话这些jar资源会记录到StandardRoot里的jarResources集合里,它们会被定时清理。

tomcat容器的后台线程(ContainerBackgroundProcessor)会触发StandardRoot里的清理逻辑

    public void backgroundProcess() {
        cache.backgroundProcess();
        gc();
    }

    public void gc() {
        for (List<WebResourceSet> list : allResources) {
            for (WebResourceSet webResourceSet : list) {
                webResourceSet.gc();
            }
        }
    }
    
    // JarWarResourceSet里的gc方法
    public void gc() {
        synchronized (archiveLock) {
            if (archive != null && archiveUseCount == 0) {
                try {
                    archive.close();
                } catch (IOException e) {
                    // Log at least WARN
                }
                archive = null;
                archiveEntries = null;
            }
        }
    }

请求过来时,Mapper阶段会根据请求路径去找映射的资源,Cache不管命中还是未命中,都会对资源进行validate,在validateResource时要去遍历WebResourceRoot里所有的资源(包括所有的jar资源),若应用依赖的jar比较多时,会导致cpu较高。

spring-boot 1.5 版本里不会再将 BOOT-INF/lib 下的所有jar添加到tomcat的WebResourceRoot里,升级到1.5.3后这个情况没有再发生。

scala雾中风景(28): private?public?

记录前些天遇到的一个问题,scala里protectedprivate修饰的方法可能在编译为class时变成了public,这已经不是第一次遇到,最早遇到是在写一个java子类时要覆盖一些父类方法,父类是scala写的一个trait,里面的方法修饰为protected,当时IDE提示我override的方法必须声明为public感到奇怪反编译了一下父trait果然被声明为了public

而这次遇到的稍有不同,跟继承没有关系,用下面的demo举例:

 ➜  cat A.scala
class A {
  private[this] def foo() = {
    List(1,2,3).map(i => bar(i))
  }

  private[this] def bar(i:Int):String = {
    "str:" + i
  }
}

当我们编译上面类之后,里面的foobar方法的修饰符最终在class里会有所不同,反编译后可看到bar修饰符变成了public:

 ➜  cfr-decompiler A
 ...
 public class A {
    private List<String> foo() {
        return (List)List..MODULE$.apply((Seq)Predef..MODULE$.wrapIntArray(new int[]{1, 2, 3})).map((Function1)new scala.Serializable(this){
            public static final long serialVersionUID = 0;
            private final /* synthetic */ A $outer;

            public final String apply(int i) {
                return this.$outer.A$$bar(i);
            }
        }, List..MODULE$.canBuildFrom());
    }

    public String A$$bar(int i) {
        return new StringBuilder().append((Object)"str:").append((Object)BoxesRunTime.boxToInteger((int)i)).toString();
    }
}

终归scala在jvm上要做一些妥协,按上面的实现,foo里面以闭包的方式使用bar的时候,如果保持scala private[this]的控制粒度,底层的匿名类其实已经无法访问bar了。所以scala在编译器的explicitouter环节做了一些向现实妥协的事情

 ➜  scalac -Xshow-phases
    phase name  id  description
    ----------  --  -----------
        parser   1  parse source into ASTs, perform simple desugaring
         namer   2  resolve names, attach symbols to named trees
packageobjects   3  load package objects
         typer   4  the meat and potatoes: type the trees
        patmat   5  translate match expressions
superaccessors   6  add super accessors in traits and nested classes
    extmethods   7  add extension methods for inline classes
       pickler   8  serialize symbol tables
     refchecks   9  reference/override checking, translate nested objects
       uncurry  10  uncurry, translate function values to anonymous classes
     tailcalls  11  replace tail calls by jumps
    specialize  12  @specialized-driven class and method specialization
 explicitouter  13  this refs to outer pointers
       erasure  14  erase types, add interfaces for traits
   posterasure  15  clean up erased inline classes
      lazyvals  16  allocate bitmaps, translate lazy vals into lazified defs
    lambdalift  17  move nested functions to top level
  constructors  18  move field definitions into constructors
       flatten  19  eliminate inner classes
         mixin  20  mixin composition
       cleanup  21  platform-specific cleanups, generate reflective calls
    delambdafy  22  remove lambdas
         icode  23  generate portable intermediate code
           jvm  24  generate JVM bytecode
      terminal  25  the last phase during a compilation run

在这个阶段,当编译器发现一些private的方法会被内部类访问的话,就删除这些private修饰符:

 ➜  scalac -Xprint:explicitouter A.scala
[[syntax trees at end of             explicitouter]] // A.scala
package <empty> {
  class A extends Object {
    def <init>(): A = {
      A.super.<init>();
      ()
    };
    private[this] def foo(): List[String] = immutable.this.List.apply[Int](scala.this.Predef.wrapIntArray(Array[Int]{1, 2, 3})).map[String, List[String]]({
      @SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractFunction1[Int,String] with Serializable {
        def <init>($outer: A.this.type): <$anon: Int => String> = {
          $anonfun.super.<init>();
          ()
        };
        final def apply(i: Int): String = $anonfun.this.$outer.bar(i);
        <synthetic> <paramaccessor> <artifact> private[this] val $outer: A.this.type = _;
        <synthetic> <stable> <artifact> def $outer(): A.this.type = $anonfun.this.$outer
      };
      (new <$anon: Int => String>(A.this): Int => String)
    }, immutable.this.List.canBuildFrom[String]());
    
    final def bar(i: Int): String = "str:".+(i)
  }
}

上面barprivate[this]在这个阶段被删除,而scala不同于java,缺省就是public,最终在class里变成了public

final object?

使用final修饰object的场景极少见,需要显式打开-Yoverride-objects编译选项才行:

 ➜  scala -Yoverride-objects
Welcome to Scala version 2.11.6 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_51).
Type in expressions to have them evaluated.
Type :help for more information.

scala> class A { object B }
defined class A

scala> class C extends A  { override object B {}  }
defined class C

如果A内部对object B使用final修饰了,子类C就不能覆盖这个object,不过它的意义是什么?这里object B是一个module,要解释object为何被当作module来设计,需要整理一下,等有时间再说。

PS,我刚发现2.11版本之后repl下,当你定义一个object时,提示已经不同了,在2.11版本之前,repl下会显示 "defined module XXX", 而 2.11 里已经变成了 "defined object XXX",可能隐含着设计者对module(早期scala中module应该是借鉴ML语言的module)这个术语可能存在理解不一致的担心,所以不再使用这个名词(只是我的猜测)。

socketRead0阻塞2个半小时?

前几个月我们遇到过某些http请求会在本机会阻塞2个半小时左右最后成功返回的情况。经过排查发现问题并不在对方服务器,而是这2个半小时基本都是在自己的网络环境阻塞,通过dns端日志发现请求到达dns服务器也是2个半小时后。

在geek talk群里有人指出,glibc的低版本会在查完域名后把拿到的ip反过来查域名,dns服务器都不支持反解就一层层的传给其他dns,可能会变慢。redhat在6u fix了这个问题。但跟我们的情况不符。

当时的堆栈显示阻塞在socket的读取上(我不确定这里的连接是复用上次HttpURLConnection创建过的连接,还是首次跟对方建立连接),我google到有些人也遇到过相似的问题,hang在同样的代码上,疑似是虚拟机(kvm/vmware)或jdk网络层面的bug (JDK 1.8.0_65), 因为无法稳定的重现,没有进一步验证,记录一下这个诡异的问题

at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:335)
- locked <0x0000000c868c460> (a java.io.BufferedInputStream)
at sun.net.www.MeteredStream.read(MeteredStream.java:134)
- locked <0x0000000c868c420> (a sun.net.www.http.KeepAliveStream)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3336)
at java.io.FilterInputStream.read(FilterInputStream.java:133)

《Scala函数式编程》中文版勘误2

感谢 shuai.xie 提出的这段漏掉的内容,这里补充一下。

这里b的类型声明并不是必须的。因为我们已经告诉Scala返回类型为B => C,Scala会从上下文获知b的类型,方法的实现部分只需要写为 b => f(a,b)就可以了。如果Scala能够推断出函数字面量的类型,就可以省略掉它的类型声明。

相关阅读:《Scala函数式编程》中文版勘误

记录几个实践中的问题

1) nginx禁止对写操作timeout时retry

以前遇到的一个case,业务那边说一笔请求从nginx端发送给后端tomcat了2次(落在两个不同的tomcat节点上)。后来发现是nginx发给后端节点timeout,然后做了重试,发给了另一个节点。默认情况下nginx对后端error 和 timeout 都会做retry,可以明确的禁止在timeout的情况下禁止retry。当然如果集群读写分离的话,对于只读集群retry是无所谓的,但对于写确实存在问题。

2) kafka重启时因为数据日志文件名被人重命名过而导致启动失败

启动kafka broker的时候,会重新load之前的每个topic的数据,正常情况下会提示每个topic恢复完成。

INFO Recovering unflushed segment 588022 in log xxx-topic-0. (kafka.log.Log)
INFO Completed load of log xxx-topic-0 with log end offset 590676 (kafka.log.Log)

但当有些topic下的数据恢复失败的时候,会导致broker关闭,异常如下

ERROR There was an error in one of the threads during logs loading: java.lang.NumberFormatException: For input string: "test" (kafka.log.LogManager)
FATAL [Kafka Server 3], Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)

java.lang.NumberFormatException: For input string: "test"
      at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
      at java.lang.Long.parseLong(Long.java:589)
      at java.lang.Long.parseLong(Long.java:631)
      at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:251)
      at scala.collection.immutable.StringOps.toLong(StringOps.scala:30)
      at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:152)
      at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:141)
      at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
      at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
      at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
      at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
      at kafka.log.Log.loadSegments(Log.scala:141)
      at kafka.log.Log.<init>(Log.scala:67)
      at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$7$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:142)
      at kafka.utils.Utils$$anon$1.run(Utils.scala:54)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

这是因为某个目录下,存在一个 test.log 的文件

$ ls mytopic-0/
00000000000000000485.index  00000000000000000485.log  00000000000000000568.index  00000000000000000568.log  test.log

看上去这个 test.log 当时是把 00…log 给拷贝了一个,然后用编辑器去查看内容。而事后忘了清理掉,导致重启时把这个文件当成一个畸形文件了。因为kafka broker要求所有数据文件名称都是Long类型的。

3) 又一个actor阻塞的例子

在我自己的mac上测试的时候,一切正常,部署到dev环境就严重超时。jstack观察发现又是误用阻塞操作导致所有actor的线程都被阻塞所致,当时 EventProcessor 这个 Router 背后的实例数设置的是40,而这台dev环境的linux只有2核,根据当时akka的配置里的并发因子算出并发线程数是32,所以32个线程基本都被 eventProcessor 的40个actor全给占用了,因为它是不断发消息轮询的(我的mac是8核,运行时的线程数要大于40不会发生全部被阻塞的情况)。解决方式,一方面调大并发因子,把线城数提升上去,另一方面控制 eventProcessor 的实例数,不让它的阻塞操作影响到其他actor。(其实根上是没设计好,没有隔离阻塞操作,只不过这正好是个小应用,不需要过多考虑。)

OneApm的问题

1) 我们没有使用它的大数据版本,普通版本在压测情况下服务器端如果吞吐不够,agent端会缓存大量的数据导致full gc,这里它占据了1G的内存

2) 对HttpURLConnection拦截时可能发生空指针异常,导致业务线程终止

java.lang.NullPointerException
 at com.blueware.monitor.bridge.reflect.ClassReflection$2.run(ClassReflection.java:30) ~[?:?]
 at com.blueware.monitor.bridge.reflect.ClassReflection$2.run(ClassReflection.java:28) ~[?:?]
 at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_65]
 at com.blueware.monitor.bridge.reflect.ClassReflection.loadClass(ClassReflection.java:28) ~[?:?]
 at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441) ~[?:1.8.0_65]
 at sun.net.www.protocol.http.HttpURLConnection.getHeaderField(HttpURLConnection.java:2979) ~[?:1.8.0_65]
 at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:489) ~[?:1.8.0_65]
 at com...

tomcat8.5.8遇到的两个问题

压力测试场景,前端nginx反向代理到4个tomcat实例,在其中的一个实例上产生了大量的countDownConnection Incorrect connection count警告

 WARNING [http-nio-8080-exec-48] org.apache.tomcat.util.net.AbstractEndpoint.countDownConnection Incorrect connection count, multiple socket.close called on the same socket.

另外一个异常是4个tomcat实例上都看到的NPE异常:

Exception in thread "http-nio-8080-AsyncTimeout" java.lang.NullPointerException
  at org.apache.coyote.AbstractProcessor.doTimeoutAsync(AbstractProcessor.java:528)
  at org.apache.coyote.AbstractProcessor.timeoutAsync(AbstractProcessor.java:518)
  at org.apache.coyote.AbstractProtocol$AsyncTimeout.run(AbstractProtocol.java:1130)
  at java.lang.Thread.run(Thread.java:745)

使用了servlet3.0,connector配置如下:

 <Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol"
           connectionTimeout="20000" redirectPort="8443" maxParameterCount="2000" maxKeepAliveRequests="-1"
       maxThreads="200" maxPostSize="20971520" acceptCount="1024" useBodyEncodingForURI="true"
        URIEncoding="UTF-8"/>

已将bug提交到了bugzilla,在这里记录一下,后续跟踪。