当前位置: 首页 > news >正文

【ES实战】ES客户端线程量分析

文章目录

  • ES客户端线程量分析
    • Transport Client的主要线程情况
      • ES工作线程池
      • Netty网络通信线程
    • Rest Client主要线程情况

ES客户端线程量分析

Transport Client的主要线程情况

Version 5.6.1

ES工作线程池

每个Transport Client都会创建一系列的ES线程池,来处理任务。管理平台为了兼容安全特性,使用PreBuiltXPackTransportClient

在这里插入图片描述

PreBuiltXPackTransportClient到TransportClient的继承关系

继承
继承
TransportClient
PreBuiltTransportClient
PreBuiltXPackTransportClient

PreBuiltXPackTransportClient进行实例化时,最终会调用下方TransportClient的构造方法

    /*** Creates a new TransportClient with the given settings, defaults and plugins.* @param settings the client settings* @param defaultSettings default settings that are merged after the plugins have added it's additional settings.* @param plugins the client plugins*/protected TransportClient(Settings settings, Settings defaultSettings, Collection<Class<? extends Plugin>> plugins,HostFailureListener hostFailureListener) {this(buildTemplate(settings, defaultSettings, plugins, hostFailureListener));}

buildTemplate方法内会创建ThreadPool实例

final ThreadPool threadPool = new ThreadPool(settings);

初始化ThreadPool对象时会创建线程池

    public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {super(settings);assert Node.NODE_NAME_SETTING.exists(settings);final Map<String, ExecutorBuilder> builders = new HashMap<>();// 根据服务器CPU数量,确认可用处理器数量,最大不超过32final int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);// 处理器数量的一半,最多5个,最少1个final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors);// 处理器数量的一半,最多10个,最少1个final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);// 处理器数量的4倍,最多512个,最少128个final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200));builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); // now that we reuse bulk for index/delete opsbuilders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000));builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));// no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded// the assumption here is that the listeners should be very lightweight on the listeners sidebuilders.put(Names.LISTENER, new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1));builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));builders.put(Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1));builders.put(Names.FETCH_SHARD_STORE, new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));for (final ExecutorBuilder<?> builder : customBuilders) {if (builders.containsKey(builder.name())) {throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists");}builders.put(builder.name(), builder);}this.builders = Collections.unmodifiableMap(builders);threadContext = new ThreadContext(settings);final Map<String, ExecutorHolder> executors = new HashMap<>();for (@SuppressWarnings("unchecked") final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) {final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings);final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);if (executors.containsKey(executorHolder.info.getName())) {throw new IllegalStateException("duplicate executors with name [" + executorHolder.info.getName() + "] registered");}logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info));executors.put(entry.getKey(), executorHolder);}// DIRECT_EXECUTOR是个AbstractExecutorService实例executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT)));this.executors = unmodifiableMap(executors);// ScheduledThreadPoolExecutor 线程固定1个this.scheduler = new ScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);this.scheduler.setRemoveOnCancelPolicy(true);TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings);// CachedTimeThread线程对象1个this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());this.cachedTimeThread.start();}

线程池类型

  • fixed:固定数量的线程池
  • scaling:线程数量介于core与max参数之间变化的线程池
  • direct:直接执行线程池,直接执行Runnable

availableProcessors的取值:

 Math.min(32, Runtime.getRuntime().availableProcessors())
线程池名线程池类型core(size)maxJVM线程栈中名称前缀
genericscaling44*availableProcessors
[128,512]
elasticsearch[_client_][generic]
listenerfixedavailableProcessorsavailableProcessorselasticsearch[_client_][listener]
getfixedavailableProcessorsavailableProcessorselasticsearch[_client_][get]
indexfixedavailableProcessorsavailableProcessorselasticsearch[_client_][index]
bulkfixedavailableProcessorsavailableProcessorselasticsearch[_client_][bulk]
searchfixed((availableProcessors * 3) / 2) + 1((availableProcessors * 3) / 2) + 1elasticsearch[_client_][search]
managementscaling15elasticsearch[_client_][management]
flushscaling1availableProcessors/2
[1,5]
elasticsearch[_client_][flush]
refreshscaling1availableProcessors/2
[1,10]
elasticsearch[_client_][refresh]
warmerscaling1availableProcessors/2
[1,5]
elasticsearch[_client_][warmer]
snapshotscaling1availableProcessors/2
[1,5]
elasticsearch[_client_][snapshot]
force_mergefixed11elasticsearch[_client_][force_merge]
fetch_shard_startedscaling12 * availableProcessorselasticsearch[_client_][fetch_shard_started]
fetch_shard_storescaling12 * availableProcessorselasticsearch[_client_][fetch_shard_store]
samedirect--elasticsearch[_client_][same]

其他线程

  • CachedTimeThread:个数固定1个,JVM线程栈中的名称前缀elasticsearch[_client_][[timer]]
  • scheduler:使用ScheduledThreadPoolExecutor, 线程固定1个,JVM线程栈中的名称前缀elasticsearch[_client_][scheduler]

Netty网络通信线程

在JVM堆栈中发现大量名称以elasticsearch[_client_][transport_client_boss]开头的线程,示例:

"elasticsearch[_client_][transport_client_boss][T#1]" #387106 daemon prio=5 os_prio=0 tid=0x00007f79a422e000 nid=0x3bc55d runnable [0x00007f78c98d7000]
java.lang.Thread.State: RUNNABLEat sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)- locked <0x0000000622f76b90> (a sun.nio.ch.Util$3)- locked <0x0000000622f76b78> (a java.util.Collections$UnmodifiableSet)- locked <0x0000000613be11a0> (a sun.nio.ch.EPollSelectorImpl)at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:752)at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:408)at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)at java.lang.Thread.run(Thread.java:750)

buildTemplate方法内会创建NetworkModule实例

  NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null);

NetworkModule实例中会创建Netty的客户端进行网络通信

    /*** Creates a network module that custom networking classes can be plugged into.* @param settings The settings for the node* @param transportClient True if only transport classes should be allowed to be registered, false otherwise.*/public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlugin> plugins, ThreadPool threadPool,BigArrays bigArrays,CircuitBreakerService circuitBreakerService,NamedWriteableRegistry namedWriteableRegistry,NamedXContentRegistry xContentRegistry,NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) {this.settings = settings;this.transportClient = transportClient;registerTransport(LOCAL_TRANSPORT, () -> new LocalTransport(settings, threadPool, namedWriteableRegistry, circuitBreakerService));for (NetworkPlugin plugin : plugins) {if (transportClient == false && HTTP_ENABLED.get(settings)) {Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays,circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, dispatcher);for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet()) {registerHttpTransport(entry.getKey(), entry.getValue());}}// getTransports会调用Netty4Plugin创建Netty4TransportMap<String, Supplier<Transport>> httpTransportFactory = plugin.getTransports(settings, threadPool, bigArrays,circuitBreakerService, namedWriteableRegistry, networkService);for (Map.Entry<String, Supplier<Transport>> entry : httpTransportFactory.entrySet()) {registerTransport(entry.getKey(), entry.getValue());}List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(threadPool.getThreadContext());for (TransportInterceptor interceptor : transportInterceptors) {registerTransportInterceptor(interceptor);}}}

Netty4Plugin创建Netty4Transport

@Override
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,CircuitBreakerService circuitBreakerService,NamedWriteableRegistry namedWriteableRegistry,NetworkService networkService) {return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, threadPool, networkService, bigArrays,namedWriteableRegistry, circuitBreakerService));
}

Netty4Transport实例化的时候会设置workCount,默认是2*availableProcessors

public Netty4Transport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {super("netty", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings));this.workerCount = WORKER_COUNT.get(settings);.........}

启动Netty客户端启动时,会执行createBootstrap,来创建固定数量的线程作为Netty客户端的NIO工作线程进行工作。

 private Bootstrap createBootstrap() {final Bootstrap bootstrap = new Bootstrap();// 默认并非阻塞客户端if (TCP_BLOCKING_CLIENT.get(settings)) {bootstrap.group(new OioEventLoopGroup(1, daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX)));bootstrap.channel(OioSocketChannel.class);} else {// 线程标识transport_client_boss,线程数量workCountbootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings, TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX)));bootstrap.channel(NioSocketChannel.class);}.........}

Rest Client主要线程情况

Version 6.7.2

在JVM堆栈中发现大量名称以I/O dispatcher开头的线程,示例:

"I/O dispatcher 85700" #387097 prio=5 os_prio=0 tid=0x00007f7970080000 nid=0x3bc554 runnable [0x00007f78ca7e6000]
java.lang.Thread.State: RUNNABLEat sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)- locked <0x0000000623151970> (a sun.nio.ch.Util$3)- locked <0x0000000623151958> (a java.util.Collections$UnmodifiableSet)- locked <0x0000000613bec330> (a sun.nio.ch.EPollSelectorImpl)at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:255)at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)at java.lang.Thread.run(Thread.java:750)

在这里插入图片描述

在类RestClientBuilder中,实现基于CloseableHttpAsyncClient实例构建RestClient实例对象。

    /*** Creates a new {@link RestClient} based on the provided configuration.*/public RestClient build() {if (failureListener == null) {failureListener = new RestClient.FailureListener();}// CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(new PrivilegedAction<CloseableHttpAsyncClient>() {@Overridepublic CloseableHttpAsyncClient run() {return createHttpClient();}});RestClient restClient = new RestClient(httpClient, maxRetryTimeout, defaultHeaders, nodes,pathPrefix, failureListener, nodeSelector, strictDeprecationMode);// httpClient.start();return restClient;}

通过下面的简化类图,说明Rest客户端使用的 IO线程数量在IOReactorConfig类中设置,其内部类BuilderBuilder()方法中对赋值this.ioThreadCount = IOReactorConfig.AVAIL_PROCS;,默认数量也是服务器的CPU内核数。

«abstract»
CloseableHttpAsyncClient
Builder
- int ioThreadCount
Builder()
IOReactorConfig
-int AVAIL_PROCS = Runtime.getRuntime()
+IOReactorConfig DEFAULT =(new Builder()
HttpAsyncClientBuilder
RestClient
RestClientBuilder

核心管理使用的对象们在HttpAsyncClientBuilder类中进行创建。

public CloseableHttpAsyncClient build() {.........// 创建DefaultConnectingIOReactor对象,会去执行AbstractMultiworkerIOReactor的构造方法ConnectingIOReactor ioreactor = IOReactorUtils.create(this.defaultIOReactorConfig != null ? this.defaultIOReactorConfig : IOReactorConfig.DEFAULT, this.threadFactory);//  poolingmgr中有ConnectingIOReactor处理IOEventDispatch        PoolingNHttpClientConnectionManager poolingmgr = new PoolingNHttpClientConnectionManager(ioreactor, RegistryBuilder.create().register("http", NoopIOSessionStrategy.INSTANCE).register("https", reuseStrategy).build());.........return new InternalHttpAsyncClient((NHttpClientConnectionManager)connManager, (ConnectionReuseStrategy)reuseStrategy, (ConnectionKeepAliveStrategy)keepAliveStrategy, threadFactory, (NHttpClientEventHandler)eventHandler, exec, (Lookup)cookieSpecRegistry, (Lookup)authSchemeRegistry, (CookieStore)defaultCookieStore, (CredentialsProvider)defaultCredentialsProvider, defaultRequestConfig);
}
继承
实现
«Abstract»
AbstractMultiworkerIOReactor
«interface»
ConnectingIOReactor
DefaultConnectingIOReactor
PoolingNHttpClientConnectionManager
HttpAsyncClientBuilder
    public AbstractMultiworkerIOReactor(final IOReactorConfig config,final ThreadFactory threadFactory) throws IOReactorException {super();this.config = config != null ? config : IOReactorConfig.DEFAULT;this.params = new BasicHttpParams();try {this.selector = Selector.open();} catch (final IOException ex) {throw new IOReactorException("Failure opening selector", ex);}this.selectTimeout = this.config.getSelectInterval();this.interestOpsQueueing = this.config.isInterestOpQueued();this.statusLock = new Object();if (threadFactory != null) {this.threadFactory = threadFactory;} else {this.threadFactory = new DefaultThreadFactory();}this.auditLog = new ArrayList<ExceptionEvent>();// this.workerCount的值就是this.ioThreadCount = IOReactorConfig.AVAIL_PROCS;this.workerCount = this.config.getIoThreadCount();this.dispatchers = new BaseIOReactor[workerCount];this.workers = new Worker[workerCount];this.threads = new Thread[workerCount];this.status = IOReactorStatus.INACTIVE;}

具体创建线程创建执行在AbstractMultiworkerIOReactorexecute(final IOEventDispatch eventDispatch)方法中。

   public void execute(final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {Args.notNull(eventDispatch, "Event dispatcher");synchronized (this.statusLock) {if (this.status.compareTo(IOReactorStatus.SHUTDOWN_REQUEST) >= 0) {this.status = IOReactorStatus.SHUT_DOWN;this.statusLock.notifyAll();return;}Asserts.check(this.status.compareTo(IOReactorStatus.INACTIVE) == 0,"Illegal state %s", this.status);this.status = IOReactorStatus.ACTIVE;// Start I/O dispatchersfor (int i = 0; i < this.dispatchers.length; i++) {final BaseIOReactor dispatcher = new BaseIOReactor(this.selectTimeout, this.interestOpsQueueing);dispatcher.setExceptionHandler(exceptionHandler);this.dispatchers[i] = dispatcher;}for (int i = 0; i < this.workerCount; i++) {final BaseIOReactor dispatcher = this.dispatchers[i];this.workers[i] = new Worker(dispatcher, eventDispatch);this.threads[i] = this.threadFactory.newThread(this.workers[i]);}}try {for (int i = 0; i < this.workerCount; i++) {if (this.status != IOReactorStatus.ACTIVE) {return;}this.threads[i].start();}for (;;) {final int readyCount;try {readyCount = this.selector.select(this.selectTimeout);} catch (final InterruptedIOException ex) {throw ex;} catch (final IOException ex) {throw new IOReactorException("Unexpected selector failure", ex);}if (this.status.compareTo(IOReactorStatus.ACTIVE) == 0) {processEvents(readyCount);}// Verify I/O dispatchersfor (int i = 0; i < this.workerCount; i++) {final Worker worker = this.workers[i];final Exception ex = worker.getException();if (ex != null) {throw new IOReactorException("I/O dispatch worker terminated abnormally", ex);}}if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {break;}}} catch (final ClosedSelectorException ex) {addExceptionEvent(ex);} catch (final IOReactorException ex) {if (ex.getCause() != null) {addExceptionEvent(ex.getCause());}throw ex;} finally {doShutdown();synchronized (this.statusLock) {this.status = IOReactorStatus.SHUT_DOWN;this.statusLock.notifyAll();}}}
http://www.lryc.cn/news/582920.html

相关文章:

  • 从 .proto 到 Python:使用 Protocol Buffers 的完整实践指南
  • 实战Linux进程状态观察:R、S、D、T、Z状态详解与实验模拟
  • 蓝桥杯 第十六届(2025)真题思路复盘解析
  • 50天50个小项目 (Vue3 + Tailwindcss V4) ✨ | StickyNavbar(粘性导航栏)
  • SPI / I2C / UART 哪个更适合初学者?
  • 【C++】AVL树底层思想 and 大厂面试
  • 27.移除元素(快慢指针)
  • AI大模型应用-Ollama本地千问大模型stream流乱码
  • HDLBits刷题笔记和一些拓展知识(十一)
  • 学习设计模式《十七》——状态模式
  • 美团Java面试分享
  • 基于模板设计模式开发优惠券推送功能以及对过期优惠卷进行定时清理
  • 在Docker中安装nexus3(作为maven私服)
  • [创业之路-489]:企业经营层 - 营销 - 如何将缺点转化为特点、再将特点转化为卖点
  • Java基础回顾(1)
  • 【无标题】导出pdf
  • Spring Boot 企业项目技术选型
  • Splunk练习 Boss of the SOC V1
  • JVM本地内存的使用监控情况
  • JVM 为什么使用元空间(Metaspace)替换了永久代(PermGen)?——深入理解 Java 方法区与类元数据存储的演进
  • 征程 6|工具链量化简介与代码实操
  • Redis 缓存进阶篇,缓存真实数据和缓存文件指针最佳实现?如何选择?
  • 当Powerbi遇到quickbi,性能优化方式对比
  • 玩具语音方案选型决策OTP vs Flash 的成本功耗与灵活性
  • BERT代码简单笔记
  • 台式电脑如何连wifi 快速连接方法
  • 无缝高清矩阵与画面分割器的区别
  • numpy数据分析知识总结
  • Web前端:not(否定伪类选择器)
  • boost中boost::noncopyalbe和boost::ignore_unused的使用详解和实战示例