-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathbalance.py
More file actions
102 lines (86 loc) · 3.42 KB
/
balance.py
File metadata and controls
102 lines (86 loc) · 3.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import rlp
from netaddr import IPNetwork, IPSet
from utils import address, normalize_address
from utils import bytes_to_int, ipaddr_to_netaddr
class Balance(rlp.Serializable):
fields = [
('own_ips', IPSet),
('delegated_ips', {address, IPSet}),
('received_ips', {address, IPSet}),
('map_server', {IPNetwork, address}),
('locator', {IPNetwork, list})
]
def __init__(self, own_ips=IPSet(), delegated_ips={}, received_ips={}, map_server={}, locator={}):
if (type(own_ips) is not IPSet):
own_ips = IPSet(own_ips)
self.own_ips = own_ips
self.delegated_ips = delegated_ips
self.received_ips = received_ips
self.map_server = map_server
self.locator = locator
super(Balance, self).__init__(own_ips, delegated_ips, received_ips, map_server, locator)
def add_own_ips(self, ips):
self.own_ips.add(ips)
def remove_own_ips(self, ips):
self.own_ips.remove(ips)
def add_delegated_ips(self, address, ips):
n_address = normalize_address(address)
if n_address in self.delegated_ips.keys():
self.delegated_ips[n_address].add(ips)
else:
self.delegated_ips[n_address] = IPSet(ips)
def remove_delegated_ips(self, address, ips):
n_address = normalize_address(address)
self.delegated_ips[n_address] = self.delegated_ips[n_address] - ips
if len(self.delegated_ips[n_address]) == 0:
self.delegated_ips.pop(n_address)
def add_received_ips(self, address, ips):
n_address = normalize_address(address)
if n_address in self.received_ips.keys():
self.received_ips[n_address].add(ips)
else:
self.received_ips[n_address] = IPSet(ips)
def remove_received_ips(self, address, ips):
n_address = normalize_address(address)
self.received_ips[n_address] = self.received_ips[n_address] - ips
if len(self.received_ips[n_address]) == 0:
self.received_ips.pop(n_address)
def in_own_ips(self, ips):
return self.own_ips.__contains__(ips)
def affected_delegated_ips(self, ips):
ips = IPSet(ips)
addresses = {}
for addr, set in self.delegated_ips.iteritems():
joint = ips & set
if len(joint) > 0:
addresses[addr.encode('HEX')] = joint
return addresses
def in_received_ips(self, ips):
for set in self.received_ips.itervalues():
if set.__contains__(ips):
return True
return False
def set_map_server(self, map_server):
self.map_server = {}
for i in range(0, len(map_server), 3):
afi = bytes_to_int(map_server[i])
ip = ipaddr_to_netaddr(afi, map_server[i + 1])
address = map_server[i + 2]
self.map_server[ip] = address
self.locator = {}
def get_map_server(self):
return self.map_server
def set_locator(self, locator):
self.locator = {}
l = [None] * 2
for i in range(0, len(locator), 4):
afi = bytes_to_int(locator[i])
ip = ipaddr_to_netaddr(afi, locator[i + 1])
priority = bytes_to_int(locator[i + 2])
weight = bytes_to_int(locator[i + 3])
l[0] = priority
l[1] = weight
self.locator[ip] = l
self.map_server = {}
def get_locator(self):
return self.locator