Skip to content

Commit

Permalink
Merge pull request #2 from labteral/develop
Browse files Browse the repository at this point in the history
v2
  • Loading branch information
brunneis authored Apr 12, 2021
2 parents 3dc347d + cdd0d26 commit ca09aca
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 34 deletions.
6 changes: 5 additions & 1 deletion tests/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ def method_with_two_params(param1, param2=None):
# Define three calls
call1 = Call('method_without_params')
call2 = Call('method_with_one_param', args=['value1'])
call3 = Call('method_with_two_params', kwargs={'param1': 'value1', 'param2': 'value2'})
call3 = Call('method_with_two_params',
kwargs={
'param1': 'value1',
'param2': 'value2'
})

# Add the three calls to the txlog atomically and check their indexes
txlog.begin()
Expand Down
6 changes: 3 additions & 3 deletions txlog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from .txlog import TxLog, Call
import logging

__version__ = '0.0.5a'
__version__ = '2.214.0'

logging.getLogger().setLevel(logging.INFO)
logging.basicConfig(format='%(asctime)-15s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
logging.info(f'TxLog v{__version__}')
logging.basicConfig(format='%(asctime)-15s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
81 changes: 51 additions & 30 deletions txlog/txlog.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from .utils import get_timestamp_ms
from easyrocks import RocksDB, WriteBatch, utils
import time
import builtins
from typing import Generator


def get_timestamp_ms():
return int(round(time.time() * 1000))
UINT_BYTES = 5
MAX_UINT = 2**(UINT_BYTES * 8) - 1


class Call:
Expand Down Expand Up @@ -82,7 +81,16 @@ def set_index(self, index):


class TxLog:
def __init__(self, path='./txlog_data', max_committed_items=0, committed_ttl_seconds=None):

CALL_PREFIX = b'\x00'
OFFSET_PREFIX = b'\x01'
INDEX_PREFIX = b'\x02'
META_PREFIX = b'\x03'

def __init__(self,
path='./txlog-data',
max_committed_items=0,
committed_ttl_seconds=None):
self._batch_index = None
self._write_batch = None
self._committed_ttl_seconds = committed_ttl_seconds
Expand All @@ -101,22 +109,21 @@ def commit(self):
def rollback(self):
self._write_batch = None

@staticmethod
def _get_call_key(index: int):
return f'txlog_{utils.get_padded_int(index)}'

def commit_call(self, call: Call):
new_write_batch = self._write_batch is None
self.begin()
is_new_write_batch = self._write_batch is None
if is_new_write_batch:
self.begin()

call._committed = True
call._commitment_timestamp = get_timestamp_ms()
self._update_call(call.index, call)
self._increment_offset()
if new_write_batch:

if is_new_write_batch:
self.commit()

def get(self, index: int) -> Call:
call_key = TxLog._get_call_key(index)
call_key = self._get_call_key(index)
return self._db.get(call_key)

def exec_uncommitted_calls(self, container_object=None):
Expand All @@ -127,7 +134,11 @@ def exec_uncommitted_calls(self, container_object=None):
def add(self, call: Call):
if not isinstance(call, Call):
raise TypeError

index = self._get_next_index()
if index > MAX_UINT:
raise ValueError(index)

call.set_index(index)
self._put_call(index, call)
return index
Expand All @@ -141,26 +152,27 @@ def print_uncommitted_calls(self):
print(call._method_name, call._args, call._kwargs)

def get_calls(self) -> Generator[Call, None, None]:
for _, call in self._db.scan(prefix='txlog_'):
for _, call in self._db.scan(prefix=self.CALL_PREFIX):
yield call

def get_first_uncommitted_call(self) -> Call:
next_offset = self._get_next_offset()
if next_offset > self._get_index():
return
call_key = TxLog._get_call_key(next_offset)
call_key = self._get_call_key(next_offset)
return self._db.get(call_key)

def get_uncommitted_calls(self) -> Generator[Call, None, None]:
first_uncommitted_call = self.get_first_uncommitted_call()
if first_uncommitted_call is not None:
for index in range(first_uncommitted_call.index, self._get_next_index()):
for index in range(first_uncommitted_call.index,
self._get_next_index()):
yield self.get(index)

def truncate(self):
if self._max_committed_items is not None:
committed_calls_no = self.count_committed_calls()
for key, call in self._db.scan(prefix='txlog_'):
for key, call in self._db.scan(prefix=self.CALL_PREFIX):
if not call.committed:
break

Expand All @@ -171,47 +183,50 @@ def truncate(self):
committed_calls_no -= 1

if self._committed_ttl_seconds is not None:
for key, call in self._db.scan(prefix='txlog_'):
for key, call in self._db.scan(prefix=self.CALL_PREFIX):
if not call.committed:
break

min_timestamp = get_timestamp_ms() - self._committed_ttl_seconds * 1000
min_timestamp = get_timestamp_ms() \
- self._committed_ttl_seconds * 1000
if call._creation_timestamp <= min_timestamp:
self._db.delete(key)

def count_committed_calls(self) -> int:
counter = 0
for _, call in self._db.scan(prefix='txlog_'):
for _, call in self._db.scan(prefix=self.CALL_PREFIX):
if call.committed:
counter += 1
return counter

def count_calls(self) -> int:
counter = 0
for _, _ in self._db.scan(prefix='txlog_'):
for _, _ in self._db.scan(prefix=self.CALL_PREFIX):
counter += 1
return counter

def _get_call_key(self, index: int):
return self.CALL_PREFIX + utils.int_to_padded_bytes(index, UINT_BYTES)

def _update_call(self, index: int, call: Call):
if not isinstance(call, Call):
raise TypeError
call_key = TxLog._get_call_key(index)
call_key = self._get_call_key(index)
self._db.put(call_key, call, write_batch=self._write_batch)

def _put_call(self, index: int, call: Call):
if not isinstance(call, Call):
raise TypeError

is_batch_new = False
if self._write_batch is None:
is_batch_new = True
is_new_write_batch = self._write_batch is None
if is_new_write_batch:
self.begin()

call_key = TxLog._get_call_key(index)
call_key = self._get_call_key(index)
self._db.put(call_key, call, write_batch=self._write_batch)
self._increment_index()

if is_batch_new:
if is_new_write_batch:
self.commit()

def _increment_offset(self):
Expand All @@ -236,16 +251,22 @@ def _get_next_index(self) -> int:
return self._batch_index + 1
return self._get_int_attribute('index') + 1

def _get_meta_key(self, attribute: str):
return self.META_PREFIX + bytes(attribute, 'utf-8')

def _increment_int_attribute(self, attribute: str):
if attribute == 'index' and self._write_batch is not None:
self._batch_index += 1
value = self._batch_index
else:
value = self._get_int_attribute(attribute) + 1
self._db.put(f'meta_{attribute}', value, write_batch=self._write_batch)

key = self._get_meta_key(attribute)
self._db.put(key, value, write_batch=self._write_batch)

def _get_int_attribute(self, attribute: str) -> int:
value = self._db.get(f'meta_{attribute}')
key = self._get_meta_key(attribute)
value = self._db.get(key)
if value is None:
return -1
return value
return value
8 changes: 8 additions & 0 deletions txlog/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time


def get_timestamp_ms():
return int(round(time.time() * 1000))

0 comments on commit ca09aca

Please sign in to comment.