Skip to content

Commit

Permalink
Implement log flushing (#808)
Browse files Browse the repository at this point in the history
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
Nikita Karetnikov and pre-commit-ci[bot] authored May 1, 2024
1 parent 6b486f3 commit 42ce8c0
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 89 deletions.
41 changes: 36 additions & 5 deletions conda-store-server/conda_store_server/action/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@

def action(f: typing.Callable):
@functools.wraps(f)
def wrapper(*args, **kwargs):
action_context = ActionContext()
def wrapper(*args, stdout=None, stderr=None, **kwargs):
action_context = ActionContext(stdout=stdout, stderr=stderr)
with contextlib.ExitStack() as stack:
# redirect stdout -> action_context.stdout
stack.enter_context(contextlib.redirect_stdout(action_context.stdout))
Expand All @@ -35,18 +35,48 @@ def wrapper(*args, **kwargs):


class ActionContext:
def __init__(self):
def __init__(self, stdout=None, stderr=None):
if stdout is not None and stderr is None:
stderr = stdout

self.id = str(uuid.uuid4())
self.stdout = io.StringIO()
self.stderr = io.StringIO()
self.stdout = stdout if stdout is not None else io.StringIO()
self.stderr = stderr if stderr is not None else io.StringIO()
self.log = logging.getLogger(f"conda_store_server.action.{self.id}")
self.log.propagate = False
self.log.addHandler(logging.StreamHandler(stream=self.stdout))
self.log.setLevel(logging.INFO)
self.result = None
self.artifacts = {}

def run_command(self, command, redirect_stderr=True, **kwargs):
"""Runs command and immediately writes to logs"""
self.log.info(f"Running command: {' '.join(command)}")

# Unlike subprocess.run, Popen doesn't support the check argument, so
# ignore it. The code below always checks the return code
kwargs.pop("check", None)

# https://stackoverflow.com/questions/4417546/constantly-print-subprocess-output-while-process-is-running
with subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT if redirect_stderr else subprocess.PIPE,
bufsize=1,
universal_newlines=True,
**kwargs,
) as p:
for line in p.stdout:
self.stdout.write(line)
if not redirect_stderr:
for line in p.stderr:
self.stderr.write(line)

if p.returncode != 0:
raise subprocess.CalledProcessError(p.returncode, p.args)

def run(self, *args, redirect_stderr=True, **kwargs):
"""Runs command waiting for it to succeed before writing to logs"""
result = subprocess.run(
*args,
**kwargs,
Expand All @@ -57,4 +87,5 @@ def run(self, *args, redirect_stderr=True, **kwargs):
self.stdout.write(result.stdout)
if not redirect_stderr:
self.stderr.write(result.stderr)

return result
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def write_file(filename, s):
"constructor",
"--help",
]
logged_command(context, command, timeout=10)
logged_command(context, command)
except FileNotFoundError:
warnings.warn(
"Installer generation requires constructor: https://github.com/conda/constructor"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ def action_install_lockfile(
str(lockfile_filename),
]

context.run(command, check=True)
context.run_command(command, check=True)
12 changes: 3 additions & 9 deletions conda-store-server/conda_store_server/action/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
import subprocess


def logged_command(context, command, **kwargs):
context.log.info(f"Running command: {' '.join(command)}")
context.log.info(
subprocess.check_output(
command, stderr=subprocess.STDOUT, encoding="utf-8", **kwargs
)
)
# This is here only for backward compatibility, new code should use the
# run_command method instead of calling this function
context.run_command(command, **kwargs)
171 changes: 104 additions & 67 deletions conda-store-server/conda_store_server/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,56 @@

import yaml

from filelock import FileLock
from sqlalchemy.orm import Session

from conda_store_server import action, api, conda_utils, orm, schema, utils
from conda_store_server.utils import BuildPathError


def append_to_logs(db: Session, conda_store, build, logs: typing.Union[str, bytes]):
try:
current_logs = conda_store.storage.get(build.log_key)
except Exception:
current_logs = b""
class LoggedStream:
"""Allows writing to storage via logging.StreamHandler"""

if isinstance(logs, str):
logs = logs.encode("utf-8")
def __init__(self, db, conda_store, build, prefix=None):
self.db = db
self.conda_store = conda_store
self.build = build
self.prefix = prefix

conda_store.storage.set(
db,
build.id,
build.log_key,
current_logs + logs,
content_type="text/plain",
artifact_type=schema.BuildArtifactType.LOGS,
)
def write(self, b, /):
for line in b.split("\n"):
# Skips empty lines
if not line:
continue
if self.prefix is not None:
line = self.prefix + line
append_to_logs(self.db, self.conda_store, self.build, line + "\n")

def flush(self):
pass


def append_to_logs(db: Session, conda_store, build, logs: typing.Union[str, bytes]):
# For instance, with local storage, this involves reading from and writing
# to a file. Locking here prevents a race condition when multiple tasks
# attempt to write to a shared resource, which is the log
with FileLock(f"{build.build_path(conda_store)}.log.lock"):
try:
current_logs = conda_store.storage.get(build.log_key)
except Exception:
current_logs = b""

if isinstance(logs, str):
logs = logs.encode("utf-8")

conda_store.storage.set(
db,
build.id,
build.log_key,
current_logs + logs,
content_type="text/plain",
artifact_type=schema.BuildArtifactType.LOGS,
)


def set_build_started(db: Session, build: orm.Build):
Expand Down Expand Up @@ -179,6 +206,12 @@ def build_conda_environment(db: Session, conda_store, build):
),
platforms=settings.conda_solve_platforms,
conda_flags=conda_store.conda_flags,
stdout=LoggedStream(
db=db,
conda_store=conda_store,
build=build,
prefix="action_solve_lockfile: ",
),
)

conda_store.storage.set(
Expand All @@ -190,40 +223,28 @@ def build_conda_environment(db: Session, conda_store, build):
artifact_type=schema.BuildArtifactType.LOCKFILE,
)

append_to_logs(
db,
conda_store,
build,
"::group::action_solve_lockfile\n"
+ context.stdout.getvalue()
+ "\n::endgroup::\n",
)
conda_lock_spec = context.result

context = action.action_fetch_and_extract_conda_packages(
conda_lock_spec=conda_lock_spec,
pkgs_dir=conda_utils.conda_root_package_dir(),
)
append_to_logs(
db,
conda_store,
build,
"::group::action_fetch_and_extract_conda_packages\n"
+ context.stdout.getvalue()
+ "\n::endgroup::\n",
stdout=LoggedStream(
db=db,
conda_store=conda_store,
build=build,
prefix="action_fetch_and_extract_conda_packages: ",
),
)

context = action.action_install_lockfile(
conda_lock_spec=conda_lock_spec,
conda_prefix=conda_prefix,
)
append_to_logs(
db,
conda_store,
build,
"::group::action_install_lockfile\n"
+ context.stdout.getvalue()
+ "\n::endgroup::\n",
stdout=LoggedStream(
db=db,
conda_store=conda_store,
build=build,
prefix="action_install_lockfile: ",
),
)

utils.symlink(conda_prefix, environment_prefix)
Expand All @@ -233,15 +254,35 @@ def build_conda_environment(db: Session, conda_store, build):
permissions=settings.default_permissions,
uid=settings.default_uid,
gid=settings.default_gid,
stdout=LoggedStream(
db=db,
conda_store=conda_store,
build=build,
prefix="action_set_conda_prefix_permissions: ",
),
)

action.action_add_conda_prefix_packages(
db=db,
conda_prefix=conda_prefix,
build_id=build.id,
stdout=LoggedStream(
db=db,
conda_store=conda_store,
build=build,
prefix="action_add_conda_prefix_packages: ",
),
)

context = action.action_get_conda_prefix_stats(conda_prefix)
context = action.action_get_conda_prefix_stats(
conda_prefix,
stdout=LoggedStream(
db=db,
conda_store=conda_store,
build=build,
prefix="action_get_conda_prefix_stats: ",
),
)
build.size = context.result["disk_usage"]

set_build_completed(db, conda_store, build)
Expand Down Expand Up @@ -299,15 +340,14 @@ def build_conda_env_export(db: Session, conda_store, build: orm.Build):
)

context = action.action_generate_conda_export(
conda_command=settings.conda_command, conda_prefix=conda_prefix
)
append_to_logs(
db,
conda_store,
build,
"::group::action_generate_conda_export\n"
+ context.stdout.getvalue()
+ "\n::endgroup::\n",
conda_command=settings.conda_command,
conda_prefix=conda_prefix,
stdout=LoggedStream(
db=db,
conda_store=conda_store,
build=build,
prefix="action_generate_conda_export: ",
),
)

conda_prefix_export = yaml.dump(context.result).encode("utf-8")
Expand All @@ -330,16 +370,15 @@ def build_conda_pack(db: Session, conda_store, build: orm.Build):
):
with tempfile.TemporaryDirectory() as tmpdir:
output_filename = pathlib.Path(tmpdir) / "environment.tar.gz"
context = action.action_generate_conda_pack(
conda_prefix=conda_prefix, output_filename=output_filename
)
append_to_logs(
db,
conda_store,
build,
"::group::action_generate_conda_pack\n"
+ context.stdout.getvalue()
+ "\n::endgroup::\n",
action.action_generate_conda_pack(
conda_prefix=conda_prefix,
output_filename=output_filename,
stdout=LoggedStream(
db=db,
conda_store=conda_store,
build=build,
prefix="action_generate_conda_pack: ",
),
)
conda_store.storage.fset(
db,
Expand Down Expand Up @@ -425,16 +464,14 @@ def build_constructor_installer(db: Session, conda_store, build: orm.Build):
),
installer_dir=pathlib.Path(tmpdir),
version=build.build_key,
stdout=LoggedStream(
db=db,
conda_store=conda_store,
build=build,
prefix="action_generate_constructor_installer: ",
),
)
output_filename = context.result
append_to_logs(
db,
conda_store,
build,
"::group::action_generate_constructor_installer\n"
+ context.stdout.getvalue()
+ "\n::endgroup::\n",
)
if output_filename is None:
return
conda_store.storage.fset(
Expand Down
26 changes: 20 additions & 6 deletions conda-store-server/conda_store_server/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,17 @@ def fset(
filename: str,
artifact_type: schema.BuildArtifactType,
):
db.add(
orm.BuildArtifact(build_id=build_id, key=key, artifact_type=artifact_type)
ba = orm.BuildArtifact
exists = (
db.query(ba)
.filter(ba.build_id == build_id)
.filter(ba.key == key)
.filter(ba.artifact_type == artifact_type)
.first()
)
db.commit()
if not exists:
db.add(ba(build_id=build_id, key=key, artifact_type=artifact_type))
db.commit()

def set(
self,
Expand All @@ -34,10 +41,17 @@ def set(
value: bytes,
artifact_type: schema.BuildArtifactType,
):
db.add(
orm.BuildArtifact(build_id=build_id, key=key, artifact_type=artifact_type)
ba = orm.BuildArtifact
exists = (
db.query(ba)
.filter(ba.build_id == build_id)
.filter(ba.key == key)
.filter(ba.artifact_type == artifact_type)
.first()
)
db.commit()
if not exists:
db.add(ba(build_id=build_id, key=key, artifact_type=artifact_type))
db.commit()

def get(self, key: str):
raise NotImplementedError()
Expand Down

0 comments on commit 42ce8c0

Please sign in to comment.