当前位置: 首页 > news >正文

从xxl-job源码中学习Netty的使用

1. 启动与Spring实例化

com.xxl.job.core.executor.impl.XxlJobSpringExecutor.java类 

继承SmartInitializingSingleton 类,在afterSingletonsInstantiated 实例化后方法中

调用initJobHandlerMethodRepository 把所有的xxljob任务管理起来;

  private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {if (applicationContext == null) {return;}// init job handler from method//扫描所有的bean,并获取XxlJob的注解,registJobHandler加入到 一个 map  中// 见jobHandlerRepositoryString[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);for (String beanDefinitionName : beanDefinitionNames) {Object bean = applicationContext.getBean(beanDefinitionName);Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBeantry {annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),new MethodIntrospector.MetadataLookup<XxlJob>() {@Overridepublic XxlJob inspect(Method method) {return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);}});} catch (Throwable ex) {logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);}if (annotatedMethods==null || annotatedMethods.isEmpty()) {continue;}for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {Method executeMethod = methodXxlJobEntry.getKey();XxlJob xxlJob = methodXxlJobEntry.getValue();if (xxlJob == null) {continue;}String name = xxlJob.value();if (name.trim().length() == 0) {throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");}if (loadJobHandler(name) != null) {throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");}// execute method/*if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +"The correct method format like \" public ReturnT<String> execute(String param) \" .");}if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +"The correct method format like \" public ReturnT<String> execute(String param) \" .");}*/executeMethod.setAccessible(true);// init and destoryMethod initMethod = null;Method destroyMethod = null;if (xxlJob.init().trim().length() > 0) {try {initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());initMethod.setAccessible(true);} catch (NoSuchMethodException e) {throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");}}if (xxlJob.destroy().trim().length() > 0) {try {destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());destroyMethod.setAccessible(true);} catch (NoSuchMethodException e) {throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");}}// registry jobhandler//扫描所有的jobhandlerregistJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));}}}

2. 调用父类的start() 方法 启动netty服务端

com.xxl.job.core.executor.XxlJobExecutor#initEmbedServer

核心代码见com.xxl.job.core.server.EmbedServer#start

 // start server 服务端ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel channel) throws Exception {channel.pipeline()//心跳机制.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle// 使用 http 协议.addLast(new HttpServerCodec())// 拆包粘包.addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL// 业务处理EmbedHttpServerHandler.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));}}).childOption(ChannelOption.SO_KEEPALIVE, true);// bindChannelFuture future = bootstrap.bind(port).sync();logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);// start registrystartRegistry(appname, address);// wait util stopfuture.channel().closeFuture().sync();

3. 定时任务发起一个任务执行 调用netty server段的接口

此处以手动调用为一个例子; 定时任务&时间轮见下回分析

com.xxl.job.admin.core.thread.JobTriggerPoolHelper#trigger

@startuml tomcat
JobInfoController -> JobTriggerPoolHelper: trigger()
JobTriggerPoolHelper -> XxlJobTrigger:trigger()
XxlJobTrigger ->  XxlJobTrigger: processTrigger()
XxlJobTrigger -> XxlJobTrigger : runExecutor()
XxlJobTrigger -> ExecutorBizImpl :run()
@enduml

最终调用

 @Overridepublic ReturnT<String> run(TriggerParam triggerParam) {return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);}

4. 其他netty 详细内容见下回分析;

http://www.lryc.cn/news/377349.html

相关文章:

  • 人工智能发展历程了解和Tensorflow基础开发环境构建
  • makefile追加warning日志
  • 不要直接使用unidefined 而使用void 0
  • 注解详解系列 - @Scope:Bean作用域管理
  • 数学建模基础:数学建模概述
  • 人工智能大模型之开源大语言模型汇总(国内外开源项目模型汇总)
  • 数据结构之B树
  • 双色球预测算法(Java),——森林机器学习、时间序列
  • 【计算机网络篇】数据链路层(11)在数据链路层扩展以太网
  • Ubuntu20.04 使用scrapy-splash爬取动态网页
  • Function:控制继电器上下电,上电后adb登录,copy配置文件
  • 香港电讯高可用网络助力企业变革金融计算
  • LDR6020一拖二快充线:多设备充电新选择
  • 电脑ffmpeg.dll丢失原因解析,找不到ffmpeg.dll的5种解决方法
  • 手机网站制作软件是哪些
  • 【Kubernetes项目部署】k8s集群+高可用、负载均衡+防火墙
  • IPC工业电脑的现状、发展未来与破局策略
  • 深入了解Redis的TYPE命令
  • iptables(3)规则管理
  • 关于addEventListener的使用和注意项
  • 分享一下,如何搭建个人网站的步骤
  • (7)摄像机和云台
  • MicroBlaze IP核中的外设接口和缓冲器接口介绍
  • Java数据结构与算法(完全背包)
  • git merge(3个模式) 与 git rebase 图文详解区别
  • Eclipse 工作空间:深入解析与高效使用
  • Aspose将doc,ppt转成pdf
  • Flutter第十四弹 抽屉菜单效果
  • Docker Nginx
  • OpenVINO™ 2024.2 发布--推出LLM专属API !服务持续增强,提升AI生成新境界