From c75730354cc03c5ae62df45068f082072b0b1b0d Mon Sep 17 00:00:00 2001 From: Roboto Bot-o Date: Thu, 1 Aug 2024 11:58:38 -0700 Subject: [PATCH] Update SDK to version 0.5.11 --- src/roboto/domain/datasets/dataset.py | 35 +++++++--- src/roboto/domain/events/__init__.py | 4 +- src/roboto/domain/events/event.py | 42 ++++++++++++ src/roboto/domain/events/operations.py | 23 +++++++ src/roboto/domain/files/file.py | 3 + src/roboto/domain/topics/record.py | 34 ++++++---- src/roboto/domain/topics/topic.py | 3 + src/roboto/upload_agent/__main__.py | 35 ++++++++-- src/roboto/upload_agent/agent.py | 88 ++++++++++++++++++++++---- src/roboto/upload_agent/files.py | 10 ++- src/roboto/version.py | 2 +- 11 files changed, 237 insertions(+), 42 deletions(-) diff --git a/src/roboto/domain/datasets/dataset.py b/src/roboto/domain/datasets/dataset.py index 6366aa0..64f8ae7 100644 --- a/src/roboto/domain/datasets/dataset.py +++ b/src/roboto/domain/datasets/dataset.py @@ -13,6 +13,7 @@ import pathspec +from ...association import Association from ...auth import Permissions from ...env import RobotoEnv from ...exceptions import ( @@ -418,6 +419,9 @@ def remove_tags( """Remove each tag in this sequence if it exists""" self.update(metadata_changeset=MetadataChangeset(remove_tags=tags)) + def to_association(self) -> Association: + return Association.dataset(self.dataset_id) + def to_dict(self) -> dict[str, typing.Any]: return self.__record.model_dump(mode="json") @@ -465,18 +469,18 @@ def rename_directory(self, old_path: str, new_path: str) -> DirectoryRecord: def upload_directory( self, directory_path: pathlib.Path, + include_patterns: typing.Optional[list[str]] = None, exclude_patterns: typing.Optional[list[str]] = None, delete_after_upload: bool = False, ) -> None: """ - Upload everything, recursively, in directory, ignoring files that match any of the ignore patterns. - - `exclude_patterns` is a list of gitignore-style patterns. - See https://git-scm.com/docs/gitignore#_pattern_format. + Uploads all files and directories recursively from the specified directory path. You can use + `include_patterns` and `exclude_patterns` to control what files and directories are uploaded, and can + use `delete_after_upload` to clean up your local filesystem after the uploads succeed. Example: - >>> from roboto.domain import datasets - >>> dataset = datasets.Dataset(...) + >>> from roboto import Dataset + >>> dataset = Dataset(...) >>> dataset.upload_directory( ... pathlib.Path("/path/to/directory"), ... exclude_patterns=[ @@ -486,12 +490,21 @@ def upload_directory( ... "**/*.log", ... ], ... ) + + Notes: + - Both `include_patterns` and `exclude_patterns` follow the 'gitignore' pattern format described + in https://git-scm.com/docs/gitignore#_pattern_format. + - If both `include_patterns` and `exclude_patterns` are provided, files matching + `exclude_patterns` will be excluded even if they match `include_patterns`. """ + include_spec: typing.Optional[pathspec.PathSpec] = excludespec_from_patterns( + include_patterns + ) exclude_spec: typing.Optional[pathspec.PathSpec] = excludespec_from_patterns( exclude_patterns ) all_files = self.__list_directory_files( - directory_path, exclude_spec=exclude_spec + directory_path, include_spec=include_spec, exclude_spec=exclude_spec ) file_destination_paths = { path: os.path.relpath(path, directory_path) for path in all_files @@ -609,13 +622,19 @@ def _flush_manifest_item_completions( def __list_directory_files( self, directory_path: pathlib.Path, + include_spec: typing.Optional[pathspec.PathSpec] = None, exclude_spec: typing.Optional[pathspec.PathSpec] = None, ) -> collections.abc.Iterable[pathlib.Path]: all_files = set() for root, _, files in os.walk(directory_path): for file in files: - if not exclude_spec or not exclude_spec.match_file(file): + should_include = include_spec is None or include_spec.match_file(file) + should_exclude = exclude_spec is not None and exclude_spec.match_file( + file + ) + + if should_include and not should_exclude: all_files.add(pathlib.Path(root, file)) return all_files diff --git a/src/roboto/domain/events/__init__.py b/src/roboto/domain/events/__init__.py index 8f5b1d5..02f0a76 100644 --- a/src/roboto/domain/events/__init__.py +++ b/src/roboto/domain/events/__init__.py @@ -8,12 +8,14 @@ from .operations import ( CreateEventRequest, QueryEventsForAssociationsRequest, + UpdateEventRequest, ) from .record import EventRecord __all__ = [ - "Event", "CreateEventRequest", + "Event", "EventRecord", "QueryEventsForAssociationsRequest", + "UpdateEventRequest", ] diff --git a/src/roboto/domain/events/event.py b/src/roboto/domain/events/event.py index fcecc86..c89614c 100644 --- a/src/roboto/domain/events/event.py +++ b/src/roboto/domain/events/event.py @@ -10,10 +10,16 @@ from ...association import Association from ...http import RobotoClient +from ...sentinels import NotSet, NotSetType from ...time import to_epoch_nanoseconds +from ...updates import ( + MetadataChangeset, + StrSequence, +) from .operations import ( CreateEventRequest, QueryEventsForAssociationsRequest, + UpdateEventRequest, ) from .record import EventRecord @@ -123,5 +129,41 @@ def record(self) -> EventRecord: def delete(self) -> None: self.__roboto_client.delete(f"v1/events/id/{self.event_id}") + def put_metadata(self, metadata: dict[str, typing.Any]) -> "Event": + return self.update(metadata_changeset=MetadataChangeset(put_fields=metadata)) + + def put_tags(self, tags: list[str]) -> "Event": + return self.update(metadata_changeset=MetadataChangeset(put_tags=tags)) + + def remove_metadata( + self, + metadata: StrSequence, + ) -> "Event": + return self.update(metadata_changeset=MetadataChangeset(remove_fields=metadata)) + + def remove_tags( + self, + tags: StrSequence, + ) -> "Event": + return self.update(metadata_changeset=MetadataChangeset(remove_tags=tags)) + + def set_description(self, description: typing.Optional[str]) -> "Event": + return self.update(description=description) + def to_dict(self) -> dict[str, typing.Any]: return self.__record.model_dump(mode="json") + + def update( + self, + description: typing.Optional[typing.Union[str, NotSetType]] = NotSet, + metadata_changeset: typing.Optional[MetadataChangeset] = None, + ) -> "Event": + request = UpdateEventRequest( + description=description, metadata_changeset=metadata_changeset + ) + + self.__record = self.__roboto_client.put( + f"/v1/events/id/{self.event_id}", data=request + ).to_record(EventRecord) + + return self diff --git a/src/roboto/domain/events/operations.py b/src/roboto/domain/events/operations.py index 73fcf2a..cd714bd 100644 --- a/src/roboto/domain/events/operations.py +++ b/src/roboto/domain/events/operations.py @@ -9,6 +9,8 @@ import pydantic from ...association import Association +from ...sentinels import NotSet, NotSetType +from ...updates import MetadataChangeset class CreateEventRequest(pydantic.BaseModel): @@ -61,3 +63,24 @@ class QueryEventsForAssociationsRequest(pydantic.BaseModel): page_token: typing.Optional[str] = None """Token to use to fetch the next page of results, use None for the first page.""" + + +class UpdateEventRequest(pydantic.BaseModel): + """ + Request payload for the Update Event operation. Allows any of the mutable fields of an event to be changed. + """ + + description: typing.Optional[typing.Union[str, NotSetType]] = NotSet + """ + An optional human-readable description of the event. + """ + + metadata_changeset: typing.Optional[MetadataChangeset] = None + """ + Metadata and tag changes to make for this event + """ + + # This is required to get NotSet/NotSetType to serialize appropriately. + model_config = pydantic.ConfigDict( + extra="forbid", json_schema_extra=NotSetType.openapi_schema_modifier + ) diff --git a/src/roboto/domain/files/file.py b/src/roboto/domain/files/file.py index f3cb85e..92dffc7 100644 --- a/src/roboto/domain/files/file.py +++ b/src/roboto/domain/files/file.py @@ -231,6 +231,9 @@ def put_metadata(self, metadata: dict[str, typing.Any]) -> "File": def put_tags(self, tags: list[str]) -> "File": return self.update(metadata_changeset=MetadataChangeset(put_tags=tags)) + def to_association(self) -> Association: + return Association.file(self.file_id) + def to_dict(self) -> dict[str, Any]: return self.__record.model_dump(mode="json") diff --git a/src/roboto/domain/topics/record.py b/src/roboto/domain/topics/record.py index 543c7d5..3b1b8f1 100644 --- a/src/roboto/domain/topics/record.py +++ b/src/roboto/domain/topics/record.py @@ -84,18 +84,33 @@ class MessagePathRecord(pydantic.BaseModel): Path to a typed attribute within individual datum records contained within a Topic. """ + canonical_data_type: CanonicalDataType + """Normalized data type, used primarily internally by the Roboto Platform.""" + + created: datetime.datetime + created_by: str + + data_type: str + """ + 'Native'/framework-specific data type of the attribute at this path. + E.g. "float32", "unint8[]", "geometry_msgs/Pose", "string". + """ + message_path: str """ Dot-delimited path to the attribute within the datum record. """ - data_type: str + metadata: collections.abc.Mapping[str, typing.Any] = pydantic.Field( + default_factory=dict, + ) """ - 'Native'/framework-specific data type of the attribute at this path. - E.g. "float32", "unint8[]", "geometry_msgs/Pose", "string". + Key-value pairs to associate with this metadata for discovery and search, e.g. + `{ 'min': '0.71', 'max': '1.77 }` """ - canonical_data_type: CanonicalDataType + modified: datetime.datetime + modified_by: str representations: collections.abc.MutableSequence[RepresentationRecord] = ( pydantic.Field(default_factory=list) @@ -104,17 +119,12 @@ class MessagePathRecord(pydantic.BaseModel): Zero to many Representations of this MessagePath. """ + topic_id: int + """Internal identifier for Topic with which this MessagePath is associated.""" + topic_message_path_id: int """Internal identifier for this MessagePath, joined to a particular Topic.""" - metadata: collections.abc.Mapping[str, typing.Any] = pydantic.Field( - default_factory=dict, - ) - """ - Key-value pairs to associate with this metadata for discovery and search, e.g. - `{ 'min': '0.71', 'max': '1.77 }` - """ - class TopicRecord(pydantic.BaseModel): """ diff --git a/src/roboto/domain/topics/topic.py b/src/roboto/domain/topics/topic.py index 63ad786..fa0ade9 100644 --- a/src/roboto/domain/topics/topic.py +++ b/src/roboto/domain/topics/topic.py @@ -238,6 +238,9 @@ def set_default_representation( self.__refresh() return representation_record + def to_association(self) -> Association: + return Association.topic(self.record.topic_id) + def update( self, end_time: typing.Union[typing.Optional[int], NotSetType] = NotSet, diff --git a/src/roboto/upload_agent/__main__.py b/src/roboto/upload_agent/__main__.py index 723ff49..964131c 100644 --- a/src/roboto/upload_agent/__main__.py +++ b/src/roboto/upload_agent/__main__.py @@ -107,7 +107,7 @@ def configure_subcommand(args: argparse.Namespace) -> None: configure() -def run(auto_create_upload_configs: bool) -> None: +def run(auto_create_upload_configs: bool, merge_uploads: bool) -> None: if not agent_config_file.is_file(): logger.error( f"No upload agent config file found at {agent_config_file}. Please run " @@ -140,7 +140,9 @@ def run(auto_create_upload_configs: bool) -> None: if auto_create_upload_configs: upload_agent.create_upload_configs() - uploaded_datasets = upload_agent.process_uploads() + uploaded_datasets = upload_agent.process_uploads( + merge_uploads=merge_uploads + ) except filelock.Timeout: logger.info( "Roboto upload agent appears to already be running, nothing to do. If you don't think this is correct, " @@ -152,7 +154,9 @@ def run(auto_create_upload_configs: bool) -> None: logger.info("Uploaded %d datasets", len(uploaded_datasets)) -def run_forever(scan_period_seconds: int, auto_create_upload_configs: bool) -> None: +def run_forever( + scan_period_seconds: int, auto_create_upload_configs: bool, merge_uploads: bool +) -> None: print( "Starting roboto-agent in run forever mode, press Ctrl+C to stop.", file=sys.stdout, @@ -161,7 +165,10 @@ def run_forever(scan_period_seconds: int, auto_create_upload_configs: bool) -> N try: while True: logger.info("Running upload agent") - run(auto_create_upload_configs=auto_create_upload_configs) + run( + auto_create_upload_configs=auto_create_upload_configs, + merge_uploads=merge_uploads, + ) logger.info( f"Run completed, sleeping for {scan_period_seconds} seconds before next attempt." ) @@ -175,9 +182,13 @@ def run_subcommand(args: argparse.Namespace) -> None: run_forever( scan_period_seconds=30, auto_create_upload_configs=args.auto_create_upload_configs, + merge_uploads=args.merge_uploads, ) else: - run(auto_create_upload_configs=args.auto_create_upload_configs) + run( + auto_create_upload_configs=args.auto_create_upload_configs, + merge_uploads=args.merge_uploads, + ) def main(): @@ -211,6 +222,20 @@ def main(): + "and sleeps between runs.", action="store_true", ) + run_parser.add_argument( + "-m", + "--merge-uploads", + action="store_true", + help=( + "If set, all uploads will be merged into a single dataset. If combined with " + "--auto-create-upload-configs, this will allow you to set many disparate output locations as your " + "search paths, and still unite everything under a single dataset. " + "Any tags/metadata/description set in .roboto_upload.json files will be applied sequentially as updates " + "to the created dataset. If there are collisions, like multiple different descriptions or multiple " + "metadata values for the same key, the last one encountered will be used, and the traversal order will " + "be non-deterministic." + ), + ) run_parser.add_argument( "-a", "--auto-create-upload-configs", diff --git a/src/roboto/upload_agent/agent.py b/src/roboto/upload_agent/agent.py index 152803d..42c564f 100644 --- a/src/roboto/upload_agent/agent.py +++ b/src/roboto/upload_agent/agent.py @@ -16,6 +16,7 @@ from ..http import RobotoClient from ..logging import default_logger from ..time import utcnow +from ..updates import MetadataChangeset from .files import ( UPLOAD_COMPLETE_FILENAME, UPLOAD_IN_PROGRESS_FILENAME, @@ -107,7 +108,14 @@ def create_upload_configs(self): ) logger.info("Wrote upload config file to %s", upload_config_file) - def process_uploads(self) -> collections.abc.Sequence[datasets.Dataset]: + def process_uploads( + self, merge_uploads: bool = False + ) -> collections.abc.Sequence[datasets.Dataset]: + """ + If merge is true, everything will be combined into a single dataset. Otherwise, each directory will be uploaded + to a separate dataset. + """ + upload_config_files = self.__get_upload_config_files() if len(upload_config_files) == 0: logger.info( @@ -120,16 +128,25 @@ def process_uploads(self) -> collections.abc.Sequence[datasets.Dataset]: len(upload_config_files), ) - uploaded_datasets: list[datasets.Dataset] = [] + update_dataset: typing.Optional[datasets.Dataset] = None + created_datasets: list[datasets.Dataset] = [] + + if merge_uploads: + update_dataset = datasets.Dataset.create( + roboto_client=self.__roboto_client, + caller_org_id=self.__agent_config.default_org_id, + ) + + created_datasets.append(update_dataset) for upload_config_file, path in upload_config_files: uploaded_dataset = self.__handle_upload_config_file( - upload_config_file, path + file=upload_config_file, path=path, update_dataset=update_dataset ) if uploaded_dataset is not None: - uploaded_datasets.append(uploaded_dataset) + created_datasets.append(uploaded_dataset) - return uploaded_datasets + return created_datasets def __get_upload_config_files( self, @@ -195,8 +212,18 @@ def __delete_uploaded_dir_if_safe(self, path: pathlib.Path): logger.info("Cleaned up empty upload directory %s", path) def __handle_upload_config_file( - self, file: UploadConfigFile, path: pathlib.Path + self, + file: UploadConfigFile, + path: pathlib.Path, + update_dataset: typing.Optional[datasets.Dataset] = None, ) -> typing.Optional[datasets.Dataset]: + """ + If you pass in an update_dataset, it will be used instead of creating a new one, and any + dataset properties in the upload config file will be applied as an update. + + This will return a dataset if one is created, which means it will NOT return a dataset passed into the + ``update_dataset`` param. + """ dir_to_upload = path.parent upload_in_progress_file = dir_to_upload / UPLOAD_IN_PROGRESS_FILENAME @@ -221,19 +248,19 @@ def __handle_upload_config_file( return None - existing_dataset: typing.Optional[datasets.Dataset] = None + in_progress_dataset: typing.Optional[datasets.Dataset] = None if upload_in_progress_file.is_file(): try: parsed_in_progress_file = UploadInProgressFile.model_validate_json( upload_in_progress_file.read_text() ) - existing_dataset = datasets.Dataset.from_id( + in_progress_dataset = datasets.Dataset.from_id( parsed_in_progress_file.dataset_id ) logger.warning( "Found upload-in-progress file for dataset %s at path %s, resuming upload", - existing_dataset.dataset_id, + in_progress_dataset.dataset_id, upload_in_progress_file.resolve(), ) except pydantic.ValidationError: @@ -243,7 +270,35 @@ def __handle_upload_config_file( ) dataset: datasets.Dataset - if existing_dataset is None: + should_write_in_progress_file: bool + + # Order of figuring out what dataset we're working on: + # 1. In progress is always the first choice, because if it exists, it means we already made a decision that + # we want to stick with, and for some reason the upload failed. Let's not change that. + # 2. Explicit update + # 3. Create new + if in_progress_dataset is not None: + dataset = in_progress_dataset + should_write_in_progress_file = False + + elif update_dataset is not None: + logger.info( + "Applying update to existing dataset %s for directory: %s", + update_dataset.dataset_id, + dir_to_upload, + ) + dataset = update_dataset + dataset.update( + description=file.dataset.description, + metadata_changeset=MetadataChangeset( + put_fields=file.dataset.metadata, + put_tags=file.dataset.tags, + ), + ) + logger.info("Successfully updated dataset %s", dataset.dataset_id) + should_write_in_progress_file = True + + else: logger.info("Creating a dataset for directory: %s", dir_to_upload) dataset = datasets.Dataset.create( description=file.dataset.description, @@ -253,6 +308,9 @@ def __handle_upload_config_file( roboto_client=self.__roboto_client, ) logger.info("Created dataset %s for path %s", dataset.dataset_id, path) + should_write_in_progress_file = True + + if should_write_in_progress_file: logger.debug( "Writing in progress file to %s", upload_in_progress_file.resolve() ) @@ -263,8 +321,6 @@ def __handle_upload_config_file( started=dataset.record.created, ).model_dump_json(indent=2) ) - else: - dataset = existing_dataset # Default to using the delete-uploaded-files strategy from the agent config file, but override it if it # has been explicitly set in the upload file. @@ -281,11 +337,12 @@ def __handle_upload_config_file( exclude_patterns = file.upload.exclude_patterns or [] # Explicitly opt out of uploading the upload-in-progress file. - exclude_patterns.append(UPLOAD_IN_PROGRESS_FILENAME) + exclude_patterns.append(f"**/{UPLOAD_IN_PROGRESS_FILENAME}") dataset.upload_directory( directory_path=dir_to_upload, exclude_patterns=exclude_patterns, + include_patterns=file.upload.include_patterns, delete_after_upload=delete_uploaded_files, ) @@ -317,4 +374,9 @@ def __handle_upload_config_file( if self.__agent_config.delete_empty_directories: self.__delete_uploaded_dir_if_safe(dir_to_upload) + # If we were passed in an update_dataset, we don't want to return it, because the intention of the return + # is to be added to an array of created datasets. This one already exists / wasn't created by this call. + if update_dataset is not None: + return None + return dataset diff --git a/src/roboto/upload_agent/files.py b/src/roboto/upload_agent/files.py index 417440d..5fc91d6 100644 --- a/src/roboto/upload_agent/files.py +++ b/src/roboto/upload_agent/files.py @@ -28,6 +28,7 @@ class UploadConfigFileDatasetSection(CreateDatasetRequest): class UploadConfigFileUploadSection(pydantic.BaseModel): delete_uploaded_files: typing.Optional[bool] = None exclude_patterns: typing.Optional[typing.List[str]] = None + include_patterns: typing.Optional[typing.List[str]] = None class UploadConfigFile(pydantic.BaseModel): @@ -44,13 +45,18 @@ class UploadConfigFile(pydantic.BaseModel): class UploadAgentConfig(pydantic.BaseModel): version: typing.Literal["v1"] = "v1" - delete_empty_directories: bool = pydantic.Field(default=False) + default_org_id: typing.Optional[str] = None + """ + The org ID to use when creating datasets via --merge-uploads, or other operations which may have an ambiguous org. + """ + + delete_empty_directories: bool = False """ If set to true, directories which are empty (or only contain a .roboto_upload_complete.json) after being uploaded will be automatically deleted. This is most useful if combined with delete_upload_files=True """ - delete_uploaded_files: bool = pydantic.Field(default=False) + delete_uploaded_files: bool = False """ If set to true, will delete files from disk after they've been successfully uploaded to Roboto. """ diff --git a/src/roboto/version.py b/src/roboto/version.py index 8be8492..ba0503b 100644 --- a/src/roboto/version.py +++ b/src/roboto/version.py @@ -1,4 +1,4 @@ -__version__ = "0.5.9" +__version__ = "0.5.11" __all__= ("__version__",) \ No newline at end of file