-
Notifications
You must be signed in to change notification settings - Fork 0
/
master.py
executable file
·90 lines (74 loc) · 2.53 KB
/
master.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#!/usr/bin/env python
"""Master Map Service.
Usage:
master.py [-b] [-t <tc>] <IP> <PORT>
Options:
-h --help Show this screen.
-b --background Run service in background.
-t --taskcount=<tc> Number of tasks to be performed [default: 10].
"""
from docopt import docopt
from rpc import RPCManager
from session import MasterSessionManager
from RMContainerAllocator import RMContainerAllocator
from CommitterEventHandler import CommitterEventHandler
from job import Job
from pool import Pool
import sys
import math
from multiprocessing import Queue
from collections import deque
import time
from random import choice
from collections import defaultdict
work = range(10)
def run(IP, PORT):
# Simulated "event queue"
eventQueue = deque()
# Process queue used for async rpc system.
processQ = Queue()
sessionManager = MasterSessionManager(IP, PORT, processQ)
rpcManager = RPCManager(sessionManager, processQ)
containerAllocator = RMContainerAllocator(eventQueue, sessionManager)
committerEventHandler = CommitterEventHandler(eventQueue)
printed = False;
serverList = []
assignedServers = []
serverAssignments = defaultdict(list)
pool = Pool()
job = Job(work, pool, rpcManager, eventQueue)
# Simulate Delayed Job init and start.
eventQueue.append(("JOB_INIT", job))
eventQueue.append(("JOB_START", job))
while True:
# Simulate "event delivery"
containerAllocator.pushNewEvents(eventQueue)
committerEventHandler.pushNewEvents(eventQueue)
pool.pushNewEvents(eventQueue)
eventQueue.clear()
# Simulate async mechanisums
sessionManager.poll()
rpcManager.poll()
containerAllocator.heartbeat()
committerEventHandler.heartbeat()
# For server failure
for locator in serverList:
if (locator not in sessionManager.serverList()):
eventQueue.append(("JOB_UPDATED_NODES", locator))
if serverList != sessionManager.serverList():
print "serverList change"
serverList = sessionManager.serverList()
# Run tasks
pool.poll()
if job.getStatus() == "SUCCEEDED" and not printed:
print "Job Complete"
print job
printed = True
if __name__ == '__main__':
args = docopt(__doc__)
print(args)
work = range(int(args['--taskcount']))
if (args['--background']):
pass
else:
run(args['<IP>'], int(args['<PORT>']))