From 407418fdd3170bb4e1b196b5cc01d1c8f560ea91 Mon Sep 17 00:00:00 2001 From: qianduoduo0904 Date: Fri, 2 Dec 2022 13:47:46 +0000 Subject: [PATCH 01/18] Support `copyto_via_buffers` & `copyto_via_file_objects` for oscar --- mars/lib/aio/__init__.py | 1 - mars/oscar/__init__.py | 10 +- mars/oscar/api.py | 32 ++- mars/oscar/backends/communication/base.py | 4 +- mars/oscar/backends/communication/ucx.py | 62 ++-- mars/oscar/backends/communication/utils.py | 21 +- mars/oscar/backends/context.py | 4 +- .../backends/mars/tests/test_transfer.py | 228 +++++++++++++++ mars/oscar/backends/message.pyi | 34 ++- mars/oscar/backends/message.pyx | 87 +++++- mars/oscar/backends/pool.py | 42 ++- mars/oscar/backends/router.py | 107 +++++-- mars/oscar/backends/transfer.py | 267 ++++++++++++++++++ mars/oscar/context.pyx | 92 +++++- mars/oscar/core.pxd | 10 + mars/oscar/core.pyx | 79 +++++- mars/oscar/utils.pyx | 1 + mars/serialization/aio.py | 9 +- mars/utils.py | 21 ++ 19 files changed, 1029 insertions(+), 82 deletions(-) create mode 100644 mars/oscar/backends/mars/tests/test_transfer.py create mode 100644 mars/oscar/backends/transfer.py diff --git a/mars/lib/aio/__init__.py b/mars/lib/aio/__init__.py index 8177989f72..fc36bf80fe 100644 --- a/mars/lib/aio/__init__.py +++ b/mars/lib/aio/__init__.py @@ -13,7 +13,6 @@ # limitations under the License. import asyncio -import contextlib import sys from .file import AioFileObject, AioFilesystem diff --git a/mars/oscar/__init__.py b/mars/oscar/__init__.py index 3672735499..829ec1f4e2 100644 --- a/mars/oscar/__init__.py +++ b/mars/oscar/__init__.py @@ -32,11 +32,19 @@ setup_cluster, wait_actor_pool_recovered, get_pool_config, + copyto_via_buffers, + copyto_via_file_objects, + buffer_ref, + file_object_ref, ) from .backends import allocate_strategy from .backends.pool import MainActorPoolType from .batch import extensible -from .core import ActorRef +from .core import ( + ActorRef, + BufferRef, + FileObjectRef, +) # noqa: F401 # pylint: disable=unused-import from .debug import set_debug_options, get_debug_options, DebugOptions from .errors import ( ActorNotExist, diff --git a/mars/oscar/api.py b/mars/oscar/api.py index baf2252c97..6d6288741e 100644 --- a/mars/oscar/api.py +++ b/mars/oscar/api.py @@ -13,13 +13,14 @@ # limitations under the License. from urllib.parse import urlparse -from typing import Any, Dict, Type, Tuple +from typing import Any, Dict, Type, Tuple, List from numbers import Number from collections import defaultdict +from ..lib.aio import AioFileObject from .backend import get_backend from .context import get_context -from .core import _Actor, _StatelessActor, ActorRef +from .core import _Actor, _StatelessActor, ActorRef, BufferRef, FileObjectRef async def create_actor(actor_cls, *args, uid=None, address=None, **kwargs) -> ActorRef: @@ -42,11 +43,21 @@ async def actor_ref(*args, **kwargs) -> ActorRef: return await ctx.actor_ref(*args, **kwargs) -async def kill_actor(actor_ref): +async def kill_actor(actor_ref: ActorRef): ctx = get_context() return await ctx.kill_actor(actor_ref) +def buffer_ref(address: str, buffer: Any) -> BufferRef: + ctx = get_context() + return ctx.buffer_ref(address, buffer) + + +def file_object_ref(address: str, fileobj: AioFileObject) -> FileObjectRef: + ctx = get_context() + return ctx.file_object_ref(address, fileobj) + + async def create_actor_pool(address: str, n_process: int = None, **kwargs): if address is None: raise ValueError("address has to be provided") @@ -70,6 +81,21 @@ async def get_pool_config(address: str): return await ctx.get_pool_config(address) +async def copyto_via_buffers(local_buffers: list, remote_buffer_refs: List[BufferRef]): + ctx = get_context() + return await ctx.copyto_via_buffers(local_buffers, remote_buffer_refs) + + +async def copyto_via_file_objects( + local_file_objects: List[AioFileObject], + remote_file_object_refs: List[FileObjectRef], +): + ctx = get_context() + return await ctx.copyto_via_file_objects( + local_file_objects, remote_file_object_refs + ) + + def setup_cluster(address_to_resources: Dict[str, Dict[str, Number]]): scheme_to_address_resources = defaultdict(dict) for address, resources in address_to_resources.items(): diff --git a/mars/oscar/backends/communication/base.py b/mars/oscar/backends/communication/base.py index 9dc891508a..ccdb424345 100644 --- a/mars/oscar/backends/communication/base.py +++ b/mars/oscar/backends/communication/base.py @@ -264,11 +264,11 @@ def parse_config(cls, config: dict) -> dict: return dict() @implements(Channel.send) - async def send(self, message): + async def send(self, message: Any): return await self.channel.send(message) @implements(Channel.recv) - async def recv(self): + async def recv(self) -> Any: return await self.channel.recv() async def close(self): diff --git a/mars/oscar/backends/communication/ucx.py b/mars/oscar/backends/communication/ucx.py index 5026464fda..af35a8c638 100644 --- a/mars/oscar/backends/communication/ucx.py +++ b/mars/oscar/backends/communication/ucx.py @@ -23,7 +23,7 @@ import cloudpickle import numpy as np -from ....utils import lazy_import, implements, classproperty +from ....utils import lazy_import, implements, classproperty, is_cuda_buffer from ....lib.nvutils import get_index_and_uuid, get_cuda_context from ....serialization import deserialize from ....serialization.aio import AioSerializer, get_header_length, BUFFER_SIZES_NAME @@ -246,23 +246,7 @@ async def send(self, message: Any): compress = self.compression or 0 serializer = AioSerializer(message, compress=compress) buffers = await serializer.run() - try: - # It is necessary to first synchronize the default stream before start - # sending We synchronize the default stream because UCX is not - # stream-ordered and syncing the default stream will wait for other - # non-blocking CUDA streams. Note this is only sufficient if the memory - # being sent is not currently in use on non-blocking CUDA streams. - if any(hasattr(buf, "__cuda_array_interface__") for buf in buffers): - # has GPU buffer - synchronize_stream(0) - - async with self._send_lock: - for buffer in buffers: - if buffer.nbytes if hasattr(buffer, "nbytes") else len(buffer) > 0: - await self.ucp_endpoint.send(buffer) - except ucp.exceptions.UCXBaseException: # pragma: no cover - self.abort() - raise ChannelClosed("While writing, the connection was closed") + return await self.send_buffers(buffers) @implements(Channel.recv) async def recv(self): @@ -302,6 +286,41 @@ async def recv(self): raise EOFError("Server closed already") return deserialize(header, buffers) + async def send_buffers(self, buffers: list): + try: + # It is necessary to first synchronize the default stream before start + # sending We synchronize the default stream because UCX is not + # stream-ordered and syncing the default stream will wait for other + # non-blocking CUDA streams. Note this is only sufficient if the memory + # being sent is not currently in use on non-blocking CUDA streams. + if any(is_cuda_buffer(buf) for buf in buffers): + # has GPU buffer + synchronize_stream(0) + + async with self._send_lock: + for buffer in buffers: + if buffer.nbytes if hasattr(buffer, "nbytes") else len(buffer) > 0: + await self.ucp_endpoint.send(buffer) + except ucp.exceptions.UCXBaseException: # pragma: no cover + self.abort() + raise ChannelClosed("While writing, the connection was closed") + + async def recv_buffers(self, buffers: list): + async with self._recv_lock: + try: + for buffer in buffers: + await self.ucp_endpoint.recv(buffer) + except BaseException as e: + if not self._closed: + # In addition to UCX exceptions, may be CancelledError or another + # "low-level" exception. The only safe thing to do is to abort. + self.abort() + raise ChannelClosed( + f"Connection closed by writer.\nInner exception: {e!r}" + ) from e + else: + raise EOFError("Server closed already") + def abort(self): self._closed = True if self.ucp_endpoint is not None: @@ -452,6 +471,7 @@ class UCXClient(Client): __slots__ = () scheme = UCXServer.scheme + channel: UCXChannel @classmethod def parse_config(cls, config: dict) -> dict: @@ -479,3 +499,9 @@ async def connect( ucp_endpoint, local_address=local_address, dest_address=dest_address ) return UCXClient(local_address, dest_address, channel) + + async def send_buffers(self, buffers: list): + return await self.channel.send_buffers(buffers) + + async def recv_buffers(self, buffers: list): + return await self.channel.recv_buffers(buffers) diff --git a/mars/oscar/backends/communication/utils.py b/mars/oscar/backends/communication/utils.py index 3add2e4d41..e605d247b1 100644 --- a/mars/oscar/backends/communication/utils.py +++ b/mars/oscar/backends/communication/utils.py @@ -18,7 +18,7 @@ import numpy as np from ....serialization.aio import BUFFER_SIZES_NAME -from ....utils import lazy_import +from ....utils import lazy_import, convert_to_cupy_ndarray, is_cuda_buffer cupy = lazy_import("cupy") cudf = lazy_import("cudf") @@ -27,23 +27,10 @@ CUDA_CHUNK_SIZE = 16 * 1024**2 -def _convert_to_cupy_ndarray( - cuda_buffer: Union["cupy.ndarray", "rmm.DeviceBuffer"] -) -> "cupy.ndarray": - if isinstance(cuda_buffer, cupy.ndarray): - return cuda_buffer - - size = cuda_buffer.nbytes - data = cuda_buffer.__cuda_array_interface__["data"][0] - memory = cupy.cuda.UnownedMemory(data, size, cuda_buffer) - ptr = cupy.cuda.MemoryPointer(memory, 0) - return cupy.ndarray(shape=size, dtype="u1", memptr=ptr) - - def write_buffers(writer: StreamWriter, buffers: List): def _write_cuda_buffer(cuda_buffer: Union["cupy.ndarray", "rmm.DeviceBuffer"]): # convert cuda buffer to cupy ndarray - cuda_buffer = _convert_to_cupy_ndarray(cuda_buffer) + cuda_buffer = convert_to_cupy_ndarray(cuda_buffer) chunk_size = CUDA_CHUNK_SIZE offset = 0 @@ -58,7 +45,7 @@ def _write_cuda_buffer(cuda_buffer: Union["cupy.ndarray", "rmm.DeviceBuffer"]): offset += size for buffer in buffers: - if hasattr(buffer, "__cuda_array_interface__"): + if is_cuda_buffer(buffer): # GPU buffer _write_cuda_buffer(buffer) else: @@ -77,7 +64,7 @@ async def read_buffers(header: Dict, reader: StreamReader): buffers.append(content) else: buffer = rmm.DeviceBuffer(size=buf_size) - arr = _convert_to_cupy_ndarray(buffer) + arr = convert_to_cupy_ndarray(buffer) offset = 0 chunk_size = CUDA_CHUNK_SIZE while offset < buf_size: diff --git a/mars/oscar/backends/context.py b/mars/oscar/backends/context.py index 5f07106876..40b5a8b751 100644 --- a/mars/oscar/backends/context.py +++ b/mars/oscar/backends/context.py @@ -41,6 +41,7 @@ ControlMessageType, ) from .router import Router +from .transfer import TransferClient @dataslots @@ -49,12 +50,13 @@ class ProfilingContext: task_id: str -class MarsActorContext(BaseActorContext): +class MarsActorContext(TransferClient, BaseActorContext): __slots__ = ("_caller",) support_allocate_strategy = True def __init__(self, address: str = None): + TransferClient.__init__(self) BaseActorContext.__init__(self, address) self._caller = ActorCaller() diff --git a/mars/oscar/backends/mars/tests/test_transfer.py b/mars/oscar/backends/mars/tests/test_transfer.py new file mode 100644 index 0000000000..5d73695b2b --- /dev/null +++ b/mars/oscar/backends/mars/tests/test_transfer.py @@ -0,0 +1,228 @@ +# Copyright 2022 XProbe Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import tempfile +from typing import List + +import numpy as np +import pytest + +from .....lib.aio import AioFileObject +from .....tests.core import require_cupy +from .....utils import lazy_import, convert_to_cupy_ndarray +from .... import ( + Actor, + BufferRef, + buffer_ref, + FileObjectRef, + file_object_ref, + ActorRefType, + actor_ref, + copyto_via_buffers, + copyto_via_file_objects, +) +from ....context import get_context +from ...allocate_strategy import ProcessIndex +from ...pool import create_actor_pool +from ..pool import MainActorPool + + +rmm = lazy_import("rmm") +cupy = lazy_import("cupy") +ucp = lazy_import("ucp") + + +class BufferTransferActor(Actor): + def __init__(self): + self._buffers = [] + + def create_buffers(self, sizes: List[int], cpu: bool = True) -> List[BufferRef]: + if cpu: + buffers = [np.empty(size, dtype="u1").data for size in sizes] + else: + buffers = [ + convert_to_cupy_ndarray(rmm.DeviceBuffer(size=size)) for size in sizes + ] + self._buffers.extend(buffers) + return [buffer_ref(self.address, buf) for buf in buffers] + + def create_arrays_from_buffer_refs( + self, buf_refs: List[BufferRef], cpu: bool = True + ): + if cpu: + return [ + np.frombuffer(BufferRef.get_buffer(ref), dtype="u1") for ref in buf_refs + ] + else: + return [ + convert_to_cupy_ndarray(BufferRef.get_buffer(ref)) for ref in buf_refs + ] + + async def copy_data( + self, ref: ActorRefType["BufferTransferActor"], sizes, cpu: bool = True + ): + xp = np if cpu else cupy + arrays = [ + np.random.randint(2, dtype=bool, size=size).astype("u1") for size in sizes + ] + buffers = [a.data for a in arrays] + if not cpu: + arrays = [cupy.asarray(a) for a in arrays] + buffers = arrays + + ref = await actor_ref(ref) + buf_refs = await ref.create_buffers(sizes, cpu=cpu) + await copyto_via_buffers(buffers, buf_refs) + new_arrays = await ref.create_arrays_from_buffer_refs(buf_refs, cpu=cpu) + assert len(arrays) == len(new_arrays) + for a1, a2 in zip(arrays, new_arrays): + xp.testing.assert_array_equal(a1, a2) + + +async def _copy_test(scheme: str, cpu: bool): + start_method = ( + os.environ.get("POOL_START_METHOD", "forkserver") + if sys.platform != "win32" + else None + ) + pool = await create_actor_pool( + "127.0.0.1", + pool_cls=MainActorPool, + n_process=2, + subprocess_start_method=start_method, + external_address_schemes=[None, scheme, scheme], + ) + + async with pool: + ctx = get_context() + + # actor on main pool + actor_ref1 = await ctx.create_actor( + BufferTransferActor, + uid="test-1", + address=pool.external_address, + allocate_strategy=ProcessIndex(1), + ) + actor_ref2 = await ctx.create_actor( + BufferTransferActor, + uid="test-2", + address=pool.external_address, + allocate_strategy=ProcessIndex(2), + ) + sizes = [10 * 1024**2, 3 * 1024**2] + await actor_ref1.copy_data(actor_ref2, sizes, cpu=cpu) + + +schemes = [None] +if ucp is not None: + schemes.append("ucx") + + +@pytest.mark.asyncio +@pytest.mark.parametrize("scheme", schemes) +async def test_copy(scheme): + await _copy_test(scheme, True) + + +@require_cupy +@pytest.mark.parametrize("scheme", schemes) +async def tests_gpu_copy(scheme): + await _copy_test(scheme, False) + + +class FileobjTransferActor(Actor): + def __init__(self): + self._fileobjs = [] + + async def create_file_objects(self, names: List[str]) -> List[FileObjectRef]: + refs = [] + for name in names: + fobj = open(name, "w+b") + afobj = AioFileObject(fobj) + self._fileobjs.append(afobj) + refs.append(file_object_ref(self.address, afobj)) + return refs + + async def close(self): + for fobj in self._fileobjs: + assert await fobj.tell() > 0 + await fobj.close() + + async def copy_data( + self, + ref: ActorRefType["FileobjTransferActor"], + names1: List[str], + names2: List[str], + sizes: List[int], + ): + fobjs = [] + for name, size in zip(names1, sizes): + fobj = open(name, "w+b") + fobj.write(np.random.bytes(size)) + fobj.seek(0) + fobjs.append(AioFileObject(fobj)) + + ref = await actor_ref(ref) + file_obj_refs = await ref.create_file_objects(names2) + await copyto_via_file_objects(fobjs, file_obj_refs) + _ = [await f.close() for f in fobjs] + await ref.close() + + for n1, n2 in zip(names1, names2): + with open(n1, "rb") as f1, open(n2, "rb") as f2: + b1 = f1.read() + b2 = f2.read() + assert b1 == b2 + + +@pytest.mark.asyncio +async def test_copy_via_fileobjects(): + start_method = ( + os.environ.get("POOL_START_METHOD", "forkserver") + if sys.platform != "win32" + else None + ) + pool = await create_actor_pool( + "127.0.0.1", + pool_cls=MainActorPool, + n_process=2, + subprocess_start_method=start_method, + ) + + with tempfile.TemporaryDirectory() as d: + async with pool: + ctx = get_context() + + # actor on main pool + actor_ref1 = await ctx.create_actor( + FileobjTransferActor, + uid="test-1", + address=pool.external_address, + allocate_strategy=ProcessIndex(1), + ) + actor_ref2 = await ctx.create_actor( + FileobjTransferActor, + uid="test-2", + address=pool.external_address, + allocate_strategy=ProcessIndex(2), + ) + sizes = [10 * 1024**2, 3 * 1024**2] + names = [] + for _ in range(2 * len(sizes)): + _, p = tempfile.mkstemp(dir=d) + names.append(p) + + await actor_ref1.copy_data(actor_ref2, names[::2], names[1::2], sizes=sizes) diff --git a/mars/oscar/backends/message.pyi b/mars/oscar/backends/message.pyi index 9b035f666e..e8a252549c 100644 --- a/mars/oscar/backends/message.pyi +++ b/mars/oscar/backends/message.pyi @@ -14,9 +14,9 @@ from enum import Enum from types import TracebackType -from typing import Any, Type +from typing import Any, Type, List -from ..core import ActorRef +from ..core import ActorRef, BufferRef, FileObjectRef DEFAULT_PROTOCOL: int = 0 @@ -31,6 +31,8 @@ class MessageType(Enum): send = 7 tell = 8 cancel = 9 + copyto_buffers = 10 + copyto_fileobjects = 11 class ControlMessageType(Enum): stop = 0 @@ -39,6 +41,8 @@ class ControlMessageType(Enum): get_config = 3 wait_pool_recovered = 4 add_sub_pool_actor = 5 + # the new channel created is for data transfer only + switch_to_transfer = 6 class _MessageBase: message_type: MessageType @@ -211,4 +215,30 @@ class DeserializeMessageFailed(RuntimeError): def __init__(self, message_id: bytes): ... def __str__(self): ... +class CopytoBuffersMessage(_MessageBase): + message_type = MessageType.copyto_buffers + + buffer_refs: List[BufferRef] + + def __int__( + self, + message_id: bytes = None, + buffer_refs: List[BufferRef] = None, + protocol: int = DEFAULT_PROTOCOL, + message_trace: list = None, + ): ... + +class CopytoFileObjectsMessage(_MessageBase): + message_type = MessageType.copyto_file_objects + + fileobj_refs: List[FileObjectRef] + + def __int__( + self, + message_id: bytes = None, + fileobj_refs: List[FileObjectRef] = None, + protocol: int = DEFAULT_PROTOCOL, + message_trace: list = None, + ): ... + def new_message_id() -> bytes: ... diff --git a/mars/oscar/backends/message.pyx b/mars/oscar/backends/message.pyx index 3b5bde7a3b..834a0bb983 100644 --- a/mars/oscar/backends/message.pyx +++ b/mars/oscar/backends/message.pyx @@ -20,7 +20,7 @@ from ...lib.tblib import pickling_support from ...serialization.core cimport Serializer from ..._utils cimport new_random_id from ...utils import wrap_exception -from ..core cimport ActorRef +from ..core cimport ActorRef, BufferRef, FileObjectRef # make sure traceback can be pickled pickling_support.install() @@ -40,6 +40,8 @@ class MessageType(Enum): send = 7 tell = 8 cancel = 9 + copyto_buffers = 10 + copyto_fileobjects = 11 class ControlMessageType(Enum): @@ -49,6 +51,8 @@ class ControlMessageType(Enum): get_config = 3 wait_pool_recovered = 4 add_sub_pool_actor = 5 + # the new channel created is for data transfer only + switch_to_transfer = 6 cdef class _MessageSerialItem: @@ -484,6 +488,85 @@ cdef class CancelMessage(_MessageBase): self.cancel_message_id = serialized[-1] +cdef class CopytoBuffersMessage(_MessageBase): + message_type = MessageType.copyto_buffers + + cdef: + public list buffer_refs + + def __init__( + self, + bytes message_id = None, + list buffer_refs = None, + int protocol = _DEFAULT_PROTOCOL, + list message_trace = None, + ): + _MessageBase.__init__( + self, + message_id, + protocol=protocol, + message_trace=message_trace + ) + self.buffer_refs = buffer_refs + + cdef _MessageSerialItem serial(self): + cdef _MessageSerialItem item = _MessageBase.serial(self) + for buffer_ref in self.buffer_refs: + item.serialized += ( + buffer_ref.address, buffer_ref.uid + ) + item.serialized += (len(self.buffer_refs),) + return item + + cdef deserial_members(self, tuple serialized, list subs): + _MessageBase.deserial_members(self, serialized, subs) + size = serialized[-1] + refs = [] + for address, uid in zip(serialized[-size * 2 - 1: -1: 2], serialized[-size * 2::2]): + refs.append(BufferRef(address, uid)) + assert len(refs) == size + self.buffer_refs = refs + +cdef class CopytoFileObjectsMessage(_MessageBase): + message_type = MessageType.copyto_fileobjects + + cdef: + public list fileobj_refs + + def __init__( + self, + bytes message_id = None, + list fileobj_refs = None, + int protocol = _DEFAULT_PROTOCOL, + list message_trace = None, + ): + _MessageBase.__init__( + self, + message_id, + protocol=protocol, + message_trace=message_trace + ) + self.fileobj_refs = fileobj_refs + + cdef _MessageSerialItem serial(self): + cdef _MessageSerialItem item = _MessageBase.serial(self) + for fileobj_ref in self.fileobj_refs: + item.serialized += ( + fileobj_ref.address, fileobj_ref.uid + ) + item.serialized += (len(self.fileobj_refs),) + return item + + cdef deserial_members(self, tuple serialized, list subs): + _MessageBase.deserial_members(self, serialized, subs) + size = serialized[-1] + refs = [] + for address, uid in zip(serialized[-size * 2 - 1: -1: 2], serialized[-size * 2::2]): + refs.append(FileObjectRef(address, uid)) + assert len(refs) == size + self.fileobj_refs = refs + + cdef dict _message_type_to_message_cls = { MessageType.control.value: ControlMessage, MessageType.result.value: ResultMessage, @@ -495,6 +578,8 @@ cdef dict _message_type_to_message_cls = { MessageType.send.value: SendMessage, MessageType.tell.value: TellMessage, MessageType.cancel.value: CancelMessage, + MessageType.copyto_buffers.value: CopytoBuffersMessage, + MessageType.copyto_fileobjects.value: CopytoFileObjectsMessage, } diff --git a/mars/oscar/backends/pool.py b/mars/oscar/backends/pool.py index 02a6a9825b..ce95679628 100644 --- a/mars/oscar/backends/pool.py +++ b/mars/oscar/backends/pool.py @@ -62,6 +62,7 @@ ControlMessageType, ) from .router import Router +from .transfer import TransferServer logger = logging.getLogger(__name__) ray = lazy_import("ray") @@ -493,18 +494,39 @@ async def stop(self): def stopped(self) -> bool: return self._stopped.is_set() + @classmethod + async def _recv_message(cls, channel: Channel): + try: + return await channel.recv() + except EOFError: + # no data to read, check channel + try: + await channel.close() + except (ConnectionError, EOFError): + # close failed, ignore + pass + async def on_new_channel(self, channel: Channel): + message = await self._recv_message(channel) + if ( + message.message_type == MessageType.control + and message.control_message_type == ControlMessageType.switch_to_transfer + ): + # switch this channel to data transfer channel + # the channel will be handed over to TransferServer + # and this loop will exit + return await TransferServer.handle_transfer_channel(channel, self._stopped) + else: + asyncio.create_task(self.process_message(message, channel)) + # delete to release the reference of message + del message + await asyncio.sleep(0) + + # continue to keep processing messages while not self._stopped.is_set(): - try: - message = await channel.recv() - except EOFError: - # no data to read, check channel - try: - await channel.close() - except (ConnectionError, EOFError): - # close failed, ignore - pass - return + message = await self._recv_message(channel) + if message is None: + break asyncio.create_task(self.process_message(message, channel)) # delete to release the reference of message del message diff --git a/mars/oscar/backends/router.py b/mars/oscar/backends/router.py index a3c3628da8..832170d8a9 100644 --- a/mars/oscar/backends/router.py +++ b/mars/oscar/backends/router.py @@ -13,7 +13,7 @@ # limitations under the License. import threading -from typing import Dict, List, Tuple, Type, Any, Optional +from typing import Dict, List, Tuple, Type, Any, Optional, Union from .communication import get_client_type, Client @@ -64,7 +64,7 @@ def __init__( self._cache_local = threading.local() @property - def _cache(self) -> Dict[Tuple[str, Any], Client]: + def _cache(self) -> Dict[Tuple[str, Any, Optional[Type[Client]]], Client]: try: return self._cache_local.cache except AttributeError: @@ -100,35 +100,106 @@ def external_address(self): return self._curr_external_addresses[0] def get_internal_address(self, external_address: str) -> str: - if external_address in self._curr_external_addresses: + try: # local address, use dummy address - return self._local_mapping.get(external_address) - # try to lookup inner address from address mapping - return self._mapping.get(external_address) + return self._local_mapping[external_address] + except KeyError: + # try to lookup inner address from address mapping + return self._mapping.get(external_address) async def get_client( - self, external_address: str, from_who: Any = None, cached: bool = True, **kw - ) -> Client: - if cached and (external_address, from_who) in self._cache: - cached_client = self._cache[external_address, from_who] + self, + external_address: str, + from_who: Any = None, + cached: bool = True, + return_from_cache=False, + **kw, + ) -> Union[Client, Tuple[Client, bool]]: + if cached and (external_address, from_who, None) in self._cache: + cached_client = self._cache[external_address, from_who, None] if cached_client.closed: # closed before, ignore it - del self._cache[external_address, from_who] + del self._cache[external_address, from_who, None] else: - return cached_client + if return_from_cache: + return cached_client, True + else: + return cached_client address = self.get_internal_address(external_address) if address is None: # no inner address, just use external address address = external_address client_type: Type[Client] = get_client_type(address) - local_address = ( - self._curr_external_addresses[0] if self._curr_external_addresses else None - ) + client = await self._create_client(client_type, address, **kw) + if cached: + self._cache[external_address, from_who, None] = client + if return_from_cache: + return client, False + else: + return client + + async def _create_client( + self, client_type: Type[Client], address: str, **kw + ) -> Client: config = client_type.parse_config(self._comm_config) if config: kw["config"] = config - client = await client_type.connect(address, local_address=local_address, **kw) + local_address = ( + self._curr_external_addresses[0] if self._curr_external_addresses else None + ) + return await client_type.connect(address, local_address=local_address, **kw) + + def _get_client_type_to_addresses( + self, external_address: str + ) -> Dict[Type[Client], str]: + client_type_to_addresses = dict() + client_type_to_addresses[get_client_type(external_address)] = external_address + if external_address in self._curr_external_addresses: + # local address, use dummy address + addr = self._local_mapping.get(external_address) + client_type = get_client_type(addr) + client_type_to_addresses[client_type] = addr + if external_address in self._mapping: + # try to lookup inner address from address mapping + addr = self._mapping.get(external_address) + client_type = get_client_type(addr) + client_type_to_addresses[client_type] = addr + return client_type_to_addresses + + def get_all_client_types(self, external_address: str) -> List[Type[Client]]: + return list(self._get_client_type_to_addresses(external_address)) + + async def get_client_via_type( + self, + external_address: str, + client_type: Type[Client], + from_who: Any = None, + cached: bool = True, + return_from_cache=False, + **kw, + ) -> Union[Client, Tuple[Client, bool]]: + if cached and (external_address, from_who, client_type) in self._cache: + cached_client = self._cache[external_address, from_who, client_type] + if cached_client.closed: + # closed before, ignore it + del self._cache[external_address, from_who, client_type] + else: + if return_from_cache: + return cached_client, True + else: + return cached_client + + client_type_to_addresses = self._get_client_type_to_addresses(external_address) + if client_type not in client_type_to_addresses: + raise ValueError( + f"Client type({client_type}) is not supported for {external_address}" + ) + address = client_type_to_addresses[client_type] + client = await self._create_client(client_type, address, **kw) if cached: - self._cache[external_address, from_who] = client - return client + self._cache[external_address, from_who, client_type] = client + if return_from_cache: + return client, False + else: + return client diff --git a/mars/oscar/backends/transfer.py b/mars/oscar/backends/transfer.py new file mode 100644 index 0000000000..2c26fcc676 --- /dev/null +++ b/mars/oscar/backends/transfer.py @@ -0,0 +1,267 @@ +# Copyright 2022 XProbe Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import contextlib +import sys +from typing import List, Union + +from ...lib.aio import AioFileObject +from ..core import BufferRef, FileObjectRef +from .communication import Client, Channel +from .message import ( + CopytoBuffersMessage, + CopytoFileObjectsMessage, + new_message_id, + MessageType, + ResultMessage, + ErrorMessage, + ControlMessage, + ControlMessageType, +) +from .router import Router + + +DEFAULT_TRANSFER_BLOCK_SIZE = 4 * 1024**2 + + +def _get_buffer_size(buf) -> int: + try: + return buf.nbytes + except AttributeError: # pragma: no cover + return len(buf) + + +class TransferClient: + def __init__(self): + self._lock = asyncio.Lock() + + @staticmethod + def _handle_ack(message: Union[ResultMessage, ErrorMessage]): + if message.message_type == MessageType.result: + assert message.result + else: + assert message.message_type == MessageType.error + raise message.error.with_traceback(message.traceback) + + async def copyto_via_buffers( + self, local_buffers: list, remote_buffer_refs: List[BufferRef] + ): + assert ( + len({ref.address for ref in remote_buffer_refs}) == 1 + ), "remote buffers for `copy_via_buffers` can support only 1 destination" + assert len(local_buffers) == len(remote_buffer_refs), ( + f"Buffers from local and remote must have same size, " + f"local: {len(local_buffers)}, remote: {len(remote_buffer_refs)}" + ) + + router = Router.get_instance() + assert router is not None, "`copy_via_buffers` can only be used inside pools" + address = remote_buffer_refs[0].address + client_types = router.get_all_client_types(address) + message = CopytoBuffersMessage( + message_id=new_message_id(), buffer_refs=remote_buffer_refs + ) + try: + # use the client that supports buffer copy + client_type = next( + client_type + for client_type in client_types + if hasattr(client_type, "send_buffers") + ) + except StopIteration: + # do not support buffer copy + # send data in batches + client, is_cached = await router.get_client( + address, from_who=self, return_from_cache=True + ) + if not is_cached: + # tell server to switch to transfer dedicated channel + await client.send(self._gen_switch_to_transfer_control_message()) + + async with self._lock: + await client.send(message) + self._handle_ack(await client.recv()) + await self._send_buffers_in_batches(local_buffers, client) + else: + client, is_cached = await router.get_client_via_type( + address, client_type, from_who=self, return_from_cache=True + ) + if not is_cached: + # tell server to switch to transfer dedicated channel + await client.send(self._gen_switch_to_transfer_control_message()) + + async with self._lock: + await client.send(message) + self._handle_ack(await client.recv()) + await client.send_buffers(local_buffers) + + @staticmethod + def _gen_switch_to_transfer_control_message(): + return ControlMessage( + message_id=new_message_id(), + control_message_type=ControlMessageType.switch_to_transfer, + ) + + @classmethod + async def _send_buffers_in_batches(cls, local_buffers: list, client: Client): + for buffer in local_buffers: + i = 0 + while True: + curr_buf = buffer[ + i + * DEFAULT_TRANSFER_BLOCK_SIZE : (i + 1) + * DEFAULT_TRANSFER_BLOCK_SIZE + ] + size = _get_buffer_size(curr_buf) + if size == 0: + break + await client.send(curr_buf) + # ack + message = await client.recv() + cls._handle_ack(message) + i += 1 + + async def copyto_via_file_objects( + self, + local_file_objects: List[AioFileObject], + remote_file_object_refs: List[FileObjectRef], + ): + assert ( + len({ref.address for ref in remote_file_object_refs}) == 1 + ), "remote file objects for `copyto_via_file_objects` can support only 1 destination" + + router = Router.get_instance() + assert ( + router is not None + ), "`copyto_via_file_objects` can only be used inside pools" + address = remote_file_object_refs[0].address + client, is_cached = await router.get_client( + address, from_who=self, return_from_cache=True + ) + if not is_cached: + # tell server to switch to transfer dedicated channel + await client.send(self._gen_switch_to_transfer_control_message()) + + message = CopytoFileObjectsMessage( + message_id=new_message_id(), fileobj_refs=remote_file_object_refs + ) + async with self._lock: + await client.send(message) + for fileobj in local_file_objects: + while True: + buf = await fileobj.read(DEFAULT_TRANSFER_BLOCK_SIZE) + size = _get_buffer_size(buf) + if size > 0: + await client.send(buf) + # ack + message = await client.recv() + self._handle_ack(message) + else: + await client.send(None) + break + + +@contextlib.asynccontextmanager +async def _catch_error(channel: Channel, message_id: bytes): + try: + yield + await channel.send(ResultMessage(message_id=message_id, result=True)) + except: # noqa: E722 # nosec # pylint: disable=bare-except # pragma: no cover + et, err, tb = sys.exc_info() + await channel.send( + ErrorMessage( + message_id=message_id, + error_type=et, + error=err, + traceback=tb, + ) + ) + raise + + +class TransferServer: + @classmethod + async def handle_transfer_channel(cls, channel: Channel, stopped: asyncio.Event): + while not stopped.is_set(): + try: + message = await channel.recv() + except EOFError: # pragma: no cover + # no data to read, check channel + try: + await channel.close() + except (ConnectionError, EOFError): + # close failed, ignore + pass + return + assert message.message_type in ( + MessageType.copyto_buffers, + MessageType.copyto_fileobjects, + ) + await cls._process_message(message, channel) + + @classmethod + async def _process_message( + cls, + message: Union[CopytoBuffersMessage, CopytoFileObjectsMessage], + channel: Channel, + ): + if isinstance(message, CopytoBuffersMessage): + async with _catch_error(channel, message.message_id): + buffers = [ + BufferRef.get_buffer(buffer_ref) + for buffer_ref in message.buffer_refs + ] + if hasattr(channel, "recv_buffers"): + await channel.recv_buffers(buffers) + else: + await cls._recv_buffers_in_batches(message, buffers, channel) + else: + assert isinstance(message, CopytoFileObjectsMessage) + async with _catch_error(channel, message.message_id): + file_objects = [ + FileObjectRef.get_file_object(ref) for ref in message.fileobj_refs + ] + await cls._recv_file_objects(message, file_objects, channel) + + @classmethod + async def _recv_buffers_in_batches( + cls, message: CopytoBuffersMessage, buffers: list, channel: Channel + ): + for buffer in buffers: + size = _get_buffer_size(buffer) + acc = 0 + while True: + async with _catch_error(channel, message.message_id): + recv_buffer = await channel.recv() + cur_size = _get_buffer_size(recv_buffer) + buffer[acc : acc + cur_size] = recv_buffer + acc += cur_size + if acc >= size: + break + + @classmethod + async def _recv_file_objects( + cls, + message: CopytoFileObjectsMessage, + file_objects: List[AioFileObject], + channel: Channel, + ): + for fileobj in file_objects: + while True: + recv_buffer = await channel.recv() + if recv_buffer is None: + break + async with _catch_error(channel, message.message_id): + await fileobj.write(recv_buffer) diff --git a/mars/oscar/context.pyx b/mars/oscar/context.pyx index 7ade51e880..d8e124dd90 100644 --- a/mars/oscar/context.pyx +++ b/mars/oscar/context.pyx @@ -13,8 +13,11 @@ # limitations under the License. from urllib.parse import urlparse +from typing import Any, List -from .core cimport ActorRef +from ..lib.aio import AioFileObject +from .._utils cimport new_random_id +from .core cimport ActorRef, BufferRef, FileObjectRef from .utils cimport new_actor_id from .utils import create_actor_ref @@ -178,6 +181,69 @@ cdef class BaseActorContext: """ raise NotImplementedError + def buffer_ref(self, str address, object buf) -> BufferRef: + """ + Create a reference to a buffer + + Parameters + ---------- + address + address of the actor pool + buf + buffer object + + Returns + ------- + BufferRef + """ + return BufferRef.create(buf, address, new_random_id(32)) + + def file_object_ref(self, str address, object file_object) -> FileObjectRef: + """ + Create a reference to a file object + + Parameters + ---------- + address + address of the actor pool + file_object + file object + + Returns + ------- + FileObjectRef + """ + return FileObjectRef.create(file_object, address, new_random_id(32)) + + async def copyto_via_buffers(self, local_buffers: list, remote_buffer_refs: List[BufferRef]): + """ + Copy local buffers to remote buffers. + + Parameters + ---------- + local_buffers + Local buffers. + remote_buffer_refs + Remote buffer refs + """ + raise NotImplementedError + + async def copyto_via_file_objects( + self, + local_file_objects: List[AioFileObject], + remote_file_object_refs: List[FileObjectRef] + ): + """ + Copy data from local file objects into remote file objects. + + Parameters + ---------- + local_file_objects + Local file objects + remote_file_object_refs + Remote file object refs + """ + cdef class ClientActorContext(BaseActorContext): """ @@ -256,6 +322,30 @@ cdef class ClientActorContext(BaseActorContext): context = self._get_backend_context(address) return context.get_pool_config(address) + def buffer_ref(self, str address, buf: Any) -> BufferRef: + context = self._get_backend_context(address) + return context.buffer_ref(address, buf) + + def file_object_ref(self, str address, file_object: AioFileObject) -> FileObjectRef: + context = self._get_backend_context(address) + return context.file_object_ref(address, file_object) + + def copyto_via_buffers(self, local_buffers: list, remote_buffer_refs: List[BufferRef]): + if remote_buffer_refs: + address = remote_buffer_refs[0].address + context = self._get_backend_context(address) + return context.copyto_via_buffers(local_buffers, remote_buffer_refs) + + def copyto_via_file_objects( + self, + local_file_objects: List[AioFileObject], + remote_file_object_refs: List[FileObjectRef] + ): + if remote_file_object_refs: + address = remote_file_object_refs[0].address + context = self._get_backend_context(address) + return context.copyto_via_file_objects(local_file_objects, remote_file_object_refs) + def register_backend_context(scheme, cls): assert issubclass(cls, BaseActorContext) diff --git a/mars/oscar/core.pxd b/mars/oscar/core.pxd index fcfeb726b1..730b734f01 100644 --- a/mars/oscar/core.pxd +++ b/mars/oscar/core.pxd @@ -25,6 +25,16 @@ cdef class LocalActorRef(ActorRef): cdef _weakref_local_actor(self) +cdef class BufferRef: + cdef public str address + cdef public bytes uid + + +cdef class FileObjectRef: + cdef public str address + cdef public bytes uid + + cdef class _BaseActor: cdef object __weakref__ cdef str _address diff --git a/mars/oscar/core.pyx b/mars/oscar/core.pyx index 7f06d8aaf6..73237eed3d 100644 --- a/mars/oscar/core.pyx +++ b/mars/oscar/core.pyx @@ -17,10 +17,11 @@ import inspect import logging import sys import weakref -from typing import AsyncGenerator +from typing import AsyncGenerator, Any cimport cython +from ..lib.aio import AioFileObject from .context cimport get_context from .errors import Return, ActorNotExist from .utils cimport is_async_generator @@ -545,3 +546,79 @@ cdef class _FakeLock: cdef class _StatelessActor(_BaseActor): def _create_lock(self): return _FakeLock() + + +cdef class BufferRef: + """ + Reference of a buffer + """ + _ref_to_buffers = weakref.WeakValueDictionary() + + def __init__(self, str address, bytes uid): + self.uid = uid + self.address = address + + @classmethod + def create(cls, buffer: Any, address: str, uid: bytes) -> "BufferRef": + ref = BufferRef(address, uid) + cls._ref_to_buffers[ref] = buffer + return ref + + @classmethod + def get_buffer(cls, ref: "BufferRef"): + return cls._ref_to_buffers[ref] + + def __getstate__(self): + return self.uid, self.address + + def __setstate__(self, state): + self.uid, self.address = state + + def __hash__(self): + return hash((self.address, self.uid)) + + def __eq__(self, other): + if type(other) != BufferRef: + return False + return self.address == other.address and self.uid == other.uid + + def __repr__(self): + return f'BufferRef(uid={self.uid}, address={self.address}' + + +cdef class FileObjectRef: + """ + Reference of a buffer + """ + _ref_to_fileobjs = weakref.WeakValueDictionary() + + def __init__(self, str address, bytes uid): + self.uid = uid + self.address = address + + @classmethod + def create(cls, fileobj: AioFileObject, address: str, uid: bytes) -> "FileObjectRef": + ref = FileObjectRef(address, uid) + cls._ref_to_fileobjs[ref] = fileobj + return ref + + @classmethod + def get_file_object(cls, ref: "FileObjectRef") -> AioFileObject: + return cls._ref_to_fileobjs[ref] + + def __getstate__(self): + return self.uid, self.address + + def __setstate__(self, state): + self.uid, self.address = state + + def __hash__(self): + return hash((self.address, self.uid)) + + def __eq__(self, other): + if type(other) != FileObjectRef: + return False + return self.address == other.address and self.uid == other.uid + + def __repr__(self): + return f'FileObjectRef(uid={self.uid}, address={self.address}' diff --git a/mars/oscar/utils.pyx b/mars/oscar/utils.pyx index a8d7b2d2be..9112e4afba 100644 --- a/mars/oscar/utils.pyx +++ b/mars/oscar/utils.pyx @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import weakref from typing import AsyncGenerator from .._utils cimport to_str, new_random_id diff --git a/mars/serialization/aio.py b/mars/serialization/aio.py index b03b97e213..f1c03b85f7 100644 --- a/mars/serialization/aio.py +++ b/mars/serialization/aio.py @@ -15,12 +15,12 @@ import asyncio import struct from io import BytesIO -from typing import Any, Union, BinaryIO +from typing import Any import cloudpickle import numpy as np -from ..utils import lazy_import +from ..utils import lazy_import, is_cuda_buffer from .core import serialize_with_spawn, deserialize rmm = lazy_import("rmm") @@ -40,10 +40,7 @@ async def _get_buffers(self): self._obj, spawn_threshold=DEFAULT_SPAWN_THRESHOLD ) - def _is_cuda_buffer(buf: Union["rmm.DeviceBuffer", BinaryIO]): - return hasattr(buf, "__cuda_array_interface__") - - is_cuda_buffers = [_is_cuda_buffer(buf) for buf in buffers] + is_cuda_buffers = [is_cuda_buffer(buf) for buf in buffers] headers[0]["is_cuda_buffers"] = np.array(is_cuda_buffers) # add buffer lengths into headers diff --git a/mars/utils.py b/mars/utils.py index 3ada0a361e..492155d487 100644 --- a/mars/utils.py +++ b/mars/utils.py @@ -1893,3 +1893,24 @@ def clean_mars_tmp_dir(): # on windows platform, raise Permission Error _windows: bool = sys.platform.startswith("win") shutil.rmtree(mars_tmp_dir, ignore_errors=_windows) + + +_cupy = lazy_import("cupy") +_rmm = lazy_import("rmm") + + +def is_cuda_buffer(cuda_buffer: Union["_cupy.ndarray", "_rmm.DeviceBuffer"]) -> bool: + return hasattr(cuda_buffer, "__cuda_array_interface__") + + +def convert_to_cupy_ndarray( + cuda_buffer: Union["_cupy.ndarray", "_rmm.DeviceBuffer"] +) -> "_cupy.ndarray": + if isinstance(cuda_buffer, _cupy.ndarray): + return cuda_buffer + + size = cuda_buffer.nbytes + data = cuda_buffer.__cuda_array_interface__["data"][0] + memory = _cupy.cuda.UnownedMemory(data, size, cuda_buffer) + ptr = _cupy.cuda.MemoryPointer(memory, 0) + return _cupy.ndarray(shape=size, dtype="u1", memptr=ptr) From 499de464d49e3562c051c0dc628152da99c06797 Mon Sep 17 00:00:00 2001 From: qianduoduo0904 Date: Mon, 5 Dec 2022 20:25:35 +0800 Subject: [PATCH 02/18] Fix --- mars/oscar/backends/transfer.py | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/mars/oscar/backends/transfer.py b/mars/oscar/backends/transfer.py index 2c26fcc676..937b1d5b44 100644 --- a/mars/oscar/backends/transfer.py +++ b/mars/oscar/backends/transfer.py @@ -160,7 +160,8 @@ async def copyto_via_file_objects( async with self._lock: await client.send(message) for fileobj in local_file_objects: - while True: + finished = False + while not finished: buf = await fileobj.read(DEFAULT_TRANSFER_BLOCK_SIZE) size = _get_buffer_size(buf) if size > 0: @@ -170,7 +171,10 @@ async def copyto_via_file_objects( self._handle_ack(message) else: await client.send(None) - break + # ack + message = await client.recv() + self._handle_ack(message) + finished = True @contextlib.asynccontextmanager @@ -259,9 +263,16 @@ async def _recv_file_objects( channel: Channel, ): for fileobj in file_objects: - while True: + finished = False + while not finished: recv_buffer = await channel.recv() - if recv_buffer is None: - break - async with _catch_error(channel, message.message_id): - await fileobj.write(recv_buffer) + if recv_buffer is not None: + # not finished, receive part data + async with _catch_error(channel, message.message_id): + await fileobj.write(recv_buffer) + else: + # done, send ack + await channel.send( + ResultMessage(message_id=message.message_id, result=True) + ) + finished = True From 945e8dd4049dd7559438f4ccfd9dc90ae47d5ffe Mon Sep 17 00:00:00 2001 From: qianduoduo0904 Date: Mon, 5 Dec 2022 20:37:25 +0800 Subject: [PATCH 03/18] Fix --- .../backends/mars/tests/test_transfer.py | 53 ++++++++++--------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/mars/oscar/backends/mars/tests/test_transfer.py b/mars/oscar/backends/mars/tests/test_transfer.py index 5d73695b2b..196a068027 100644 --- a/mars/oscar/backends/mars/tests/test_transfer.py +++ b/mars/oscar/backends/mars/tests/test_transfer.py @@ -13,6 +13,7 @@ # limitations under the License. import os +import shutil import sys import tempfile from typing import List @@ -202,27 +203,31 @@ async def test_copy_via_fileobjects(): subprocess_start_method=start_method, ) - with tempfile.TemporaryDirectory() as d: - async with pool: - ctx = get_context() - - # actor on main pool - actor_ref1 = await ctx.create_actor( - FileobjTransferActor, - uid="test-1", - address=pool.external_address, - allocate_strategy=ProcessIndex(1), - ) - actor_ref2 = await ctx.create_actor( - FileobjTransferActor, - uid="test-2", - address=pool.external_address, - allocate_strategy=ProcessIndex(2), - ) - sizes = [10 * 1024**2, 3 * 1024**2] - names = [] - for _ in range(2 * len(sizes)): - _, p = tempfile.mkstemp(dir=d) - names.append(p) - - await actor_ref1.copy_data(actor_ref2, names[::2], names[1::2], sizes=sizes) + d = tempfile.mkdtemp() + async with pool: + ctx = get_context() + + # actor on main pool + actor_ref1 = await ctx.create_actor( + FileobjTransferActor, + uid="test-1", + address=pool.external_address, + allocate_strategy=ProcessIndex(1), + ) + actor_ref2 = await ctx.create_actor( + FileobjTransferActor, + uid="test-2", + address=pool.external_address, + allocate_strategy=ProcessIndex(2), + ) + sizes = [10 * 1024**2, 3 * 1024**2] + names = [] + for _ in range(2 * len(sizes)): + _, p = tempfile.mkstemp(dir=d) + names.append(p) + + await actor_ref1.copy_data(actor_ref2, names[::2], names[1::2], sizes=sizes) + try: + shutil.rmtree(d) + except PermissionError: + pass From 36239fc6f293ab3ad0c4de3de91c0d4d4ff54a29 Mon Sep 17 00:00:00 2001 From: ChengjieLi Date: Tue, 6 Dec 2022 11:30:39 +0800 Subject: [PATCH 04/18] Use plugin to fix cov issue --- .github/workflows/core-gpu-ci.yml | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/.github/workflows/core-gpu-ci.yml b/.github/workflows/core-gpu-ci.yml index 48db698c9c..2a6b1fc8ac 100644 --- a/.github/workflows/core-gpu-ci.yml +++ b/.github/workflows/core-gpu-ci.yml @@ -46,12 +46,12 @@ jobs: - name: Run GPU CI shell: bash run: | - coverage run -m pytest ./ -m cuda --cov-config=setup.cfg --cov-report= --cov=mars - coverage xml + pytest ./ -m cuda --cov-config=setup.cfg --cov-report=xml --cov=mars - - name: Report coverage data - shell: bash - run: | - bash <(curl -s https://codecov.io/bash) - rm -rf *.coverage* - rm -rf coverage.xml + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v3 + with: + fail_ci_if_error: true + flags: unittests + name: codecov-gpu + verbose: true From d1e20769406048d651aacf3eaf10ed2cc32ef770 Mon Sep 17 00:00:00 2001 From: qianduoduo0904 Date: Tue, 6 Dec 2022 10:30:45 +0000 Subject: [PATCH 05/18] Try to increase coverage --- .../backends/communication/tests/test_comm.py | 31 +++++++- mars/oscar/backends/communication/ucx.py | 4 +- mars/oscar/backends/pool.py | 2 +- mars/oscar/backends/tests/__init__.py | 0 mars/oscar/backends/tests/test_router.py | 71 +++++++++++++++++++ mars/oscar/backends/transfer.py | 2 +- 6 files changed, 104 insertions(+), 6 deletions(-) create mode 100644 mars/oscar/backends/tests/__init__.py create mode 100644 mars/oscar/backends/tests/test_router.py diff --git a/mars/oscar/backends/communication/tests/test_comm.py b/mars/oscar/backends/communication/tests/test_comm.py index 6906892d52..db6f3f91b4 100644 --- a/mars/oscar/backends/communication/tests/test_comm.py +++ b/mars/oscar/backends/communication/tests/test_comm.py @@ -22,7 +22,7 @@ import pytest from .....lib.aio import AioEvent -from .....tests.core import require_cudf, require_cupy +from .....tests.core import require_cudf, require_cupy, require_ucx from .....utils import get_next_port, lazy_import from .. import ( Channel, @@ -37,6 +37,7 @@ DummyClient, Server, UCXServer, + UCXChannel, ) from ..ucx import UCXInitializer @@ -222,6 +223,34 @@ async def test_multiprocess_cuda_comm(server_type): await client.close() +@require_ucx +@pytest.mark.asyncio +async def test_ucx_channel(): + size = 2**5 + + async def handle_channel(channel: UCXChannel): + buffer = np.empty(size, dtype="u1") + await channel.recv_buffers([buffer]) + await channel.send_buffers([buffer]) + + # create server + addr = f"127.0.0.1:{get_next_port()}" + server = await UCXServer.create({"address": addr, "handle_channel": handle_channel}) + await server.start() + assert isinstance(server.info, dict) + + # create client + client = await UCXServer.client_type.connect(addr) + buf = np.zeros(size, dtype="u1") + buf += 1 + await client.send_buffers([buf]) + new_buf = np.empty_like(buf) + await client.recv_buffers([new_buf]) + np.testing.assert_array_equal(buf, new_buf) + + await server.stop() + + def test_get_client_type(): assert issubclass(get_client_type("127.0.0.1"), SocketClient) assert issubclass(get_client_type("unixsocket:///1"), UnixSocketClient) diff --git a/mars/oscar/backends/communication/ucx.py b/mars/oscar/backends/communication/ucx.py index af35a8c638..918c5b272c 100644 --- a/mars/oscar/backends/communication/ucx.py +++ b/mars/oscar/backends/communication/ucx.py @@ -172,9 +172,7 @@ def init(ucx_config: dict): new_environ.update(envs) os.environ = new_environ try: - ucp.init( - options=options, env_takes_precedence=True, blocking_progress_mode=False - ) + ucp.init(options=options, env_takes_precedence=True) finally: os.environ = original_environ diff --git a/mars/oscar/backends/pool.py b/mars/oscar/backends/pool.py index ce95679628..9704af31bb 100644 --- a/mars/oscar/backends/pool.py +++ b/mars/oscar/backends/pool.py @@ -502,7 +502,7 @@ async def _recv_message(cls, channel: Channel): # no data to read, check channel try: await channel.close() - except (ConnectionError, EOFError): + except (ConnectionError, EOFError): # pragma: no cover # close failed, ignore pass diff --git a/mars/oscar/backends/tests/__init__.py b/mars/oscar/backends/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/mars/oscar/backends/tests/test_router.py b/mars/oscar/backends/tests/test_router.py new file mode 100644 index 0000000000..c23f705eae --- /dev/null +++ b/mars/oscar/backends/tests/test_router.py @@ -0,0 +1,71 @@ +# Copyright 2022 XProbe Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from ....tests.core import mock +from ..communication import SocketClient, UnixSocketClient, DummyClient +from ..router import Router + + +@pytest.mark.asyncio +@mock.patch.object(Router, "_create_client") +async def test_router(fake_create_client): + class FakeClient: + def __init__(self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + self.closed = False + + fake_create_client.side_effect = FakeClient + + router = Router( + external_addresses=["test"], + local_address="dummy://1", + mapping={ + "test": "unixsocket://local_test1", + "test2": "unixsocket://local_test2", + "test3": "unixsocket://local_test3", + }, + ) + client, is_cache = await router.get_client("test2", return_from_cache=True) + assert not is_cache + client2, is_cache = await router.get_client("test2", return_from_cache=True) + assert is_cache + assert client2 is client + # close fake client + client.closed = True + client3, is_cache = await router.get_client("test2", return_from_cache=True) + assert not is_cache + assert client3 is not client + + all_client_tyeps = router.get_all_client_types("test") + assert set(all_client_tyeps) == {UnixSocketClient, SocketClient, DummyClient} + + client = await router.get_client_via_type("test", DummyClient) + client2, is_cache = await router.get_client_via_type( + "test", DummyClient, return_from_cache=True + ) + assert client is client2 + assert is_cache + # close client + client.closed = True + client3, is_cache = await router.get_client_via_type( + "test", DummyClient, return_from_cache=True + ) + assert not is_cache + assert client is not client3 + client4 = await router.get_client_via_type("test", DummyClient) + assert client3 is client4 + assert client3.args[-1].startswith("dummy://") diff --git a/mars/oscar/backends/transfer.py b/mars/oscar/backends/transfer.py index 937b1d5b44..64056c3f7f 100644 --- a/mars/oscar/backends/transfer.py +++ b/mars/oscar/backends/transfer.py @@ -51,7 +51,7 @@ def __init__(self): def _handle_ack(message: Union[ResultMessage, ErrorMessage]): if message.message_type == MessageType.result: assert message.result - else: + else: # pragma: no cover assert message.message_type == MessageType.error raise message.error.with_traceback(message.traceback) From 613eb243cd05bc35001adef1ab29ef1a842937ad Mon Sep 17 00:00:00 2001 From: qianduoduo0904 Date: Wed, 7 Dec 2022 03:27:06 +0000 Subject: [PATCH 06/18] Fix --- mars/oscar/backends/tests/__init__.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/mars/oscar/backends/tests/__init__.py b/mars/oscar/backends/tests/__init__.py index e69de29bb2..313d6ba7ab 100644 --- a/mars/oscar/backends/tests/__init__.py +++ b/mars/oscar/backends/tests/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022 XProbe Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. From 762339caf6bcbff63441da5d7d5c4d6270312412 Mon Sep 17 00:00:00 2001 From: qianduoduo0904 Date: Wed, 7 Dec 2022 08:55:33 +0000 Subject: [PATCH 07/18] Parallelize send&recv in batches --- mars/oscar/backends/message.pyi | 4 + mars/oscar/backends/message.pyx | 10 ++ mars/oscar/backends/transfer.py | 301 ++++++++++++++++++++++++-------- 3 files changed, 245 insertions(+), 70 deletions(-) diff --git a/mars/oscar/backends/message.pyi b/mars/oscar/backends/message.pyi index e8a252549c..d8192d5f7a 100644 --- a/mars/oscar/backends/message.pyi +++ b/mars/oscar/backends/message.pyi @@ -219,11 +219,13 @@ class CopytoBuffersMessage(_MessageBase): message_type = MessageType.copyto_buffers buffer_refs: List[BufferRef] + content: object def __int__( self, message_id: bytes = None, buffer_refs: List[BufferRef] = None, + content: object = None, protocol: int = DEFAULT_PROTOCOL, message_trace: list = None, ): ... @@ -232,11 +234,13 @@ class CopytoFileObjectsMessage(_MessageBase): message_type = MessageType.copyto_file_objects fileobj_refs: List[FileObjectRef] + content: object def __int__( self, message_id: bytes = None, fileobj_refs: List[FileObjectRef] = None, + content: object = None, protocol: int = DEFAULT_PROTOCOL, message_trace: list = None, ): ... diff --git a/mars/oscar/backends/message.pyx b/mars/oscar/backends/message.pyx index 834a0bb983..61823c7c87 100644 --- a/mars/oscar/backends/message.pyx +++ b/mars/oscar/backends/message.pyx @@ -493,11 +493,13 @@ cdef class CopytoBuffersMessage(_MessageBase): cdef: public list buffer_refs + public object content def __init__( self, bytes message_id = None, list buffer_refs = None, + object content = None, int protocol = _DEFAULT_PROTOCOL, list message_trace = None, ): @@ -508,6 +510,7 @@ cdef class CopytoBuffersMessage(_MessageBase): message_trace=message_trace ) self.buffer_refs = buffer_refs + self.content = content cdef _MessageSerialItem serial(self): cdef _MessageSerialItem item = _MessageBase.serial(self) @@ -516,6 +519,7 @@ cdef class CopytoBuffersMessage(_MessageBase): buffer_ref.address, buffer_ref.uid ) item.serialized += (len(self.buffer_refs),) + item.subs = [self.content] return item cdef deserial_members(self, tuple serialized, list subs): @@ -526,17 +530,20 @@ cdef class CopytoBuffersMessage(_MessageBase): refs.append(BufferRef(address, uid)) assert len(refs) == size self.buffer_refs = refs + self.content = subs[0] cdef class CopytoFileObjectsMessage(_MessageBase): message_type = MessageType.copyto_fileobjects cdef: public list fileobj_refs + public object content def __init__( self, bytes message_id = None, list fileobj_refs = None, + object content = None, int protocol = _DEFAULT_PROTOCOL, list message_trace = None, ): @@ -547,6 +554,7 @@ cdef class CopytoFileObjectsMessage(_MessageBase): message_trace=message_trace ) self.fileobj_refs = fileobj_refs + self.content = content cdef _MessageSerialItem serial(self): cdef _MessageSerialItem item = _MessageBase.serial(self) @@ -555,6 +563,7 @@ cdef class CopytoFileObjectsMessage(_MessageBase): fileobj_ref.address, fileobj_ref.uid ) item.serialized += (len(self.fileobj_refs),) + item.subs = [self.content] return item cdef deserial_members(self, tuple serialized, list subs): @@ -565,6 +574,7 @@ cdef class CopytoFileObjectsMessage(_MessageBase): refs.append(FileObjectRef(address, uid)) assert len(refs) == size self.fileobj_refs = refs + self.content = subs[0] cdef dict _message_type_to_message_cls = { diff --git a/mars/oscar/backends/transfer.py b/mars/oscar/backends/transfer.py index 64056c3f7f..003b6ca1ad 100644 --- a/mars/oscar/backends/transfer.py +++ b/mars/oscar/backends/transfer.py @@ -14,8 +14,11 @@ import asyncio import contextlib +import itertools import sys -from typing import List, Union +import weakref +from abc import ABC, abstractmethod +from typing import List, Union, Any from ...lib.aio import AioFileObject from ..core import BufferRef, FileObjectRef @@ -43,18 +46,216 @@ def _get_buffer_size(buf) -> int: return len(buf) +def _handle_ack(message: Union[ResultMessage, ErrorMessage]): + if message.message_type == MessageType.result: + assert message.result + else: # pragma: no cover + assert message.message_type == MessageType.error + raise message.error.with_traceback(message.traceback) + + +class _ParallelSender(ABC): + _message_id_to_futures: weakref.WeakValueDictionary + + def __init__( + self, + client: Client, + local_objs: list, + remote_obj_refs: List[Union[BufferRef, FileObjectRef]], + ): + self.client = client + self.local_objs = local_objs + self.remote_obj_refs = remote_obj_refs + + self._message_id_to_futures = weakref.WeakValueDictionary() + self._n_send = [0] * len(local_objs) + + @staticmethod + @abstractmethod + def _new_message(ref: Union[BufferRef, FileObjectRef], buf: Any): + pass + + @abstractmethod + async def _read(self, index: int, buffer_or_fileobj: Any): + pass + + async def _send_one( + self, + index: int, + buffer_or_fileobj: Any, + remote_ref: Union[BufferRef, FileObjectRef], + ): + while True: + part_buf = await self._read(index, buffer_or_fileobj) + size = _get_buffer_size(part_buf) + if size == 0: + break + fut = asyncio.get_running_loop().create_future() + message = self._new_message(remote_ref, part_buf) + self._message_id_to_futures[message.message_id] = fut + await self.client.send(message) + self._n_send[index] += size + await fut + + async def _recv_ack_in_background(self): + while True: + ack_message = await self.client.recv() + if ack_message is None: + # receive finished + break + fut: asyncio.Future = self._message_id_to_futures[ack_message.message_id] + try: + _handle_ack(ack_message) + except BaseException as e: # pragma: no cover # noqa: E722 # nosec # pylint: disable=bare-except + fut.set_exception(e) + else: + fut.set_result(ack_message.result) + + async def start(self): + recv_task = asyncio.create_task(self._recv_ack_in_background()) + tasks = [] + for i, local_obj, remote_obj_ref in zip( + itertools.count(0), self.local_objs, self.remote_obj_refs + ): + tasks.append(self._send_one(i, local_obj, remote_obj_ref)) + try: + await asyncio.gather(*tasks) + finally: + # all send finished, send a None to receiver + await self.client.send(None) + await recv_task + + +class _ParallelReceiver(ABC): + def __init__( + self, + channel: Channel, + local_objs: list, + remote_obj_refs: List[Union[BufferRef, FileObjectRef]], + ): + self.channel = channel + self.local_objs = local_objs + self.remote_obj_refs = remote_obj_refs + + self._n_recv = [0] * len(local_objs) + self._ref_to_i = {ref: i for i, ref in enumerate(remote_obj_refs)} + + @abstractmethod + async def _write(self, index: int, buffer_or_fileobj: Any): + pass + + @staticmethod + @staticmethod + def _get_ref_from_message( + message: Union[CopytoBuffersMessage, CopytoFileObjectsMessage] + ): + pass + + async def _recv_part(self, buf: Any, index: int, message_id: bytes): + async with _catch_error(self.channel, message_id): + await self._write(index, buf) + self._n_recv[index] += _get_buffer_size(buf) + + async def start(self): + tasks = [] + while True: + message = await self.channel.recv() + if message is None: + # send finished + break + message_id = message.message_id + buf = message.content + ref = self._get_ref_from_message(message)[0] + i = self._ref_to_i[ref] + tasks.append(asyncio.create_task(self._recv_part(buf, i, message_id))) + try: + await asyncio.gather(*tasks) + finally: + # when all done, send a None to finish client + await self.channel.send(None) + + +class _BufferSender(_ParallelSender): + def __init__( + self, client: Client, local_buffers: list, remote_buffer_refs: List[BufferRef] + ): + super().__init__(client, local_buffers, remote_buffer_refs) + + @staticmethod + def _new_message(ref: Union[BufferRef, FileObjectRef], buf: Any): + return CopytoBuffersMessage( + message_id=new_message_id(), buffer_refs=[ref], content=buf + ) + + async def _read(self, index: int, buffer_or_fileobj: Any): + size = self._n_send[index] + return buffer_or_fileobj[size : size + DEFAULT_TRANSFER_BLOCK_SIZE] + + +class _BufferReceiver(_ParallelReceiver): + def __init__(self, channel: Channel, buffers: list, buffer_refs: List[BufferRef]): + super().__init__(channel, buffers, buffer_refs) + + async def _write(self, index: int, buffer_or_fileobj: Any): + full_buf = self.local_objs[index] + size = _get_buffer_size(buffer_or_fileobj) + n_recv = self._n_recv[index] + + def copy(): + full_buf[n_recv : n_recv + size] = buffer_or_fileobj + + await asyncio.to_thread(copy) + + @staticmethod + def _get_ref_from_message( + message: Union[CopytoBuffersMessage, CopytoFileObjectsMessage] + ): + return message.buffer_refs + + +class _FileObjectSender(_ParallelSender): + def __init__( + self, + client: Client, + local_file_objects: list, + remote_file_object_refs: List[FileObjectRef], + ): + super().__init__(client, local_file_objects, remote_file_object_refs) + + @staticmethod + def _new_message(ref: Union[BufferRef, FileObjectRef], buf: Any): + return CopytoFileObjectsMessage( + message_id=new_message_id(), fileobj_refs=[ref], content=buf + ) + + async def _read(self, index: int, buffer_or_fileobj: Any): + return await buffer_or_fileobj.read(DEFAULT_TRANSFER_BLOCK_SIZE) + + +class _FileObjectReceiver(_ParallelReceiver): + def __init__( + self, + channel: Channel, + file_objects: list, + file_object_refs: List[FileObjectRef], + ): + super().__init__(channel, file_objects, file_object_refs) + + async def _write(self, index: int, buffer_or_fileobj: Any): + fileobj = self.local_objs[index] + await fileobj.write(buffer_or_fileobj) + + @staticmethod + def _get_ref_from_message( + message: Union[CopytoBuffersMessage, CopytoFileObjectsMessage] + ): + return message.fileobj_refs + + class TransferClient: def __init__(self): self._lock = asyncio.Lock() - @staticmethod - def _handle_ack(message: Union[ResultMessage, ErrorMessage]): - if message.message_type == MessageType.result: - assert message.result - else: # pragma: no cover - assert message.message_type == MessageType.error - raise message.error.with_traceback(message.traceback) - async def copyto_via_buffers( self, local_buffers: list, remote_buffer_refs: List[BufferRef] ): @@ -92,8 +293,10 @@ async def copyto_via_buffers( async with self._lock: await client.send(message) - self._handle_ack(await client.recv()) - await self._send_buffers_in_batches(local_buffers, client) + _handle_ack(await client.recv()) + await self._send_buffers_in_batches( + local_buffers, remote_buffer_refs, client + ) else: client, is_cached = await router.get_client_via_type( address, client_type, from_who=self, return_from_cache=True @@ -104,7 +307,7 @@ async def copyto_via_buffers( async with self._lock: await client.send(message) - self._handle_ack(await client.recv()) + _handle_ack(await client.recv()) await client.send_buffers(local_buffers) @staticmethod @@ -115,23 +318,11 @@ def _gen_switch_to_transfer_control_message(): ) @classmethod - async def _send_buffers_in_batches(cls, local_buffers: list, client: Client): - for buffer in local_buffers: - i = 0 - while True: - curr_buf = buffer[ - i - * DEFAULT_TRANSFER_BLOCK_SIZE : (i + 1) - * DEFAULT_TRANSFER_BLOCK_SIZE - ] - size = _get_buffer_size(curr_buf) - if size == 0: - break - await client.send(curr_buf) - # ack - message = await client.recv() - cls._handle_ack(message) - i += 1 + async def _send_buffers_in_batches( + cls, local_buffers: list, remote_buffer_refs: List[BufferRef], client: Client + ): + sender = _BufferSender(client, local_buffers, remote_buffer_refs) + await sender.start() async def copyto_via_file_objects( self, @@ -159,22 +350,11 @@ async def copyto_via_file_objects( ) async with self._lock: await client.send(message) - for fileobj in local_file_objects: - finished = False - while not finished: - buf = await fileobj.read(DEFAULT_TRANSFER_BLOCK_SIZE) - size = _get_buffer_size(buf) - if size > 0: - await client.send(buf) - # ack - message = await client.recv() - self._handle_ack(message) - else: - await client.send(None) - # ack - message = await client.recv() - self._handle_ack(message) - finished = True + _handle_ack(await client.recv()) + sender = _FileObjectSender( + client, local_file_objects, remote_file_object_refs + ) + await sender.start() @contextlib.asynccontextmanager @@ -243,17 +423,9 @@ async def _process_message( async def _recv_buffers_in_batches( cls, message: CopytoBuffersMessage, buffers: list, channel: Channel ): - for buffer in buffers: - size = _get_buffer_size(buffer) - acc = 0 - while True: - async with _catch_error(channel, message.message_id): - recv_buffer = await channel.recv() - cur_size = _get_buffer_size(recv_buffer) - buffer[acc : acc + cur_size] = recv_buffer - acc += cur_size - if acc >= size: - break + buffer_refs = message.buffer_refs + receiver = _BufferReceiver(channel, buffers, buffer_refs) + await receiver.start() @classmethod async def _recv_file_objects( @@ -262,17 +434,6 @@ async def _recv_file_objects( file_objects: List[AioFileObject], channel: Channel, ): - for fileobj in file_objects: - finished = False - while not finished: - recv_buffer = await channel.recv() - if recv_buffer is not None: - # not finished, receive part data - async with _catch_error(channel, message.message_id): - await fileobj.write(recv_buffer) - else: - # done, send ack - await channel.send( - ResultMessage(message_id=message.message_id, result=True) - ) - finished = True + file_object_refs = message.fileobj_refs + receiver = _FileObjectReceiver(channel, file_objects, file_object_refs) + await receiver.start() From f3a82bde3132e557f8b0f42a3bd3052a9eb01da8 Mon Sep 17 00:00:00 2001 From: qianduoduo0904 Date: Thu, 8 Dec 2022 04:37:40 +0000 Subject: [PATCH 08/18] Increase coverage --- mars/oscar/backends/communication/socket.py | 1 + mars/oscar/backends/communication/ucx.py | 3 ++- mars/oscar/backends/core.py | 2 +- mars/oscar/backends/router.py | 2 +- mars/oscar/backends/transfer.py | 14 +++++++------- 5 files changed, 12 insertions(+), 10 deletions(-) diff --git a/mars/oscar/backends/communication/socket.py b/mars/oscar/backends/communication/socket.py index 753b0c147a..143c329dcc 100644 --- a/mars/oscar/backends/communication/socket.py +++ b/mars/oscar/backends/communication/socket.py @@ -159,6 +159,7 @@ async def stop(self): await asyncio.gather( *(channel.close() for channel in self._channels if not channel.closed) ) + self._channels = [] @property @implements(Server.stopped) diff --git a/mars/oscar/backends/communication/ucx.py b/mars/oscar/backends/communication/ucx.py index 918c5b272c..1fd6cddef2 100644 --- a/mars/oscar/backends/communication/ucx.py +++ b/mars/oscar/backends/communication/ucx.py @@ -308,7 +308,7 @@ async def recv_buffers(self, buffers: list): try: for buffer in buffers: await self.ucp_endpoint.recv(buffer) - except BaseException as e: + except BaseException as e: # pragma: no cover if not self._closed: # In addition to UCX exceptions, may be CancelledError or another # "low-level" exception. The only safe thing to do is to abort. @@ -455,6 +455,7 @@ async def stop(self): await asyncio.gather( *(channel.close() for channel in self._channels if not channel.closed) ) + self._channels = [] self._ucp_listener = None self._closed.set() diff --git a/mars/oscar/backends/core.py b/mars/oscar/backends/core.py index a575bb4212..e331433b4d 100644 --- a/mars/oscar/backends/core.py +++ b/mars/oscar/backends/core.py @@ -39,7 +39,7 @@ def __init__(self): self._clients: Dict[Client, asyncio.Task] = dict() async def get_client(self, router: Router, dest_address: str) -> Client: - client = await router.get_client(dest_address, from_who=self) + client = await router.get_client(dest_address, from_who=type(self)) if client not in self._clients: self._clients[client] = asyncio.create_task(self._listen(client)) self._client_to_message_futures[client] = dict() diff --git a/mars/oscar/backends/router.py b/mars/oscar/backends/router.py index 832170d8a9..7bccebf91f 100644 --- a/mars/oscar/backends/router.py +++ b/mars/oscar/backends/router.py @@ -191,7 +191,7 @@ async def get_client_via_type( return cached_client client_type_to_addresses = self._get_client_type_to_addresses(external_address) - if client_type not in client_type_to_addresses: + if client_type not in client_type_to_addresses: # pragma: no cover raise ValueError( f"Client type({client_type}) is not supported for {external_address}" ) diff --git a/mars/oscar/backends/transfer.py b/mars/oscar/backends/transfer.py index 003b6ca1ad..826ae2926c 100644 --- a/mars/oscar/backends/transfer.py +++ b/mars/oscar/backends/transfer.py @@ -73,11 +73,11 @@ def __init__( @staticmethod @abstractmethod def _new_message(ref: Union[BufferRef, FileObjectRef], buf: Any): - pass + """new message""" @abstractmethod async def _read(self, index: int, buffer_or_fileobj: Any): - pass + """read data""" async def _send_one( self, @@ -142,14 +142,14 @@ def __init__( @abstractmethod async def _write(self, index: int, buffer_or_fileobj: Any): - pass + """write data""" @staticmethod @staticmethod def _get_ref_from_message( message: Union[CopytoBuffersMessage, CopytoFileObjectsMessage] ): - pass + """get ref according to message""" async def _recv_part(self, buf: Any, index: int, message_id: bytes): async with _catch_error(self.channel, message_id): @@ -285,7 +285,7 @@ async def copyto_via_buffers( # do not support buffer copy # send data in batches client, is_cached = await router.get_client( - address, from_who=self, return_from_cache=True + address, from_who=type(self), return_from_cache=True ) if not is_cached: # tell server to switch to transfer dedicated channel @@ -299,7 +299,7 @@ async def copyto_via_buffers( ) else: client, is_cached = await router.get_client_via_type( - address, client_type, from_who=self, return_from_cache=True + address, client_type, from_who=type(self), return_from_cache=True ) if not is_cached: # tell server to switch to transfer dedicated channel @@ -339,7 +339,7 @@ async def copyto_via_file_objects( ), "`copyto_via_file_objects` can only be used inside pools" address = remote_file_object_refs[0].address client, is_cached = await router.get_client( - address, from_who=self, return_from_cache=True + address, from_who=type(self), return_from_cache=True ) if not is_cached: # tell server to switch to transfer dedicated channel From e73f7d106142231adf77be96a78c45a1c56db64f Mon Sep 17 00:00:00 2001 From: qianduoduo0904 Date: Thu, 8 Dec 2022 06:39:04 +0000 Subject: [PATCH 09/18] Fix --- mars/oscar/backends/communication/ucx.py | 9 ++- mars/oscar/backends/router.py | 93 +++++++++++++----------- 2 files changed, 56 insertions(+), 46 deletions(-) diff --git a/mars/oscar/backends/communication/ucx.py b/mars/oscar/backends/communication/ucx.py index 1fd6cddef2..ebb782dba2 100644 --- a/mars/oscar/backends/communication/ucx.py +++ b/mars/oscar/backends/communication/ucx.py @@ -403,7 +403,7 @@ async def serve_forever(client_ucp_endpoint: "ucp.Endpoint"): client_ucp_endpoint, local_address=server.address ) except ChannelClosed: # pragma: no cover - logger.debug("Connection closed before handshake completed") + logger.exception("Connection closed before handshake completed") return ucp_listener = ucp.create_listener(serve_forever, port=port) @@ -492,8 +492,11 @@ async def connect( try: ucp_endpoint = await ucp.create_endpoint(host, port) - except ucp.exceptions.UCXBaseException: # pragma: no cover - raise ChannelClosed("Connection closed before handshake completed") + except ucp.exceptions.UCXBaseException as e: # pragma: no cover + raise ChannelClosed( + f"Connection closed before handshake completed, " + f"local address: {local_address}, dest address: {dest_address}" + ) from e channel = UCXChannel( ucp_endpoint, local_address=local_address, dest_address=dest_address ) diff --git a/mars/oscar/backends/router.py b/mars/oscar/backends/router.py index 7bccebf91f..9eecaffb93 100644 --- a/mars/oscar/backends/router.py +++ b/mars/oscar/backends/router.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import threading from typing import Dict, List, Tuple, Type, Any, Optional, Union @@ -29,6 +30,7 @@ class Router: "_mapping", "_comm_config", "_cache_local", + "_lock", ) _instance: "Router" = None @@ -62,6 +64,7 @@ def __init__( self._mapping = mapping self._comm_config = comm_config or dict() self._cache_local = threading.local() + self._lock = asyncio.Lock() @property def _cache(self) -> Dict[Tuple[str, Any, Optional[Type[Client]]], Client]: @@ -115,29 +118,30 @@ async def get_client( return_from_cache=False, **kw, ) -> Union[Client, Tuple[Client, bool]]: - if cached and (external_address, from_who, None) in self._cache: - cached_client = self._cache[external_address, from_who, None] - if cached_client.closed: - # closed before, ignore it - del self._cache[external_address, from_who, None] - else: - if return_from_cache: - return cached_client, True + async with self._lock: + if cached and (external_address, from_who, None) in self._cache: + cached_client = self._cache[external_address, from_who, None] + if cached_client.closed: + # closed before, ignore it + del self._cache[external_address, from_who, None] else: - return cached_client - - address = self.get_internal_address(external_address) - if address is None: - # no inner address, just use external address - address = external_address - client_type: Type[Client] = get_client_type(address) - client = await self._create_client(client_type, address, **kw) - if cached: - self._cache[external_address, from_who, None] = client - if return_from_cache: - return client, False - else: - return client + if return_from_cache: + return cached_client, True + else: + return cached_client + + address = self.get_internal_address(external_address) + if address is None: + # no inner address, just use external address + address = external_address + client_type: Type[Client] = get_client_type(address) + client = await self._create_client(client_type, address, **kw) + if cached: + self._cache[external_address, from_who, None] = client + if return_from_cache: + return client, False + else: + return client async def _create_client( self, client_type: Type[Client], address: str, **kw @@ -179,27 +183,30 @@ async def get_client_via_type( return_from_cache=False, **kw, ) -> Union[Client, Tuple[Client, bool]]: - if cached and (external_address, from_who, client_type) in self._cache: - cached_client = self._cache[external_address, from_who, client_type] - if cached_client.closed: - # closed before, ignore it - del self._cache[external_address, from_who, client_type] - else: - if return_from_cache: - return cached_client, True + async with self._lock: + if cached and (external_address, from_who, client_type) in self._cache: + cached_client = self._cache[external_address, from_who, client_type] + if cached_client.closed: + # closed before, ignore it + del self._cache[external_address, from_who, client_type] else: - return cached_client + if return_from_cache: + return cached_client, True + else: + return cached_client - client_type_to_addresses = self._get_client_type_to_addresses(external_address) - if client_type not in client_type_to_addresses: # pragma: no cover - raise ValueError( - f"Client type({client_type}) is not supported for {external_address}" + client_type_to_addresses = self._get_client_type_to_addresses( + external_address ) - address = client_type_to_addresses[client_type] - client = await self._create_client(client_type, address, **kw) - if cached: - self._cache[external_address, from_who, client_type] = client - if return_from_cache: - return client, False - else: - return client + if client_type not in client_type_to_addresses: # pragma: no cover + raise ValueError( + f"Client type({client_type}) is not supported for {external_address}" + ) + address = client_type_to_addresses[client_type] + client = await self._create_client(client_type, address, **kw) + if cached: + self._cache[external_address, from_who, client_type] = client + if return_from_cache: + return client, False + else: + return client From 22b43f0111b47681fe2357adc59fce42e83c8fd9 Mon Sep 17 00:00:00 2001 From: qianduoduo0904 Date: Thu, 8 Dec 2022 09:11:14 +0000 Subject: [PATCH 10/18] Fix --- mars/oscar/backends/core.py | 2 +- mars/oscar/backends/transfer.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/mars/oscar/backends/core.py b/mars/oscar/backends/core.py index e331433b4d..a575bb4212 100644 --- a/mars/oscar/backends/core.py +++ b/mars/oscar/backends/core.py @@ -39,7 +39,7 @@ def __init__(self): self._clients: Dict[Client, asyncio.Task] = dict() async def get_client(self, router: Router, dest_address: str) -> Client: - client = await router.get_client(dest_address, from_who=type(self)) + client = await router.get_client(dest_address, from_who=self) if client not in self._clients: self._clients[client] = asyncio.create_task(self._listen(client)) self._client_to_message_futures[client] = dict() diff --git a/mars/oscar/backends/transfer.py b/mars/oscar/backends/transfer.py index 826ae2926c..aa50a5d44c 100644 --- a/mars/oscar/backends/transfer.py +++ b/mars/oscar/backends/transfer.py @@ -285,7 +285,7 @@ async def copyto_via_buffers( # do not support buffer copy # send data in batches client, is_cached = await router.get_client( - address, from_who=type(self), return_from_cache=True + address, from_who=self, return_from_cache=True ) if not is_cached: # tell server to switch to transfer dedicated channel @@ -299,7 +299,7 @@ async def copyto_via_buffers( ) else: client, is_cached = await router.get_client_via_type( - address, client_type, from_who=type(self), return_from_cache=True + address, client_type, from_who=self, return_from_cache=True ) if not is_cached: # tell server to switch to transfer dedicated channel @@ -339,7 +339,7 @@ async def copyto_via_file_objects( ), "`copyto_via_file_objects` can only be used inside pools" address = remote_file_object_refs[0].address client, is_cached = await router.get_client( - address, from_who=type(self), return_from_cache=True + address, from_who=self, return_from_cache=True ) if not is_cached: # tell server to switch to transfer dedicated channel From fa67b47e380ccec5c7300aa2758500a4de0dc586 Mon Sep 17 00:00:00 2001 From: qianduoduo0904 Date: Thu, 8 Dec 2022 09:36:25 +0000 Subject: [PATCH 11/18] Fix --- mars/oscar/backends/router.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/mars/oscar/backends/router.py b/mars/oscar/backends/router.py index 9eecaffb93..5b37420d19 100644 --- a/mars/oscar/backends/router.py +++ b/mars/oscar/backends/router.py @@ -30,7 +30,6 @@ class Router: "_mapping", "_comm_config", "_cache_local", - "_lock", ) _instance: "Router" = None @@ -64,7 +63,6 @@ def __init__( self._mapping = mapping self._comm_config = comm_config or dict() self._cache_local = threading.local() - self._lock = asyncio.Lock() @property def _cache(self) -> Dict[Tuple[str, Any, Optional[Type[Client]]], Client]: @@ -74,6 +72,14 @@ def _cache(self) -> Dict[Tuple[str, Any, Optional[Type[Client]]], Client]: cache = self._cache_local.cache = dict() return cache + @property + def _lock(self) -> asyncio.Lock: + try: + return self._cache_local.lock + except AttributeError: + lock = self._cache_local.lock = asyncio.Lock() + return lock + def set_mapping(self, mapping: Dict[str, str]): self._mapping = mapping self._cache_local = threading.local() From e57a4409d0b63587e9daebd7150d5ba1543e1646 Mon Sep 17 00:00:00 2001 From: qianduoduo0904 Date: Tue, 6 Dec 2022 10:55:30 +0000 Subject: [PATCH 12/18] Merge master --- mars/deploy/oscar/pool.py | 2 +- mars/deploy/oscar/service.py | 14 +++ mars/deploy/oscar/worker.py | 2 +- mars/oscar/backends/transfer.py | 1 + mars/oscar/context.pyx | 2 + mars/services/storage/tests/test_transfer.py | 101 +++++++++++++++++-- mars/services/storage/transfer.py | 77 +++++++++++--- mars/storage/core.py | 12 +++ mars/storage/plasma.py | 11 ++ mars/storage/shared_memory.py | 10 ++ mars/tests/test_cluster.py | 50 ++++++++- 11 files changed, 253 insertions(+), 29 deletions(-) diff --git a/mars/deploy/oscar/pool.py b/mars/deploy/oscar/pool.py index 121c804ccb..adbf97ca23 100644 --- a/mars/deploy/oscar/pool.py +++ b/mars/deploy/oscar/pool.py @@ -209,7 +209,7 @@ async def create_worker_actor_pool( gpu_enable_internal_address = gpu_config.get("enable_internal_addr") extra_conf = oscar_config.get("extra_conf", dict()) - if cuda_devices is None: # pragma: no cover + if cuda_devices is None: env_devices = os.environ.get("CUDA_VISIBLE_DEVICES") if not env_devices: cuda_devices = list(range(cuda_count())) diff --git a/mars/deploy/oscar/service.py b/mars/deploy/oscar/service.py index 8a061117dd..809d10372c 100644 --- a/mars/deploy/oscar/service.py +++ b/mars/deploy/oscar/service.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +import urllib.parse from typing import List, Dict, Union from ...oscar import ServerClosed @@ -22,6 +23,17 @@ logger = logging.getLogger(__name__) +def _normalize_lookup_address(addr: str, config: dict): + scheme = urllib.parse.urlparse(addr).scheme if "://" in addr else None + if scheme and config["cluster"].get("lookup_address"): + config["cluster"]["lookup_address"] = ",".join( + [ + f"{scheme}://{laddr}" if not laddr.startswith(scheme) else laddr + for laddr in config["cluster"]["lookup_address"].split(",") + ] + ) + + async def start_supervisor( address: str, lookup_address: str = None, @@ -39,6 +51,7 @@ async def start_supervisor( config["services"].append("web") if modules: config["modules"] = modules + _normalize_lookup_address(address, config) try: await start_services(NodeRole.SUPERVISOR, config, address=address) logger.debug("Mars supervisor started at %s", address) @@ -84,6 +97,7 @@ async def start_worker( config["storage"]["backends"].append("cuda") if modules: config["modules"] = modules + _normalize_lookup_address(address, config) await start_services( NodeRole.WORKER, config, address=address, mark_ready=mark_ready ) diff --git a/mars/deploy/oscar/worker.py b/mars/deploy/oscar/worker.py index 24cbfb0881..047c5a6683 100644 --- a/mars/deploy/oscar/worker.py +++ b/mars/deploy/oscar/worker.py @@ -68,7 +68,7 @@ def parse_args(self, parser, argv, environ=None): if args.cuda_devices == "auto": self.cuda_devices = list(range(cuda_count())) - elif args.cuda_devices.strip() == "": # pragma: no cover + elif args.cuda_devices.strip() == "-1": # allow using CPU only self.cuda_devices = [] else: # pragma: no cover diff --git a/mars/oscar/backends/transfer.py b/mars/oscar/backends/transfer.py index aa50a5d44c..190732d413 100644 --- a/mars/oscar/backends/transfer.py +++ b/mars/oscar/backends/transfer.py @@ -21,6 +21,7 @@ from typing import List, Union, Any from ...lib.aio import AioFileObject +from ...utils import is_cuda_buffer from ..core import BufferRef, FileObjectRef from .communication import Client, Channel from .message import ( diff --git a/mars/oscar/context.pyx b/mars/oscar/context.pyx index d8e124dd90..ce0b34cfbe 100644 --- a/mars/oscar/context.pyx +++ b/mars/oscar/context.pyx @@ -335,6 +335,8 @@ cdef class ClientActorContext(BaseActorContext): address = remote_buffer_refs[0].address context = self._get_backend_context(address) return context.copyto_via_buffers(local_buffers, remote_buffer_refs) + else: + raise ValueError('buffer size should be at least 1') def copyto_via_file_objects( self, diff --git a/mars/services/storage/tests/test_transfer.py b/mars/services/storage/tests/test_transfer.py index 20244bf2f3..33a0499700 100644 --- a/mars/services/storage/tests/test_transfer.py +++ b/mars/services/storage/tests/test_transfer.py @@ -91,10 +91,72 @@ async def create_actors(actor_pools): await mo.destroy_actor(manager_ref2) +@pytest.fixture +async def ucx_actor_pools(): + async def start_pool(): + start_method = ( + os.environ.get("POOL_START_METHOD", "forkserver") + if sys.platform != "win32" + else None + ) + + pool = await mo.create_actor_pool( + "127.0.0.1", + n_process=2, + labels=["main", "numa-0", "io"], + subprocess_start_method=start_method, + external_address_schemes=["ucx"] * 3, + ) + await pool.start() + return pool + + worker_pool_1 = await start_pool() + worker_pool_2 = await start_pool() + try: + yield worker_pool_1, worker_pool_2 + finally: + await worker_pool_1.stop() + await worker_pool_2.stop() + + +@pytest.fixture +async def create_ucx_actors(ucx_actor_pools): + worker_pool_1, worker_pool_2 = ucx_actor_pools + + manager_ref1 = await mo.create_actor( + StorageManagerActor, + {"shared_memory": {}}, + uid=StorageManagerActor.default_uid(), + address=worker_pool_1.external_address, + ) + + manager_ref2 = await mo.create_actor( + StorageManagerActor, + {"shared_memory": {}}, + uid=StorageManagerActor.default_uid(), + address=worker_pool_2.external_address, + ) + yield worker_pool_1.external_address, worker_pool_2.external_address + await mo.destroy_actor(manager_ref1) + await mo.destroy_actor(manager_ref2) + + @pytest.mark.asyncio -async def test_simple_transfer(create_actors): +@pytest.mark.parametrize("enable_oscar_copyto", [True, False]) +async def test_simple_transfer(create_actors, enable_oscar_copyto): worker_address_1, worker_address_2 = create_actors + await _transfer_test(worker_address_1, worker_address_2, enable_oscar_copyto) + +@pytest.mark.asyncio +async def test_ucx_transfer(create_ucx_actors): + worker_address_1, worker_address_2 = create_ucx_actors + await _transfer_test(worker_address_1, worker_address_2, True) + + +async def _transfer_test( + worker_address_1: str, worker_address_2: str, enable_oscar_copyto: bool +): session_id = "mock_session" data1 = np.random.rand(100, 100) data2 = pd.DataFrame(np.random.randint(0, 100, (500, 10))) @@ -121,6 +183,7 @@ async def test_simple_transfer(create_actors): worker_address_2, StorageLevel.MEMORY, block_size=1000, + enable_oscar_copyto=enable_oscar_copyto, ) await sender_actor.send_batch_data( @@ -129,6 +192,7 @@ async def test_simple_transfer(create_actors): worker_address_2, StorageLevel.MEMORY, block_size=1000, + enable_oscar_copyto=enable_oscar_copyto, ) get_data1 = await storage_handler2.get(session_id, "data_key1") @@ -142,7 +206,11 @@ async def test_simple_transfer(create_actors): address=worker_address_2, uid=SenderManagerActor.gen_uid("numa-0") ) await sender_actor.send_batch_data( - session_id, ["data_key3"], worker_address_1, StorageLevel.MEMORY + session_id, + ["data_key3"], + worker_address_1, + StorageLevel.MEMORY, + enable_oscar_copyto=enable_oscar_copyto, ) get_data3 = await storage_handler1.get(session_id, "data_key3") pd.testing.assert_frame_equal(data2, get_data3) @@ -234,7 +302,11 @@ async def test_cancel_transfer(create_actors, mock_sender, mock_receiver): send_task = asyncio.create_task( sender_actor.send_batch_data( - "mock", ["data_key1"], worker_address_2, StorageLevel.MEMORY + "mock", + ["data_key1"], + worker_address_2, + StorageLevel.MEMORY, + enable_oscar_copyto=False, ) ) @@ -252,7 +324,11 @@ async def test_cancel_transfer(create_actors, mock_sender, mock_receiver): send_task = asyncio.create_task( sender_actor.send_batch_data( - "mock", ["data_key1"], worker_address_2, StorageLevel.MEMORY + "mock", + ["data_key1"], + worker_address_2, + StorageLevel.MEMORY, + enable_oscar_copyto=False, ) ) await send_task @@ -263,12 +339,20 @@ async def test_cancel_transfer(create_actors, mock_sender, mock_receiver): if mock_sender is MockSenderManagerActor: send_task1 = asyncio.create_task( sender_actor.send_batch_data( - "mock", ["data_key2"], worker_address_2, StorageLevel.MEMORY + "mock", + ["data_key2"], + worker_address_2, + StorageLevel.MEMORY, + enable_oscar_copyto=False, ) ) send_task2 = asyncio.create_task( sender_actor.send_batch_data( - "mock", ["data_key2"], worker_address_2, StorageLevel.MEMORY + "mock", + ["data_key2"], + worker_address_2, + StorageLevel.MEMORY, + enable_oscar_copyto=False, ) ) await asyncio.sleep(0.5) @@ -281,7 +365,8 @@ async def test_cancel_transfer(create_actors, mock_sender, mock_receiver): @pytest.mark.asyncio -async def test_transfer_same_data(create_actors): +@pytest.mark.parametrize("enable_oscar_copyto", [False, True]) +async def test_transfer_same_data(create_actors, enable_oscar_copyto): worker_address_1, worker_address_2 = create_actors session_id = "mock_session" @@ -306,6 +391,7 @@ async def test_transfer_same_data(create_actors): worker_address_2, StorageLevel.MEMORY, block_size=1000, + enable_oscar_copyto=enable_oscar_copyto, ) ) task2 = asyncio.create_task( @@ -315,6 +401,7 @@ async def test_transfer_same_data(create_actors): worker_address_2, StorageLevel.MEMORY, block_size=1000, + enable_oscar_copyto=enable_oscar_copyto, ) ) await asyncio.gather(task1, task2) diff --git a/mars/services/storage/transfer.py b/mars/services/storage/transfer.py index 18f88b4cca..8c903655e4 100644 --- a/mars/services/storage/transfer.py +++ b/mars/services/storage/transfer.py @@ -15,10 +15,10 @@ import asyncio import logging from dataclasses import dataclass -from typing import Dict, List +from typing import Dict, List, Tuple, Union, Set from ... import oscar as mo -from ...lib.aio import alru_cache +from ...lib.aio import alru_cache, AioFileObject from ...storage import StorageLevel from ...utils import dataslots from .core import DataManagerActor, WrappedStorageFileObject @@ -62,6 +62,7 @@ async def get_receiver_ref(address: str, band_name: str): async def _send_data( self, + readers: List[AioFileObject], receiver_ref: mo.ActorRefType["ReceiverManagerActor"], session_id: str, data_keys: List[str], @@ -91,12 +92,6 @@ async def send(self, buffer, eof_mark, key): await self.flush() sender = BufferedSender() - open_reader_tasks = [] - for data_key in data_keys: - open_reader_tasks.append( - self._storage_handler.open_reader.delay(session_id, data_key) - ) - readers = await self._storage_handler.open_reader.batch(*open_reader_tasks) for data_key, reader in zip(data_keys, readers): while True: @@ -125,6 +120,7 @@ async def send_batch_data( band_name: str = "numa-0", block_size: int = None, error: str = "raise", + enable_oscar_copyto=True, ): logger.debug( "Begin to send data (%s, %s) to %s", session_id, data_keys, address @@ -174,7 +170,7 @@ async def send_batch_data( data_sizes = [info.store_size for info in infos] if level is None: level = infos[0].level - is_transferring_list = await receiver_ref.open_writers( + is_transferring_list, data_key_to_buffer_refs = await receiver_ref.open_writers( session_id, data_keys, data_sizes, level, sub_infos ) to_send_keys = [] @@ -186,7 +182,42 @@ async def send_batch_data( to_send_keys.append(data_key) if to_send_keys: - await self._send_data(receiver_ref, session_id, to_send_keys, block_size) + open_reader_tasks = [] + for data_key in data_keys: + open_reader_tasks.append( + self._storage_handler.open_reader.delay(session_id, data_key) + ) + readers = await self._storage_handler.open_reader.batch(*open_reader_tasks) + + local_buffers = [] + remote_buffer_refs = [] + copied_keys = set() + if enable_oscar_copyto: + rest_readers = [] + rest_keys = [] + for data_key, reader in zip(data_keys, readers): + try: + local_buffer = reader.get_buffer() + remote_buffer_ref = data_key_to_buffer_refs[data_key] + local_buffers.append(local_buffer) + remote_buffer_refs.append(remote_buffer_ref) + copied_keys.add(data_key) + except (KeyError, AttributeError): + rest_readers.append(reader) + rest_keys.append(data_key) + + if local_buffers: + # for data that supports buffer protocol on both sides + # hand over to oscar to transfer data + await mo.copyto_via_buffers(local_buffers, remote_buffer_refs) + await receiver_ref.close_writers(session_id, copied_keys) + else: + rest_keys = to_send_keys + rest_readers = readers + if rest_keys: + await self._send_data( + rest_readers, receiver_ref, session_id, rest_keys, block_size + ) if to_wait_keys: await receiver_ref.wait_transfer_done(session_id, to_wait_keys) unpin_tasks = [] @@ -249,11 +280,12 @@ async def create_writers( data_sizes: List[int], level: StorageLevel, sub_infos: List, - ): + ) -> Tuple[List[bool], Dict[str, mo.BufferRef]]: tasks = dict() key_to_sub_infos = dict() data_key_to_size = dict() being_processed = [] + data_key_to_buffer_refs = dict() for data_key, data_size, sub_info in zip(data_keys, data_sizes, sub_infos): data_key_to_size[data_key] = data_size if (session_id, data_key) not in self._writing_infos: @@ -275,7 +307,15 @@ async def create_writers( ) if key_to_sub_infos[data_key] is not None: writer._sub_key_infos = key_to_sub_infos[data_key] - return being_processed + try: + buffer = writer.get_buffer() + except AttributeError: + pass + else: + buffer_ref = mo.buffer_ref(self.address, buffer) + data_key_to_buffer_refs[data_key] = buffer_ref + + return being_processed, data_key_to_buffer_refs async def open_writers( self, @@ -301,18 +341,25 @@ async def do_write( self, data: list, session_id: str, data_keys: List[str], eof_marks: List[bool] ): # close may be a high-cost operation, use create_task - close_tasks = [] finished_keys = [] for data, data_key, is_eof in zip(data, data_keys, eof_marks): writer = self._writing_infos[(session_id, data_key)].writer if data: await writer.write(data) if is_eof: - close_tasks.append(writer.close()) finished_keys.append(data_key) + await self.close_writers(session_id, finished_keys) + + async def close_writers( + self, session_id: str, data_keys: Union[List[str], Set[str]] + ): + close_tasks = [] + for data_key in data_keys: + writer = self._writing_infos[(session_id, data_key)].writer + close_tasks.append(writer.close()) await asyncio.gather(*close_tasks) async with self._lock: - for data_key in finished_keys: + for data_key in data_keys: event = self._writing_infos[(session_id, data_key)].event event.set() self._decref_writing_key(session_id, data_key) diff --git a/mars/storage/core.py b/mars/storage/core.py index 9a7dce0eeb..5318c1d87f 100644 --- a/mars/storage/core.py +++ b/mars/storage/core.py @@ -62,6 +62,10 @@ def __init__(self, object_id: Any, mode: str, size: Optional[int] = None): self._mv = None self._buffer = None + @property + def size(self): + return self._size + @abstractmethod def _read_init(self): """ @@ -86,6 +90,14 @@ def buffer(self): def mode(self): return self._mode + def init(self): + if not self._initialized: + if self.mode == "w": + self._write_init() + else: + self._read_init() + self._initialized = True + def read(self, size=-1): if not self._initialized: self._read_init() diff --git a/mars/storage/plasma.py b/mars/storage/plasma.py index a3ede1da6e..a52e60fa0d 100644 --- a/mars/storage/plasma.py +++ b/mars/storage/plasma.py @@ -55,6 +55,7 @@ def buffer(self): def _write_init(self): self._buffer = buf = self._plasma_client.create(self._object_id, self._size) + self._mv = memoryview(self._buffer) file = self._file = pa.FixedSizeBufferWriter(buf) file.set_memcopy_threads(6) @@ -80,6 +81,10 @@ def _write_close(self): def _read_close(self): pass + def get_buffer(self): + self.init() + return self._mv + class PlasmaStorageFileObject(StorageFileObject): def __init__(self, *args, **kwargs): @@ -90,6 +95,12 @@ async def close(self): self._buffer = self._file.buffer await super().close() + def get_buffer(self): + buf = self._file.get_buffer() + if buf is None: # pragma: no cover + raise AttributeError(f"{type(self)} does not have attribute get_buffer") + return buf + @dataslots @dataclass diff --git a/mars/storage/shared_memory.py b/mars/storage/shared_memory.py index 6d3194f849..038c0bdae6 100644 --- a/mars/storage/shared_memory.py +++ b/mars/storage/shared_memory.py @@ -86,6 +86,10 @@ def _write_close(self): def _read_close(self): pass + def get_buffer(self): + self.init() + return self.shm.buf + class ShmStorageFileObject(StorageFileObject): def __init__(self, *args, **kwargs): @@ -97,6 +101,12 @@ async def close(self): self._shm = self._file.shm await super().close() + def get_buffer(self): + buf = self._file.get_buffer() + if buf is None: + raise AttributeError(f"{type(self)} does not have attribute get_buffer") + return buf + @register_storage_backend class SharedMemoryStorage(StorageBackend): diff --git a/mars/tests/test_cluster.py b/mars/tests/test_cluster.py index cfad3c8e1d..72428dc153 100644 --- a/mars/tests/test_cluster.py +++ b/mars/tests/test_cluster.py @@ -23,16 +23,32 @@ from .. import new_session from .. import tensor as mt +from .. import remote as mr from ..services.cluster import NodeRole, WebClusterAPI -from ..tests.core import flaky +from ..tests.core import flaky, lazy_import from ..utils import get_next_port +ucp = lazy_import("ucp") + CONFIG_CONTENT = """\ "@inherits": "@mars/config.yml" scheduling: mem_hard_limit: null""" +UCX_CONFIG_FILE = f""" +{CONFIG_CONTENT} +oscar: + numa: + external_addr_scheme: ucx +""" + +configs = { + "default": CONFIG_CONTENT, +} +if ucp is not None: + configs["ucx"] = UCX_CONFIG_FILE + def _terminate(pid: int): proc = psutil.Process(pid) @@ -49,7 +65,8 @@ def _terminate(pid: int): @flaky(max_runs=3) @pytest.mark.asyncio -async def test_cluster(): +@pytest.mark.parametrize("name", list(configs)) +async def test_cluster(name): port = get_next_port() web_port = get_next_port() supervisor_addr = f"127.0.0.1:{port}" @@ -58,7 +75,7 @@ async def test_cluster(): # gen config file fd, path = tempfile.mkstemp() with os.fdopen(fd, mode="w") as f: - f.write(CONFIG_CONTENT) + f.write(configs[name]) r = subprocess.Popen( [ @@ -77,7 +94,17 @@ async def test_cluster(): stderr=subprocess.PIPE, ) w = subprocess.Popen( - [sys.executable, "-m", "mars.worker", "-s", supervisor_addr, "-f", path] + [ + sys.executable, + "-m", + "mars.worker", + "-s", + supervisor_addr, + "--cuda-devices", + "-1", + "-f", + path, + ] ) for p in [r, w]: @@ -121,10 +148,22 @@ def _check(timeout: float = 1.0): if len(jsn) > 0: break + def f(): + from mars.oscar.backends.router import Router + + mapping = Router.get_instance()._mapping + if name == "ucx": + assert all(a.startswith("ucx") for a in mapping) + else: + assert all(not a.startswith("ucx") for a in mapping) + sess = new_session(web_addr, default=True) a = mt.arange(10) assert a.sum().to_numpy(show_progress=False) == 45 + # no error should be raised + mr.spawn(f).execute() + sess2 = new_session(web_addr, session_id=sess.session_id) sess2.close() finally: @@ -133,4 +172,5 @@ def _check(timeout: float = 1.0): # test stderr out = r.communicate()[1].decode() - assert f"Supervisor started at {supervisor_addr}, web address: {web_addr}" in out + saddr = supervisor_addr if name == "default" else f"{name}://{supervisor_addr}" + assert f"Supervisor started at {saddr}, web address: {web_addr}" in out From c6f9efa46aa1e23b0f21b6b708bd95096f4adf34 Mon Sep 17 00:00:00 2001 From: qianduoduo0904 Date: Wed, 7 Dec 2022 09:16:09 +0000 Subject: [PATCH 13/18] Fix --- mars/oscar/backends/transfer.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/mars/oscar/backends/transfer.py b/mars/oscar/backends/transfer.py index 190732d413..f4f12f9d42 100644 --- a/mars/oscar/backends/transfer.py +++ b/mars/oscar/backends/transfer.py @@ -202,6 +202,13 @@ async def _write(self, index: int, buffer_or_fileobj: Any): size = _get_buffer_size(buffer_or_fileobj) n_recv = self._n_recv[index] + is_cuda = is_cuda_buffer(full_buf) + buffer_format = getattr(full_buf, "format", None) if not is_cuda else None + if not is_cuda: + buffer_or_fileobj = memoryview(buffer_or_fileobj) + if buffer_format: + buffer_or_fileobj = buffer_or_fileobj.cast(buffer_format) + def copy(): full_buf[n_recv : n_recv + size] = buffer_or_fileobj From 71c8bd83f4072041321a97cf29a90acf3493913c Mon Sep 17 00:00:00 2001 From: qianduoduo0904 Date: Thu, 8 Dec 2022 03:11:30 +0000 Subject: [PATCH 14/18] Fix --- mars/services/storage/tests/test_transfer.py | 2 ++ mars/tests/test_cluster.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/mars/services/storage/tests/test_transfer.py b/mars/services/storage/tests/test_transfer.py index 33a0499700..0e903b8930 100644 --- a/mars/services/storage/tests/test_transfer.py +++ b/mars/services/storage/tests/test_transfer.py @@ -23,6 +23,7 @@ from .... import oscar as mo from ....oscar.backends.allocate_strategy import IdleLabel from ....storage import StorageLevel +from ....tests.core import require_ucx from ..core import DataManagerActor, StorageManagerActor, StorageQuotaActor from ..errors import DataNotExist from ..handler import StorageHandlerActor @@ -148,6 +149,7 @@ async def test_simple_transfer(create_actors, enable_oscar_copyto): await _transfer_test(worker_address_1, worker_address_2, enable_oscar_copyto) +@require_ucx @pytest.mark.asyncio async def test_ucx_transfer(create_ucx_actors): worker_address_1, worker_address_2 = create_ucx_actors diff --git a/mars/tests/test_cluster.py b/mars/tests/test_cluster.py index 72428dc153..32e9a643c5 100644 --- a/mars/tests/test_cluster.py +++ b/mars/tests/test_cluster.py @@ -149,7 +149,7 @@ def _check(timeout: float = 1.0): break def f(): - from mars.oscar.backends.router import Router + from ..oscar.backends.router import Router mapping = Router.get_instance()._mapping if name == "ucx": From 533c23455582b7908909a0ac7ae8239b973a2813 Mon Sep 17 00:00:00 2001 From: qianduoduo0904 Date: Thu, 8 Dec 2022 07:07:49 +0000 Subject: [PATCH 15/18] Add tests --- mars/services/storage/tests/test_transfer.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/mars/services/storage/tests/test_transfer.py b/mars/services/storage/tests/test_transfer.py index 0e903b8930..aaf7c65bb1 100644 --- a/mars/services/storage/tests/test_transfer.py +++ b/mars/services/storage/tests/test_transfer.py @@ -59,10 +59,8 @@ async def start_pool(): await worker_pool_2.stop() -@pytest.fixture -async def create_actors(actor_pools): - worker_pool_1, worker_pool_2 = actor_pools - +storage_options = [{"shared_memory": {}}] +if not _is_windows: if sys.platform == "darwin": plasma_dir = "/tmp" else: @@ -70,9 +68,13 @@ async def create_actors(actor_pools): plasma_setup_params = dict( store_memory=5 * 1024 * 1024, plasma_directory=plasma_dir, check_dir_size=False ) - storage_configs = ( - {"plasma": plasma_setup_params} if not _is_windows else {"shared_memory": {}} - ) + storage_options.append({"plasma": plasma_setup_params}) + + +@pytest.fixture(params=storage_options) +async def create_actors(actor_pools, request): + worker_pool_1, worker_pool_2 = actor_pools + storage_configs = request.param manager_ref1 = await mo.create_actor( StorageManagerActor, From 41a495af261a28d60732011afc20fd840facc3fe Mon Sep 17 00:00:00 2001 From: qianduoduo0904 Date: Thu, 8 Dec 2022 16:17:42 +0800 Subject: [PATCH 16/18] Fix win --- mars/storage/shared_memory.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mars/storage/shared_memory.py b/mars/storage/shared_memory.py index 038c0bdae6..adf2f7f276 100644 --- a/mars/storage/shared_memory.py +++ b/mars/storage/shared_memory.py @@ -79,6 +79,7 @@ def _read_init(self): self._buffer = self._mv = shm.buf if self._size is None: (self._size,) = _qword_pack.unpack(shm.buf[-8:]) + self._buffer = self._buffer[: self._size + 8] def _write_close(self): pass @@ -88,7 +89,7 @@ def _read_close(self): def get_buffer(self): self.init() - return self.shm.buf + return self._buffer class ShmStorageFileObject(StorageFileObject): From ef4b525beeef6638f711740166de2a6fa23da6e3 Mon Sep 17 00:00:00 2001 From: qianduoduo0904 Date: Thu, 8 Dec 2022 08:31:32 +0000 Subject: [PATCH 17/18] Fix block size for transfer --- mars/oscar/__init__.py | 1 + mars/oscar/backends/transfer.py | 15 +++++++++++++++ mars/services/storage/transfer.py | 9 +++++---- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/mars/oscar/__init__.py b/mars/oscar/__init__.py index 829ec1f4e2..0541e419bd 100644 --- a/mars/oscar/__init__.py +++ b/mars/oscar/__init__.py @@ -39,6 +39,7 @@ ) from .backends import allocate_strategy from .backends.pool import MainActorPoolType +from .backends.transfer import temp_transfer_block_size from .batch import extensible from .core import ( ActorRef, diff --git a/mars/oscar/backends/transfer.py b/mars/oscar/backends/transfer.py index f4f12f9d42..2559abf33f 100644 --- a/mars/oscar/backends/transfer.py +++ b/mars/oscar/backends/transfer.py @@ -40,6 +40,21 @@ DEFAULT_TRANSFER_BLOCK_SIZE = 4 * 1024**2 +@contextlib.contextmanager +def temp_transfer_block_size(size: int): + global DEFAULT_TRANSFER_BLOCK_SIZE + + if size == DEFAULT_TRANSFER_BLOCK_SIZE: + yield + else: + default_size = DEFAULT_TRANSFER_BLOCK_SIZE + DEFAULT_TRANSFER_BLOCK_SIZE = size + try: + yield + finally: + DEFAULT_TRANSFER_BLOCK_SIZE = default_size + + def _get_buffer_size(buf) -> int: try: return buf.nbytes diff --git a/mars/services/storage/transfer.py b/mars/services/storage/transfer.py index 8c903655e4..9ae20e484e 100644 --- a/mars/services/storage/transfer.py +++ b/mars/services/storage/transfer.py @@ -207,10 +207,11 @@ async def send_batch_data( rest_keys.append(data_key) if local_buffers: - # for data that supports buffer protocol on both sides - # hand over to oscar to transfer data - await mo.copyto_via_buffers(local_buffers, remote_buffer_refs) - await receiver_ref.close_writers(session_id, copied_keys) + with mo.temp_transfer_block_size(block_size): + # for data that supports buffer protocol on both sides + # hand over to oscar to transfer data + await mo.copyto_via_buffers(local_buffers, remote_buffer_refs) + await receiver_ref.close_writers(session_id, copied_keys) else: rest_keys = to_send_keys rest_readers = readers From 3eb30a619acfe958783f0a2aead54de5ccfada79 Mon Sep 17 00:00:00 2001 From: qianduoduo0904 Date: Thu, 8 Dec 2022 10:56:14 +0000 Subject: [PATCH 18/18] Add more api to ucx channel --- .../backends/communication/tests/test_comm.py | 3 +- mars/oscar/backends/communication/ucx.py | 92 ++++++++++++------- mars/utils.py | 7 ++ 3 files changed, 66 insertions(+), 36 deletions(-) diff --git a/mars/oscar/backends/communication/tests/test_comm.py b/mars/oscar/backends/communication/tests/test_comm.py index db6f3f91b4..54f1ba629e 100644 --- a/mars/oscar/backends/communication/tests/test_comm.py +++ b/mars/oscar/backends/communication/tests/test_comm.py @@ -229,6 +229,7 @@ async def test_ucx_channel(): size = 2**5 async def handle_channel(channel: UCXChannel): + assert await channel.recv() == 1 buffer = np.empty(size, dtype="u1") await channel.recv_buffers([buffer]) await channel.send_buffers([buffer]) @@ -243,7 +244,7 @@ async def handle_channel(channel: UCXChannel): client = await UCXServer.client_type.connect(addr) buf = np.zeros(size, dtype="u1") buf += 1 - await client.send_buffers([buf]) + await client.send_objects_and_buffers([1], [buf]) new_buf = np.empty_like(buf) await client.recv_buffers([new_buf]) np.testing.assert_array_equal(buf, new_buf) diff --git a/mars/oscar/backends/communication/ucx.py b/mars/oscar/backends/communication/ucx.py index ebb782dba2..4f36a748ae 100644 --- a/mars/oscar/backends/communication/ucx.py +++ b/mars/oscar/backends/communication/ucx.py @@ -23,7 +23,13 @@ import cloudpickle import numpy as np -from ....utils import lazy_import, implements, classproperty, is_cuda_buffer +from ....utils import ( + lazy_import, + implements, + classproperty, + is_cuda_buffer, + get_buffer_size, +) from ....lib.nvutils import get_index_and_uuid, get_cuda_context from ....serialization import deserialize from ....serialization.aio import AioSerializer, get_header_length, BUFFER_SIZES_NAME @@ -236,14 +242,17 @@ def _close_channel(channel_ref: weakref.ReferenceType): def type(self) -> ChannelType: return ChannelType.remote + async def _get_buffers(self, message: Any) -> list: + compress = self.compression or 0 + serializer = AioSerializer(message, compress=compress) + return await serializer.run() + @implements(Channel.send) async def send(self, message: Any): if self.closed: raise ChannelClosed("UCX Endpoint is closed, unable to send message") - compress = self.compression or 0 - serializer = AioSerializer(message, compress=compress) - buffers = await serializer.run() + buffers = await self._get_buffers(message) return await self.send_buffers(buffers) @implements(Channel.recv) @@ -285,39 +294,55 @@ async def recv(self): return deserialize(header, buffers) async def send_buffers(self, buffers: list): + self._sync_cuda(buffers) + async with self._send_lock: + await self._send_buffers(buffers) + + @classmethod + def _sync_cuda(cls, buffers: list): + # It is necessary to first synchronize the default stream before start + # sending We synchronize the default stream because UCX is not + # stream-ordered and syncing the default stream will wait for other + # non-blocking CUDA streams. Note this is only sufficient if the memory + # being sent is not currently in use on non-blocking CUDA streams. + if any(is_cuda_buffer(buf) for buf in buffers): + # has GPU buffer + synchronize_stream(0) + + async def _send_buffers(self, buffers: list): try: - # It is necessary to first synchronize the default stream before start - # sending We synchronize the default stream because UCX is not - # stream-ordered and syncing the default stream will wait for other - # non-blocking CUDA streams. Note this is only sufficient if the memory - # being sent is not currently in use on non-blocking CUDA streams. - if any(is_cuda_buffer(buf) for buf in buffers): - # has GPU buffer - synchronize_stream(0) - - async with self._send_lock: - for buffer in buffers: - if buffer.nbytes if hasattr(buffer, "nbytes") else len(buffer) > 0: - await self.ucp_endpoint.send(buffer) + for buffer in buffers: + if get_buffer_size(buffer) > 0: + await self.ucp_endpoint.send(buffer) except ucp.exceptions.UCXBaseException: # pragma: no cover self.abort() raise ChannelClosed("While writing, the connection was closed") + async def send_objects_and_buffers(self, objects: list, buffers: list): + all_buffers = [] + for obj in objects: + all_buffers.extend(await self._get_buffers(obj)) + all_buffers.extend(buffers) + return await self.send_buffers(all_buffers) + async def recv_buffers(self, buffers: list): async with self._recv_lock: - try: - for buffer in buffers: - await self.ucp_endpoint.recv(buffer) - except BaseException as e: # pragma: no cover - if not self._closed: - # In addition to UCX exceptions, may be CancelledError or another - # "low-level" exception. The only safe thing to do is to abort. - self.abort() - raise ChannelClosed( - f"Connection closed by writer.\nInner exception: {e!r}" - ) from e - else: - raise EOFError("Server closed already") + await self._recv_buffers(buffers) + + async def _recv_buffers(self, buffers: list): + try: + for buffer in buffers: + await self.ucp_endpoint.recv(buffer) + except BaseException as e: # pragma: no cover + if not self._closed: + # In addition to UCX exceptions, may be CancelledError or another + # "low-level" exception. The only safe thing to do is to abort. + self.abort() + raise ChannelClosed( + f"Connection closed by writer.\nInner exception: {e!r}" + ) from e + else: + raise EOFError("Server closed already") def abort(self): self._closed = True @@ -502,8 +527,5 @@ async def connect( ) return UCXClient(local_address, dest_address, channel) - async def send_buffers(self, buffers: list): - return await self.channel.send_buffers(buffers) - - async def recv_buffers(self, buffers: list): - return await self.channel.recv_buffers(buffers) + def __getattr__(self, attr: str): + return getattr(self.channel, attr) diff --git a/mars/utils.py b/mars/utils.py index 492155d487..3e44e64d5d 100644 --- a/mars/utils.py +++ b/mars/utils.py @@ -1914,3 +1914,10 @@ def convert_to_cupy_ndarray( memory = _cupy.cuda.UnownedMemory(data, size, cuda_buffer) ptr = _cupy.cuda.MemoryPointer(memory, 0) return _cupy.ndarray(shape=size, dtype="u1", memptr=ptr) + + +def get_buffer_size(buf: Any) -> int: + try: + return buf.nbytes + except AttributeError: # pragma: no cover + return len(buf)