Skip to content
This repository has been archived by the owner on Feb 8, 2023. It is now read-only.

Commit

Permalink
Fix block size for transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
qianduoduo0904 committed Dec 8, 2022
1 parent 41a495a commit ef4b525
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 4 deletions.
1 change: 1 addition & 0 deletions mars/oscar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
)
from .backends import allocate_strategy
from .backends.pool import MainActorPoolType
from .backends.transfer import temp_transfer_block_size
from .batch import extensible
from .core import (
ActorRef,
Expand Down
15 changes: 15 additions & 0 deletions mars/oscar/backends/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,21 @@
DEFAULT_TRANSFER_BLOCK_SIZE = 4 * 1024**2


@contextlib.contextmanager
def temp_transfer_block_size(size: int):
global DEFAULT_TRANSFER_BLOCK_SIZE

if size == DEFAULT_TRANSFER_BLOCK_SIZE:
yield
else:
default_size = DEFAULT_TRANSFER_BLOCK_SIZE
DEFAULT_TRANSFER_BLOCK_SIZE = size
try:
yield
finally:
DEFAULT_TRANSFER_BLOCK_SIZE = default_size


def _get_buffer_size(buf) -> int:
try:
return buf.nbytes
Expand Down
9 changes: 5 additions & 4 deletions mars/services/storage/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,11 @@ async def send_batch_data(
rest_keys.append(data_key)

if local_buffers:
# for data that supports buffer protocol on both sides
# hand over to oscar to transfer data
await mo.copyto_via_buffers(local_buffers, remote_buffer_refs)
await receiver_ref.close_writers(session_id, copied_keys)
with mo.temp_transfer_block_size(block_size):
# for data that supports buffer protocol on both sides
# hand over to oscar to transfer data
await mo.copyto_via_buffers(local_buffers, remote_buffer_refs)
await receiver_ref.close_writers(session_id, copied_keys)
else:
rest_keys = to_send_keys
rest_readers = readers
Expand Down

0 comments on commit ef4b525

Please sign in to comment.