1. RYU结构,源码
1.1 RYU文件目录
下面介绍ryu/ryu目录下的主要目录内容。
base
base中有一个非常重要的文件:app_manager.py,其作用是RYU应用的管理中心。用于加载RYU应用程序,接受从APP发送过来的信息,同时也完成消息的路由。
其主要的函数有app注册、注销、查找、并定义了RYUAPP基类,定义了RYUAPP的基本属性。包含name, threads, events, event_handlers和observers等成员,以及对应的许多基本函数。如:start(), stop()等。
这个文件中还定义了AppManager基类,用于管理APP。定义了加载APP等函数。不过如果仅仅是开发APP的话,这个类可以不必关心。
controller——实现controller和交换机之间的互联和事件处理
controller文件夹中许多非常重要的文件,如events.py, ofp_handler.py, controller.py等。其中controller.py中定义了OpenFlowController基类。用于定义OpenFlow的控制器,用于处理交换机和控制器的连接等事件,同时还可以产生事件和路由事件。其事件系统的定义,可以查看events.py和ofp_events.py。
在ofp_handler.py中定义了基本的handler(应该怎么称呼呢?句柄?处理函数?),完成了基本的如:握手,错误信息处理和keep alive 等功能。更多的如packet_in_handler应该在app中定义。
在dpset.py文件中,定义了交换机端的一些消息,如端口状态信息等,用于描述和操作交换机。如添加端口,删除端口等操作。
lib——网络基本协议的实现和使用
lib中定义了我们需要使用到的基本的数据结构,如dpid, mac和ip等数据结构。在lib/packet目录下,还定义了许多网络协议,如ICMP, DHCP, MPLS和IGMP等协议内容。而每一个数据包的类中都有parser和serialize两个函数。用于解析和序列化数据包。
lib目录下,还有ovs, netconf目录,对应的目录下有一些定义好的数据类型,不再赘述。
ofproto
在这个目录下,基本分为两类文件,一类是协议的数据结构定义,另一类是协议解析,也即数据包处理函数文件。如ofproto_v1_0.py是1.0版本的OpenFlow协议数据结构的定义,而ofproto_v1_0_parser.py则定义了1.0版本的协议编码和解码。
topology——交换机和链路的查询模块
包含了switches.py等文件,基本定义了一套交换机的数据结构。event.py定义了交换上的事件。dumper.py定义了获取网络拓扑的内容。最后api.py向上提供了一套调用topology目录中定义函数的接口。
contrib——第三方库
这个文件夹主要存放的是开源社区贡献者的代码。
cmd——入口函数
定义了RYU的命令系统,为controller的执行创建环境,接收和处理相关命令
services
完成了BGP和vrrp的实现。
tests
tests目录下存放了单元测试以及整合测试的代码。
1.2 RYU 架构
RYU SDN 架构:
组件功能:
1.3 应用程序编程模型
Ryu 事件处理、进程与线程:
1) Applications:该类继承自ryu.base.app_manager.RyuApp,用户逻辑被描述为一个APP。
2) Event : 继承自ryu.controller.event.EventBase , 应用程序之间的通信由transmitting and receiving events 完成。
3) Event Queue:每一个application 都有一个队列用于接收事件。
4) Threads:Ryu 使用第三方库eventlets 运行多线程。因为线程是非抢占式的,因此,当执行耗时的处理程序时要非常小心。
5) Event loops: 创建一个application 时,会自动生成一个线程,该线程运行一个事件循环。当队列事件不为空时,这个事件循环会加载该事件并且调用相应的事件处理函数(注册之后)。
6) Additional threads:可以使用hub.spawn()添加其它线程,用来处理特殊的应用
7) Eventlets:这是一个第三方库,里面的库函数被封装到hub 模块中被开发人员加载使用。【提供线程和事件队列的实现】
8) Event handlers:使用ryu.controller.handler.set_ev_cls 修饰一个事件处理函数。当该类型的事件触发后,事件处理函数就会被应用程序的事件循环调用。
1.4 OpenFlow的解析和封装
Ofp_handler
负责底层数据通信的模块是ofp\_handler模块。ofp\_handler启动之后,start函数实例化了一个controller.OpenFlowController实例。OpenFlowController实例化之后,立即调用\__call\__()函数,call函数启动了server\_loop去创建server socket,其handler为domain\_connection\_factory函数。每当收到一个switch连接,domain\_connection\_factory就会实例化一个datapath对象。这个对象用于描述交换机的所有行为。其中定义了接收循环和发送循环。
Datapath
datapath.serve函数是socket通信收发逻辑的入口。该函数启动了一个绿色线程去处理发送循环,然后本线程负责接收循环的处理。self.\_send\_loop是发送主循环。其主要逻辑为:不断获取发送队列是否有数据,若有,则发送;底层调用的是socket.send\_all()函数。
def serve(self):send_thr = hub.spawn(self._send_loop)# send hello message immediatelyhello = self.ofproto_parser.OFPHello(self)self.send_msg(hello)try:self._recv_loop()finally:hub.kill(send_thr)hub.joinall([send_thr])
接收函数\_reck\_loop中实现了数据的接收和解析。
@_deactivatedef _recv_loop(self):buf = bytearray() #初始化一个字节数组required_len = ofproto_common.OFP_HEADER_SIZE # ofproto_common模块定义了OpenFlow常用的公共属性 # 如报头长度=8count = 0while self.is_active:ret = self.socket.recv(required_len)if len(ret) == 0:self.is_active = Falsebreakbuf += retwhile len(buf) >= required_len:# ofproto_parser是在Datapath实例的父类ProtocolDesc的属性。# 用于寻找对应协议版本的解析文件,如ofproto_v1_0_parser.py# header函数是解析报头的函数。定义在ofproto_parser.py。(version, msg_type, msg_len, xid) = ofproto_parser.header(buf)required_len = msg_lenif len(buf) < required_len:break# ofproto_parser.msg的定义并没有在对应的ofproto_parser中# msg函数的位置和header函数位置一样,都在ofproto_parser.py中。# msg返回的是解析完成的消息。# msg函数返回了msg_parser函数的返回值# ofproto_parser.py中的_MSG_PARSERS记录了不同版本对应的msg_parser。其注册手法是通过@ofproto_parser.register_msg_parser(ofproto.OFP_VERSION)装饰器。# 在对应版本的ofproto_parser,如ofproto_v1_0_parser.py中,都有定义一个同名的_MSG_PARSERS字典,这个字典用于记录报文类型和解析函数的关系。此处命名不恰当,引入混淆。# parser函数通过@register_parser来将函数注册/记录到_MSG_PARSERS字典中。msg = ofproto_parser.msg(self,version, msg_type, msg_len, xid, buf)# LOG.debug('queue msg %s cls %s', msg, msg.__class__)if msg:# Ryu定义的Event system很简单,在报文名前加上前缀“Event”,即是事件的名称。# 同时此类系带msg信息。# 使用send_event_to_obserevrs()函数将事件分发给监听事件的handler,完成事件的分发。ev = ofp_event.ofp_msg_to_ev(msg)self.ofp_brick.send_event_to_observers(ev, self.state)dispatchers = lambda x: x.callers[ev.__class__].dispatchers# handler的注册是通过使用controller.handler.py文件下定义的set_ev_handler作为装饰器去注册。 # self.ofp_brick在初始化时,由注册在服务列表中查找名为"ofp_event"的模块赋值。# ofp_handler模块的名字为"ofp_event",所以对应的模块是ofp_handlerhandlers = [handler for handler inself.ofp_brick.get_handlers(ev) ifself.state in dispatchers(handler)]for handler in handlers:handler(ev)buf = buf[required_len:]required_len = ofproto_common.OFP_HEADER_SIZE# We need to schedule other greenlets. Otherwise, ryu# can't accept new switches or handle the existing# switches. The limit is arbitrary. We need the better# approach in the future.count += 1if count > 2048:count = 0hub.sleep(0)
OpenFlow协议实现
OpenFlow协议解析部分代码大部分在ofproto目录下,少部分在controller目录下。首先介绍ofproto目录下的源码内容,再介绍controller目录下的ofp_event文件。
__init__
首先,__init__.py并不为空。该文件定义了两个功能类似的函数get_ofp_module()和get_ofp_modules(),前者用于取得协议版本对应的协议定义文件和协议解析模块,后者则取出整个字典。对应的字典在ofproto_protocol模块中定义。
ofproto\_protocol
在ofproto\_protocol定义了\_versions字典,具体如下:
_versions = {ofproto_v1_0.OFP_VERSION: (ofproto_v1_0, ofproto_v1_0_parser),ofproto_v1_2.OFP_VERSION: (ofproto_v1_2, ofproto_v1_2_parser),ofproto_v1_3.OFP_VERSION: (ofproto_v1_3, ofproto_v1_3_parser),ofproto_v1_4.OFP_VERSION: (ofproto_v1_4, ofproto_v1_4_parser),}
除此之外,该文件还定义了Datapath的父类ProtocolDesc,此类基本上只完成了与协议版本相关的内容。该类最重要的两个成员是self.ofproto和self.ofproto\_parser,其值指明所本次通信所使用的OpenFlow协议的版本以及对应的解析模块。
ofproto\_common
ofproto\_common文件比较简单,主要定义了OpenFlow需要使用的公共属性,如监听端口,报头长度,报头封装格式等内容。
ofproto\_parser
ofproto\_parser文件定义了所有版本都需要的解析相关的公共属性。如定义了最重要的基类MsgBase(StringifyMixin)。
StringifyMixin类的定义在lib.stringify文件,有兴趣的读者可自行查看。MsgBase基类定义了最基础的属性信息,具体如下所示:
@create_list_of_base_attributesdef __init__(self, datapath):super(MsgBase, self).__init__()self.datapath = datapathself.version = Noneself.msg_type = Noneself.msg_len = Noneself.xid = Noneself.buf = None
此外,该类还定义了基础的parser函数和serialize函数。基础的parser函数基本什么都没有做,仅返回一个赋值后的消息体。
@classmethoddef parser(cls, datapath, version, msg_type, msg_len, xid, buf):msg_ = cls(datapath)msg_.set_headers(version, msg_type, msg_len, xid)msg_.set_buf(buf)return msg_
serialize函数分为3部分,self.\_serialize\_pre(), self.\_serialize\_body()和self.\_serialize\_header()。本质上完成了header的序列化。关于body的序列化,将在对应的派生类中得到重写。
ofproto_v1_0
以1.0版本为例介绍ofproto\_v1\_x.py文件的作用。由于Ryu支持多版本的OpenFlow,所以在ofproto目录下,定义了从1.0到1.5版本的所有代码实现。所以其文件命名为ofproto\_v1_x.py,x从[1,2,3,4,5]中获得,分别对应相应的协议版本。
此类文件最重要的一个目的是定义了所有需要的静态内容,包括某字段的所有选项以及消息封装的格式以及长度。与OpenFlow消息内容相关的有协议的类型,动作的类型,port的类型等。此外对应每一个报文,都需要定义其封装的格式,以及封装的长度。Ryu采用了Python的Struct库去完成数据的解封装工作,关于Struct的介绍将在后续内容介绍。具体定义内容举例如下:
# enum ofp_portOFPP_MAX = 0xff00OFPP_IN_PORT = 0xfff8 # Send the packet out the input port. This# virtual port must be explicitly used# in order to send back out of the input# port.OFPP_TABLE = 0xfff9 # Perform actions in flow table.# NB: This can only be the destination# port for packet-out messages.OFPP_NORMAL = 0xfffa # Process with normal L2/L3 switching.OFPP_FLOOD = 0xfffb # All physical ports except input port and# those disabled by STP.OFPP_ALL = 0xfffc # All physical ports except input port.OFPP_CONTROLLER = 0xfffd # Send to controller.OFPP_LOCAL = 0xfffe # Local openflow "port".OFPP_NONE = 0xffff # Not associated with a physical port.# enum ofp_typeOFPT_HELLO = 0 # Symmetric messageOFPT_ERROR = 1 # Symmetric messageOFPT_ECHO_REQUEST = 2 # Symmetric messageOFPT_ECHO_REPLY = 3 # Symmetric messageOFPT_VENDOR = 4 # Symmetric messageOFPT_FEATURES_REQUEST = 5 # Controller/switch messageOFPT_FEATURES_REPLY = 6 # Controller/switch messageOFPT_GET_CONFIG_REQUEST = 7 # Controller/switch messageOFPT_GET_CONFIG_REPLY = 8 # Controller/switch messageOFPT_SET_CONFIG = 9 # Controller/switch messageOFPT_PACKET_IN = 10 # Async messageOFPT_FLOW_REMOVED = 11 # Async messageOFPT_PORT_STATUS = 12 # Async messageOFPT_PACKET_OUT = 13 # Controller/switch messageOFPT_FLOW_MOD = 14 # Controller/switch messageOFPT_PORT_MOD = 15 # Controller/switch messageOFPT_STATS_REQUEST = 16 # Controller/switch messageOFPT_STATS_REPLY = 17 # Controller/switch messageOFPT_BARRIER_REQUEST = 18 # Controller/switch messageOFPT_BARRIER_REPLY = 19 # Controller/switch messageOFPT_QUEUE_GET_CONFIG_REQUEST = 20 # Controller/switch messageOFPT_QUEUE_GET_CONFIG_REPLY = 21 # Controller/switch messageOFP_HEADER_PACK_STR = '!BBHI'OFP_HEADER_SIZE = 8OFP_MSG_SIZE_MAX = 65535assert calcsize(OFP_HEADER_PACK_STR) == OFP_HEADER_SIZE
OFP\_HEADER\_PACK\_STR = '!BBHI'的意思是将header按照8|8|16|32的长度封装成对应的数值。在Python中分别对应的是1个字节的integer|一个字节的integer|2个字节的integer|4个字节的integer。
calcsize函数用于计算对应的format的长度。
ofproto_v1_0_parser
本模块用于定义报文的解析等动态内容。模块中定义了与OpenFlow协议对应的Common\_struct及message type对应的类。每一个message对应的类都是有MsgBase派生的,其继承了父类的parser函数和serialize函数。若报文无消息体,如Hello报文,则无需重写parser和serialize函数。
本模块定义了几个重要的全局函数:\_set\_msg\_type,\_register\_parser,msg\_parser和\_set\_msg\_reply。其作用介绍如下:
- _set_msg_type: 完成类与ofproto模块中定义的报文名字的映射,原因在于ofproto模块定义的名字并不是类名,而解析时需要使用ofproto中的名字。
- _register_parser:完成对应的类与类中的parser函数的映射,当解析函数从ofproto模块的名字映射到类之后,若需要解析,则需从类对应到对应的解析函数。parser函数是一个类函数,所以在使用时必须传入对应的类的对象作为参数。
- msg_parser:从_MSG_PARSERS中获取对msg_type的parser,并返回解析之后的内容。
- _set_msg_reply:完成该类与对应的回应报文的映射。
def _set_msg_type(msg_type):'''Annotate corresponding OFP message type'''def _set_cls_msg_type(cls):cls.cls_msg_type = msg_typereturn clsreturn _set_cls_msg_typedef _register_parser(cls): '''class decorator to register msg parser''' assert cls.cls_msg_type is not None assert cls.cls_msg_type not in _MSG_PARSERS _MSG_PARSERS[cls.cls_msg_type] = cls.parser return cls @ofproto_parser.register_msg_parser(ofproto.OFP_VERSION) def msg_parser(datapath, version, msg_type, msg_len, xid, buf): parser = _MSG_PARSERS.get(msg_type) return parser(datapath, version, msg_type, msg_len, xid, buf) def _set_msg_reply(msg_reply): '''Annotate OFP reply message class''' def _set_cls_msg_reply(cls): cls.cls_msg_reply = msg_reply return cls return _set_cls_msg_reply
报文如果有消息体,则需要重写parser函数或者serialize函数,具体根据报文内容而不一样。此处,分别以Packet\_in和Flow\_mod作为parser的案例和serialize的案例,示例如下:
@_register_parser@_set_msg_type(ofproto.OFPT_PACKET_IN)class OFPPacketIn(MsgBase):def __init__(self, datapath, buffer_id=None, total_len=None, in_port=None,reason=None, data=None):super(OFPPacketIn, self).__init__(datapath)self.buffer_id = buffer_id self.total_len = total_len self.in_port = in_port self.reason = reason self.data = data @classmethod def parser(cls, datapath, version, msg_type, msg_len, xid, buf): # 解析头部,获取msg msg = super(OFPPacketIn, cls).parser(datapath, version, msg_type, msg_len, xid, buf) # 解析body,获取packet_in相关字段。 (msg.buffer_id, msg.total_len, msg.in_port, msg.reason) = struct.unpack_from( ofproto.OFP_PACKET_IN_PACK_STR, msg.buf, ofproto.OFP_HEADER_SIZE) # 将ofproto.OFP_PACKET_IN_SIZE长度之外的buf内容,赋值给data msg.data = msg.buf[ofproto.OFP_PACKET_IN_SIZE:] if msg.total_len < len(msg.data): # discard padding for 8-byte alignment of OFP packet msg.data = msg.data[:msg.total_len] return msg @_set_msg_type(ofproto.OFPT_FLOW_MOD) class OFPFlowMod(MsgBase): def __init__(self, datapath, match, cookie, command, idle_timeout=0, hard_timeout=0, priority=ofproto.OFP_DEFAULT_PRIORITY, buffer_id=0xffffffff, out_port=ofproto.OFPP_NONE, flags=0, actions=None): if actions is None: actions = [] super(OFPFlowMod, self).__init__(datapath) self.match = match self.cookie = cookie self.command = command self.idle_timeout = idle_timeout self.hard_timeout = hard_timeout self.priority = priority self.buffer_id = buffer_id self.out_port = out_port self.flags = flags self.actions = actions def _serialize_body(self): offset = ofproto.OFP_HEADER_SIZE self.match.serialize(self.buf, offset) # 封装的起点是offset offset += ofproto.OFP_MATCH_SIZE # 按照ofproto.OFP_FLOW_MOD_PACK_STR0的格式,将对应的字段封装到self.buf中 msg_pack_into(ofproto.OFP_FLOW_MOD_PACK_STR0, self.buf, offset, self.cookie, self.command, self.idle_timeout, self.hard_timeout, self.priority, self.buffer_id, self.out_port, self.flags) offset = ofproto.OFP_FLOW_MOD_SIZE if self.actions is not None: for a in self.actions: # 序列化action a.serialize(self.buf, offset) offset += a.len
此模块代码量大,包括OpenFlow协议对应版本内容的完全描述。分类上可分为解析和序列化封装两个重点内容。读者在阅读源码时可根据需求阅读片段即可。
Inet & ether
这两个模块非常简单,ether定义了常用的以太网的协议类型及其对应的代码;inet定义了IP协议族中不同协议的端口号,如TCP=6。
oxm_field
在1.3等高版本OpenFlow中,使用到了oxm\_field的概念。oxm全称为OpenFlow Extensible Match。当OpenFlow逐渐发展成熟,flow的match域越来越多。然而很多通信场景下使用到的匹配字段很少,甚至只有一个。OXM是一种TLV格式,使用OXM可以在下发流表时仅携带使用到的match域内容,而放弃剩余的大量的match域。当使用的match域较少时,统计概率上会减少报文传输的字节数。
nx_match
该文件定义了nicira extensible match的相关内容。
ofp_event
这个模块的位置并不再ofproto,而位于controller目录下。controller模块下的event定义了基础的事件基类。ofp\_event模块完成了OpenFlow报文到event的生成过程。模块中定义了EventOFPMsgBase(event.EventBase)类和\_ofp\_msg\_name\_to\_ev\_name(msg\_name)等函数的定义。相关函数都非常的简单,可从函数名了解到其功能。示例代码如下:
def _ofp_msg_name_to_ev_name(msg_name):return 'Event' + msg_name
Struct lib
Python的struct库是一个简单的,高效的数据封装\解封装的库。该库主要包含5个函数,介绍如下:
- struct.pack(fmt, v1, v2, ...): 将V1,V2等值按照对应的fmt(format)进行封装。
- struct.pack_into(fmt, buffer, offset, v1, v2, ...):将V1,V2等值按照对应的fmt(format)封装到buffer中,从初始位置offset开始。
- struct.unpack(fmt, string): 将string按照fmt的格式解封
- struct.unpack_from(fmt, buffer[offset=0,]): 按照fmt的格式,从offset开始将buffer解封。
- struct.calcsize(fmt): 计算对应的fmt的长度。
更详细的封装语法,请查看struct对应的链接。此处仅对常用语法进行介绍:
- !:大端存储
- c: char
- B: 一个字节长度,unsigned char.
- H:两个字节,16位
- I: 4个字节,int型
- Q: 64bits
- x: padding
- 3x:3个字节的padding
- 5s: 5字节的字符串
1.5 Ryu的处理流程
-
入口函数执行流程
-
事件处理流程
-
补充说明
1.6 ryu运行
从main函数入手,讲述RYU的ryuapp基类细节、app_manager类如何load apps,注册并运行application,Event的产生以及分发,还有最重要的应用ofp_handler。
main()
RYU的main函数在ryu/cmd/manager.py文件中,部分内容如下:
def main(args=None, prog=None):
try:CONF(args=args, prog=prog,project='ryu', version='ryu-manager %s' % version,default_config_files=['/usr/local/etc/ryu/ryu.conf'])except cfg.ConfigFilesNotFoundError:CONF(args=args, prog=prog,project='ryu', version='ryu-manager %s' % version)log.init_log() #初始化打印logif CONF.pid_file:import oswith open(CONF.pid_file, 'w') as pid_file:pid_file.write(str(os.getpid()))app_lists = CONF.app_lists + CONF.app# keep old behaivor, run ofp if no application is specified.if not app_lists:app_lists = ['ryu.controller.ofp_handler']app_mgr = AppManager.get_instance() #在AppManager类中获取实例app_mgr.load_apps(app_lists) #加载Appcontexts = app_mgr.create_contexts() #创建运行环境,"dpset"/"wsgi"services = []services.extend(app_mgr.instantiate_apps(**contexts))
#启动App线程,App实例化
#ryu.controller.dpset.DPSet / rest_firewall.RestFirewallAPI / ryu.controller.ofp_handler.OFPHandlerwebapp = wsgi.start_service(app_mgr) #webapp启动if webapp:thr = hub.spawn(webapp)services.append(thr)try:hub.joinall(services) #调用t.wait(),执行等待,wait()方法使当前线程暂停执行并释放对象锁标志
#循环join,直到有异常或者外部中断推迟
finally:app_mgr.close()
首先从CONF文件中读取出app list。如果ryu-manager 命令任何参数,则默认应用为ofp_handler应用。紧接着实例化一个AppManager对象,调用load_apps函数将应用加载。调用create_contexts函数创建对应的contexts, 然后调用instantiate_apps函数将app_list和context中的app均实例化。启动wsgi架构,提供web应用。最后将所有的应用作为任务,作为coroutine的task去执行,joinall使得程序必须等待所有的task都执行完成才可以退出程序。最后调用close函数,关闭程序,释放资源。以下的部分将以主函数中出现的调用顺序为依据,展开讲解。
OFPHandler
上文说到,如果没有捕获Application输入,那么默认启动的应用是OFPHandler应用。该应用主要用于处理OpenFlow消息。在start函数初始化运行了一个OpenFlowController实例。OpenFlowController类将在后续介绍。
def start(self):super(OFPHandler, self).start()return hub.spawn(OpenFlowController())
OFPHandler应用完成了基本的消息处理,如hello_handler:用于处理hello报文,协议版本的协商。其处理并不复杂,但是值得注意的一点是装饰器:Decorator的使用。
Decorator
Python修饰器的函数式编程 Python Decorator可以看作是一种声明,一种修饰。以下举例参考自Coolshell。举例如下:
@decorator
def foo():pass
实际上等同于foo = decorator(foo), 而且它还被执行了。举个例子:
def keyword(fn): print "you %s me!" % fn.__name__.upper()@keyword
def evol():pass
运行之后,就会输出 you EVOL me
多个decorator:
@decorator_a
@decorator_b
def foo():pass
这相当于:
foo = decorator_a(decorator_b(foo))
而带参数的decorator:
@decorator(arg1, arg2)
def foo():pass
相当于
foo = decorator(arg1,arg2)(foo)
decorator(arg1,arg2)将生成一个decorator。
class式的 Decorator
class myDecorator(object):def __init__(self, fn):print "inside myDecorator.__init__()"self.fn = fndef __call__(self):self.fn()print "inside myDecorator.__call__()"@myDecorator def aFunction():print "inside aFunction()"print "Finished decorating aFunction()"aFunction()
#结果:
>>>
inside myDecorator.__init__()
Finished decorating aFunction()
inside aFunction()
inside myDecorator.__call__()
>>>
@decorator使用时,__init__被调用,当function被调用是,执行__call__函数,而不执行function,所以在__call__函数中需要写出self.fn = fn,更多内容可以直接访问Python Decorator Library。
OpenFlowController
前一部分提到OFPHandle的start函数会将OpenFlowController启动。本小节介绍OpenFlowController类。该类的定义在ryu/cmd/controller.py文件中。OpenFlowController.__call__()函数启动了server_loop()函数,该函数实例化了hub.py中的StreamServer类,并将handler函数初始化为datapath_connection_factory函数,并调用serve_forever(),不断进行socket的监听。StreamServer定义如下:
class StreamServer(object):def __init__(self, listen_info, handle=None, backlog=None,spawn='default', **ssl_args):assert backlog is Noneassert spawn == 'default'if ':' in listen_info[0]:self.server = eventlet.listen(listen_info,family=socket.AF_INET6)else:self.server = eventlet.listen(listen_info)if ssl_args:def wrap_and_handle(sock, addr):ssl_args.setdefault('server_side', True)handle(ssl.wrap_socket(sock, **ssl_args), addr)self.handle = wrap_and_handleelse:self.handle = handledef serve_forever(self):while True:sock, addr = self.server.accept()spawn(self.handle, sock, addr)
Datapath
Datapath类在RYU中极为重要,每当一个datapath实体与控制器建立连接时,就会实例化一个Datapath的对象。 该类中不仅定义了许多的成员变量用于描述一个datapath,还管理控制器与该datapath通信的数据收发。其中_recv_loop函数完成数据的接收与解析,事件的产生与分发。
@_deactivatedef _recv_loop(self):buf = bytearray()required_len = ofproto_common.OFP_HEADER_SIZEcount = 0while self.is_active:ret = self.socket.recv(required_len)if len(ret) == 0:self.is_active = Falsebreakbuf += retwhile len(buf) >= required_len:(version, msg_type, msg_len, xid) = ofproto_parser.header(buf)required_len = msg_lenif len(buf) < required_len:breakmsg = ofproto_parser.msg(self,version, msg_type, msg_len, xid, buf) # 解析报文# LOG.debug('queue msg %s cls %s', msg, msg.__class__)if msg:ev = ofp_event.ofp_msg_to_ev(msg) # 产生事件self.ofp_brick.send_event_to_observers(ev, self.state) # 事件分发 dispatchers = lambda x: x.callers[ev.__class__].dispatchershandlers = [handler for handler inself.ofp_brick.get_handlers(ev) ifself.state in dispatchers(handler)]for handler in handlers:handler(ev)buf = buf[required_len:]required_len = ofproto_common.OFP_HEADER_SIZE# We need to schedule other greenlets. Otherwise, ryu# can't accept new switches or handle the existing# switches. The limit is arbitrary. We need the better# approach in the future.count += 1if count > 2048:count = 0hub.sleep(0)
@_deactivate修饰符作用在于在Datapath断开连接之后,将其状态is_active置为False。self.ofp_brick.send_event_to_observers(ev, self.state) 语句完成了事件的分发。self.brick的初始化语句可以在self.__init__函数中找到:
self.ofp_brick = ryu.base.app_manager.lookup_service_brick('ofp_event')
由上可知,self.ofp_brick实际上是由service_brick(中文可以成为:服务链表?)中的“ofp_event”服务赋值的。在每一个app中,使用@set_ev_cls(ev_cls,dispatchers)时,就会将实例化ofp_event模块,执行文件中最后一句:
handler.register_service('ryu.controller.ofp_handler')
register_service函数实体如下:
def register_service(service):"""Register the ryu application specified by 'service' asa provider of events defined in the calling module.If an application being loaded consumes events (in the sense ofset_ev_cls) provided by the 'service' application, the latterapplication will be automatically loaded.This mechanism is used to e.g. automatically start ofp_handler ifthere are applications consuming OFP events."""frm = inspect.stack()[1]m = inspect.getmodule(frm[0])m._SERVICE_NAME = service
其中inspect.stack()[1]返回了调用此函数的caller, inspect.getmodule(frm[0])返回了该caller的模块,当前例子下,module=ofp_event。
我们可以通过ryu-manager --verbose来查看到输出信息,从而印证这一点。
:~/ryu/ryu/app$ ryu-manager --verbose
loading app ryu.controller.ofp_handler
instantiating app ryu.controller.ofp_handler of OFPHandler
BRICK ofp_eventCONSUMES EventOFPErrorMsgCONSUMES EventOFPEchoRequestCONSUMES EventOFPPortDescStatsReplyCONSUMES EventOFPHelloCONSUMES EventOFPSwitchFeatures
所以当运行ofp_handler应用时,就会注册ofp_event service,为后续的应用提供服务。分发事件之后,还要处理自身订阅的事件,所以首先找到符合当前state的caller,然后调用handler。_caller类可以在handler.py文件中找到,包含dispatchers和ev_source两个成员变量。前者用于描述caller需要的state,后者是event产生者的模块名称。
对应的发送循环由_send_loop完成。self.send_p是一个深度为16的发送queue。
@_deactivatedef _send_loop(self):try:while self.is_active:buf = self.send_q.get()self.socket.sendall(buf)finally:q = self.send_q# first, clear self.send_q to prevent new references.self.send_q = None# there might be threads currently blocking in send_q.put().# unblock them by draining the queue.try:while q.get(block=False):passexcept hub.QueueEmpty:pass
serve函数完成了发送循环的启动和接受循环的启动。启动一个coroutine去执行self._send_loop(), 然后马上主动发送hello报文到datapath(可以理解为交换网桥:Bridge),最后执行self._recv_loop()。
def serve(self):send_thr = hub.spawn(self._send_loop)# send hello message immediatelyhello = self.ofproto_parser.OFPHello(self)self.send_msg(hello)try:self._recv_loop()finally:hub.kill(send_thr)hub.joinall([send_thr])
而serve函数又在datapath_connection_factory函数中被调用。当然向外提供完整功能的API就是这个。所以在OpenFlowController类中可以看到在初始化server实例的时候,handler赋值为datapath_connection_factory。其中使用到的contextlib module具体内容不作介绍,读者可自行学习。
def datapath_connection_factory(socket, address):LOG.debug('connected socket:%s address:%s', socket, address)with contextlib.closing(Datapath(socket, address)) as datapath:try:datapath.serve()except:# Something went wrong.# Especially malicious switch can send malformed packet,# the parser raise exception.# Can we do anything more graceful?if datapath.id is None:dpid_str = "%s" % datapath.idelse:dpid_str = dpid_to_str(datapath.id)LOG.error("Error in the datapath %s from %s", dpid_str, address)raise
到此为止,OFPHandler应用的功能实现介绍完毕。RYU启动时,需要启动OFPHandler,才能完成数据的收发和解析。更多的上层应用逻辑都是在此基础之上进行的。若要开发APP则需要继承RyuApp类,并完成observer监听事件,以及注册handler去完成事件处理。
RyuApp
RyuApp类是RYU封装好的APP基类,用户只需要继承该类,就可以方便地开发应用。而注册对应的observer和handler都使用@derocator的形式,使得开发非常的简单高效,这也是Python的优点之一吧。RyuApp类的定义在ryu/base/app_manager.py文件中。该文件实现了两个类RyuApp和AppManager。前者用于定义APP基类,为应用开发提供基本的模板,后者用于Application的管理,加载应用,运行应用,消息路由等功能。
app_manager.py文件中import了instpect和itertools module,从而使得开发更方便简洁。inspect模块提供了一些有用的方法,用于类型检测,获取内容,检测是否可迭代等功能。itertools则是一个关于迭代器的模块,可以提供丰富的迭代器类型,在数据处理上尤其有用。
_CONTEXT
这是一个极其难理解的概念。博主的理解是,_CONTEXT内存储着name:class的key value pairs。为什么需要存储这个内容?实际上这个_CONTEXT携带的信息是所有本APP需要依赖的APP。需要在启动本应用之前去启动,以满足依赖的,比如一个simple_switch.py的应用,如果没有OFPHandler应用作为数据收发和解析的基础的话,是无法运行的。具体文档如下:
_CONTEXTS = {}"""A dictionary to specify contexts which this Ryu application wants to use.Its key is a name of context and its value is an ordinary classwhich implements the context. The class is instantiated by app_managerand the instance is shared among RyuApp subclasses which has _CONTEXTSmember with the same key. A RyuApp subclass can obtain a reference tothe instance via its __init__'s kwargs as the following.Example::_CONTEXTS = {'network': network.Network}def __init__(self, *args, *kwargs):self.network = kwargs['network']"""
_EVENTS
用于记录本应用会产生的event。但是当且仅当定义该event的语句在其他模块时才会被使用到。
self.__init__
__init__函数中初始化了许多重要的成员变量,如self.event_handler用于记录向外提供的事件处理句柄,而self.observer则刚好相反,用于通知app_manager本应用监听何种类型的事件。self.event是事件队列。
def __init__(self, *_args, **_kwargs):super(RyuApp, self).__init__()self.name = self.__class__.__name__self.event_handlers = {} # ev_cls -> handlers:listself.observers = {} # ev_cls -> observer-name -> states:setself.threads = []self.events = hub.Queue(128)if hasattr(self.__class__, 'LOGGER_NAME'):self.logger = logging.getLogger(self.__class__.LOGGER_NAME)else:self.logger = logging.getLogger(self.name)self.CONF = cfg.CONF# prevent accidental creation of instances of this class outside RyuAppclass _EventThreadStop(event.EventBase):passself._event_stop = _EventThreadStop()self.is_active = True
self.start
start函数将启动coroutine去处理_event_loop,并将其加入threads字典中。
self._event_loop
_event_loop函数用于启动事件处理循环,通过调用self.get_handlers(ev, state)函数来找到事件对应的handler,然后处理事件。
def get_handlers(self, ev, state=None):"""Returns a list of handlers for the specific event.:param ev: The event to handle.:param state: The current state. ("dispatcher")If None is given, returns all handlers for the event.Otherwise, returns only handlers that are interestedin the specified state.The default is None."""ev_cls = ev.__class__handlers = self.event_handlers.get(ev_cls, [])if state is None:return handlersdef _event_loop(self):while self.is_active or not self.events.empty():ev, state = self.events.get()if ev == self._event_stop:continuehandlers = self.get_handlers(ev, state)for handler in handlers:handler(ev)
event dispatch
应用中可以通过@set_ev_cls修饰符去监听某些事件。当产生event时,通过event去get observer,得到对应的观察者,然后再使用self.send_event函数去发送事件。在这里,实际上就是直接往self.event队列中put event。
def _send_event(self, ev, state):self.events.put((ev, state))def send_event(self, name, ev, state=None):"""Send the specified event to the RyuApp instance specified by name."""if name in SERVICE_BRICKS:if isinstance(ev, EventRequestBase):ev.src = self.nameLOG.debug("EVENT %s->%s %s" %(self.name, name, ev.__class__.__name__))SERVICE_BRICKS[name]._send_event(ev, state)else:LOG.debug("EVENT LOST %s->%s %s" %(self.name, name, ev.__class__.__name__))def send_event_to_observers(self, ev, state=None):"""Send the specified event to all observers of this RyuApp."""for observer in self.get_observers(ev, state):self.send_event(observer, ev, state)
其他函数如注册handler函数:register_handler,注册监听函数:register_observer等都是非常简单直白的代码,不再赘述。
AppManager
AppManager类是RYU应用的调度中心。用于管理应用的添加删除,消息路由等等功能。
首先从启动函数开始介绍,我们可以看到run_apps函数中的代码和前文提到的main函数语句基本一样。首先获取一个对象,然后加载对应的apps,然后获取contexts,context中其实包含的是本应用所需要的依赖应用。所以在调用instantiate_apps函数时,将app_lists内的application和contexts中的services都实例化,然后启动协程去运行这些服务。
@staticmethod
def run_apps(app_lists):"""Run a set of Ryu applicationsA convenient method to load and instantiate apps.This blocks until all relevant apps stop."""app_mgr = AppManager.get_instance()app_mgr.load_apps(app_lists)contexts = app_mgr.create_contexts()services = app_mgr.instantiate_apps(**contexts)webapp = wsgi.start_service(app_mgr)if webapp:services.append(hub.spawn(webapp))try:hub.joinall(services)finally:app_mgr.close()
load_apps
首先从创建一个apps_lists的生成器(个人理解应该是生成器而非迭代器)。在while循环中,每次pop一个应用进行处理,然后将其本身和其context中的内容添加到services中,再去调用get_dependent_services函数获取其依赖应用,最后将所有的依赖services添加到app_lists中,循环至最终app_lists内元素全都pop出去,完成application的加载。
def load_apps(self, app_lists):app_lists = [app for appin itertools.chain.from_iterable(app.split(',')for app in app_lists)]while len(app_lists) > 0:app_cls_name = app_lists.pop(0)context_modules = map(lambda x: x.__module__,self.contexts_cls.values())if app_cls_name in context_modules:continueLOG.info('loading app %s', app_cls_name)cls = self.load_app(app_cls_name)if cls is None:continueself.applications_cls[app_cls_name] = clsservices = []for key, context_cls in cls.context_iteritems():v = self.contexts_cls.setdefault(key, context_cls)assert v == context_clscontext_modules.append(context_cls.__module__)if issubclass(context_cls, RyuApp):services.extend(get_dependent_services(context_cls))# we can't load an app that will be initiataed for# contexts.for i in get_dependent_services(cls):if i not in context_modules:services.append(i)if services:app_lists.extend([s for s in set(services)if s not in app_lists])
create_contexts
context实例化函数将context中name:service class键值对的内容实例化成对应的对象,以便加入到services 列表中,从而得到加载。首先从列表中取出对应数据,然后判断是否时RyuApp的子类,是则实例化,否则直接赋值service class。load_app函数在读取的时候还会再次判断是否是RyuApp子类。
def create_contexts(self):for key, cls in self.contexts_cls.items():if issubclass(cls, RyuApp):# hack for dpsetcontext = self._instantiate(None, cls)else:context = cls()LOG.info('creating context %s', key)assert key not in self.contextsself.contexts[key] = contextreturn self.contexts
instantiate_apps
此函数调用了self._instantiate函数,在_instantiate函数中又调用了register_app()函数,此函数将app添加到SERVICE_BRICKS字典之中,然后继续调用了ryu.controller.handler 中的 register_instance函数,最终完成了应用的注册。此后继续调用self._update_bricks函数完成了服务链表的更新,最后启动了所有的应用。
def instantiate_apps(self, *args, **kwargs):for app_name, cls in self.applications_cls.items():self._instantiate(app_name, cls, *args, **kwargs)self._update_bricks()self.report_bricks()threads = []for app in self.applications.values():t = app.start()if t is not None:threads.append(t)return threadsdef _instantiate(self, app_name, cls, *args, **kwargs):# for now, only single instance of a given module# Do we need to support multiple instances?# Yes, maybe for slicing.#LOG.info('instantiating app %s of %s', app_name, cls.__name__)if hasattr(cls, 'OFP_VERSIONS') and cls.OFP_VERSIONS is not None:ofproto_protocol.set_app_supported_versions(cls.OFP_VERSIONS)if app_name is not None:assert app_name not in self.applicationsapp = cls(*args, **kwargs)register_app(app)assert app.name not in self.applicationsself.applications[app.name] = appreturn app
_update_bricks
此函数完成了更新service_bricks的功能。首先从获取到service实例,然后再获取到service中的方法,若方法有callers属性,即使用了@set_ev_cls的装饰符,拥有了calls属性。(caller类中的ev_source和dispatcher成员变量描述了产生该event的source module, dispatcher描述了event需要在什么状态下才可以被分发。如:HANDSHAKE_DISPATCHER,CONFIG_DISPATCHER等。)最后调用register_observer函数注册了observer。
def _update_bricks(self):for i in SERVICE_BRICKS.values():for _k, m in inspect.getmembers(i, inspect.ismethod):if not hasattr(m, 'callers'):continuefor ev_cls, c in m.callers.iteritems():if not c.ev_source:continuebrick = _lookup_service_brick_by_mod_name(c.ev_source)if brick:brick.register_observer(ev_cls, i.name,c.dispatchers)# allow RyuApp and Event class are in different modulefor brick in SERVICE_BRICKS.itervalues():if ev_cls in brick._EVENTS:brick.register_observer(ev_cls, i.name,c.dispatchers)
ryu.controller.handler.register_instance
以上的部分介绍了App的注册,observer的注册,handler的查找和使用,但是,始终没有提到handler在何处注册。实际上,handler的注册在register_instance部分完成了。为什么他的位置在handler文件,而不在app_manager文件呢?个人认为可能是为了给其他非Ryu APP的模块使用吧。
def register_instance(i):for _k, m in inspect.getmembers(i, inspect.ismethod):# LOG.debug('instance %s k %s m %s', i, _k, m)if _has_caller(m):for ev_cls, c in m.callers.iteritems():i.register_handler(ev_cls, m)
2. RYU实践
2.1 二层交换机
http://ryu.readthedocs.org/en/latest/writing_ryu_app.html
第一步:
ryu.base import app_manager:该文件中定义了RyuApp基类,开发APP需要继承该基类;
保存为L2Switch.py 运行: ryu-manager L2Switch.py
from ryu.base import app_managerclass L2Switch(app_manager.RyuApp):def __init__(self, *args, **kwargs):super(L2Switch, self).__init__(*args, **kwargs)
第二步:
ofp_event完成了事件的定义,从而我们可以在函数中注册handler,监听事件,并作出回应。
packet_in_handler方法用于处理packet_in事件。
@set_ev_cls修饰符用于告知RYU,被修饰的函数应该被调用。第一个参数表示事件发生时应该调用的函数,第二个参数告诉交换机只有在交换机握手完成之后,才可以被调用。
数据操作:
- ev.msg:每一个事件类ev中都有msg成员,用于携带触发事件的数据包。
- msg.datapath:已经格式化的msg其实就是一个packet_in报文,msg.datapath直接可以获得packet_in报文的datapath结构。datapath用于描述一个交换网桥。也是和控制器通信的实体单元。datapath.send_msg()函数用于发送数据到指定datapath。通过datapath.id可获得dpid数据,在后续的教程中会有使用。
- datapath.ofproto对象是一个OpenFlow协议数据结构的对象,成员包含OpenFlow协议的数据结构,如动作类型OFPP_FLOOD。
- datapath.ofp_parser则是一个按照OpenFlow解析的数据结构。
- actions是一个列表,用于存放action list,可在其中添加动作。
- 通过ofp_parser类,可以构造构造packet_out数据结构。括弧中填写对应字段的赋值即可。
如果datapath.send_msg()函数发送的是一个OpenFlow的数据结构,RYU将把这个数据发送到对应的datapath。
from ryu.base import app_manager
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER
from ryu.controller.handler import set_ev_clsclass L2Switch(app_manager.RyuApp):def __init__(self, *args, **kwargs):super(L2Switch, self).__init__(*args, **kwargs)@set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)def packet_in_handler(self, ev):msg = ev.msgdatapath = msg.datapathofp = datapath.ofprotoofp_parser = datapath.ofproto_parseractions = [ofp_parser.OFPActionOutput(ofp.OFPP_FLOOD)]out = ofp_parser.OFPPacketOut(datapath=datapath, buffer_id=msg.buffer_id, in_port=msg.in_port,actions=actions)datapath.send_msg(out)
第三步:
import struct
import loggingfrom ryu.base import app_manager
from ryu.controller import mac_to_port
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.ofproto import ofproto_v1_0
from ryu.lib.mac import haddr_to_bin
from ryu.lib.packet import packet
from ryu.lib.packet import ethernetclass L2Switch(app_manager.RyuApp):OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION]#define the version of OpenFlowdef __init__(self, *args, **kwargs):super(L2Switch, self).__init__(*args, **kwargs)self.mac_to_port = {}def add_flow(self, datapath, in_port, dst, actions):ofproto = datapath.ofprotomatch = datapath.ofproto_parser.OFPMatch(in_port = in_port, dl_dst = haddr_to_bin(dst))mod = datapath.ofproto_parser.OFPFlowMod(datapath = datapath, match = match, cookie = 0,command = ofproto.OFPFC_ADD, idle_timeout = 10,hard_timeout = 30,priority = ofproto.OFP_DEFAULT_PRIORITY,flags =ofproto.OFPFF_SEND_FLOW_REM, actions = actions)datapath.send_msg(mod)@set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)def packet_in_handler(self, ev):msg = ev.msgdatapath = msg.datapathofproto = datapath.ofprotopkt = packet.Packet(msg.data)eth = pkt.get_protocol(ethernet.ethernet)dst = eth.dstsrc = eth.srcdpid = datapath.id #get the dpidself.mac_to_port.setdefault(dpid, {})self.logger.info("packet in %s %s %s %s", dpid, src, dst , msg.in_port)#To learn a mac address to avoid FLOOD next time.self.mac_to_port[dpid][src] = msg.in_portout_port = ofproto.OFPP_FLOOD#Look up the out_port if dst in self.mac_to_port[dpid]:out_port = self.mac_to_port[dpid][dst]ofp_parser = datapath.ofproto_parseractions = [ofp_parser.OFPActionOutput(out_port)]if out_port != ofproto.OFPP_FLOOD:self.add_flow(datapath, msg.in_port, dst, actions)#We always send the packet_out to handle the first packet.packet_out = ofp_parser.OFPPacketOut(datapath = datapath, buffer_id = msg.buffer_id,in_port = msg.in_port, actions = actions)datapath.send_msg(packet_out)#To show the message of ports' status.@set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER)def _port_status_handler(self, ev):msg = ev.msgreason = msg.reasonport_no = msg.desc.port_noofproto = msg.datapath.ofprotoif reason == ofproto.OFPPR_ADD:self.logger.info("port added %s", port_no)elif reason == ofproto.OFPPR_DELETE:self.logger.info("port deleted %s", port_no)elif reason == ofproto.OFPPR_MODIFY:self.logger.info("port modified %s", port_no)else:self.logger.info("Illeagal port state %s %s", port_no, reason)
2.2 simple-switch.py 的APP测试
在mininet上模拟一台交换机(s1)三台主机(h1,h2,h3),然后远端连接RYU控制器,使用127.0.0.1,和6633端口建立连接
第一,在RYU控制器开启simple-switch.py的APP,输入命令:ryu-manager simple-switch.py:
第二,在另外一个终端上建立mininet模拟拓扑,输入命令:mn --topo single,3 --mac --switch ovsk --controller remote
然后在RYU的那个终端就会显示连接的建立,同时,也会同步一些交换机和控制器建立连接的信息,如图:
此时,在交换机的转发流表是空的,因此此时主机之间是不可以通信的,在使用h1去ping h2的时候,就会自动建立流表
注意是先进行广播,然后建立反方向的流表,然后建立正方向的流表。流表如图:
资料出处:
http://ryu.readthedocs.org/en/latest/api_ref.html
http://www.sdnlab.com/6395.html
http://www.sdnlab.com/12838.html