当前位置: 首页 > news >正文

聊聊reactor-logback的AsyncAppender

本文主要研究一下reactor-logback的AsyncAppender

AsyncAppender

reactor-logback/src/main/java/reactor/logback/AsyncAppender.java

public class AsyncAppender extends ContextAwareBaseimplements Appender<ILoggingEvent>, AppenderAttachable<ILoggingEvent>,CoreSubscriber<ILoggingEvent> {private final AppenderAttachableImpl<ILoggingEvent>    aai      =new AppenderAttachableImpl<ILoggingEvent>();private final FilterAttachableImpl<ILoggingEvent>      fai      =new FilterAttachableImpl<ILoggingEvent>();private final AtomicReference<Appender<ILoggingEvent>> delegate =new AtomicReference<Appender<ILoggingEvent>>();private String                            name;private WorkQueueProcessor<ILoggingEvent> processor;private int     backlog           = 1024 * 1024;private boolean includeCallerData = false;private boolean started           = false;//......
}	

AsyncAppender继承了ContextAwareBase,同时实现了Appender、AppenderAttachable、CoreSubscriber接口

CoreSubscriber

reactor/core/CoreSubscriber.java

public interface CoreSubscriber<T> extends Subscriber<T> {/*** Request a {@link Context} from dependent components which can include downstream* operators during subscribing or a terminal {@link org.reactivestreams.Subscriber}.** @return a resolved context or {@link Context#empty()}*/default Context currentContext(){return Context.empty();}/*** Implementors should initialize any state used by {@link #onNext(Object)} before* calling {@link Subscription#request(long)}. Should further {@code onNext} related* state modification occur, thread-safety will be required.* <p>*    Note that an invalid request {@code <= 0} will not produce an onError and*    will simply be ignored or reported through a debug-enabled*    {@link reactor.util.Logger}.** {@inheritDoc}*/@Overridevoid onSubscribe(Subscription s);
}

CoreSubscriber继承了Subscriber接口,Subscriber接口定义了onSubscribe(Subscription s)、onNext、onError、onComplete方法

onSubscribe

	public void onSubscribe(Subscription s) {try {doStart();}catch (Throwable t) {addError(t.getMessage(), t);}finally {started = true;s.request(Long.MAX_VALUE);}}protected void doStart() {}	

onSubscribe方法执行doStart,标记started为true,同时触发s.request(Long.MAX_VALUE)

onNext

	public void onNext(ILoggingEvent iLoggingEvent) {aai.appendLoopOnAppenders(iLoggingEvent);}

onNext调用AppenderAttachableImpl的appendLoopOnAppenders方法

onError

	public void onError(Throwable t) {addError(t.getMessage(), t);}

onError主要是添加错误信息到logback的status

onComplete

	public void onComplete() {try {Appender<ILoggingEvent> appender = delegate.getAndSet(null);if (appender != null){doStop();appender.stop();aai.detachAndStopAllAppenders();}}catch (Throwable t) {addError(t.getMessage(), t);}finally {started = false;}}protected void doStop() {}	

onComplete则执行doStop、appender.stop()、aai.detachAndStopAllAppenders(),最后标记started为false

Appender.doAppend

	public void doAppend(ILoggingEvent evt) throws LogbackException {if (getFilterChainDecision(evt) == FilterReply.DENY) {return;}evt.prepareForDeferredProcessing();if (includeCallerData) {evt.getCallerData();}try {queueLoggingEvent(evt);}catch (Throwable t) {addError(t.getMessage(), t);}}protected void queueLoggingEvent(ILoggingEvent evt) {if (null != delegate.get()) {processor.onNext(evt);}}	

doAppend方法先判断是否需要DENY,是则直接返回,之后主要执行queueLoggingEvent,它在delegate不为null时执行processor.onNext(evt)

LifeCycle.start

	public void start() {startDelegateAppender();processor = WorkQueueProcessor.<ILoggingEvent>builder().name("logger").bufferSize(backlog).autoCancel(false).build();processor.subscribe(this);}private void startDelegateAppender() {Appender<ILoggingEvent> delegateAppender = delegate.get();if (null != delegateAppender && !delegateAppender.isStarted()) {delegateAppender.start();}}public void addAppender(Appender<ILoggingEvent> newAppender) {if (delegate.compareAndSet(null, newAppender)) {aai.addAppender(newAppender);}else {throw new IllegalArgumentException(delegate.get() + " already attached.");}}		

start方法执行startDelegateAppender,然后创建WorkQueueProcessor(默认bufferSize为1024 * 1024),并subscribe当前实例;addAppender方法会设置delegate,并往AppenderAttachableImpl添加appender

stop

	public void stop() {processor.onComplete();}

stop方法执行processor.onComplete()

小结

reactor-logback基于WorkQueueProcessor提供了另外一种AsyncAppender,它不是基于BlockingQueue而是基于RingBuffer来实现的。其onSubscribe方法执行doStart,标记started为true,同时触发s.request(Long.MAX_VALUE);onNext调用AppenderAttachableImpl的appendLoopOnAppenders方法;onComplete则执行doStop、appender.stop()、aai.detachAndStopAllAppenders(),最后标记started为false;doAppend方法先判断是否需要DENY,是则直接返回,之后主要执行queueLoggingEvent,它在delegate不为null时执行processor.onNext(evt)。

http://www.lryc.cn/news/262895.html

相关文章:

  • Apache SeaTunne简介
  • 【开题报告】基于uniapp的IT资讯阅读小程序的设计与实现
  • Java小案例-SpringBoot火车票订票购票票务系统
  • 关于获取高级电工职业技能等级证书一些避坑经历
  • springboot(ssm在线课程管理系统 网课管理系统Java系统
  • 4.1 媒资管理模块 - Nacos与Gateway搭建
  • 1641:【例 1】矩阵 A×B
  • iOS问题记录 - iOS 17通过NSUserDefaults设置UserAgent无效
  • linux的一些典型面试题解读
  • tortoisesvn各版本下载链接
  • [自动化运维工具]ansible简单介绍和常用模块
  • 记一次渗透测试信息收集-越权
  • Flink系列之:Table API Connectors之JSON Format
  • 2018年第七届数学建模国际赛小美赛B题世界杯足球赛的赛制安排解题全过程文档及程序
  • 【为数据之道学习笔记】5-7五类数据主题联接的应用场景
  • 得帆信息创始人-张桐,受邀出席 BV百度风投AIGC主题论坛
  • 云原生之深入解析减少Docker镜像大小的优化技巧
  • 记一次java for循环改造多线程的操作
  • Java面试整理-Java复制
  • wsl kafka的简单应用
  • 2023年国赛高教杯数学建模D题圈养湖羊的空间利用率解题全过程文档及程序
  • Flink系列之:Table API Connectors之Raw Format
  • 社交网络分析3:社交网络隐私攻击、保护的基本概念和方法 + 去匿名化技术 + 推理攻击技术 + k-匿名 + 基于聚类的隐私保护算法
  • 2023大湾区汽车创新大会在深圳坪山开幕
  • Graylog 中日志级别及其对应的数字
  • 智能手表上的音频(五):录音
  • 2023.12.17 关于 Redis 的特性和应用场景
  • 智能优化算法应用:基于社会群体算法3D无线传感器网络(WSN)覆盖优化 - 附代码
  • Kotlin 笔记 -- Kotlin 语言特性的理解(二)
  • 数据结构【1】:数组专题