聊聊reactor-logback的AsyncAppender
序
本文主要研究一下reactor-logback的AsyncAppender
AsyncAppender
reactor-logback/src/main/java/reactor/logback/AsyncAppender.java
public class AsyncAppender extends ContextAwareBaseimplements Appender<ILoggingEvent>, AppenderAttachable<ILoggingEvent>,CoreSubscriber<ILoggingEvent> {private final AppenderAttachableImpl<ILoggingEvent> aai =new AppenderAttachableImpl<ILoggingEvent>();private final FilterAttachableImpl<ILoggingEvent> fai =new FilterAttachableImpl<ILoggingEvent>();private final AtomicReference<Appender<ILoggingEvent>> delegate =new AtomicReference<Appender<ILoggingEvent>>();private String name;private WorkQueueProcessor<ILoggingEvent> processor;private int backlog = 1024 * 1024;private boolean includeCallerData = false;private boolean started = false;//......
}
AsyncAppender继承了ContextAwareBase,同时实现了Appender、AppenderAttachable、CoreSubscriber接口
CoreSubscriber
reactor/core/CoreSubscriber.java
public interface CoreSubscriber<T> extends Subscriber<T> {/*** Request a {@link Context} from dependent components which can include downstream* operators during subscribing or a terminal {@link org.reactivestreams.Subscriber}.** @return a resolved context or {@link Context#empty()}*/default Context currentContext(){return Context.empty();}/*** Implementors should initialize any state used by {@link #onNext(Object)} before* calling {@link Subscription#request(long)}. Should further {@code onNext} related* state modification occur, thread-safety will be required.* <p>* Note that an invalid request {@code <= 0} will not produce an onError and* will simply be ignored or reported through a debug-enabled* {@link reactor.util.Logger}.** {@inheritDoc}*/@Overridevoid onSubscribe(Subscription s);
}
CoreSubscriber继承了Subscriber接口,Subscriber接口定义了onSubscribe(Subscription s)、onNext、onError、onComplete方法
onSubscribe
public void onSubscribe(Subscription s) {try {doStart();}catch (Throwable t) {addError(t.getMessage(), t);}finally {started = true;s.request(Long.MAX_VALUE);}}protected void doStart() {}
onSubscribe方法执行doStart,标记started为true,同时触发s.request(Long.MAX_VALUE)
onNext
public void onNext(ILoggingEvent iLoggingEvent) {aai.appendLoopOnAppenders(iLoggingEvent);}
onNext调用AppenderAttachableImpl的appendLoopOnAppenders方法
onError
public void onError(Throwable t) {addError(t.getMessage(), t);}
onError主要是添加错误信息到logback的status
onComplete
public void onComplete() {try {Appender<ILoggingEvent> appender = delegate.getAndSet(null);if (appender != null){doStop();appender.stop();aai.detachAndStopAllAppenders();}}catch (Throwable t) {addError(t.getMessage(), t);}finally {started = false;}}protected void doStop() {}
onComplete则执行doStop、appender.stop()、aai.detachAndStopAllAppenders(),最后标记started为false
Appender.doAppend
public void doAppend(ILoggingEvent evt) throws LogbackException {if (getFilterChainDecision(evt) == FilterReply.DENY) {return;}evt.prepareForDeferredProcessing();if (includeCallerData) {evt.getCallerData();}try {queueLoggingEvent(evt);}catch (Throwable t) {addError(t.getMessage(), t);}}protected void queueLoggingEvent(ILoggingEvent evt) {if (null != delegate.get()) {processor.onNext(evt);}}
doAppend方法先判断是否需要DENY,是则直接返回,之后主要执行queueLoggingEvent,它在delegate不为null时执行processor.onNext(evt)
LifeCycle.start
public void start() {startDelegateAppender();processor = WorkQueueProcessor.<ILoggingEvent>builder().name("logger").bufferSize(backlog).autoCancel(false).build();processor.subscribe(this);}private void startDelegateAppender() {Appender<ILoggingEvent> delegateAppender = delegate.get();if (null != delegateAppender && !delegateAppender.isStarted()) {delegateAppender.start();}}public void addAppender(Appender<ILoggingEvent> newAppender) {if (delegate.compareAndSet(null, newAppender)) {aai.addAppender(newAppender);}else {throw new IllegalArgumentException(delegate.get() + " already attached.");}}
start方法执行startDelegateAppender,然后创建WorkQueueProcessor(
默认bufferSize为1024 * 1024
),并subscribe当前实例;addAppender方法会设置delegate,并往AppenderAttachableImpl添加appender
stop
public void stop() {processor.onComplete();}
stop方法执行processor.onComplete()
小结
reactor-logback基于WorkQueueProcessor提供了另外一种AsyncAppender,它不是基于BlockingQueue而是基于RingBuffer来实现的。其onSubscribe方法执行doStart,标记started为true,同时触发s.request(Long.MAX_VALUE);onNext调用AppenderAttachableImpl的appendLoopOnAppenders方法;onComplete则执行doStop、appender.stop()、aai.detachAndStopAllAppenders(),最后标记started为false;doAppend方法先判断是否需要DENY,是则直接返回,之后主要执行queueLoggingEvent,它在delegate不为null时执行processor.onNext(evt)。