当前位置: 首页 > news >正文

Elasticsearch集群shard过多后导致的性能问题分析

1.问题现象

上午上班以后发现ES日志集群状态不正确,集群频繁地重新发起选主操作。对外不能正常提供数据查询服务,相关日志数据入库也产生较大延时

2.问题原因

相关日志

查看ES集群日志如下:

00:00:51开始集群各个节点与当时的master节点通讯超时

Timeleveldata
00:00:51.140WARNReceived response for a request that has timed out, sent [12806ms] ago, timed out [2802ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [864657514]
00:01:24.912WARNReceived response for a request that has timed out, sent [12205ms] ago, timed out [2201ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [143113108]
00:01:24.912WARNReceived response for a request that has timed out, sent [12206ms] ago, timed out [2201ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [835936906]
00:01:27.731WARNReceived response for a request that has timed out, sent [20608ms] ago, timed out [10604ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [137999525]
00:01:44.686WARNReceived response for a request that has timed out, sent [18809ms] ago, timed out [8804ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [143114372]
00:01:44.686WARNReceived response for a request that has timed out, sent [18643ms] ago, timed out [8639ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [835938242]
00:01:56.523WARNReceived response for a request that has timed out, sent [20426ms] ago, timed out [10423ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [137250155]
00:01:56.523WARNReceived response for a request that has timed out, sent [31430ms] ago, timed out [21426ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [137249119]

触发各个节点发起重新选主的操作

Timeleveldata
00:00:51.140WARNReceived response for a request that has timed out, sent [12806ms] ago, timed out [2802ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [864657514]
00:01:24.912WARNReceived response for a request that has timed out, sent [12206ms] ago, timed out [2201ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [835936906]
00:01:24.912WARNReceived response for a request that has timed out, sent [12205ms] ago, timed out [2201ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [143113108]
00:01:27.731WARNReceived response for a request that has timed out, sent [20608ms] ago, timed out [10604ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [137999525]
00:01:44.686WARNReceived response for a request that has timed out, sent [18643ms] ago, timed out [8639ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [835938242]
00:01:44.686WARNReceived response for a request that has timed out, sent [18809ms] ago, timed out [8804ms] ago, action [internal:coordination/fault_detection/leader_check], node [{hot}{tUvNI22CRAanSsJdircGlA}{crDi96kOQl6J944HZqNB0w}{131}{131:9300}{dim}{xpack.installed=true, box_type=hot}], id [143114372]

新的主节点被选出,但频繁在3个候选节点间切换,集群状态始终处于不稳定状态

Timeleveldata
00:52:37.264DEBUGexecuting cluster state update for [elected-as-master ([2] nodes joined)[{hot}{g7zfvt_3QI6cW6ugxIkSRw}{bELGusphTpy6RBeArNo8MA}{129}{129:9300}{dim}{xpack.installed=true, box_type=hot} elect leader, {hot}{GDyoKXPmQyC42JBjNP0tzA}{llkC7-LgQbi4BdcPiX_oOA}{130}{130:9300}{dim}{xpack.installed=true, box_type=hot} elect leader, _BECOME_MASTER_TASK_, _FINISH_ELECTION_]]
00:52:37.264TRACEwill process [elected-as-master ([2] nodes joined)[_FINISH_ELECTION_]]
00:52:37.264TRACEwill process [elected-as-master ([2] nodes joined)[_BECOME_MASTER_TASK_]]
00:52:37.264TRACEwill process [elected-as-master ([2] nodes joined)[{hot}{g7zfvt_3QI6cW6ugxIkSRw}{bELGusphTpy6RBeArNo8MA}{129}{129:9300}{dim}{xpack.installed=true, box_type=hot} elect leader]]
00:52:37.264TRACEwill process [elected-as-master ([2] nodes joined)[{hot}{GDyoKXPmQyC42JBjNP0tzA}{llkC7-LgQbi4BdcPiX_oOA}{130}{130:9300}{dim}{xpack.installed=true, box_type=hot} elect leader]]
00:52:37.584DEBUGtook [200ms] to compute cluster state update for [elected-as-master ([2] nodes joined)[{hot}{g7zfvt_3QI6cW6ugxIkSRw}{bELGusphTpy6RBeArNo8MA}{129}{129:9300}{dim}{xpack.installed=true, box_type=hot} elect leader, {hot}{GDyoKXPmQyC42JBjNP0tzA}{llkC7-LgQbi4BdcPiX_oOA}{130}{130:9300}{dim}{xpack.installed=true, box_type=hot} elect leader, _BECOME_MASTER_TASK_, _FINISH_ELECTION_]]
00:52:37.828TRACEcluster state updated, source [elected-as-master ([2] nodes joined)[{hot}{g7zfvt_3QI6cW6ugxIkSRw}{bELGusphTpy6RBeArNo8MA}{129}{129:9300}{dim}{xpack.installed=true, box_type=hot} elect leader, {hot}{GDyoKXPmQyC42JBjNP0tzA}{llkC7-LgQbi4BdcPiX_oOA}{130}{130:9300}{dim}{xpack.installed=true, box_type=hot} elect leader, _BECOME_MASTER_TASK_, _FINISH_ELECTION_]]

问题分析

综合上述日志、集群状态及近期所做的操作后,发现这是由于为解决前期ES集群SSD磁盘IO不均,部分磁盘达到IO上限的问题,为平衡各节点、各SSD磁盘的IO,将index的shard均匀分配至每个节点的每块SSD上,增加了在每个节点上的shard分配数量。这虽然避免了热点盘的问题,有效地均衡了磁盘IO,但导致了shard数目的快速增加 (之前集群shard总数一般控制在2万左右,出现问题时集群shard数目接近6万)进而触发如下ES bug(该bug在ES 7.6及以上版本被修复),导致平时可以在短时间内正常完成的处理(freeze index,delete index,create index)长时间不能完成,同时造成master节点负载过高,最终出现大量处理超时等错误:

  • https://github.com/elastic/elasticsearch/pull/47817

  • https://github.com/elastic/elasticsearch/issues/46941

  • https://github.com/elastic/elasticsearch/pull/48579

这3个bug所表述的事情是同一个,即:为了确定节点中一个shard是否需要发生移动,ES集群需要查看集群中所有shard是否处于RELOCATING或者INITIALIZING状态,以获取其shard的大小。在bug未修复版本中,集群里的每个shard都会重复上述操作,而这些工作都由master节点通过实时计算来完成。当集群的shard数增多后,master节点计算工作量会急剧上升,从而导致master节点处理缓慢,引发一系列的问题。由于集群shard数上升,导致master节点的工作负载急剧上升,出现相关处理缓慢的情况,进而导致以下问题:

(1)Master节点由于负载过高长时间不能响应其他节点的请求导致超时,进而触发集群重新选主,但由于新选出的Master仍然不能承载集群相关工作,再次导致超时,再次触发重新选主,周而复始,最后集群异常。

(2)Master节点处理缓慢,导致大面积作业堆积(冷冻索引、创建索引、删除索引、数据迁移等作业)

该问题最早是由华为工程师发现并提交社区的,相关堆栈信息为:

"elasticsearch[iZ2ze1ymtwjqspsn3jco0tZ][masterService#updateTask][T#1]" #39 daemon prio=5 os_prio=0 cpu=150732651.74ms elapsed=258053.43s tid=0x00007f7c98012000 nid=0x3006 runnable  [0x00007f7ca28f8000]java.lang.Thread.State: RUNNABLEat java.util.Collections$UnmodifiableCollection$1.hasNext(java.base@13/Collections.java:1046)at org.elasticsearch.cluster.routing.RoutingNode.shardsWithState(RoutingNode.java:148)at org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider.sizeOfRelocatingShards(DiskThresholdDecider.java:111)at org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider.getDiskUsage(DiskThresholdDecider.java:345)at org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider.canRemain(DiskThresholdDecider.java:290)at org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders.canRemain(AllocationDeciders.java:108)at org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator$Balancer.decideMove(BalancedShardsAllocator.java:668)at org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator$Balancer.moveShards(BalancedShardsAllocator.java:628)at org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.allocate(BalancedShardsAllocator.java:123)at org.elasticsearch.cluster.routing.allocation.AllocationService.reroute(AllocationService.java:405)at org.elasticsearch.cluster.routing.allocation.AllocationService.reroute(AllocationService.java:370)at org.elasticsearch.cluster.metadata.MetaDataIndexStateService$1$1.execute(MetaDataIndexStateService.java:168)at org.elasticsearch.cluster.ClusterStateUpdateTask.execute(ClusterStateUpdateTask.java:47)at org.elasticsearch.cluster.service.MasterService.executeTasks(MasterService.java:702)at org.elasticsearch.cluster.service.MasterService.calculateTaskOutputs(MasterService.java:324)at org.elasticsearch.cluster.service.MasterService.runTasks(MasterService.java:219)at org.elasticsearch.cluster.service.MasterService.access$000(MasterService.java:73)at org.elasticsearch.cluster.service.MasterService$Batcher.run(MasterService.java:151)at org.elasticsearch.cluster.service.TaskBatcher.runIfNotProcessed(TaskBatcher.java:150)at org.elasticsearch.cluster.service.TaskBatcher$BatchedTask.run(TaskBatcher.java:188)at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:703)at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:252)at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:215)at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@13/ThreadPoolExecutor.java:1128)at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@13/ThreadPoolExecutor.java:628)at java.lang.Thread.run(java.base@13/Thread.java:830)

    /*** Determine the shards with a specific state* @param states set of states which should be listed* @return List of shards*/public List<ShardRouting> shardsWithState(ShardRoutingState... states) {List<ShardRouting> shards = new ArrayList<>();for (ShardRouting shardEntry : this) {for (ShardRoutingState state : states) {if (shardEntry.state() == state) {shards.add(shardEntry);}}}return shards;}

在shardsWithState中会对所有shard进行遍历找到符合状态的shard,并返回。在ES7.2后由于pr#39499功能的引入,导致即使index被关闭也将被统计,随着集群shard数的增加需要遍历的工作量急剧增加,导致处理缓慢

下面是ES官方给出的统计数据:

ShardsNodesShards per nodeReroute time without relocationsReroute time with relocations
60000106000~250ms~15000ms
60000601000~250ms~4000ms
10000101000~60ms~250ms

由此可见即使在正常情况下,随着集群shard数的增加系统的处理耗时也是在快速增加的,需要进行优化

代码改进

为修复该问题,在新版本的ES中修改了RoutingNode的结构,在原来的基础上新增了两个LinkedHashSet结构的initializingShards和relocatingShards,分别用来存储INITIALIZING状态和RELOCATING状态的shard。在其构造函数中添加了对shard分类的逻辑,将INITIALIZING状态和RELOCATING状态的shard信息分别存储在两个LinkedHashSet结构中,具体代码如下:

+   private final LinkedHashSet<ShardRouting> initializingShards;
+   private final LinkedHashSet<ShardRouting> relocatingShards;RoutingNode(String nodeId, DiscoveryNode node, LinkedHashMap<ShardId, ShardRouting> shards) {this.nodeId = nodeId;this.node = node;this.shards = shards;
+       this.relocatingShards = new LinkedHashSet<>();
+       this.initializingShards = new LinkedHashSet<>();
+       for (ShardRouting shardRouting : shards.values()) {
+           if (shardRouting.initializing()) {
+               initializingShards.add(shardRouting);
+           } else if (shardRouting.relocating()) {
+               relocatingShards.add(shardRouting);
+           }
+        }
+       assert invariant();
}

由于RoutingNode的结构中新增了initializingShards和relocatingShards,所以其add、update、remove、numberOfShardsWithState和shardsWithState也需要同步做改动,具体如下:

void add(ShardRouting shard) {
+       assert invariant();if (shards.containsKey(shard.shardId())) {throw new IllegalStateException("Trying to add a shard " + shard.shardId() + " to a node [" + nodeId+ "] where it already exists. current [" + shards.get(shard.shardId()) + "]. new [" + shard + "]");}shards.put(shard.shardId(), shard);+       if (shard.initializing()) {
+           initializingShards.add(shard);
+       } else if (shard.relocating()) {
+           relocatingShards.add(shard);
+       }
+       assert invariant();}

void update(ShardRouting oldShard, ShardRouting newShard) {
+       assert invariant();if (shards.containsKey(oldShard.shardId()) == false) {// Shard was already removed by routing nodes iterator// TODO: change caller logic in RoutingNodes so that this check can go awayreturn;}ShardRouting previousValue = shards.put(newShard.shardId(), newShard);assert previousValue == oldShard : "expected shard " + previousValue + " but was " + oldShard;+       if (oldShard.initializing()) {
+           boolean exist = initializingShards.remove(oldShard);
+           assert exist : "expected shard " + oldShard + " to exist in initializingShards";
+       } else if (oldShard.relocating()) {
+           boolean exist = relocatingShards.remove(oldShard);
+           assert exist : "expected shard " + oldShard + " to exist in relocatingShards";
+       }
+       if (newShard.initializing()) {
+           initializingShards.add(newShard);
+       } else if (newShard.relocating()) {
+           relocatingShards.add(newShard);
+       }
+       assert invariant();}

void remove(ShardRouting shard) {
+       assert invariant();ShardRouting previousValue = shards.remove(shard.shardId());assert previousValue == shard : "expected shard " + previousValue + " but was " + shard;
+       if (shard.initializing()) {
+           boolean exist = initializingShards.remove(shard);
+           assert exist : "expected shard " + shard + " to exist in initializingShards";
+       } else if (shard.relocating()) {
+           boolean exist = relocatingShards.remove(shard);
+           assert exist : "expected shard " + shard + " to exist in relocatingShards";
+       }
+       assert invariant();
+    }

public int numberOfShardsWithState(ShardRoutingState... states) {
+       if (states.length == 1) {
+           if (states[0] == ShardRoutingState.INITIALIZING) {
+               return initializingShards.size();
+           } else if (states[0] == ShardRoutingState.RELOCATING) {
+               return relocatingShards.size();
+           }
+       }int count = 0;for (ShardRouting shardEntry : this) {for (ShardRoutingState state : states) {if (shardEntry.state() == state) {count++;}}}return count;}
public List<ShardRouting> shardsWithState(String index, ShardRoutingState... states) {List<ShardRouting> shards = new ArrayList<>();+       if (states.length == 1) {
+           if (states[0] == ShardRoutingState.INITIALIZING) {
+               for (ShardRouting shardEntry : initializingShards) {
+                if (shardEntry.getIndexName().equals(index) == false) {
+                    continue;
+                }
+                shards.add(shardEntry);
+            }
+            return shards;
+        } else if (states[0] == ShardRoutingState.RELOCATING) {
+            for (ShardRouting shardEntry : relocatingShards) {
+                if (shardEntry.getIndexName().equals(index) == false) {
+                    continue;
+                }
+                shards.add(shardEntry);
+            }
+            return shards;
+          }
+       }for (ShardRouting shardEntry : this) {if (!shardEntry.getIndexName().equals(index)) {continue;}for (ShardRoutingState state : states) {if (shardEntry.state() == state) {shards.add(shardEntry);}}}return shards;}
    public int numberOfOwningShards() {
-        int count = 0;
-        for (ShardRouting shardEntry : this) {
-            if (shardEntry.state() != ShardRoutingState.RELOCATING) {
-                count++;
-            }
-        }
-
-        return count;
+        return shards.size() - relocatingShards.size();}+    private boolean invariant() {
+    
+        // initializingShards must consistent with that in shards
+        Collection<ShardRouting> shardRoutingsInitializing =
+            shards.values().stream().filter(ShardRouting::initializing).collect(Collectors.toList());
+        assert initializingShards.size() == shardRoutingsInitializing.size();
+        assert initializingShards.containsAll(shardRoutingsInitializing);+        // relocatingShards must consistent with that in shards
+        Collection<ShardRouting> shardRoutingsRelocating =
+            shards.values().stream().filter(ShardRouting::relocating).collect(Collectors.toList());
+        assert relocatingShards.size() == shardRoutingsRelocating.size();
+        assert relocatingShards.containsAll(shardRoutingsRelocating);+        return true;
+    }
  • 上面的add、update、remove方法的开始和结尾处都添加了assert invariant(),这个确保了initializingShards和relocatingShards中存储的INITIALIZING状态和RELOCATING状态的shard在任何时候都是最新的,但是,随着shard的数量级的增长,invariant()方法花费的时间也会增大,所以在shard进行add、update、remove操作时所耗费的时间也会增大。

  • 该修复通过使用两个LinkedHashSet结构来存储initializingShards和relocatingShards的信息,同时在每次shard更新时同步更新LinkedHashSet里面的信息,由此降低了每次使用时都需要重新统计全量shard信息的开销,提高了处理效率。该问题在ES 7.2-7.5间的版本上,当集群shard超过50000以上就极有可能触发。BUG在ES 7.6上被修复。

3.问题处理

当时为快速恢复服务,对集群进行了重启操作。但集群相关作业处理仍然很慢,整个恢复过程持续很长时间。后续我们的处理方法是:

  • 临时设置设置集群参数"cluster.routing.allocation.disk.include_relocations":"false"(不推荐使用,在ES 7.5后该参数被废弃。在磁盘使用率接近高水位时会出现错误的计算,导致频繁的数据迁移)

  • 减少集群的shard数目,缩短在线数据查询时间范围为最近20天,目前控制集群shard总数在5万左右

上面的处理方法只能缓解问题,没有从根本上解决,如果要解决该问题可以进行以下处理:

  • 升级ES的版本至已修复bug的版本

  • 控制集群总shard数目在合理范围内

http://www.lryc.cn/news/130814.html

相关文章:

  • Unity框架学习--5 事件中心管理器
  • (二)结构型模式:3、过滤器模式(Filter、Criteria Pattern)(C++示例)
  • 谷歌在Chrome浏览器中推进抗量子加密技术
  • Kotlin的数组
  • centos 安装docker
  • Oracle-如何判断字符串包含中文字符串(汉字),删除中文内容及保留中文内容
  • File 类的用法, InputStream和Reader, OutputStream和Writer 的用法
  • AtCoder Beginner Contest 315 Task:A/B/C/E
  • 【项目实践】基于LSTM的一维数据扩展与预测
  • webshell实践,在nginx上实现负载均衡
  • LVS+Keepalived集群
  • Java的网络编程
  • kafka配置远程连接
  • css实现渐变色border
  • 管理 IBM Spectrum LSF
  • 117页数字化转型与产业互联网发展趋势及机会分析报告PPT
  • 【JavaWeb】实训的长篇笔记(上)
  • 如何使用Docker安装AWVS?
  • vue命名规范
  • 第05天 SpringBoot自动配置原理
  • AlphaZero能否从围棋和国际象棋飞跃到量子计算?
  • 进程切换
  • ES踩坑记录之UNASSIGNED分片无法恢复
  • ubuntu更换国内apt源
  • OpenCV-Python中的图像处理-视频分析
  • STM32 CubeMX (第四步Freertos内存管理和CPU使用率)
  • 题解 | #1012.Equalize the Array# 2023杭电暑期多校10
  • UE4/5C++多线程插件制作(二十一、使用)
  • 【C#】关于?的用法
  • linux——mysql的高可用MHA