Skip to content

Commit

Permalink
Types refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
HardNorth committed Dec 4, 2024
1 parent f49eb08 commit f223bbe
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 100 deletions.
53 changes: 30 additions & 23 deletions reportportal_client/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,16 +208,16 @@ async def close(self) -> None:
await self._session.close()
self._session = None

async def __get_item_url(self, item_id_future: Union[str, Task[str]]) -> Optional[str]:
async def __get_item_url(self, item_id_future: Union[Optional[str], Task[Optional[str]]]) -> Optional[str]:
item_id = await await_if_necessary(item_id_future)
if item_id is NOT_FOUND:
if item_id is NOT_FOUND or item_id is None:
logger.warning('Attempt to make request for non-existent id.')
return
return root_uri_join(self.base_url_v2, 'item', item_id)

async def __get_launch_url(self, launch_uuid_future: Union[str, Task[str]]) -> Optional[str]:
async def __get_launch_url(self, launch_uuid_future: Union[Optional[str], Task[Optional[str]]]) -> Optional[str]:
launch_uuid = await await_if_necessary(launch_uuid_future)
if launch_uuid is NOT_FOUND:
if launch_uuid is NOT_FOUND or launch_uuid is None:
logger.warning('Attempt to make request for non-existent launch.')
return
return root_uri_join(self.base_url_v2, 'launch', launch_uuid, 'finish')
Expand Down Expand Up @@ -442,7 +442,7 @@ async def update_test_item(self,

async def __get_launch_uuid_url(self, launch_uuid_future: Union[str, Task[str]]) -> Optional[str]:
launch_uuid = await await_if_necessary(launch_uuid_future)
if launch_uuid is NOT_FOUND:
if launch_uuid is NOT_FOUND or launch_uuid is None:
logger.warning('Attempt to make request for non-existent Launch UUID.')
return
logger.debug('get_launch_info - ID: %s', launch_uuid)
Expand All @@ -466,9 +466,9 @@ async def get_launch_info(self, launch_uuid_future: Union[str, Task[str]]) -> Op
logger.warning('get_launch_info - Launch info: Failed to fetch launch ID from the API.')
return launch_info

async def __get_item_uuid_url(self, item_uuid_future: Union[str, Task[str]]) -> Optional[str]:
async def __get_item_uuid_url(self, item_uuid_future: Union[Optional[str], Task[Optional[str]]]) -> Optional[str]:
item_uuid = await await_if_necessary(item_uuid_future)
if item_uuid is NOT_FOUND:
if item_uuid is NOT_FOUND or item_uuid is None:
logger.warning('Attempt to make request for non-existent UUID.')
return
return root_uri_join(self.base_url_v1, 'item', 'uuid', item_uuid)
Expand Down Expand Up @@ -972,7 +972,7 @@ def client(self) -> Client:
return self.__client

@property
def launch_uuid(self) -> Optional[Task[str]]:
def launch_uuid(self) -> Task[Optional[str]]:
"""Return current Launch UUID.
:return: UUID string.
Expand Down Expand Up @@ -1102,13 +1102,13 @@ def current_item(self) -> Task[_T]:
"""
return self._item_stack.last()

async def __empty_str(self):
async def __empty_str(self) -> str:
return ""

async def __empty_dict(self):
async def __empty_dict(self) -> dict:
return {}

async def __int_value(self):
async def __int_value(self) -> int:
return -1

def start_launch(self,
Expand All @@ -1118,7 +1118,7 @@ def start_launch(self,
attributes: Optional[Union[list, dict]] = None,
rerun: bool = False,
rerun_of: Optional[str] = None,
**kwargs) -> Task[str]:
**kwargs) -> Task[Optional[str]]:
"""Start a new Launch with the given arguments.
:param name: Launch name.
Expand Down Expand Up @@ -1151,7 +1151,7 @@ def start_test_item(self,
retry: bool = False,
test_case_id: Optional[str] = None,
retry_of: Optional[str] = None,
**kwargs: Any) -> Task[str]:
**kwargs: Any) -> Task[Optional[str]]:
"""Start Test Case/Suite/Step/Nested Step Item.
:param name: Name of the Test Item.
Expand Down Expand Up @@ -1191,7 +1191,7 @@ def finish_test_item(self,
retry: bool = False,
test_case_id: Optional[str] = None,
retry_of: Optional[str] = None,
**kwargs: Any) -> Task[str]:
**kwargs: Any) -> Task[Optional[str]]:
"""Finish Test Suite/Case/Step/Nested Step Item.
:param item_id: ID of the Test Item.
Expand Down Expand Up @@ -1219,7 +1219,7 @@ def finish_launch(self,
end_time: str,
status: Optional[str] = None,
attributes: Optional[Union[list, dict]] = None,
**kwargs: Any) -> Task[str]:
**kwargs: Any) -> Task[Optional[str]]:
"""Finish a Launch.
:param end_time: Launch end time.
Expand All @@ -1242,7 +1242,7 @@ def finish_launch(self,
def update_test_item(self,
item_uuid: Task[str],
attributes: Optional[Union[list, dict]] = None,
description: Optional[str] = None) -> Task:
description: Optional[str] = None) -> Task[Optional[str]]:
"""Update existing Test Item at the ReportPortal.
:param item_uuid: Test Item UUID returned on the item start.
Expand All @@ -1255,7 +1255,7 @@ def update_test_item(self,
result_task = self.create_task(result_coro)
return result_task

def get_launch_info(self) -> Task[dict]:
def get_launch_info(self) -> Task[Optional[dict]]:
"""Get current Launch information.
:return: Launch information in dictionary.
Expand All @@ -1266,7 +1266,7 @@ def get_launch_info(self) -> Task[dict]:
result_task = self.create_task(result_coro)
return result_task

def get_item_id_by_uuid(self, item_uuid_future: Task[str]) -> Task[str]:
def get_item_id_by_uuid(self, item_uuid_future: Task[Optional[str]]) -> Task[Optional[str]]:
"""Get Test Item ID by the given Item UUID.
:param item_uuid_future: Str or Task UUID returned on the Item start.
Expand All @@ -1276,7 +1276,7 @@ def get_item_id_by_uuid(self, item_uuid_future: Task[str]) -> Task[str]:
result_task = self.create_task(result_coro)
return result_task

def get_launch_ui_id(self) -> Task[int]:
def get_launch_ui_id(self) -> Task[Optional[int]]:
"""Get Launch ID of the current Launch.
:return: Launch ID of the Launch. None if not found.
Expand All @@ -1287,7 +1287,7 @@ def get_launch_ui_id(self) -> Task[int]:
result_task = self.create_task(result_coro)
return result_task

def get_launch_ui_url(self) -> Task[str]:
def get_launch_ui_url(self) -> Task[Optional[str]]:
"""Get full quality URL of the current Launch.
:return: Launch URL string.
Expand All @@ -1298,7 +1298,7 @@ def get_launch_ui_url(self) -> Task[str]:
result_task = self.create_task(result_coro)
return result_task

def get_project_settings(self) -> Task[dict]:
def get_project_settings(self) -> Task[Optional[str]]:
"""Get settings of the current Project.
:return: Settings response in Dictionary.
Expand All @@ -1314,7 +1314,7 @@ async def _log(self, log_rq: AsyncRPRequestLog) -> Optional[Tuple[str, ...]]:
return await self._log_batch(await self._log_batcher.append_async(log_rq))

def log(self, time: str, message: str, level: Optional[Union[int, str]] = None,
attachment: Optional[dict] = None, item_id: Optional[Task[str]] = None) -> Task[Tuple[str, ...]]:
attachment: Optional[dict] = None, item_id: Optional[Task[str]] = None) -> Task[Optional[Tuple[str, ...]]]:
"""Send Log message to the ReportPortal and attach it to a Test Item or Launch.
This method stores Log messages in internal batch and sent it when batch is full, so not every method
Expand All @@ -1338,6 +1338,11 @@ def close(self) -> None:
self.create_task(self.__client.close()).blocking_result()


def heartbeat(self):
"""Heartbeat function to keep the loop running."""
self._loop.call_at(self._loop.time() + 0.1, heartbeat, self)


class ThreadedRPClient(_RPClient):
"""Synchronous-asynchronous ReportPortal Client which uses background Thread to execute async coroutines.
Expand Down Expand Up @@ -1369,7 +1374,7 @@ def __init_task_list(self, task_list: Optional[BackgroundTaskList[Task[_T]]] = N
def __heartbeat(self):
# We operate on our own loop with daemon thread, so we will exit in any way when main thread exit,
# so we can iterate forever
self._loop.call_at(self._loop.time() + 0.1, self.__heartbeat)
heartbeat(self)

def __init_loop(self, loop: Optional[asyncio.AbstractEventLoop] = None):
self._thread = None
Expand Down Expand Up @@ -1471,6 +1476,8 @@ def finish_tasks(self):
break
logs = self._log_batcher.flush()
if logs:
# We use own Task Factory in which we add the following method to the Task class
# noinspection PyUnresolvedReferences
self._loop.create_task(self._log_batch(logs)).blocking_result()

def clone(self) -> 'ThreadedRPClient':
Expand Down
49 changes: 33 additions & 16 deletions reportportal_client/steps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,30 +43,38 @@ def test_my_nested_step():
"""
from functools import wraps
from typing import Callable, ParamSpec, TypeVar, Optional, Dict, Union, Type

from reportportal_client import RP
# noinspection PyProtectedMember
from reportportal_client._internal.aio.tasks import Task
# noinspection PyProtectedMember
from reportportal_client._internal.local import current
from reportportal_client.helpers import get_function_params, timestamp

NESTED_STEP_ITEMS = ('step', 'scenario', 'before_class', 'before_groups',
'before_method', 'before_suite', 'before_test',
'after_test', 'after_suite', 'after_class',
'after_groups', 'after_method')
NESTED_STEP_ITEMS = ('step', 'scenario', 'before_class', 'before_groups', 'before_method', 'before_suite',
'before_test', 'after_test', 'after_suite', 'after_class', 'after_groups', 'after_method')

_Param = ParamSpec("_Param")
_Return = TypeVar("_Return")


# noinspection PyUnresolvedReferences
class StepReporter:
"""Nested Steps context handling class."""

def __init__(self, rp_client):
client: RP

def __init__(self, rp_client: RP):
"""Initialize required attributes.
:param rp_client: ReportPortal client which will be used to report
steps
"""
self.client = rp_client

def start_nested_step(self, name, start_time, parameters=None, **_):
def start_nested_step(self, name: str, start_time: str, parameters: Optional[Dict[str, Any]] = None,
**_: Dict[str, Any]) -> Union[Optional[str], Task[Optional[str]]]:
"""Start Nested Step on ReportPortal.
:param name: Nested Step name
Expand All @@ -79,7 +87,8 @@ def start_nested_step(self, name, start_time, parameters=None, **_):
return self.client.start_test_item(
name, start_time, 'step', has_stats=False, parameters=parameters, parent_item_id=parent_id)

def finish_nested_step(self, item_id, end_time, status=None, **_):
def finish_nested_step(self, item_id: str, end_time: str, status: str = None,
**_: Dict[str, Any]) -> Union[Optional[str], Task[Optional[str]]]:
"""Finish a Nested Step on ReportPortal.
:param item_id: Nested Step item ID
Expand All @@ -89,10 +98,16 @@ def finish_nested_step(self, item_id, end_time, status=None, **_):
return self.client.finish_test_item(item_id, end_time, status=status)


class Step:
class Step(Callable[[_Param], _Return]):
"""Step context handling class."""

def __init__(self, name, params, status, rp_client):
name: str
params: Dict
status: str
client: Optional[RP]
__item_id: Union[Optional[str], Task[Optional[str]]]

def __init__(self, name: str, params: Dict, status: str, rp_client: Optional[RP]):
"""Initialize required attributes.
:param name: Nested Step name
Expand All @@ -108,7 +123,7 @@ def __init__(self, name, params, status, rp_client):
self.client = rp_client
self.__item_id = None

def __enter__(self):
def __enter__(self) -> None:
"""Enter the runtime context related to this object."""
# Cannot call _local.current() early since it will be initialized
# before client put something in there
Expand All @@ -124,7 +139,7 @@ def __enter__(self):
param_str = 'Parameters: ' + '; '.join(param_list)
rp_client.log(timestamp(), param_str, level='INFO', item_id=self.__item_id)

def __exit__(self, exc_type, exc_val, exc_tb):
def __exit__(self, exc_type: Type[BaseException], exc_val, exc_tb) -> None:
"""Exit the runtime context related to this object."""
# Cannot call local.current() early since it will be initialized before client put something in there
rp_client = self.client or current()
Expand All @@ -138,25 +153,27 @@ def __exit__(self, exc_type, exc_val, exc_tb):
step_status = 'FAILED'
rp_client.step_reporter.finish_nested_step(self.__item_id, timestamp(), step_status)

def __call__(self, func):
def __call__(self, *args, **kwargs):
"""Wrap and call a function reference.
:param func: function reference
"""
func = args[0]

@wraps(func)
def wrapper(*args, **kwargs):
def wrapper(*my_args, **my_kwargs):
__tracebackhide__ = True
params = self.params
if params is None:
params = get_function_params(func, args, kwargs)
params = get_function_params(func, my_args, my_kwargs)
with Step(self.name, params, self.status, self.client):
return func(*args, **kwargs)
return func(*my_args, **my_kwargs)

return wrapper


def step(name_source, params=None, status='PASSED', rp_client=None):
def step(name_source: Union[Callable[[_Param], _Return], str], params: Optional[Dict] = None, status: str = 'PASSED',
rp_client: Optional[RP] = None) -> Callable[[_Param], _Return]:
"""Nested step report function.
Create a Nested Step inside a test method on ReportPortal.
Expand Down
61 changes: 0 additions & 61 deletions reportportal_client/steps/__init__.pyi

This file was deleted.

0 comments on commit f223bbe

Please sign in to comment.