Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revised Grouping Logic #76

Merged
merged 3 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion echodataflow/config/datastore.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ args: # Input arguments
anon: true
group: # Source data transect information
file: ./x0007_fileset.txt # Transect file URL. Accepts .zip or .txt file. File name Must follow r"x(?P<transect_num>\d+)" regex or set group_name to process without grouping. To process all the files at the source, remove the transect section from this yaml.
grouping_regex: x(?P<transect_num>\d+) # Regex to parse group name from the filename. Skip if you want everything into default group or entire file name as group name
storage_options: # Transect file storage options
block_name: echodataflow-aws-credentials # Block name. For more information on Blocks refer blocks.md
type: AWS # Block type
group_name: default_group # Set when not using a file to pass transect information
group_name: default_group # Set when not using a file to pass transect information or group all files under one group. Skip if you want file name as group name.
json_export: true # Export raw json metadata of files to be processed
raw_json_path: s3://echodataflow-workground/combined_files/raw_json # Path to store the raw json metadata. Can also work to skip the process of parsing the files at source directory and fetch files present in this json instead.
output: # Output arguments
Expand Down
1 change: 1 addition & 0 deletions echodataflow/models/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class Transect(BaseModel):
file: Optional[str] = None
storage_options: Optional[StorageOptions] = None
storage_options_dict: Optional[Dict[str, Any]] = {}
grouping_regex: Optional[str] = None


class Args(BaseModel):
Expand Down
99 changes: 63 additions & 36 deletions echodataflow/utils/config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
from echodataflow.models.pipeline import Recipe, Stage
from echodataflow.utils.file_utils import extract_fs, isFile

TRANSECT_FILE_REGEX = r"x(?P<transect_num>\d+)"
nest_asyncio.apply()


Expand Down Expand Up @@ -135,7 +134,8 @@ def extract_transect_files(
file_format: Literal["txt", "zip"],
file_path: str,
storage_options: Dict[str, Any] = {},
default_transect: str = 0
default_transect: str = None,
group_regex: str = None
) -> Dict[str, Dict[str, Any]]:
"""
Extracts raw file names and transect numbers from transect file(s).
Expand All @@ -155,15 +155,38 @@ def extract_transect_files(
file_system = extract_fs(file_path, storage_options=storage_options)

if file_format == "zip":
return _extract_from_zip(file_system, file_path)
return _extract_from_zip(file_system, file_path, group_regex, default_transect)
elif file_format == "txt":
return _extract_from_text(file_system, file_path, default_transect)
return _extract_from_text(file_system, file_path, group_regex, default_transect)
else:
raise ValueError(
f"Invalid file format: {file_format}. Only 'txt' or 'zip' are valid")

def _get_group_name(filename: str, group_regex: str, default_transect: str) -> str:
"""
Determines the group name from the filename using the specified regex pattern or default value.

Parameters:
filename (str): The filename from which to extract the group name.
group_regex (str): The regex pattern for extracting the group name.
default_transect (str): The default value for the group name if regex pattern is not provided.

Returns:
str: The extracted group name.

Example:
group_name = _get_group_name(filename="example_file_01", group_regex=r"example_(?P<transect_num>\d+)", default_transect="default")
"""
if group_regex:
m = re.match(group_regex, filename)
transect_num = str(m.groupdict()["transect_num"])
elif default_transect:
transect_num = default_transect
else:
transect_num = filename.split('.')[0]
return transect_num

def _extract_from_zip(file_system, file_path: str) -> Dict[str, Dict[str, Any]]:
def _extract_from_zip(file_system, file_path: str, group_regex: str, default_transect: str) -> Dict[str, Dict[str, Any]]:
"""
Extracts raw files from transect file zip format.

Expand All @@ -187,22 +210,21 @@ def _extract_from_zip(file_system, file_path: str) -> Dict[str, Dict[str, Any]]:
with ZipFile(f) as zf:
zip_infos = zf.infolist()
for zi in zip_infos:
m = re.match(TRANSECT_FILE_REGEX, zi.filename)
transect_num = str(m.groupdict()["transect_num"])

transect_num = _get_group_name(filename=zi.filename, group_regex=group_regex, default_transect=default_transect)

if zi.is_dir():
raise ValueError(
"Directory found in zip file. This is not allowed!")
with zf.open(zi.filename) as txtfile:
file_list = txtfile.readlines()
for rawfile in file_list:
transect_dict.setdefault(
rawfile.decode("utf-8").split("\n")[0].split("\r")[0],
{"filename": zi.filename, "num": transect_num},
)
with zf.open(zi.filename) as txtfile:
for line_bytes in txtfile:
line = line_bytes.decode("utf-8").strip("\r\n")
transect_dict.setdefault(line, {"filename": zi.filename, "num": transect_num})
print(transect_dict)
return transect_dict


def _extract_from_text(file_system, file_path: str, default_transect: str = "DefaultGroup") -> Dict[str, Dict[str, Any]]:
def _extract_from_text(file_system, file_path: str, group_regex: str, default_transect: str) -> Dict[str, Dict[str, Any]]:
"""
Extracts raw files from transect file text format.

Expand All @@ -221,21 +243,15 @@ def _extract_from_text(file_system, file_path: str, default_transect: str = "Def
extracted_data = _extract_from_text(file_system, file_path)
"""
filename = os.path.basename(file_path)
m = re.match(TRANSECT_FILE_REGEX, filename)
if m is None:
transect_num = default_transect
else:
transect_num = str(m.groupdict()["transect_num"])

transect_num = _get_group_name(filename=filename, group_regex=group_regex, default_transect=default_transect)

transect_dict = {}

with file_system.open(file_path) as txtfile:
file_list = txtfile.readlines()
for rawfile in file_list:
transect_dict.setdefault(
rawfile.decode("utf-8").split("\n")[0].split("\r")[0],
{"filename": filename, "num": transect_num},
)
for line_bytes in txtfile:
line = line_bytes.decode("utf-8").strip("\r\n")
transect_dict.setdefault(line, {"filename": filename, "num": transect_num})

return transect_dict

Expand Down Expand Up @@ -364,26 +380,37 @@ def parse_raw_paths(all_raw_files: List[str], config: Dataset) -> List[Dict[Any,
sonar_model = config.sonar_model
fname_pattern = config.raw_regex
transect_dict = {}

if config.args.group is not None and config.args.group.file is not None:
default_transect = str(config.args.group_name) if config.args.group_name else None

if config.args.group and config.args.group.file:

# When transect info is available, extract it
file_input = config.args.group.file
storage_options = config.args.group.storage_options_dict
default_transect = str(config.args.group_name if config.args.group_name else "DefaultGroup")
storage_options = config.args.group.storage_options_dict
group_regex = config.args.group.grouping_regex

if isinstance(file_input, str):
filename = os.path.basename(file_input)
_, ext = os.path.splitext(filename)
transect_dict = extract_transect_files(
ext.strip("."), file_input, storage_options, default_transect)
transect_dict = extract_transect_files(file_format=ext.strip("."),
file_path=file_input,
storage_options=storage_options,
group_regex=group_regex,
default_transect=default_transect)
else:
transect_dict = {}
for f in file_input:
filename = os.path.basename(f)
_, ext = os.path.splitext(filename)
result = extract_transect_files(
ext.strip("."), f, storage_options, default_transect)
_, ext = os.path.splitext(filename)
result = extract_transect_files(file_format=ext.strip("."),
file_path=f,
storage_options=storage_options,
group_regex=group_regex,
default_transect=default_transect)
transect_dict.update(result)

else:
default_transect = "DefaultGroup"

raw_file_dicts = []
for raw_file in all_raw_files:
# get transect info from the transect_dict above
Expand Down
Loading