#!/usr/bin/python3 import sys import struct import array from optparse import OptionParser from collections import defaultdict from netlink import parse_attributes, Message, Nested, U8Attr, StrAttr, NulStrAttr, Connection, NLM_F_REQUEST NETLINK_ROUTE = 0 RTM_GETDCB = 78 RTM_SETDCB = 79 DCB_CMD_IEEE_SET = 20 DCB_CMD_IEEE_GET = 21 DCB_CMD_GDCBX = 22 DCB_CMD_SDCBX = 23 DCB_CMD_IEEE_DEL = 27 DCB_ATTR_IFNAME = 1 DCB_ATTR_IEEE = 13 DCB_ATTR_DCBX = 14 DCB_ATTR_IEEE_ETS = 1 DCB_ATTR_IEEE_PFC = 2 DCB_ATTR_IEEE_APP_TABLE = 3 DCB_ATTR_IEEE_APP = 1 DCB_CAP_DCBX_HOST = 0x1 IEEE_8021QAZ_TSA_STRICT = 0 IEEE_8021QAZ_TSA_ETS = 2 IEEE_8021QAZ_TSA_VENDOR = 255 IEEE_8021QAZ_APP_SEL_DSCP = 5 IEEE_8021QAZ_APP_SEL_ETHERTYPE = 1 parser = OptionParser(usage="%prog -i [options]", version="%prog 1.2") parser.add_option("-d", "--dcbx", dest="dcbx", help="set dcbx mode to firmware controlled(fw) or " + "OS controlled(os). Note, when in OS mode, mlnx_qos should not be used " + "in parallel with other dcbx tools such as lldptool") parser.add_option("--trust", dest="trust", help="set priority trust state to pcp or dscp") parser.add_option("--dscp2prio", dest="dscp2prio", action="append", help="set/del a (dscp,prio) mapping, or 'flush' to delete all. Example 'set,30,2' maps dscp 30 to priority 2. " + "'del,30,2' deletes the mapping. Deleting last dscp or using 'flush' will automatically change trust to pcp.") parser.add_option("-i", "--interface", dest="intf", help="Interface name") (options, args) = parser.parse_args() if len(args) > 0: print("Bad arguments") parser.print_usage() sys.exit(1) if (options.intf == None): print("Interface name is required") parser.print_usage() sys.exit(1) class DcbApp: def __init__(self, selector, priority, protocol): self.selector = selector self.priority = priority self.protocol = protocol class DcbAppTable: def __init__ (self): self.apps = {} def countAppSelector(self, selector): count = 0 for i in range(len(self.apps)): if self.apps[i].selector == selector: count = count + 1 return count def printAppSelector(self, selector): s = ["","","","","","","",""] for i in range(len(self.apps)): if self.apps[i].selector == selector: s[self.apps[i].priority] += '%02d,' % self.apps[i].protocol for i in range(8): temp = "" pad = "\tprio:%d dscp:" %i while (len(s[i]) > 24): temp += pad + s[i][:24] + "\n" s[i] = s[i][24:] if s[i] != "": temp += pad + s[i] if temp != "": print(temp) def printDefaultPriority(self): for app in self.apps.values(): if app.selector == IEEE_8021QAZ_APP_SEL_ETHERTYPE: print("\tprio:%d" % app.priority) def delAppEntry(self, ctrl, selector): for i in range(len(self.apps)): if self.apps[i].selector == selector: ctrl.del_ieee_app(self.apps[i].selector, self.apps[i].priority, self.apps[i].protocol) def setDefaultAppEntry(self, ctrl, selector, max_protocol): for i in range(max_protocol): ctrl.set_ieee_app(selector, i >> 3, i) #firmware default return def from_bytes(a, string): if sys.version_info[0] < 3: a.fromstring(string) else: a.frombytes(string) def to_bytes(a): if sys.version_info[0] < 3: return a.tostring() else: return a.tobytes() class DcbnlHdr: def __init__(self, len, type): self.len = len self.type = type def _dump(self): return struct.pack("BBxx", self.len, self.type) class DcbNlMessage(Message): def __init__(self, type, cmd, attrs=[], flags=0): self.type = type self.cmd = cmd self.attrs = attrs Message.__init__(self, type, flags=flags, payload=[DcbnlHdr(len=0, type=self.cmd)]+attrs) @staticmethod def recv(conn): msgs = conn.recv() packet = msgs[0].payload dcb_family, cmd = struct.unpack("BBxx", packet[:4]) dcbnlmsg = DcbNlMessage(dcb_family, cmd) dcbnlmsg.attrs = parse_attributes(packet[4:]) return dcbnlmsg class DcbController: def __init__(self, intf): self.conn = Connection(NETLINK_ROUTE) self.intf = intf.encode() def check_err(self, m, attr_type): if m.attrs[attr_type].u8(): err = OSError("Netlink error: Bad value. see dmesg.") raise err def get_dcbx(self): a = NulStrAttr(DCB_ATTR_IFNAME, self.intf) m = DcbNlMessage(type = RTM_GETDCB, cmd = DCB_CMD_GDCBX, flags=NLM_F_REQUEST, attrs=[a]) m.send(self.conn) m = DcbNlMessage.recv(self.conn) return m.attrs[DCB_ATTR_DCBX].u8() def set_dcbx(self, mode): a = NulStrAttr(DCB_ATTR_IFNAME, self.intf) mode_attr = U8Attr(DCB_ATTR_DCBX , mode) m = DcbNlMessage(type = RTM_SETDCB, cmd = DCB_CMD_SDCBX, flags=NLM_F_REQUEST, attrs=[a, mode_attr]) m.send(self.conn) m = DcbNlMessage.recv(self.conn) self.check_err(m, DCB_ATTR_DCBX) def get_ieee_pfc_en(self): a = NulStrAttr(DCB_ATTR_IFNAME, self.intf) m = DcbNlMessage(type = RTM_GETDCB, cmd = DCB_CMD_IEEE_GET, flags=NLM_F_REQUEST, attrs=[a]) m.send(self.conn) m = DcbNlMessage.recv(self.conn) ieee = m.attrs[DCB_ATTR_IEEE].nested() a = array.array('B') from_bytes(a, ieee[DCB_ATTR_IEEE_PFC].str()[0:]) return a[1] def get_ieee_pfc_delay(self): a = NulStrAttr(DCB_ATTR_IFNAME, self.intf) m = DcbNlMessage(type = RTM_GETDCB, cmd = DCB_CMD_IEEE_GET, flags=NLM_F_REQUEST, attrs=[a]) m.send(self.conn) m = DcbNlMessage.recv(self.conn) ieee = m.attrs[DCB_ATTR_IEEE].nested() a = array.array('B') from_bytes(a, ieee[DCB_ATTR_IEEE_PFC].str()[0:]) return a[4] + (a[5] << 8) def get_ieee_ets(self): a = NulStrAttr(DCB_ATTR_IFNAME, self.intf) m = DcbNlMessage(type = RTM_GETDCB, cmd = DCB_CMD_IEEE_GET, flags=NLM_F_REQUEST, attrs=[a]) m.send(self.conn) m = DcbNlMessage.recv(self.conn) ieee = m.attrs[DCB_ATTR_IEEE].nested() willing, ets_cap, cbs = struct.unpack_from("BBB", ieee[DCB_ATTR_IEEE_ETS].str(), 0) a = array.array('B') from_bytes(a, ieee[DCB_ATTR_IEEE_ETS].str()[3:]) f = lambda A, n=8: [A[i:i+n] for i in range(0, len(A), n)] tc_tc_bw, tc_rx_bw, tc_tsa, prio_tc, tc_reco_bw, tc_reco_tsa, reco_prio_tc = f(a,8) return prio_tc, tc_tsa, tc_tc_bw def get_ieee_app_table(self): a = NulStrAttr(DCB_ATTR_IFNAME, self.intf) m = DcbNlMessage(type = RTM_GETDCB, cmd = DCB_CMD_IEEE_GET, flags=NLM_F_REQUEST, attrs=[a]) m.send(self.conn) m = DcbNlMessage.recv(self.conn) ieee = m.attrs[DCB_ATTR_IEEE].nested() ieee_app_table = ieee[DCB_ATTR_IEEE_APP_TABLE] attrs = ieee_app_table.get_app_table() appTable = DcbAppTable() for i in range(len(attrs)): selector, priority, protocol = struct.unpack('BBH', attrs[i].data) appTable.apps[i] = DcbApp(selector, priority, protocol) return appTable def set_ieee_app(self, selector, priority, protocol): dcb_app = struct.pack("BBH", selector, priority, protocol) intf = NulStrAttr(DCB_ATTR_IFNAME, self.intf) ieee_app = StrAttr(DCB_ATTR_IEEE_APP, dcb_app) ieee_app_table = Nested(DCB_ATTR_IEEE_APP_TABLE, [ieee_app]); ieee = Nested(DCB_ATTR_IEEE, [ieee_app_table]); m = DcbNlMessage(type = RTM_SETDCB, cmd = DCB_CMD_IEEE_SET, flags=NLM_F_REQUEST, attrs=[intf, ieee]) m.send(self.conn) m = DcbNlMessage.recv(self.conn) self.check_err(m, DCB_ATTR_IEEE) def del_ieee_app(self, selector, priority, protocol): dcb_app = struct.pack("BBH", selector, priority, protocol) intf = NulStrAttr(DCB_ATTR_IFNAME, self.intf) ieee_app = StrAttr(DCB_ATTR_IEEE_APP, dcb_app) ieee_app_table = Nested(DCB_ATTR_IEEE_APP_TABLE, [ieee_app]); ieee = Nested(DCB_ATTR_IEEE, [ieee_app_table]); m = DcbNlMessage(type = RTM_SETDCB, cmd = DCB_CMD_IEEE_DEL, flags=NLM_F_REQUEST, attrs=[intf, ieee]) m.send(self.conn) m = DcbNlMessage.recv(self.conn) self.check_err(m, DCB_ATTR_IEEE) def pretty_print(prio_tc, tsa, tcbw, pfc_en, trust, pfc_delay): appTable = ctrl.get_ieee_app_table() if (ctrl.get_dcbx() & DCB_CAP_DCBX_HOST): print("DCBX mode: OS controlled") else: print("DCBX mode: Firmware controlled") print ("Priority trust state: " + trust) if trust == "dscp": print("dscp2prio mapping:") appTable.printAppSelector(IEEE_8021QAZ_APP_SEL_DSCP) print("default priority:") appTable.printDefaultPriority() tc2up = defaultdict(list) print("Cable len: %d" % pfc_delay) print("PFC configuration:") print("\tpriority 0 1 2 3 4 5 6 7") msg = "\tenabled " for up in range(8): msg += "%1d " % ((pfc_en >> up) & 0x01) print(msg) for up in range(len(prio_tc)): tc = prio_tc[up] tc2up[int(tc)].append(up) for tc in sorted(tc2up): r = "unlimited" msg = "" try: msg = "tc: %d , tsa: " % (tc) except Exception as err: pass try: if (tsa[tc] == IEEE_8021QAZ_TSA_ETS): msg +="ets, bw: %s%%" % (tcbw[tc]) elif (tsa[tc] == IEEE_8021QAZ_TSA_STRICT): msg += "strict" elif (tsa[tc] == IEEE_8021QAZ_TSA_VENDOR): msg += "vendor" else: msg += "unknown" except Exception as err: pass if msg: print(msg) try: for up in tc2up[tc]: print(("\t priority: %s" % up)) except Exception as err: pass ctrl = DcbController(options.intf) class Trust: def __init__(self): self.trust = "none" def getTrust(self, ctrl): appTable = ctrl.get_ieee_app_table() if appTable.countAppSelector(IEEE_8021QAZ_APP_SEL_DSCP) == 0: self.trust = "pcp" else: self.trust = "dscp" def setTrust(self, ctrl, optionTrust): if self.trust == optionTrust: return appTable = ctrl.get_ieee_app_table() if optionTrust == "pcp": appTable.delAppEntry(ctrl, IEEE_8021QAZ_APP_SEL_DSCP) elif optionTrust == "dscp": appTable.setDefaultAppEntry(ctrl, IEEE_8021QAZ_APP_SEL_DSCP, 64) # ********* dcbx mode command ************************************** if (options.dcbx != None): if (options.dcbx == "os"): ctrl.set_dcbx(ctrl.get_dcbx() | DCB_CAP_DCBX_HOST); elif (options.dcbx == "fw"): ctrl.set_dcbx(0); elif (options.dcbx != "get"): print ("Invalid dcbx mode command. Refer to the help.") sys.exit(1) # ********* trust command ****************************** if options.trust: if (options.trust != "dscp") and (options.trust != "pcp"): print("Invalid trust state command. Refer to the help.") sys.exit(1) try: trustObj = Trust() trustObj.getTrust(ctrl) if options.trust: trustObj.setTrust(ctrl, options.trust) trustObj.getTrust(ctrl) appTable = ctrl.get_ieee_app_table() except: print ("Priority trust state is not supported on your system") if options.trust: sys.exit(1) else: trustObj.trust = "none" pfc_en = 0 pfc_delay = 0 tsa = [IEEE_8021QAZ_TSA_STRICT, IEEE_8021QAZ_TSA_STRICT,IEEE_8021QAZ_TSA_STRICT,IEEE_8021QAZ_TSA_STRICT,IEEE_8021QAZ_TSA_STRICT,IEEE_8021QAZ_TSA_STRICT,IEEE_8021QAZ_TSA_STRICT,IEEE_8021QAZ_TSA_STRICT] tc_bw = [0, 0, 0, 0, 0, 0, 0, 0] prio_tc = [0, 0, 0, 0, 0, 0, 0, 0] try: prio_tc, tsa, tc_bw = ctrl.get_ieee_ets() pfc_en = ctrl.get_ieee_pfc_en() pfc_delay = ctrl.get_ieee_pfc_delay() except: print("ETS features are not supported on your system") sys.exit(1) # ********* dscp2prio command ****************************** dscp2prio = [] if options.dscp2prio and options.trust: print ("trust and dscp2prio commands cannot be used at the same time.") sys.exit(1) try: if options.dscp2prio: for opt in options.dscp2prio: if opt == "flush": dscp2prio.append((opt, None, None)) continue action, dscp, prio = opt.split(",") dscp = int(dscp) prio = int(prio) if ((action != "set") and (action != "del")) or (dscp > 63) or (prio > 7): sys.exit(1) dscp2prio.append((action, dscp, prio)) except: print("Invalid dscp2prio command. Refer to the help.") sys.exit(1) try: for action, dscp, prio in dscp2prio: if action == "set": ctrl.set_ieee_app(IEEE_8021QAZ_APP_SEL_DSCP,prio,dscp) elif action == "del": ctrl.del_ieee_app(IEEE_8021QAZ_APP_SEL_DSCP,prio,dscp) elif action == "flush": appTable = ctrl.get_ieee_app_table() for app in appTable.apps.values(): if app.selector != IEEE_8021QAZ_APP_SEL_DSCP: continue ctrl.del_ieee_app(app.selector, app.priority, app.protocol) appTable = ctrl.get_ieee_app_table() except: if options.dscp2prio: print("dscp2prio command failed") sys.exit(1) else: appTable = DcbAppTable() pretty_print(prio_tc, tsa, tc_bw, pfc_en, trustObj.trust, pfc_delay)