From e7fcd7b3925dddc52289df5b52a827350372259d Mon Sep 17 00:00:00 2001 From: Antoni Baum Date: Tue, 10 Dec 2024 11:55:47 -0800 Subject: [PATCH] Better exception handling --- redis/asyncio/cluster.py | 40 ++++++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 0db833860c..ac13b06bf1 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -1515,7 +1515,7 @@ async def _execute( allow_redirections: bool = True, ) -> List[Any]: todo = [ - cmd for cmd in stack if not cmd.result or isinstance(cmd.result, Exception) + cmd for cmd in stack if not cmd.unwrap_result() or cmd.get_all_exceptions() ] nodes = {} @@ -1548,18 +1548,22 @@ async def _execute( if allow_redirections: # send each errored command individually for cmd in todo: - if isinstance(cmd.result, (TryAgainError, MovedError, AskError)): - try: - result = await client.execute_command( - *cmd.args, **cmd.kwargs - ) - except Exception as e: - result = e + for name, exc in cmd.get_all_exceptions(): + if isinstance(exc, (TryAgainError, MovedError, AskError)): + try: + result = await client.execute_command( + *cmd.args, **cmd.kwargs + ) + except Exception as e: + result = e - if isinstance(result, dict): - cmd.result = result - else: - cmd.set_node_result(cmd.target_nodes[0].name, result) + if isinstance(result, dict): + cmd.result = result + else: + cmd.set_node_result(name, result) + + # We have already retried the command on all nodes. + break if raise_on_error: for cmd in todo: @@ -1583,11 +1587,16 @@ async def _execute( # to replace it. # Note: when the error is raised we'll reset the default node in the # caller function. + has_exc = False for cmd in default_node[1]: # Check if it has a command that failed with a relevant # exception - if type(cmd.result) in self.__class__.ERRORS_ALLOW_RETRY: - client.replace_default_node() + for name, exc in cmd.get_all_exceptions(): + if type(exc) in self.__class__.ERRORS_ALLOW_RETRY: + client.replace_default_node() + has_exc = True + break + if has_exc: break return [cmd.unwrap_result() for cmd in stack] @@ -1649,5 +1658,8 @@ def get_first_exception(self) -> Optional[Tuple[str, Exception]]: ((n, r) for n, r in self.result.items() if isinstance(r, Exception)), None ) + def get_all_exceptions(self) -> List[Tuple[str, Exception]]: + return [(n, r) for n, r in self.result.items() if isinstance(r, Exception)] + def __repr__(self) -> str: return f"[{self.position}] {self.args} ({self.kwargs})"