Skip to content

Commit

Permalink
Merge pull request #184 from DeliZhangX/private/deliz/CP-31034
Browse files Browse the repository at this point in the history
Private/deliz/cp 31034
  • Loading branch information
DeliZhangX authored Apr 1, 2020
2 parents e375384 + 87e9255 commit 32599fe
Show file tree
Hide file tree
Showing 6 changed files with 277 additions and 272 deletions.
8 changes: 8 additions & 0 deletions autocertkit/ack_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,14 @@ def parse_static_config(configParser, section):
raise utils.InvalidArgument(
option, config[option], "Should not be empty!")

ip_s = utils.IPv4Addr(config['ip_start'], config['netmask'], config['gw'])
ip_s.validate()
ip_e = utils.IPv4Addr(config['ip_end'], config['netmask'], config['gw'])
ip_e.validate()
if ip_s.get_subnet_host()[1] >= ip_e.get_subnet_host()[1]:
raise utils.InvalidArgument('ip_end', config['ip_end'],
"Should be greater than 'ip_start' %s!" % config['ip_start'])

return config


Expand Down
3 changes: 2 additions & 1 deletion autocertkit/network_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ def test_vlan_high_port(self, session):
rec = {}
rec['info'] = ping_result

if "0% packet loss" not in ping_result:
if " 0% packet loss" not in ping_result:
raise TestCaseError("Error: Ping transmittion failed. %s"
% ping_result)

Expand Down Expand Up @@ -975,6 +975,7 @@ def test_ping(self, session):
class MulticastTestClass(IperfTestClass):
""" Subclass that runs multicast test"""

REQUIRED_FOR = ">= %s" % XCP_MIN_VER_WITH_MULTICAST
caps = [MULTICAST_CAP]
required = False

Expand Down
15 changes: 9 additions & 6 deletions autocertkit/testbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,17 +326,20 @@ def generate_static_net_conf(self):
log.debug("Create static config for %s (%s)" %
(iface, vlan))
key_name = "%s_%s" % (iface, vlan)
assert key_name not in res.keys(
), "Duplicate static IP addressing specified for %s (%s)" % (iface, vlan)
assert key_name not in res.keys(), \
"Duplicate static IP addressing specified for %s (%s)" % (
iface, vlan)
res[key_name] = sm
log.debug("Added static conf for '%s'" % key_name)

mgmt = get_pool_management_device(self.session)
if 'static_management' in netconf and mgmt not in netconf:
log.debug("Create static config for mgmt device %s" % mgmt)
log.debug("The pool management device is %s" % mgmt)
if 'static_management' in netconf:
assert mgmt not in netconf, \
"'static_management' should only be specified when management " \
"device(%s) is not being tested for certification. " % (mgmt)
log.debug("Create static config for management device %s" % mgmt)
key_name = "%s_0" % (mgmt)
assert key_name not in res.keys(
), "'static_management' should only be specified when management device(%s) is not being tested for certification." % (iface)
res[key_name] = StaticIPManager(netconf['static_management'])
log.debug("Added static conf for '%s'" % key_name)

Expand Down
189 changes: 109 additions & 80 deletions autocertkit/tests/utils_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,86 +42,115 @@ def testGreaterThanFalse(self):
self._exp_false('> 5.6 SP2', '5.6 FP1')


class StaticIPUtilsTests(unittest_base.DevTestCase):
"""Verify that the class methods for manipulating
static IP addresses work correctly"""

def _test_on_subnet(self, ip_a, ip_b, mask, expect=True):
ipa = utils.IPv4Addr(ip_a, mask, '192.168.0.1')
ipb = utils.IPv4Addr(ip_b, mask, '192.168.0.1')

if ipa.on_subnet(ipb) and not expect:
raise Exception("'%s' and '%s' on subnet '%s' - '%s'" % (ip_a,
ip_b,
mask,
expect))

def _test_increment_ip(self, start, result, expect=True):
conf = {'ip_start': '192.168.0.1',
'ip_end': '192.168.0.10',
'netmask': '255.255.255.0',
'gw': '192.168.0.1'}
sim = utils.StaticIPManager(conf)

res = sim.increment_ip_string(start)

if res != result and expect:
raise Exception("Error: '%s' incremeneted, should equal '%s' not '%s'" %
(start, result, res))

def testOnSubnet(self):
self._test_on_subnet('192.168.0.10',
'192.168.0.40',
'255.255.255.0')

def testNotOnSubnet(self):
self._test_on_subnet('192.168.0.10',
'192.128.0.40',
'255.255.255.0',
expect=False)

def testIncrementIPs(self):
self._test_increment_ip('192.168.0.1', '192.168.0.2')
self._test_increment_ip('192.168.0.1', '192.168.0.10', expect=False)

def testEnumerateIPs(self):
conf = {'ip_start': '10.80.227.143',
'ip_end': '10.80.227.151',
'netmask': '255.255.0.0',
'gw': '10.80.227.1'}

full_list = ['10.80.227.143', '10.80.227.144', '10.80.227.145', '10.80.227.146',
'10.80.227.147', '10.80.227.148', '10.80.227.149', '10.80.227.150',
'10.80.227.151']

sim = utils.StaticIPManager(conf)
free_list = sim.ip_pool

if len(free_list) != len(full_list):
raise Exception("Error: we expect there to be %d IPs, enumerate produced %d." %
(len(full_list), len(free_list)))

for i in range(len(full_list)):
if free_list[i].addr != full_list[i]:
raise Exception("Error: Enumerate IP returns %s, we expect %s" % (free_list,
full_list))

def testLoanStaticIP(self):
conf = {'ip_start': '192.168.0.5',
'ip_end': '192.168.0.10',
'netmask': '255.255.255.0',
'gw': '192.168.0.1'}

sim = utils.StaticIPManager(conf)

borrowed_ip = sim.get_ip()
assert(sim.available_ips() == 5)
assert(len(sim.in_use) == 1)

sim.return_ip(borrowed_ip)

assert(sim.available_ips() == 6)
assert(len(sim.in_use) == 0)
class IPv4AddrTests(unittest.TestCase):

def test_check_ip_format(self):
utils.IPv4Addr.check_ip_format('192.168.0.1')
self.assertRaises(
Exception, lambda: utils.IPv4Addr.check_ip_format('192.168.0.256'))
self.assertRaises(
Exception, lambda: utils.IPv4Addr.check_ip_format('192.168.1'))
self.assertRaises(
Exception, lambda: utils.IPv4Addr.check_ip_format('192.168.0.0.1'))
self.assertRaises(
Exception, lambda: utils.IPv4Addr.check_ip_format('192.168.0.01'))

def test_check_netwrok_mask(self):
utils.IPv4Addr.check_netwrok_mask('255.255.255.0')
utils.IPv4Addr.check_netwrok_mask('255.255.0.0')
utils.IPv4Addr.check_netwrok_mask('255.0.0.0')
utils.IPv4Addr.check_netwrok_mask('255.255.240.0')
self.assertRaises(
Exception, lambda: utils.IPv4Addr.check_netwrok_mask('255.255.255.255'))
self.assertRaises(
Exception, lambda: utils.IPv4Addr.check_netwrok_mask('0.0.0.0'))

def test_check_special_ip(self):
utils.IPv4Addr.check_special_ip('192.168.0.1', '255.255.255.0')
self.assertRaises(Exception, lambda: utils.IPv4Addr.check_special_ip(
'192.168.0.0', '255.255.255.0'))
self.assertRaises(Exception, lambda: utils.IPv4Addr.check_special_ip(
'192.168.0.255', '255.255.255.0'))

def test_split(self):
subnet, host = utils.IPv4Addr.split('192.168.0.1', '255.255.255.0')
self.assertEqual(subnet, (192 << 24) + (168 << 16) + (0 << 8))
self.assertEqual(host, 1)

def test_aton(self):
n_ip = utils.IPv4Addr.aton('192.168.0.1')
self.assertEqual(n_ip, (192 << 24) + (168 << 16) + (0 << 8) + 1)
self.assertRaises(
Exception, lambda: utils.IPv4Addr.aton('192.168.0.256'))

def test_ntoa(self):
ip = utils.IPv4Addr.ntoa((192 << 24) + (168 << 16) + (0 << 8) + 1)
self.assertEqual(ip, '192.168.0.1')
self.assertRaises(Exception, lambda: utils.IPv4Addr.ntoa(0x100000000))

def test_validate_netmask(self):
utils.IPv4Addr.validate_netmask('255.255.255.0')

def test_validate_ip(self):
utils.IPv4Addr.validate_ip('192.168.255.1', '255.255.255.0')

def test_in_same_subnet(self):
utils.IPv4Addr.in_same_subnet(
'192.168.255.1', '192.168.255.254', '255.255.255.0')

def test_validate(self):
ip = utils.IPv4Addr('192.168.0.10', '255.255.255.0', '192.168.0.1')
ip.validate()
ip = utils.IPv4Addr('192.16.254.10', '255.240.0.0', '192.16.0.1')
ip.validate()

def test_get_subnet_host(self):
ip = utils.IPv4Addr('192.168.0.2', '255.255.255.0', '192.168.0.1')
subnet, host = ip.get_subnet_host()
self.assertEqual(subnet, (192 << 24) + (168 << 16) + (0 << 8))
self.assertEqual(host, 2)


class StaticIPManagerTests(unittest.TestCase):

def setUp(self):
self.conf = {'ip_start': '192.168.0.2',
'ip_end': '192.168.0.5',
'netmask': '255.255.255.0',
'gw': '192.168.0.1'}
self.sm = utils.StaticIPManager(self.conf)

def tearDown(self):
self.sm.release_all()

def test_get_ip(self):
ip = self.sm.get_ip()
self.assertEqual(ip.addr, '192.168.0.2')
self.assertEqual(ip.netmask, '255.255.255.0')
self.assertEqual(ip.gateway, '192.168.0.1')
ip = self.sm.get_ip()
self.assertEqual(ip.addr, '192.168.0.3')
ip = self.sm.get_ip()
self.assertEqual(ip.addr, '192.168.0.4')
ip = self.sm.get_ip()
self.assertEqual(ip.addr, '192.168.0.5')
self.assertRaises(Exception, lambda: self.sm.get_ip())

self.sm.release_all()

def test_return_ip(self):
free1 = self.sm.available_ips()
ip = self.sm.get_ip()
free2 = self.sm.available_ips()
self.assertEqual(free1 - 1, free2)

self.sm.return_ip(ip)
free3 = self.sm.available_ips()
self.assertEqual(free1, free3)

self.assertRaises(Exception, lambda: self.sm.return_ip(ip))

self.sm.release_all()


class ValueInRangeFunctions(unittest.TestCase):
Expand Down
Loading

0 comments on commit 32599fe

Please sign in to comment.