【ES实战】ES客户端线程量分析
文章目录
- ES客户端线程量分析
- Transport Client的主要线程情况
- ES工作线程池
- Netty网络通信线程
- Rest Client主要线程情况
ES客户端线程量分析
Transport Client的主要线程情况
Version 5.6.1
ES工作线程池
每个Transport Client都会创建一系列的ES线程池,来处理任务。管理平台为了兼容安全特性,使用PreBuiltXPackTransportClient
。
PreBuiltXPackTransportClient到TransportClient的继承关系
在PreBuiltXPackTransportClient
进行实例化时,最终会调用下方TransportClient的构造方法
/*** Creates a new TransportClient with the given settings, defaults and plugins.* @param settings the client settings* @param defaultSettings default settings that are merged after the plugins have added it's additional settings.* @param plugins the client plugins*/protected TransportClient(Settings settings, Settings defaultSettings, Collection<Class<? extends Plugin>> plugins,HostFailureListener hostFailureListener) {this(buildTemplate(settings, defaultSettings, plugins, hostFailureListener));}
buildTemplate方法内会创建ThreadPool实例
final ThreadPool threadPool = new ThreadPool(settings);
初始化ThreadPool对象时会创建线程池
public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {super(settings);assert Node.NODE_NAME_SETTING.exists(settings);final Map<String, ExecutorBuilder> builders = new HashMap<>();// 根据服务器CPU数量,确认可用处理器数量,最大不超过32final int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);// 处理器数量的一半,最多5个,最少1个final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors);// 处理器数量的一半,最多10个,最少1个final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);// 处理器数量的4倍,最多512个,最少128个final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200));builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); // now that we reuse bulk for index/delete opsbuilders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000));builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));// no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded// the assumption here is that the listeners should be very lightweight on the listeners sidebuilders.put(Names.LISTENER, new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1));builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));builders.put(Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1));builders.put(Names.FETCH_SHARD_STORE, new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));for (final ExecutorBuilder<?> builder : customBuilders) {if (builders.containsKey(builder.name())) {throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists");}builders.put(builder.name(), builder);}this.builders = Collections.unmodifiableMap(builders);threadContext = new ThreadContext(settings);final Map<String, ExecutorHolder> executors = new HashMap<>();for (@SuppressWarnings("unchecked") final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) {final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings);final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);if (executors.containsKey(executorHolder.info.getName())) {throw new IllegalStateException("duplicate executors with name [" + executorHolder.info.getName() + "] registered");}logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info));executors.put(entry.getKey(), executorHolder);}// DIRECT_EXECUTOR是个AbstractExecutorService实例executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT)));this.executors = unmodifiableMap(executors);// ScheduledThreadPoolExecutor 线程固定1个this.scheduler = new ScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);this.scheduler.setRemoveOnCancelPolicy(true);TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings);// CachedTimeThread线程对象1个this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());this.cachedTimeThread.start();}
线程池类型
fixed
:固定数量的线程池scaling
:线程数量介于core与max参数之间变化的线程池direct
:直接执行线程池,直接执行Runnable
availableProcessors的取值:
Math.min(32, Runtime.getRuntime().availableProcessors())
线程池名 | 线程池类型 | core(size) | max | JVM线程栈中名称前缀 |
---|---|---|---|---|
generic | scaling | 4 | 4*availableProcessors [128,512] | elasticsearch[_client_][generic] |
listener | fixed | availableProcessors | availableProcessors | elasticsearch[_client_][listener] |
get | fixed | availableProcessors | availableProcessors | elasticsearch[_client_][get] |
index | fixed | availableProcessors | availableProcessors | elasticsearch[_client_][index] |
bulk | fixed | availableProcessors | availableProcessors | elasticsearch[_client_][bulk] |
search | fixed | ((availableProcessors * 3) / 2) + 1 | ((availableProcessors * 3) / 2) + 1 | elasticsearch[_client_][search] |
management | scaling | 1 | 5 | elasticsearch[_client_][management] |
flush | scaling | 1 | availableProcessors/2 [1,5] | elasticsearch[_client_][flush] |
refresh | scaling | 1 | availableProcessors/2 [1,10] | elasticsearch[_client_][refresh] |
warmer | scaling | 1 | availableProcessors/2 [1,5] | elasticsearch[_client_][warmer] |
snapshot | scaling | 1 | availableProcessors/2 [1,5] | elasticsearch[_client_][snapshot] |
force_merge | fixed | 1 | 1 | elasticsearch[_client_][force_merge] |
fetch_shard_started | scaling | 1 | 2 * availableProcessors | elasticsearch[_client_][fetch_shard_started] |
fetch_shard_store | scaling | 1 | 2 * availableProcessors | elasticsearch[_client_][fetch_shard_store] |
same | direct | - | - | elasticsearch[_client_][same] |
其他线程
CachedTimeThread
:个数固定1个,JVM线程栈中的名称前缀elasticsearch[_client_][[timer]]
scheduler
:使用ScheduledThreadPoolExecutor, 线程固定1个,JVM线程栈中的名称前缀elasticsearch[_client_][scheduler]
Netty网络通信线程
在JVM堆栈中发现大量名称以
elasticsearch[_client_][transport_client_boss]
开头的线程,示例:"elasticsearch[_client_][transport_client_boss][T#1]" #387106 daemon prio=5 os_prio=0 tid=0x00007f79a422e000 nid=0x3bc55d runnable [0x00007f78c98d7000] java.lang.Thread.State: RUNNABLEat sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)- locked <0x0000000622f76b90> (a sun.nio.ch.Util$3)- locked <0x0000000622f76b78> (a java.util.Collections$UnmodifiableSet)- locked <0x0000000613be11a0> (a sun.nio.ch.EPollSelectorImpl)at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:752)at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:408)at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)at java.lang.Thread.run(Thread.java:750)
buildTemplate方法内会创建NetworkModule实例
NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null);
NetworkModule实例中会创建Netty的客户端进行网络通信
/*** Creates a network module that custom networking classes can be plugged into.* @param settings The settings for the node* @param transportClient True if only transport classes should be allowed to be registered, false otherwise.*/public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlugin> plugins, ThreadPool threadPool,BigArrays bigArrays,CircuitBreakerService circuitBreakerService,NamedWriteableRegistry namedWriteableRegistry,NamedXContentRegistry xContentRegistry,NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) {this.settings = settings;this.transportClient = transportClient;registerTransport(LOCAL_TRANSPORT, () -> new LocalTransport(settings, threadPool, namedWriteableRegistry, circuitBreakerService));for (NetworkPlugin plugin : plugins) {if (transportClient == false && HTTP_ENABLED.get(settings)) {Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays,circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, dispatcher);for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet()) {registerHttpTransport(entry.getKey(), entry.getValue());}}// getTransports会调用Netty4Plugin创建Netty4TransportMap<String, Supplier<Transport>> httpTransportFactory = plugin.getTransports(settings, threadPool, bigArrays,circuitBreakerService, namedWriteableRegistry, networkService);for (Map.Entry<String, Supplier<Transport>> entry : httpTransportFactory.entrySet()) {registerTransport(entry.getKey(), entry.getValue());}List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(threadPool.getThreadContext());for (TransportInterceptor interceptor : transportInterceptors) {registerTransportInterceptor(interceptor);}}}
Netty4Plugin创建Netty4Transport
@Override
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,CircuitBreakerService circuitBreakerService,NamedWriteableRegistry namedWriteableRegistry,NetworkService networkService) {return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, threadPool, networkService, bigArrays,namedWriteableRegistry, circuitBreakerService));
}
Netty4Transport实例化的时候会设置workCount,默认是2*availableProcessors
public Netty4Transport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {super("netty", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings));this.workerCount = WORKER_COUNT.get(settings);.........}
启动Netty客户端启动时,会执行createBootstrap,来创建固定数量的线程作为Netty客户端的NIO工作线程进行工作。
private Bootstrap createBootstrap() {final Bootstrap bootstrap = new Bootstrap();// 默认并非阻塞客户端if (TCP_BLOCKING_CLIENT.get(settings)) {bootstrap.group(new OioEventLoopGroup(1, daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX)));bootstrap.channel(OioSocketChannel.class);} else {// 线程标识transport_client_boss,线程数量workCountbootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings, TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX)));bootstrap.channel(NioSocketChannel.class);}.........}
Rest Client主要线程情况
Version 6.7.2
在JVM堆栈中发现大量名称以
I/O dispatcher
开头的线程,示例:"I/O dispatcher 85700" #387097 prio=5 os_prio=0 tid=0x00007f7970080000 nid=0x3bc554 runnable [0x00007f78ca7e6000] java.lang.Thread.State: RUNNABLEat sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)- locked <0x0000000623151970> (a sun.nio.ch.Util$3)- locked <0x0000000623151958> (a java.util.Collections$UnmodifiableSet)- locked <0x0000000613bec330> (a sun.nio.ch.EPollSelectorImpl)at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:255)at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)at java.lang.Thread.run(Thread.java:750)
在类RestClientBuilder
中,实现基于CloseableHttpAsyncClient实例构建RestClient实例对象。
/*** Creates a new {@link RestClient} based on the provided configuration.*/public RestClient build() {if (failureListener == null) {failureListener = new RestClient.FailureListener();}// CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(new PrivilegedAction<CloseableHttpAsyncClient>() {@Overridepublic CloseableHttpAsyncClient run() {return createHttpClient();}});RestClient restClient = new RestClient(httpClient, maxRetryTimeout, defaultHeaders, nodes,pathPrefix, failureListener, nodeSelector, strictDeprecationMode);// httpClient.start();return restClient;}
通过下面的简化类图,说明Rest客户端使用的 IO线程数量在IOReactorConfig
类中设置,其内部类Builder
的Builder()
方法中对赋值this.ioThreadCount = IOReactorConfig.AVAIL_PROCS;
,默认数量也是服务器的CPU内核数。
核心管理使用的对象们在HttpAsyncClientBuilder
类中进行创建。
public CloseableHttpAsyncClient build() {.........// 创建DefaultConnectingIOReactor对象,会去执行AbstractMultiworkerIOReactor的构造方法ConnectingIOReactor ioreactor = IOReactorUtils.create(this.defaultIOReactorConfig != null ? this.defaultIOReactorConfig : IOReactorConfig.DEFAULT, this.threadFactory);// poolingmgr中有ConnectingIOReactor处理IOEventDispatch PoolingNHttpClientConnectionManager poolingmgr = new PoolingNHttpClientConnectionManager(ioreactor, RegistryBuilder.create().register("http", NoopIOSessionStrategy.INSTANCE).register("https", reuseStrategy).build());.........return new InternalHttpAsyncClient((NHttpClientConnectionManager)connManager, (ConnectionReuseStrategy)reuseStrategy, (ConnectionKeepAliveStrategy)keepAliveStrategy, threadFactory, (NHttpClientEventHandler)eventHandler, exec, (Lookup)cookieSpecRegistry, (Lookup)authSchemeRegistry, (CookieStore)defaultCookieStore, (CredentialsProvider)defaultCredentialsProvider, defaultRequestConfig);
}
public AbstractMultiworkerIOReactor(final IOReactorConfig config,final ThreadFactory threadFactory) throws IOReactorException {super();this.config = config != null ? config : IOReactorConfig.DEFAULT;this.params = new BasicHttpParams();try {this.selector = Selector.open();} catch (final IOException ex) {throw new IOReactorException("Failure opening selector", ex);}this.selectTimeout = this.config.getSelectInterval();this.interestOpsQueueing = this.config.isInterestOpQueued();this.statusLock = new Object();if (threadFactory != null) {this.threadFactory = threadFactory;} else {this.threadFactory = new DefaultThreadFactory();}this.auditLog = new ArrayList<ExceptionEvent>();// this.workerCount的值就是this.ioThreadCount = IOReactorConfig.AVAIL_PROCS;this.workerCount = this.config.getIoThreadCount();this.dispatchers = new BaseIOReactor[workerCount];this.workers = new Worker[workerCount];this.threads = new Thread[workerCount];this.status = IOReactorStatus.INACTIVE;}
具体创建线程创建执行在AbstractMultiworkerIOReactor
的execute(final IOEventDispatch eventDispatch)
方法中。
public void execute(final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {Args.notNull(eventDispatch, "Event dispatcher");synchronized (this.statusLock) {if (this.status.compareTo(IOReactorStatus.SHUTDOWN_REQUEST) >= 0) {this.status = IOReactorStatus.SHUT_DOWN;this.statusLock.notifyAll();return;}Asserts.check(this.status.compareTo(IOReactorStatus.INACTIVE) == 0,"Illegal state %s", this.status);this.status = IOReactorStatus.ACTIVE;// Start I/O dispatchersfor (int i = 0; i < this.dispatchers.length; i++) {final BaseIOReactor dispatcher = new BaseIOReactor(this.selectTimeout, this.interestOpsQueueing);dispatcher.setExceptionHandler(exceptionHandler);this.dispatchers[i] = dispatcher;}for (int i = 0; i < this.workerCount; i++) {final BaseIOReactor dispatcher = this.dispatchers[i];this.workers[i] = new Worker(dispatcher, eventDispatch);this.threads[i] = this.threadFactory.newThread(this.workers[i]);}}try {for (int i = 0; i < this.workerCount; i++) {if (this.status != IOReactorStatus.ACTIVE) {return;}this.threads[i].start();}for (;;) {final int readyCount;try {readyCount = this.selector.select(this.selectTimeout);} catch (final InterruptedIOException ex) {throw ex;} catch (final IOException ex) {throw new IOReactorException("Unexpected selector failure", ex);}if (this.status.compareTo(IOReactorStatus.ACTIVE) == 0) {processEvents(readyCount);}// Verify I/O dispatchersfor (int i = 0; i < this.workerCount; i++) {final Worker worker = this.workers[i];final Exception ex = worker.getException();if (ex != null) {throw new IOReactorException("I/O dispatch worker terminated abnormally", ex);}}if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {break;}}} catch (final ClosedSelectorException ex) {addExceptionEvent(ex);} catch (final IOReactorException ex) {if (ex.getCause() != null) {addExceptionEvent(ex.getCause());}throw ex;} finally {doShutdown();synchronized (this.statusLock) {this.status = IOReactorStatus.SHUT_DOWN;this.statusLock.notifyAll();}}}