Skip to content

Commit

Permalink
Add clone test
Browse files Browse the repository at this point in the history
  • Loading branch information
HardNorth committed Oct 3, 2023
1 parent 290319f commit 9577a0f
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 18 deletions.
34 changes: 17 additions & 17 deletions reportportal_client/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1323,12 +1323,12 @@ class ThreadedRPClient(_RPClient):
bodies generation and serialization, connection retries and log batching.
"""

_task_timeout: float
_shutdown_timeout: float
_loop: Optional[asyncio.AbstractEventLoop]
task_timeout: float
shutdown_timeout: float
_task_list: BackgroundTaskList[Task[_T]]
_task_mutex: threading.RLock
_loop: Optional[asyncio.AbstractEventLoop]
_thread: Optional[threading.Thread]
__task_list: BackgroundTaskList[Task[_T]]

def __init_task_list(self, task_list: Optional[BackgroundTaskList[Task[_T]]] = None,
task_mutex: Optional[threading.RLock] = None):
Expand All @@ -1340,7 +1340,7 @@ def __init_task_list(self, task_list: Optional[BackgroundTaskList[Task[_T]]] = N
RuntimeWarning,
3
)
self.__task_list = task_list or BackgroundTaskList()
self._task_list = task_list or BackgroundTaskList()
self._task_mutex = task_mutex or threading.RLock()

def __heartbeat(self):
Expand All @@ -1354,7 +1354,7 @@ def __init_loop(self, loop: Optional[asyncio.AbstractEventLoop] = None):
self._loop = loop
else:
self._loop = asyncio.new_event_loop()
self._loop.set_task_factory(ThreadedTaskFactory(self._task_timeout))
self._loop.set_task_factory(ThreadedTaskFactory(self.task_timeout))
self.__heartbeat()
self._thread = threading.Thread(target=self._loop.run_forever, name='RP-Async-Client',
daemon=True)
Expand Down Expand Up @@ -1409,8 +1409,8 @@ def __init__(
if this argument is None.
"""
super().__init__(endpoint, project, **kwargs)
self._task_timeout = task_timeout
self._shutdown_timeout = shutdown_timeout
self.task_timeout = task_timeout
self.shutdown_timeout = shutdown_timeout
self.__init_task_list(task_list, task_mutex)
self.__init_loop(loop)

Expand All @@ -1424,17 +1424,17 @@ def create_task(self, coro: Coroutine[Any, Any, _T]) -> Optional[Task[_T]]:
return
result = self._loop.create_task(coro)
with self._task_mutex:
self.__task_list.append(result)
self._task_list.append(result)
return result

def finish_tasks(self):
"""Ensure all pending Tasks are finished, block current Thread if necessary."""
shutdown_start_time = datetime.time()
with self._task_mutex:
tasks = self.__task_list.flush()
tasks = self._task_list.flush()
for task in tasks:
task.blocking_result()
if datetime.time() - shutdown_start_time >= self._shutdown_timeout:
if datetime.time() - shutdown_start_time >= self.shutdown_timeout:
break
logs = self._log_batcher.flush()
if logs:
Expand All @@ -1450,17 +1450,17 @@ def clone(self) -> 'ThreadedRPClient':
cloned_client = self.client.clone()
# noinspection PyTypeChecker
cloned = ThreadedRPClient(
endpoint=None,
project=None,
endpoint=self.endpoint,
project=self.project,
launch_uuid=self.launch_uuid,
client=cloned_client,
log_batch_size=self.log_batch_size,
log_batch_payload_limit=self.log_batch_payload_limit,
log_batcher=self._log_batcher,
task_timeout=self._task_timeout,
shutdown_timeout=self._shutdown_timeout,
task_timeout=self.task_timeout,
shutdown_timeout=self.shutdown_timeout,
task_mutex=self._task_mutex,
task_list=self.__task_list,
task_list=self._task_list,
loop=self._loop
)
current_item = self.current_item()
Expand All @@ -1487,7 +1487,7 @@ def __setstate__(self, state: Dict[str, Any]) -> None:
:param dict state: object state dictionary
"""
self.__dict__.update(state)
self.__init_task_list(self.__task_list, threading.RLock())
self.__init_task_list(self._task_list, threading.RLock())
self.__init_loop()


Expand Down
47 changes: 46 additions & 1 deletion tests/aio/test_threaded_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,56 @@
# limitations under the License
import pickle

from reportportal_client.aio import ThreadedRPClient
from reportportal_client.aio import ThreadedRPClient, ThreadedTask


def test_threaded_rp_client_pickling():
client = ThreadedRPClient('http://localhost:8080', 'default_personal', api_key='test_key')
pickled_client = pickle.dumps(client)
unpickled_client = pickle.loads(pickled_client)
assert unpickled_client is not None


async def __empty_string():
return ""


def test_clone():
args = ['http://endpoint', 'project']
kwargs = {'api_key': 'api_key1', 'launch_uuid': 'launch_uuid', 'log_batch_size': 30,
'log_batch_payload_limit': 30 * 1024 * 1024, 'task_timeout': 63, 'shutdown_timeout': 123}
async_client = ThreadedRPClient(*args, **kwargs)
async_client._add_current_item(ThreadedTask(__empty_string(), 64, loop=async_client._loop,
name='test-321'))
async_client._add_current_item(ThreadedTask(__empty_string(), 64, loop=async_client._loop,
name='test-322'))
client = async_client.client
step_reporter = async_client.step_reporter
cloned = async_client.clone()
assert (
cloned is not None
and async_client is not cloned
and cloned.client is not None
and cloned.client is not client
and cloned.step_reporter is not None
and cloned.step_reporter is not step_reporter
and cloned._task_list is async_client._task_list
and cloned._task_mutex is async_client._task_mutex
and cloned._loop is async_client._loop
)
assert (
cloned.endpoint == args[0]
and cloned.project == args[1]
and cloned.client.endpoint == args[0]
and cloned.client.project == args[1]
)
assert (
cloned.client.api_key == kwargs['api_key']
and cloned.launch_uuid == kwargs['launch_uuid']
and cloned.log_batch_size == kwargs['log_batch_size']
and cloned.log_batch_payload_limit == kwargs['log_batch_payload_limit']
and cloned.task_timeout == kwargs['task_timeout']
and cloned.shutdown_timeout == kwargs['shutdown_timeout']
)
assert cloned._item_stack.qsize() == 1 \
and async_client.current_item() == cloned.current_item()

0 comments on commit 9577a0f

Please sign in to comment.