Skip to content
This repository has been archived by the owner on Feb 8, 2023. It is now read-only.

enh: Support mo.copyto_via_buffers in storage transfer #82

Draft
wants to merge 18 commits into
base: master
Choose a base branch
from
Draft
16 changes: 8 additions & 8 deletions .github/workflows/core-gpu-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ jobs:
- name: Run GPU CI
shell: bash
run: |
coverage run -m pytest ./ -m cuda --cov-config=setup.cfg --cov-report= --cov=mars
coverage xml
pytest ./ -m cuda --cov-config=setup.cfg --cov-report=xml --cov=mars

- name: Report coverage data
shell: bash
run: |
bash <(curl -s https://codecov.io/bash)
rm -rf *.coverage*
rm -rf coverage.xml
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
fail_ci_if_error: true
flags: unittests
name: codecov-gpu
verbose: true
2 changes: 1 addition & 1 deletion mars/deploy/oscar/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ async def create_worker_actor_pool(
gpu_enable_internal_address = gpu_config.get("enable_internal_addr")
extra_conf = oscar_config.get("extra_conf", dict())

if cuda_devices is None: # pragma: no cover
if cuda_devices is None:
env_devices = os.environ.get("CUDA_VISIBLE_DEVICES")
if not env_devices:
cuda_devices = list(range(cuda_count()))
Expand Down
14 changes: 14 additions & 0 deletions mars/deploy/oscar/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import logging
import urllib.parse
from typing import List, Dict, Union

from ...oscar import ServerClosed
Expand All @@ -22,6 +23,17 @@
logger = logging.getLogger(__name__)


def _normalize_lookup_address(addr: str, config: dict):
scheme = urllib.parse.urlparse(addr).scheme if "://" in addr else None
if scheme and config["cluster"].get("lookup_address"):
config["cluster"]["lookup_address"] = ",".join(
[
f"{scheme}://{laddr}" if not laddr.startswith(scheme) else laddr
for laddr in config["cluster"]["lookup_address"].split(",")
]
)


async def start_supervisor(
address: str,
lookup_address: str = None,
Expand All @@ -39,6 +51,7 @@ async def start_supervisor(
config["services"].append("web")
if modules:
config["modules"] = modules
_normalize_lookup_address(address, config)
try:
await start_services(NodeRole.SUPERVISOR, config, address=address)
logger.debug("Mars supervisor started at %s", address)
Expand Down Expand Up @@ -84,6 +97,7 @@ async def start_worker(
config["storage"]["backends"].append("cuda")
if modules:
config["modules"] = modules
_normalize_lookup_address(address, config)
await start_services(
NodeRole.WORKER, config, address=address, mark_ready=mark_ready
)
Expand Down
2 changes: 1 addition & 1 deletion mars/deploy/oscar/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def parse_args(self, parser, argv, environ=None):

if args.cuda_devices == "auto":
self.cuda_devices = list(range(cuda_count()))
elif args.cuda_devices.strip() == "": # pragma: no cover
elif args.cuda_devices.strip() == "-1":
# allow using CPU only
self.cuda_devices = []
else: # pragma: no cover
Expand Down
1 change: 0 additions & 1 deletion mars/lib/aio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

import asyncio
import contextlib
import sys

from .file import AioFileObject, AioFilesystem
Expand Down
11 changes: 10 additions & 1 deletion mars/oscar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,20 @@
setup_cluster,
wait_actor_pool_recovered,
get_pool_config,
copyto_via_buffers,
copyto_via_file_objects,
buffer_ref,
file_object_ref,
)
from .backends import allocate_strategy
from .backends.pool import MainActorPoolType
from .backends.transfer import temp_transfer_block_size
from .batch import extensible
from .core import ActorRef
from .core import (
ActorRef,
BufferRef,
FileObjectRef,
) # noqa: F401 # pylint: disable=unused-import
from .debug import set_debug_options, get_debug_options, DebugOptions
from .errors import (
ActorNotExist,
Expand Down
32 changes: 29 additions & 3 deletions mars/oscar/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
# limitations under the License.

from urllib.parse import urlparse
from typing import Any, Dict, Type, Tuple
from typing import Any, Dict, Type, Tuple, List
from numbers import Number
from collections import defaultdict

from ..lib.aio import AioFileObject
from .backend import get_backend
from .context import get_context
from .core import _Actor, _StatelessActor, ActorRef
from .core import _Actor, _StatelessActor, ActorRef, BufferRef, FileObjectRef


async def create_actor(actor_cls, *args, uid=None, address=None, **kwargs) -> ActorRef:
Expand All @@ -42,11 +43,21 @@ async def actor_ref(*args, **kwargs) -> ActorRef:
return await ctx.actor_ref(*args, **kwargs)


async def kill_actor(actor_ref):
async def kill_actor(actor_ref: ActorRef):
ctx = get_context()
return await ctx.kill_actor(actor_ref)


def buffer_ref(address: str, buffer: Any) -> BufferRef:
ctx = get_context()
return ctx.buffer_ref(address, buffer)


def file_object_ref(address: str, fileobj: AioFileObject) -> FileObjectRef:
ctx = get_context()
return ctx.file_object_ref(address, fileobj)


async def create_actor_pool(address: str, n_process: int = None, **kwargs):
if address is None:
raise ValueError("address has to be provided")
Expand All @@ -70,6 +81,21 @@ async def get_pool_config(address: str):
return await ctx.get_pool_config(address)


async def copyto_via_buffers(local_buffers: list, remote_buffer_refs: List[BufferRef]):
ctx = get_context()
return await ctx.copyto_via_buffers(local_buffers, remote_buffer_refs)


async def copyto_via_file_objects(
local_file_objects: List[AioFileObject],
remote_file_object_refs: List[FileObjectRef],
):
ctx = get_context()
return await ctx.copyto_via_file_objects(
local_file_objects, remote_file_object_refs
)


def setup_cluster(address_to_resources: Dict[str, Dict[str, Number]]):
scheme_to_address_resources = defaultdict(dict)
for address, resources in address_to_resources.items():
Expand Down
4 changes: 2 additions & 2 deletions mars/oscar/backends/communication/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,11 @@ def parse_config(cls, config: dict) -> dict:
return dict()

@implements(Channel.send)
async def send(self, message):
async def send(self, message: Any):
return await self.channel.send(message)

@implements(Channel.recv)
async def recv(self):
async def recv(self) -> Any:
return await self.channel.recv()

async def close(self):
Expand Down
1 change: 1 addition & 0 deletions mars/oscar/backends/communication/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ async def stop(self):
await asyncio.gather(
*(channel.close() for channel in self._channels if not channel.closed)
)
self._channels = []

@property
@implements(Server.stopped)
Expand Down
32 changes: 31 additions & 1 deletion mars/oscar/backends/communication/tests/test_comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import pytest

from .....lib.aio import AioEvent
from .....tests.core import require_cudf, require_cupy
from .....tests.core import require_cudf, require_cupy, require_ucx
from .....utils import get_next_port, lazy_import
from .. import (
Channel,
Expand All @@ -37,6 +37,7 @@
DummyClient,
Server,
UCXServer,
UCXChannel,
)
from ..ucx import UCXInitializer

Expand Down Expand Up @@ -222,6 +223,35 @@ async def test_multiprocess_cuda_comm(server_type):
await client.close()


@require_ucx
@pytest.mark.asyncio
async def test_ucx_channel():
size = 2**5

async def handle_channel(channel: UCXChannel):
assert await channel.recv() == 1
buffer = np.empty(size, dtype="u1")
await channel.recv_buffers([buffer])
await channel.send_buffers([buffer])

# create server
addr = f"127.0.0.1:{get_next_port()}"
server = await UCXServer.create({"address": addr, "handle_channel": handle_channel})
await server.start()
assert isinstance(server.info, dict)

# create client
client = await UCXServer.client_type.connect(addr)
buf = np.zeros(size, dtype="u1")
buf += 1
await client.send_objects_and_buffers([1], [buf])
new_buf = np.empty_like(buf)
await client.recv_buffers([new_buf])
np.testing.assert_array_equal(buf, new_buf)

await server.stop()


def test_get_client_type():
assert issubclass(get_client_type("127.0.0.1"), SocketClient)
assert issubclass(get_client_type("unixsocket:///1"), UnixSocketClient)
Expand Down
Loading