当前位置: 首页 > news >正文

RabbitMQ-客户端源码之AMQChannel

AMQChannel是一个抽象类,是ChannelN的父类。其中包含唯一的抽象方法:

/*** Protected API - called by nextCommand to check possibly handle an incoming Command before it is returned to the caller of nextCommand. If this method* returns true, the command is considered handled and is not passed back to nextCommand's caller; if it returns false, nextCommand returns the command as* usual. This is used in subclasses to implement handling of Basic.Return and Basic.Deliver messages, as well as Channel.Close and Connection.Close.* @param command the command to handle asynchronously* @return true if we handled the command; otherwise the caller should consider it "unhandled"*/
public abstract boolean processAsync(Command command) throws IOException;

有关processAsync()这个方法的会在介绍ChannelN类的时候详细阐述([[八]RabbitMQ-客户端源码之ChannelN][RabbitMQ-_ChannelN])。


首先来说下AMQChannel的成员变量:

protected final Object _channelMutex = new Object();
/** The connection this channel is associated with. */
private final AMQConnection _connection;
/** This channel's channel number. */
private final int _channelNumber;
/** Command being assembled */
private AMQCommand _command = new AMQCommand();
/** The current outstanding RPC request, if any. (Could become a queue in future.) */
private RpcContinuation _activeRpc = null;
/** Whether transmission of content-bearing methods should be blocked */
public volatile boolean _blockContent = false;
  • _channelMutex这个是内部用来当对象锁的,没有实际的意义,可忽略
  • _connection是指AMQConnection这个对象。
  • _channelNumber是指channel number, 这个应该不用多解释了吧。通道编号为0的代表全局连接中的所有帧,1-65535代表特定通道的帧.
  • _command是内部处理使用的对象,调用AMQCommand的方法来处理一些东西。
  • _activeRpc是指当前未处理完的rpc请求(the current outstanding rpc request)。
  • _blockContent 是在Channel.Flow里用到的,其余情况都是false
    在AMQChannel的构造函数中,只有两个参数:AMQConnection connection以及int channelNumber.

AMQChannel中有个handleFrame方法:

/*** Private API - When the Connection receives a Frame for this* channel, it passes it to this method.* @param frame the incoming frame* @throws IOException if an error is encountered*/
public void handleFrame(Frame frame) throws IOException {AMQCommand command = _command;if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line_command = new AMQCommand(); // prepare for the next onehandleCompleteInboundCommand(command);}
}/*** Private API - handle a command which has been assembled* @throws IOException if there's any problem** @param command the incoming command* @throws IOException*/
public void handleCompleteInboundCommand(AMQCommand command) throws IOException {// First, offer the command to the asynchronous-command// handling mechanism, which gets to act as a filter on the// incoming command stream.  If processAsync() returns true,// the command has been dealt with by the filter and so should// not be processed further.  It will return true for// asynchronous commands (deliveries/returns/other events),// and false for commands that should be passed on to some// waiting RPC continuation.if (!processAsync(command)) {// The filter decided not to handle/consume the command,// so it must be some reply to an earlier RPC.nextOutstandingRpc().handleCommand(command);markRpcFinished();}
}

这个在[[六]RabbitMQ-客户端源码之AMQCommand][RabbitMQ-_AMQCommand]有所介绍,主要是用来处理Frame帧的,当调用AMQCommand的handleFrame处理之后返回为true是,即处理完毕时继续调用handleCompleteInboundCommand方法。这其中也牵涉到AMQConnection的MainLoop内部类,具体可以看看:[[六]RabbitMQ-客户端源码之AMQCommand][RabbitMQ-_AMQCommand]。


AMQChannel中有很多方法带有rpc的字样,这来做一个整理。
首先是:

public void enqueueRpc(RpcContinuation k)
{synchronized (_channelMutex) {boolean waitClearedInterruptStatus = false;while (_activeRpc != null) {try {_channelMutex.wait();} catch (InterruptedException e) {waitClearedInterruptStatus = true;}}if (waitClearedInterruptStatus) {Thread.currentThread().interrupt();}_activeRpc = k;}
}

这个方法在AMQConnection.start()方法中有过使用:_channel0.enqueueRpc(conStartBroker)。这个方法就是将参数付给成员变量_activeRpc,至于这个RpcContinuation到底是个什么gui,我们下面再讲。
继续下一个方法:

public boolean isOutstandingRpc()
{synchronized (_channelMutex) {return (_activeRpc != null);}
}

这个方法是判断一下当前的_activeRpc是否为null,为null则为false,否则为true。看方法的名字应该猜出大半。
下面一个方法:

public RpcContinuation nextOutstandingRpc()
{synchronized (_channelMutex) {RpcContinuation result = _activeRpc;_activeRpc = null;_channelMutex.notifyAll();return result;}
}

方法将当前的_activeRpc返回,并置AQMChannel的_activeRpc为null。

接下来几个方法联系性很强:

/*** Protected API - sends a {@link Method} to the broker and waits for the* next in-bound Command from the broker: only for use from* non-connection-MainLoop threads!*/
public AMQCommand rpc(Method m)throws IOException, ShutdownSignalException
{return privateRpc(m);
}public AMQCommand rpc(Method m, int timeout)throws IOException, ShutdownSignalException, TimeoutException {return privateRpc(m, timeout);
}private AMQCommand privateRpc(Method m)throws IOException, ShutdownSignalException
{SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();rpc(m, k);// At this point, the request method has been sent, and we// should wait for the reply to arrive.//// Calling getReply() on the continuation puts us to sleep// until the connection's reader-thread throws the reply over// the fence.return k.getReply();
}private AMQCommand privateRpc(Method m, int timeout)throws IOException, ShutdownSignalException, TimeoutException {SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();rpc(m, k);return k.getReply(timeout);
}public void rpc(Method m, RpcContinuation k)throws IOException
{synchronized (_channelMutex) {ensureIsOpen();quiescingRpc(m, k);}
}public void quiescingRpc(Method m, RpcContinuation k)throws IOException
{synchronized (_channelMutex) {enqueueRpc(k);quiescingTransmit(m);}
}

主要是看最后一个方法——quiescingRpc.这个方法说白就两行代码:
enqueueRpc(k);是将由privateRpc等方法内部创建的SimpleBlockingRpcContinuation对象附给当前的AQMChannel对象的成员变量_activeRpc
关于quiescingTransmit(m)就要接下去看了:

public void quiescingTransmit(Method m) throws IOException {synchronized (_channelMutex) {quiescingTransmit(new AMQCommand(m));}
}
public void quiescingTransmit(AMQCommand c) throws IOException {synchronized (_channelMutex) {if (c.getMethod().hasContent()) {while (_blockContent) {try {_channelMutex.wait();} catch (InterruptedException e) {}// This is to catch a situation when the thread wakes up during// shutdown. Currently, no command that has content is allowed// to send anything in a closing state.ensureIsOpen();}}c.transmit(this);}
}

上面代码只需要看: c.transmit(this);这一句,其余的都是摆设。看到这里,就调用了AMQCommand的transmit方法,这个transmit方法就是讲AMQChannel中封装的内容发给broker,然后等待broker返回,进而通过之前附值的_activeRpc来处理回传的帧。

虽然之前在AMQConnection([[二]RabbitMQ-客户端源码之AMQConnection][RabbitMQ-_AMQConnection])中详细讲述了start()方法,但是这里还是要来拿这个来举例这个AMQChannel中的rpc怎么使用
在AMQConnection中有这么一段代码:

Method method = (challenge == null)? new AMQP.Connection.StartOk.Builder().clientProperties(_clientProperties).mechanism(sm.getName()).response(response).build(): new AMQP.Connection.SecureOk.Builder().response(response).build();try {Method serverResponse = _channel0.rpc(method, HANDSHAKE_TIMEOUT/2).getMethod();if (serverResponse instanceof AMQP.Connection.Tune) {connTune = (AMQP.Connection.Tune) serverResponse;} else {challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();response = sm.handleChallenge(challenge, this.username, this.password);}

客户端将Method封装成Connection.StartOk帧之后等待broker返回Connection.Tune帧。
此时调用了AMQChannel的rpc(Method m, int timeout)方法,其间接调用了AMQChannel的privateRpc(Method, int timeout)方法。代码详情上面已经罗列出来。

注意privateRpc(Method, int timeout)方法的最有一句返回:return k.getReply(timeout);这句代码的意思是SimpleBlockingRpcContinuation对象在等待broker的返回,确切的来说是MainLoop线程处理之后返回,即AMQChannel类中handleCompleteInboundCommand方法的nextOutstandingRpc().handleCommand(command)这行代码。

http://www.lryc.cn/news/1622.html

相关文章:

  • 注意力机制(SE,ECA,CBAM) Pytorch代码
  • Vue2笔记03 脚手架(项目结构),常用属性配置,ToDoList(本地存储,组件通信)
  • Java程序的执行顺序、简述对线程池的理解
  • 【前言】嵌入式系统简介
  • React设计原理—1框架原理
  • (C00034)基于Springboot+html前后端分离技术的宿舍管理系统-有文档
  • Flink面试题
  • Python学习笔记
  • 最适合入门的100个深度学习实战项目
  • AssertionError: 618 columns passed, passed data had 508 columns【已解决】
  • 166_技巧_Power BI 窗口函数处理连续发生业务问题
  • 电子科技大学人工智能期末复习笔记(五):机器学习
  • 使用DDD指导业务设计的总结思考
  • 面试官问:如何确保缓存和数据库的一致性?
  • 16.数据库Redis
  • 【Redis高级-集群分片】
  • CSDN - CSDN27题解
  • docker拉取mysql
  • 在Linux上安装Python3
  • 23 种设计模式的通俗解释,看完秒懂
  • 如何做好需求管理?经验方法、模型、工具
  • 怎么用期货做风险对冲(如何利用期货对冲风险)
  • C++标准模板库type_traits源码剖析
  • Python获取公众号(pc客户端)数据,使用Fiddler抓包工具
  • Maven进阶
  • AXI实战(一)-为AXI总线搭建简单的仿真测试环境
  • 数据库管理-第五十六期 监控(20230210)
  • 测试开发,测试架构师为什么能拿50 60k呢需要掌握哪些技能呢
  • Miniblink 入门
  • [python入门㊷] - python存储数据