Skip to content

Commit

Permalink
Add clone test
Browse files Browse the repository at this point in the history
  • Loading branch information
HardNorth committed Oct 3, 2023
1 parent 13487dd commit 290319f
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 22 deletions.
44 changes: 22 additions & 22 deletions reportportal_client/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1499,14 +1499,14 @@ class BatchedRPClient(_RPClient):
bodies generation and serialization, connection retries and log batching.
"""

_task_timeout: float
_shutdown_timeout: float
task_timeout: float
shutdown_timeout: float
trigger_num: int
trigger_interval: float
_loop: asyncio.AbstractEventLoop
_task_mutex: threading.RLock
__task_list: TriggerTaskBatcher[Task[_T]]
_task_list: TriggerTaskBatcher[Task[_T]]
__last_run_time: float
__trigger_num: int
__trigger_interval: float

def __init_task_list(self, task_list: Optional[TriggerTaskBatcher[Task[_T]]] = None,
task_mutex: Optional[threading.RLock] = None):
Expand All @@ -1518,7 +1518,7 @@ def __init_task_list(self, task_list: Optional[TriggerTaskBatcher[Task[_T]]] = N
RuntimeWarning,
3
)
self.__task_list = task_list or TriggerTaskBatcher(self.__trigger_num, self.__trigger_interval)
self._task_list = task_list or TriggerTaskBatcher(self.trigger_num, self.trigger_interval)
self._task_mutex = task_mutex or threading.RLock()

def __init_loop(self, loop: Optional[asyncio.AbstractEventLoop] = None):
Expand Down Expand Up @@ -1581,10 +1581,10 @@ def __init__(
:param trigger_interval: Time limit which triggers Task batch execution.
"""
super().__init__(endpoint, project, **kwargs)
self._task_timeout = task_timeout
self._shutdown_timeout = shutdown_timeout
self.__trigger_num = trigger_num
self.__trigger_interval = trigger_interval
self.task_timeout = task_timeout
self.shutdown_timeout = shutdown_timeout
self.trigger_num = trigger_num
self.trigger_interval = trigger_interval
self.__init_task_list(task_list, task_mutex)
self.__last_run_time = datetime.time()
self.__init_loop(loop)
Expand All @@ -1599,17 +1599,17 @@ def create_task(self, coro: Coroutine[Any, Any, _T]) -> Optional[Task[_T]]:
return
result = self._loop.create_task(coro)
with self._task_mutex:
tasks = self.__task_list.append(result)
tasks = self._task_list.append(result)
if tasks:
self._loop.run_until_complete(asyncio.wait(tasks, timeout=self._task_timeout))
self._loop.run_until_complete(asyncio.wait(tasks, timeout=self.task_timeout))
return result

def finish_tasks(self) -> None:
"""Ensure all pending Tasks are finished, block current Thread if necessary."""
with self._task_mutex:
tasks = self.__task_list.flush()
tasks = self._task_list.flush()
if tasks:
self._loop.run_until_complete(asyncio.wait(tasks, timeout=self._shutdown_timeout))
self._loop.run_until_complete(asyncio.wait(tasks, timeout=self.shutdown_timeout))
logs = self._log_batcher.flush()
if logs:
log_task = self._loop.create_task(self._log_batch(logs))
Expand All @@ -1625,20 +1625,20 @@ def clone(self) -> 'BatchedRPClient':
cloned_client = self.client.clone()
# noinspection PyTypeChecker
cloned = BatchedRPClient(
endpoint=None,
project=None,
endpoint=self.endpoint,
project=self.project,
launch_uuid=self.launch_uuid,
client=cloned_client,
log_batch_size=self.log_batch_size,
log_batch_payload_limit=self.log_batch_payload_limit,
log_batcher=self._log_batcher,
task_timeout=self._task_timeout,
shutdown_timeout=self._shutdown_timeout,
task_list=self.__task_list,
task_timeout=self.task_timeout,
shutdown_timeout=self.shutdown_timeout,
task_list=self._task_list,
task_mutex=self._task_mutex,
loop=self._loop,
trigger_num=self.__trigger_num,
trigger_interval=self.__trigger_interval
trigger_num=self.trigger_num,
trigger_interval=self.trigger_interval
)
current_item = self.current_item()
if current_item:
Expand All @@ -1663,5 +1663,5 @@ def __setstate__(self, state: Dict[str, Any]) -> None:
:param dict state: object state dictionary
"""
self.__dict__.update(state)
self.__init_task_list(self.__task_list, threading.RLock())
self.__init_task_list(self._task_list, threading.RLock())
self.__init_loop()
47 changes: 47 additions & 0 deletions tests/aio/test_batched_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# limitations under the License
import pickle

from reportportal_client.aio import BatchedTask
from reportportal_client.aio import BatchedRPClient


Expand All @@ -20,3 +21,49 @@ def test_batched_rp_client_pickling():
pickled_client = pickle.dumps(client)
unpickled_client = pickle.loads(pickled_client)
assert unpickled_client is not None


async def __empty_string():
return ""


def test_clone():
args = ['http://endpoint', 'project']
kwargs = {'api_key': 'api_key1', 'launch_uuid': 'launch_uuid', 'log_batch_size': 30,
'log_batch_payload_limit': 30 * 1024 * 1024, 'task_timeout': 63, 'shutdown_timeout': 123,
'trigger_num': 25, 'trigger_interval': 3}
async_client = BatchedRPClient(*args, **kwargs)
async_client._add_current_item(BatchedTask(__empty_string(), loop=async_client._loop, name='test-321'))
async_client._add_current_item(BatchedTask(__empty_string(), loop=async_client._loop, name='test-322'))
client = async_client.client
step_reporter = async_client.step_reporter
cloned = async_client.clone()
assert (
cloned is not None
and async_client is not cloned
and cloned.client is not None
and cloned.client is not client
and cloned.step_reporter is not None
and cloned.step_reporter is not step_reporter
and cloned._task_list is async_client._task_list
and cloned._task_mutex is async_client._task_mutex
and cloned._loop is async_client._loop
)
assert (
cloned.endpoint == args[0]
and cloned.project == args[1]
and cloned.client.endpoint == args[0]
and cloned.client.project == args[1]
)
assert (
cloned.client.api_key == kwargs['api_key']
and cloned.launch_uuid == kwargs['launch_uuid']
and cloned.log_batch_size == kwargs['log_batch_size']
and cloned.log_batch_payload_limit == kwargs['log_batch_payload_limit']
and cloned.task_timeout == kwargs['task_timeout']
and cloned.shutdown_timeout == kwargs['shutdown_timeout']
and cloned.trigger_num == kwargs['trigger_num']
and cloned.trigger_interval == kwargs['trigger_interval']
)
assert cloned._item_stack.qsize() == 1 \
and async_client.current_item() == cloned.current_item()

0 comments on commit 290319f

Please sign in to comment.