-
Notifications
You must be signed in to change notification settings - Fork 2
/
persistent_queue.py
executable file
·151 lines (108 loc) · 3.61 KB
/
persistent_queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/python
from Queue import Queue
import dybase
import sys
MAX_INT = sys.maxint
MIN_INT = -MAX_INT - 1
#DEBUG = True
DEBUG = False
class Root(dybase.Persistent):
def __init__(self):
self.start = 0
self.stop = 0
class SizeOfPersistentQueueExceeded(Exception):
pass
class incomplete_persistent_deque:
def __init__(self, filename):
self._init_db(filename)
def _init_db(self, filename):
self.db = dybase.Storage()
if self.db.open(filename):
self.root = self.db.getRootObject()
if self.root == None:
self.root = Root()
# createLongIndex can be used on 64 bits systems but it is
# strange to pass 2**32 elements in the queue
self.root.elements = self.db.createIntIndex()
self.root.pending_elements = self.db.createIntIndex()
self.db.setRootObject(self.root)
self.db.commit()
else:
if DEBUG:
print "self.root already exists"
if DEBUG:
print "self.root.start =", self.root.start
print "self.root.stop = ", self.root.stop
def __len__(self):
if self.root.stop >= self.root.start:
return self.root.stop - self.root.start
else:
return (MAX_INT - self.root.start + 1) + (self.root.stop - MIN_INT)
def append(self, item):
# add element to index
self.root.elements.insert(self.root.stop, item)
self.root.stop += 1
if self.root.stop > MAX_INT:
# check also if stop touches start
self.root.stop = MIN_INT
if self.root.start == self.root.stop:
raise SizeOfPersistentQueueExceeded
# persist
self.root.store()
self.db.commit()
def popleft(self):
# don't check if empty, Queue class take charge of that
# remove element from index
item = self.root.elements.get(self.root.start)
self.root.elements.remove(self.root.start)
self.root.start += 1
if self.root.start > MAX_INT:
# check also if start touches stop
self.root.start = MIN_INT
# if queue is empty resync start & stop to 0. It is for beautifier
# purposes can be removed.
if self.root.start == self.root.stop:
self.root.start = 0
self.root.stop = 0
# persist
self.root.store()
self.db.commit()
return item
class PersistentQueue(Queue):
def __init__(self, filename, maxsize=0):
self.filename = filename
Queue.__init__(self, maxsize)
def _init(self, maxsize):
# original: self.queue = deque()
# incomplete_persistent_deque:
# - incomplete implementation but enough for Queue:
# - implemented methods:
# -- __len__
# -- append
# -- popleft
#
self.queue = incomplete_persistent_deque(self.filename)
def connect(self): # to handle failovers
pass
def ack(self):
pass
# def ack(self, item):
class ElementTest:
def __init__(self, value):
self.value = value
def __repr__(self):
return self.value
def __str__(self):
return self.value
def test1():
q = PersistentQueue("myqueue.dbs")
if not q.empty(): # get pending items
while not q.empty():
e = q.get()
print e
for s in ['one', 'two', 'three']:
q.put(ElementTest(s))
def main(): # run this script twice to see the persisted elements
test1()
if __name__ == '__main__':
main()