Hugo Friberg
Hugo Friberg

Reputation: 1

Trouble implementing ECMP in a Ryu controller running OpenFlow 1.3 in mininet

I am trying to implement ECMP load balancing in a 4 pod, fat-tree network. I.E 4 core switches connected to 4 pods containing 4 switches and 8 hosts each.

When running pingAll() host 1 (a0:00:00:00:01 / 10.0.0.1) to host 3 (a0:00:00:00:00:03 / 10.0.0.3) switch nr.7 (which host 1 is connected to) should use ECMP to forward the ping to either switch 5 (port 1) or 6 (port 2), but neither of these switches recieve the ping from switch 5.

The links are: S5-S7, S5-S8, S6-S7, S6-S8.

The links are set up so that using ports 1 and 2 will send the packet "upward" in the network, and using ports 3 and 4 will send the packet "downward" in the network.

from ryu.base import app_manager
from ryu.controller import ofp_event
from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.ofproto import ofproto_v1_3
from ryu.lib.packet import packet
from ryu.lib.packet import ethernet, arp
from ryu.lib.packet import ether_types


class SimpleSwitch13(app_manager.RyuApp):
    OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]

    def __init__(self, *args, **kwargs):
        super(SimpleSwitch13, self).__init__(*args, **kwargs)
        self.mac_to_port = {
            '7': {'a0:00:00:00:00:01': 3, 'a0:00:00:00:00:02': 4}, '8': {'a0:00:00:00:00:03': 3, 'a0:00:00:00:00:04': 4},
            '11': {'a0:00:00:00:00:05': 3, 'a0:00:00:00:00:06': 4}, '12': {'a0:00:00:00:00:07': 3, 'a0:00:00:00:00:08': 4},
            '15': {'a0:00:00:00:00:09': 3, 'a0:00:00:00:00:10': 4}, '16': {'a0:00:00:00:00:11': 3, 'a0:00:00:00:00:12': 4},
            '19': {'a0:00:00:00:00:13': 3, 'a0:00:00:00:00:14': 4}, '20': {'a0:00:00:00:00:15': 3, 'a0:00:00:00:00:16': 4}
        }
        self.arp_table = {
            'a0:00:00:00:00:01': '10.0.0.1', 'a0:00:00:00:00:02': '10.0.0.2',
            'a0:00:00:00:00:03': '10.0.0.3', 'a0:00:00:00:00:04': '10.0.0.4',

            'a0:00:00:00:00:05': '10.0.0.5', 'a0:00:00:00:00:06': '10.0.0.6',
            'a0:00:00:00:00:07': '10.0.0.7', 'a0:00:00:00:00:08': '10.0.0.8',

            'a0:00:00:00:00:09': '10.0.0.9', 'a0:00:00:00:00:10': '10.0.0.10',
            'a0:00:00:00:00:11': '10.0.0.11', 'a0:00:00:00:00:12': '10.0.0.12',

            'a0:00:00:00:00:13': '10.0.0.13', 'a0:00:00:00:00:14': '10.0.0.14',
            'a0:00:00:00:00:15': '10.0.0.15', 'a0:00:00:00:00:16': '10.0.0.16',
        }
        self.pod_map = {
            'a0:00:00:00:00:01': 1, 'a0:00:00:00:00:02': 1, 'a0:00:00:00:00:03': 1, 'a0:00:00:00:00:04': 1,
            'a0:00:00:00:00:05': 2, 'a0:00:00:00:00:06': 2, 'a0:00:00:00:00:07': 2, 'a0:00:00:00:00:08': 2,
            'a0:00:00:00:00:09': 3, 'a0:00:00:00:00:10': 3, 'a0:00:00:00:00:11': 3, 'a0:00:00:00:00:12': 3,
            'a0:00:00:00:00:13': 4, 'a0:00:00:00:00:14': 4, 'a0:00:00:00:00:15': 4, 'a0:00:00:00:00:16': 4
        }

        self.upPorts = [1,2]
        self.downPorts = [3,4]
        self.coreS = [1,2,3,4]
        self.aggS = [5,6,9,10,13,14,17,18]
        self.edgeS = [7,8,11,12,15,16,19,20]


    def addEdgeSwitchSettings(self, dp, msg):
        ofproto = dp.ofproto
        parser = dp.ofproto_parser

        #Host is on the same switch
        for key, value in self.mac_to_port[str(dp.id)].items():
            actions = [parser.OFPActionOutput(value)]
            match = parser.OFPMatch(eth_dst=key)
            self.add_flow(dp, 3, match, actions)
            print(f'[{dp.id}] added flow rule: {key} on port {value}')

        #Packet comes from above but is not destined for this switch
        match1 = parser.OFPMatch(in_port=self.upPorts[0])
        match2 = parser.OFPMatch(in_port=self.upPorts[1])

        actions1 = [parser.OFPActionOutput(self.upPorts[1])]
        actions2 = [parser.OFPActionOutput(self.upPorts[0])]

        self.add_flow(dp, 2, match1, actions1)
        self.add_flow(dp, 2, match2, actions2)
        print(f'[{dp.id}] added flow rule: miss-routed packet on port {self.upPorts[0]} -> port {self.upPorts[1]}')
        print(f'[{dp.id}] added flow rule: miss-routed packet on port {self.upPorts[1]} -> port {self.upPorts[0]}')


    def addCoreSwitchSettings(self, dp, msg):
        ofproto = dp.ofproto
        parser = dp.ofproto_parser

        for key, value in self.pod_map.items():
            actions = [parser.OFPActionOutput(value)]
            match = parser.OFPMatch(eth_dst=key)
            self.add_flow(dp, 1, match, actions)
            print(f'[{dp.id}] added flow rule: {key} on port {value}')


    @set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
    def switch_features_handler(self, ev):
        datapath = ev.msg.datapath
        msg = ev.msg
        dp = msg.datapath
        ofproto = datapath.ofproto
        parser = datapath.ofproto_parser

        #Proactivly add flow rules for the edge switches. When both hosts are on the same switch
        self.mac_to_port.setdefault(str(dp.id), {})

        #Proactivly add flow rules for the core switches.
        if dp.id in self.coreS:
            self.addCoreSwitchSettings(dp, msg)

        #Add missed for all un-handled packets
        match = parser.OFPMatch()
        actions = [parser.OFPActionOutput(ofproto.OFPP_CONTROLLER,
                                          ofproto.OFPCML_NO_BUFFER)]
        self.add_flow(datapath, 0, match, actions)

        #Create RR Group Entries
        #    1 = Upwards
        #    2 = Downwards
        lBucket = parser.OFPBucket(actions=[parser.OFPActionOutput(self.upPorts[0])])
        rBucket = parser.OFPBucket(actions=[parser.OFPActionOutput(self.upPorts[1])])

        RRUp_mod = parser.OFPGroupMod(dp, ofproto.OFPGC_ADD, ofproto.OFPGT_SELECT, 1,
            [lBucket, rBucket])
        dp.send_msg(RRUp_mod)

        lBucket2 = parser.OFPBucket(actions=[parser.OFPActionOutput(self.downPorts[0])])
        rBucket3 = parser.OFPBucket(actions=[parser.OFPActionOutput(self.downPorts[1])])

        RRDown_mod = parser.OFPGroupMod(dp, ofproto.OFPGC_ADD, ofproto.OFPGT_SELECT, 2,
            [lBucket2, rBucket3])
        dp.send_msg(RRDown_mod)

        if dp.id in self.edgeS:
            self.addEdgeSwitchSettings(dp, msg)


    #Add a flow rule
    def add_flow(self, datapath, priority, match, actions, buffer_id=None):
        ofproto = datapath.ofproto
        parser = datapath.ofproto_parser

        inst = [parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS,
                                             actions)]
        if buffer_id:
            mod = parser.OFPFlowMod(datapath=datapath, buffer_id=buffer_id,
                                    priority=priority, match=match,
                                    instructions=inst)
        else:
            mod = parser.OFPFlowMod(datapath=datapath, priority=priority,
                                    match=match, instructions=inst)
        datapath.send_msg(mod)


    @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
    def _packet_in_handler(self, ev):
        # If you hit this you might want to increase
        # the "miss_send_length" of your switch
        if ev.msg.msg_len < ev.msg.total_len:
            self.logger.debug("packet truncated: only %s of %s bytes",
                              ev.msg.msg_len, ev.msg.total_len)
        msg = ev.msg
        datapath = msg.datapath
        dp = msg.datapath
        ofproto = datapath.ofproto
        parser = datapath.ofproto_parser
        in_port = msg.match['in_port']
        dpid = dp.id

        pkt = packet.Packet(msg.data)
        eth = pkt.get_protocols(ethernet.ethernet)[0]

        self.mac_to_port.setdefault(str(dpid), {})

        dst = eth.dst
        src = eth.src

        if eth.ethertype == ether_types.ETH_TYPE_LLDP:
            # ignore lldp packet
            return

        #Ignore multicast packets
        if dst[:6] == '33:33:':
            return

        self.mac_to_port[str(dp.id)][src] = in_port

        if eth.ethertype == ether_types.ETH_TYPE_ARP:
            arp_pkt = pkt.get_protocols(arp.arp)[0]
            self.ARP(dp, msg, src, dst, arp_pkt)
            return

        if dpid in [5,6]:
           print(f'[{dp.id}] {src}->{dst}')

        if dpid in self.edgeS:
            self.handleEdges(dp, msg, src, dst)
            return

        if dpid in self.aggS:
            self.handleAggs(dp, msg, src, dst)
            return

        print(f'[{dp.id}] {src}->{dst}')


    #Handle Aggregation switches
    def handleAggs(self, dp, msg, src, dst):
        parser = dp.ofproto_parser
        ofproto = dp.ofproto
        in_port = msg.match['in_port']

        print(f'[{dp.id}] packet {src}->{dst}')

        if in_port in self.upPorts: #from another pod, apply ECMP
            actions = [parser.OFPActionGroup(group_id=2)]
            match = parser.OFPMatch(eth_dst=dst)
            self.add_flow(dp, 1, match, actions)
            self.send_flow(dp, msg, actions)
            print(f'[{dp.id}] added flow rule: ->{dst} ECMP')
            return

        if in_port in self.downPorts and not self.inSamePod(src, dst): #going to another pod, apply ECMP
            actions = [parser.OFPActionGroup(group_id=1)]
            match = parser.OFPMatch(eth_dst=dst)
            self.add_flow(dp, 1, match, actions)
            self.send_flow(dp, msg, actions)
            print(f'[{dp.id}] added flow rule: ->{dst} ECMP')
            return

       #Going to the 'other side' of the pod
        match = parser.OFPMatch(in_port=in_port, eth_dst=dst)
        if in_port == self.downPorts[0]:
            out_port = self.downPorts[1]
        else:
            out_port = self.downPorts[0]
        actions = [parser.OFPActionOutput(out_port)]

        self.add_flow(dp, 1, match, actions)
        self.send_flow(dp, msg, actions)
        print(f'[{dp.id}] added flow rule: {src}->{dst} on port {out_port}')


    #Handle Edge switches
    def handleEdges(self, dp, msg, src, dst):
        parser = dp.ofproto_parser
        ofproto = dp.ofproto

        print(f'[{dp.id}] handling ECMP for {src}->{dst}')

        actions = [parser.OFPActionGroup(group_id=1)]
        match = parser.OFPMatch(eth_dst=dst, eth_src=src)
        self.add_flow(dp, 5, match, actions)
        self.send_flow(dp, msg, actions)
        print(f'[{dp.id}] added flow rule: ->{dst} ECMP with actions {actions}')


    #Handle ARP
    def ARP(self, dp, msg, src, dst, arp_pkt):
        proto = dp.ofproto
        parser = dp.ofproto_parser
        src_ip = arp_pkt.src_ip
        dst_ip = arp_pkt.dst_ip
        in_port = msg.match['in_port']

        dst_mac = None
        for key, value in self.arp_table.items():
            if dst_ip == value:
                dst_mac = key

        eth_reply = ethernet.ethernet(dst=src, src=dst, ethertype=ether_types.ETH_TYPE_ARP)
        arp_reply = arp.arp(opcode=arp.ARP_REPLY, src_mac=dst_mac, src_ip=dst_ip, dst_mac=src, dst_ip=self.arp_table[src])

        pkt = packet.Packet()
        pkt.add_protocol(eth_reply)
        pkt.add_protocol(arp_reply)
        pkt.serialize()

        actions = [parser.OFPActionOutput(in_port)]
        out = parser.OFPPacketOut(datapath=dp, buffer_id=proto.OFP_NO_BUFFER, in_port=proto.OFPP_CONTROLLER, actions=actions,
            data=pkt.data)

        dp.send_msg(out)


    #Send packet decision to switch
    def send_flow(self, dp, msg, actions):
        proto = dp.ofproto
        parser = dp.ofproto_parser
        data = None
        in_port = msg.match['in_port']

        if msg.buffer_id == proto.OFP_NO_BUFFER:
            data = msg.data

        out = parser.OFPPacketOut(datapath=dp, buffer_id=msg.buffer_id,
            in_port=in_port, actions=actions, data=data)
        dp.send_msg(out)


    #Check if two hosts are in the same Pod
    def inSamePod(self, src, dst):
        return self.pod_map[src] == self.pod_map[dst]

Before implementing ECMP with buckets, I originally used Round Robbin for load balancing. This worked and so I'm pretty sure the problem lies in how I've implemented ECMP and not the rest of the controller logic.

Network topology

Upvotes: 0

Views: 20

Answers (0)

Related Questions