forked from QratorLabs/ASPA
-
Notifications
You must be signed in to change notification settings - Fork 0
/
aspa_logic.py
106 lines (81 loc) · 3.46 KB
/
aspa_logic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
IPv4, IPv6 = 4, 6
AS_SET, AS_SEQUENCE, AS_CONFED_SEQUENCE, AS_CONFED_SET = range(1, 5)
Valid, Invalid, Unknown, Unverifiable = range(4)
class Segment:
def __init__(self, value, type):
self.value, self.type = value, type
class ASPA:
def __init__(self, aspa_records):
self.aspa_records = aspa_records
def verify_pair(self, as1, as2, afi):
aspa_records_afi = self.aspa_records.get(afi, None)
if aspa_records_afi is None:
return Unknown
aspa_records_as1 = aspa_records_afi.get(as1, None)
if aspa_records_as1 is None:
return Unknown
if as2 not in aspa_records_as1:
return Invalid
return Valid
def get_indexes(self, aspath, afi):
unknown_index = 0
unverifiable_flag = False
as1 = 0
index = 1
for segment in aspath:
if segment.type != AS_SEQUENCE:
as1 = 0
unverifiable_flag = True
elif segment.type == AS_SEQUENCE:
if not as1:
as1 = segment.value
elif as1 != segment.value:
pair_check = self.verify_pair(as1, segment.value, afi)
if pair_check == Invalid:
return index - 1, unknown_index - 1 if unknown_index else index - 1, unverifiable_flag
elif pair_check == Unknown and not unknown_index:
unknown_index = index
as1 = segment.value
index += 1
return index - 1, unknown_index - 1 if unknown_index else index - 1, unverifiable_flag
def check_upflow_path(self, aspath, neighbor_as, afi):
if len(aspath) == 0:
return Invalid
if aspath[-1].type == AS_SEQUENCE and aspath[-1].value != neighbor_as:
return Invalid
forward_invalid_index, forward_unknown_index, forward_unverifiable = self.get_indexes(aspath, afi)
aspath_len = len(aspath)
if forward_invalid_index < aspath_len:
return Invalid
if forward_unverifiable:
return Unverifiable
if forward_unknown_index < aspath_len:
return Unknown
return Valid
def check_downflow_path(self, aspath, neighbor_as, afi):
if len(aspath) == 0:
return Invalid
if aspath[-1].type == AS_SEQUENCE and aspath[-1].value != neighbor_as:
return Invalid
forward_invalid_index, forward_unknown_index, forward_unverifiable = self.get_indexes(aspath, afi)
backward_invalid_index, backward_unknown_index, backward_unverifiable = self.get_indexes(list(reversed(aspath)), afi)
aspath_len = len(aspath)
if forward_invalid_index + backward_invalid_index < aspath_len:
return Invalid
if forward_unverifiable or backward_unverifiable:
return Unverifiable
if forward_unknown_index + backward_unknown_index < aspath_len:
return Unknown
return Valid
def check_ix_path(self, aspath, neighbor_as, afi):
if len(aspath) == 0:
return Invalid
forward_invalid_index, forward_unknown_index, forward_unverifiable = self.get_indexes(aspath, afi)
aspath_len = len(aspath)
if forward_invalid_index < aspath_len:
return Invalid
if forward_unverifiable:
return Unverifiable
if forward_unknown_index < aspath_len:
return Unknown
return Valid