Skip to content

Commit

Permalink
Better exception handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Yard1 committed Dec 10, 2024
1 parent da5378c commit e7fcd7b
Showing 1 changed file with 26 additions and 14 deletions.
40 changes: 26 additions & 14 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1515,7 +1515,7 @@ async def _execute(
allow_redirections: bool = True,
) -> List[Any]:
todo = [
cmd for cmd in stack if not cmd.result or isinstance(cmd.result, Exception)
cmd for cmd in stack if not cmd.unwrap_result() or cmd.get_all_exceptions()
]

nodes = {}
Expand Down Expand Up @@ -1548,18 +1548,22 @@ async def _execute(
if allow_redirections:
# send each errored command individually
for cmd in todo:
if isinstance(cmd.result, (TryAgainError, MovedError, AskError)):
try:
result = await client.execute_command(
*cmd.args, **cmd.kwargs
)
except Exception as e:
result = e
for name, exc in cmd.get_all_exceptions():
if isinstance(exc, (TryAgainError, MovedError, AskError)):
try:
result = await client.execute_command(
*cmd.args, **cmd.kwargs
)
except Exception as e:
result = e

if isinstance(result, dict):
cmd.result = result
else:
cmd.set_node_result(cmd.target_nodes[0].name, result)
if isinstance(result, dict):
cmd.result = result
else:
cmd.set_node_result(name, result)

# We have already retried the command on all nodes.
break

if raise_on_error:
for cmd in todo:
Expand All @@ -1583,11 +1587,16 @@ async def _execute(
# to replace it.
# Note: when the error is raised we'll reset the default node in the
# caller function.
has_exc = False
for cmd in default_node[1]:
# Check if it has a command that failed with a relevant
# exception
if type(cmd.result) in self.__class__.ERRORS_ALLOW_RETRY:
client.replace_default_node()
for name, exc in cmd.get_all_exceptions():
if type(exc) in self.__class__.ERRORS_ALLOW_RETRY:
client.replace_default_node()
has_exc = True
break
if has_exc:
break

return [cmd.unwrap_result() for cmd in stack]
Expand Down Expand Up @@ -1649,5 +1658,8 @@ def get_first_exception(self) -> Optional[Tuple[str, Exception]]:
((n, r) for n, r in self.result.items() if isinstance(r, Exception)), None
)

def get_all_exceptions(self) -> List[Tuple[str, Exception]]:
return [(n, r) for n, r in self.result.items() if isinstance(r, Exception)]

def __repr__(self) -> str:
return f"[{self.position}] {self.args} ({self.kwargs})"

0 comments on commit e7fcd7b

Please sign in to comment.