hongjiang,您好: 请教下,akka这部分,当因为数据量越来越大,需要类似集群那样来扩展时,你们在akka这块具体怎么弄的呢?因为听你演讲说没有用akka的集群,所以对这个感到奇怪,特地请教,谢谢! 回复 ↓
hi,您好。请教一个kafka的问题,在获取kafka最大offset的时候可以这么写 :consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets(0) 可是consumer.getOffsetsBefore(request)返回的OffsetResponse可能会报OffsetOutOfRange、UnknownTopicOrPartition等error,网上很多捕获这个error就offset=0, 我觉得这样处理是不对的。 如果不处理,后面partitionErrorAndOffsets(topicAndPartition).offsets(0) 应该会报数组越界错误。 不知道博主有木有遇到过这种问题,请教博主见解! 谢谢!! 回复 ↓
你用的kafka版本是? 我们使用 0.8.2.1 版本,基于 SimpleConsumer 自己控制offset的存储。 当consumer 请求 fetchRequest的时候 offset 如果不符合服务器端区间范围,可以自己进行一个调整,比如小于最小值则采用最小值,大于最大值则采用最大值(即之前的消息都忽略掉)。 回复 ↓
嗯,我们也是0.8.2.1。自己控制offset。 我的意思其实是 consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets(0) 这个代码的最后.offsets(0) 如果前面一段throws exception,这里会报错数组越界。 这个是怎么处理的呢? 回复 ↓
hongjiang,您好:
请教下,akka这部分,当因为数据量越来越大,需要类似集群那样来扩展时,你们在akka这块具体怎么弄的呢?因为听你演讲说没有用akka的集群,所以对这个感到奇怪,特地请教,谢谢!
hi,您好。请教一个kafka的问题,在获取kafka最大offset的时候可以这么写 :consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets(0)
可是consumer.getOffsetsBefore(request)返回的OffsetResponse可能会报OffsetOutOfRange、UnknownTopicOrPartition等error,网上很多捕获这个error就offset=0, 我觉得这样处理是不对的。
如果不处理,后面partitionErrorAndOffsets(topicAndPartition).offsets(0)
应该会报数组越界错误。
不知道博主有木有遇到过这种问题,请教博主见解!
谢谢!!
你用的kafka版本是?
我们使用 0.8.2.1 版本,基于 SimpleConsumer 自己控制offset的存储。
当consumer 请求 fetchRequest的时候 offset 如果不符合服务器端区间范围,可以自己进行一个调整,比如小于最小值则采用最小值,大于最大值则采用最大值(即之前的消息都忽略掉)。
嗯,我们也是0.8.2.1。自己控制offset。 我的意思其实是
consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets(0)
这个代码的最后.offsets(0) 如果前面一段throws exception,这里会报错数组越界。
这个是怎么处理的呢?
这估计是你构造的OffsetRequest范围不合理导致的吧,我没有遇到过这种情况。
可以先对 OffsetResponse 做一个检查,判断是否hasError