Reputation: 11
I need to get the queue statistics to calculate the queue length to prevent the queue from increasing the queue. The queue length is to be calculated periodically. That's why it is necessary to write an app.
The example of the ofproto_v1_3_parser.py file (https://github.com/faucetsdn/ryu/blob/master/ryu/ofproto/ofproto_v1_3_parser.py) to obtain the queue statistics is below:
from ryu.base import app_manager
from ryu.controller import ofp_event
from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER,DEAD_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.ofproto import ofproto_v1_3
from ryu.lib.packet import packet
from ryu.lib.packet import ethernet
from ryu.lib.packet import ether_types
from operator import attrgetter
from time import sleep
from ryu.app import simple_switch_13
from ryu.lib import hub
from ryu.ofproto.ofproto_v1_3_parser import OFPQueueStatsRequest
from ryu.ofproto.ofproto_v1_3_parser import OFPQueueStats
class simplequeues(simple_switch_13.SimpleSwitch13):
def __init__(self, *args, **kwargs):
super(simplequeues, self).__init__(*args, **kwargs)
self.mac_to_port = {}
self.datapaths = {}
self.monitor_thread = hub.spawn(self._monitorqueues)
@set_ev_cls(ofp_event.EventOFPStateChange,
[MAIN_DISPATCHER, DEAD_DISPATCHER])
def _state_change_handler(self, ev):
datapath = ev.datapath
if ev.state == MAIN_DISPATCHER:
if datapath.id not in self.datapaths:
self.logger.debug('register datapath: %016x', datapath.id)
self.datapaths[datapath.id] = datapath
elif ev.state == DEAD_DISPATCHER:
if datapath.id in self.datapaths:
self.logger.debug('unregister datapath: %016x', datapath.id)
del self.datapaths[datapath.id]
def _monitorqueues(self):
while True:
for dp in self.datapaths.values():
self.send_queue_stats_request(dp)
hub.sleep(30)
def send_queue_stats_request(self, datapath):
self.logger.debug('send stats request: %016x', datapath.id)
ofp = datapath.ofproto
ofp_parser = datapath.ofproto_parser
req = ofp_parser.OFPQueueStatsRequest(datapath, ofp.OFPP_ANY,ofp.OFPQ_ALL)
print('*'*20)
datapath.send_msg(req)
print('send*'*20)
@set_ev_cls(ofp_event.EventOFPQueueStatsReply, MAIN_DISPATCHER)
def queue_stats_reply_handler(self, ev):
queues = []
for stat in ev.msg.body:
queues.append('port_no=%d queue_id=%d '
'tx_bytes=%d tx_packets=%d tx_errors=%d '
'duration_sec=%d duration_nsec=%d' %
(stat.port_no, stat.queue_id,
stat.tx_bytes, stat.tx_packets, stat.tx_errors,
stat.duration_sec, stat.duration_nsec))
self.logger.debug('QueueStats: %s', queues)
I have found another solution in books:
@set_ev_cls(ofp_event.EventOFPStatsReply, MAIN_DISPATCHER)
def stats_reply_handler(self, ev):
print("!!"*30,"start reply")
msg = ev.msg
ofp = msg.datapath.ofproto
body = ev.msg.body
print("!!"*30,"start reply condition")
if msg.type == ofp.OFPST_QUEUE:
print("!!"*30,"start request")
self.queue_stats_reply_handler(body)
def queue_stats_reply_handler(self, body):
print("start 2 reply")
queues = []
for stat in body:
queues.append('port_no=%d queue_id=%d '
'tx_bytes=%d tx_packets=%d tx_errors=%d ' %
(stat.port_no, stat.queue_id,
stat.tx_bytes, stat.tx_packets, stat.tx_errors))
self.logger.debug('QueueStats: %s', queues)
But I saw the following output after the Ryu-Manager run in the terminal:
EventOFPErrorMsg received.
version=0x4, msg_type=0x1, msg_len=0x28, xid=0x29c9db81
`-- msg_type: OFPT_ERROR(1)
OFPErrorExperimenterMsg(type=0xffff, exp_type=0xa50, experimenter=0x4f4e4600, data=b'\x04\x12\x00\x18\x29\xc9\xdb\x81\x00\x05\x0f\xff\x00\x00\x00\x00\x00\x00\x0f\xff\x00\x00\x0f\xff')
`-- data: version=0x4, msg_type=0x12, msg_len=0x18, xid=0x29c9db81
`-- msg_type: OFPT_MULTIPART_REQUEST(18)
Thank you in advance for your help
Upvotes: 0
Views: 247
Reputation: 11
Fortunately, I got the answer. I had a problem because there was an access point in addition to the switch in my network topology. By removing the access points, I was able to achieve my desired result. I will put the relevant code below, maybe someone will have a similar problem later.I have assumed that the ID of the switches starts with 00: from operator import attrgetter
from ryu.app import simple_switch_13
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.lib import hub
from ryu.lib import dpid as dpid_lib
import json
import subprocess
class SimpleMonitor13(simple_switch_13.SimpleSwitch13):
def __init__(self, *args, **kwargs):
super(SimpleMonitor13, self).__init__(*args, **kwargs)
self.datapaths = {}
self.monitor_thread = hub.spawn(self._monitor)
@set_ev_cls(ofp_event.EventOFPStateChange,
[MAIN_DISPATCHER, DEAD_DISPATCHER])
def _state_change_handler(self, ev):
datapath = ev.datapath
if ev.state == MAIN_DISPATCHER:
if datapath.id not in self.datapaths:
self.logger.debug('register datapath: %016x', datapath.id)
self.datapaths[datapath.id] = datapath
elif ev.state == DEAD_DISPATCHER:
if datapath.id in self.datapaths:
self.logger.debug('unregister datapath: %016x', datapath.id)
del self.datapaths[datapath.id]
def _monitor(self):
while True:
for dp in self.datapaths.values():
dpid_str = dpid_lib.dpid_to_str(dp.id)
if dpid_str[:2] == '00':
self.send_queue_stats_request(dp)
hub.sleep(10)
def send_queue_stats_request(self, datapath):
ofp = datapath.ofproto
ofp_parser = datapath.ofproto_parser
dpid_str = dpid_lib.dpid_to_str(datapath.id)
#print(dpid_str,'\n')
if dpid_str[:2] == '00':
req = ofp_parser.OFPQueueStatsRequest(datapath, 0, ofp.OFPP_ANY,
ofp.OFPQ_ALL)
print("*"*40,"start request queues stats",'*'*40)
datapath.send_msg(req)
@set_ev_cls(ofp_event.EventOFPQueueStatsReply, MAIN_DISPATCHER)
def queue_stats_reply_handler(self, ev):
queues = []
dpid_str = dpid_lib.dpid_to_str(ev.msg.datapath.id)
#print(dpid_str,'\n')
if dpid_str[:2] == '00':
self.logger.info('swtch-id port_no in-queue_id tx_bytes tx_packets tx_errors duration_sec duration_nsec')
self.logger.info('-------- -------- -------- -------- -------- -------- -------- --------')
for stat in ev.msg.body:
self.logger.info('%016x %8x %8x %8d %8d %8d %8d %8d',
ev.msg.datapath.id,
stat.port_no,
stat.queue_id, stat.tx_bytes,
stat.tx_packets,
stat.tx_errors, stat.duration_sec,stat.duration_nsec)
queues.append('port_no=%d queue_id=%d '
'tx_bytes=%d tx_packets=%d tx_errors=%d '
'duration_sec=%d duration_nsec=%d' %
(stat.port_no, stat.queue_id,
stat.tx_bytes, stat.tx_packets, stat.tx_errors,
stat.duration_sec, stat.duration_nsec))
self.logger.debug('QueueStats: %s', queues)
Upvotes: 0