Skip to content

Commit

Permalink
Merge pull request #76 from OSOceanAcoustics/dev
Browse files Browse the repository at this point in the history
Revised Grouping Logic
  • Loading branch information
Sohambutala authored May 14, 2024
2 parents 5477759 + 06528ac commit 4e686be
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 37 deletions.
3 changes: 2 additions & 1 deletion echodataflow/config/datastore.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ args: # Input arguments
anon: true
group: # Source data transect information
file: ./x0007_fileset.txt # Transect file URL. Accepts .zip or .txt file. File name Must follow r"x(?P<transect_num>\d+)" regex or set group_name to process without grouping. To process all the files at the source, remove the transect section from this yaml.
grouping_regex: x(?P<transect_num>\d+) # Regex to parse group name from the filename. Skip if you want everything into default group or entire file name as group name
storage_options: # Transect file storage options
block_name: echodataflow-aws-credentials # Block name. For more information on Blocks refer blocks.md
type: AWS # Block type
group_name: default_group # Set when not using a file to pass transect information
group_name: default_group # Set when not using a file to pass transect information or group all files under one group. Skip if you want file name as group name.
json_export: true # Export raw json metadata of files to be processed
raw_json_path: s3://echodataflow-workground/combined_files/raw_json # Path to store the raw json metadata. Can also work to skip the process of parsing the files at source directory and fetch files present in this json instead.
output: # Output arguments
Expand Down
1 change: 1 addition & 0 deletions echodataflow/models/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class Transect(BaseModel):
file: Optional[str] = None
storage_options: Optional[StorageOptions] = None
storage_options_dict: Optional[Dict[str, Any]] = {}
grouping_regex: Optional[str] = None


class Args(BaseModel):
Expand Down
99 changes: 63 additions & 36 deletions echodataflow/utils/config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
from echodataflow.models.pipeline import Recipe, Stage
from echodataflow.utils.file_utils import extract_fs, isFile

TRANSECT_FILE_REGEX = r"x(?P<transect_num>\d+)"
nest_asyncio.apply()


Expand Down Expand Up @@ -135,7 +134,8 @@ def extract_transect_files(
file_format: Literal["txt", "zip"],
file_path: str,
storage_options: Dict[str, Any] = {},
default_transect: str = 0
default_transect: str = None,
group_regex: str = None
) -> Dict[str, Dict[str, Any]]:
"""
Extracts raw file names and transect numbers from transect file(s).
Expand All @@ -155,15 +155,38 @@ def extract_transect_files(
file_system = extract_fs(file_path, storage_options=storage_options)

if file_format == "zip":
return _extract_from_zip(file_system, file_path)
return _extract_from_zip(file_system, file_path, group_regex, default_transect)
elif file_format == "txt":
return _extract_from_text(file_system, file_path, default_transect)
return _extract_from_text(file_system, file_path, group_regex, default_transect)
else:
raise ValueError(
f"Invalid file format: {file_format}. Only 'txt' or 'zip' are valid")

def _get_group_name(filename: str, group_regex: str, default_transect: str) -> str:
"""
Determines the group name from the filename using the specified regex pattern or default value.
Parameters:
filename (str): The filename from which to extract the group name.
group_regex (str): The regex pattern for extracting the group name.
default_transect (str): The default value for the group name if regex pattern is not provided.
Returns:
str: The extracted group name.
Example:
group_name = _get_group_name(filename="example_file_01", group_regex=r"example_(?P<transect_num>\d+)", default_transect="default")
"""
if group_regex:
m = re.match(group_regex, filename)
transect_num = str(m.groupdict()["transect_num"])
elif default_transect:
transect_num = default_transect
else:
transect_num = filename.split('.')[0]
return transect_num

def _extract_from_zip(file_system, file_path: str) -> Dict[str, Dict[str, Any]]:
def _extract_from_zip(file_system, file_path: str, group_regex: str, default_transect: str) -> Dict[str, Dict[str, Any]]:
"""
Extracts raw files from transect file zip format.
Expand All @@ -187,22 +210,21 @@ def _extract_from_zip(file_system, file_path: str) -> Dict[str, Dict[str, Any]]:
with ZipFile(f) as zf:
zip_infos = zf.infolist()
for zi in zip_infos:
m = re.match(TRANSECT_FILE_REGEX, zi.filename)
transect_num = str(m.groupdict()["transect_num"])

transect_num = _get_group_name(filename=zi.filename, group_regex=group_regex, default_transect=default_transect)

if zi.is_dir():
raise ValueError(
"Directory found in zip file. This is not allowed!")
with zf.open(zi.filename) as txtfile:
file_list = txtfile.readlines()
for rawfile in file_list:
transect_dict.setdefault(
rawfile.decode("utf-8").split("\n")[0].split("\r")[0],
{"filename": zi.filename, "num": transect_num},
)
with zf.open(zi.filename) as txtfile:
for line_bytes in txtfile:
line = line_bytes.decode("utf-8").strip("\r\n")
transect_dict.setdefault(line, {"filename": zi.filename, "num": transect_num})
print(transect_dict)
return transect_dict


def _extract_from_text(file_system, file_path: str, default_transect: str = "DefaultGroup") -> Dict[str, Dict[str, Any]]:
def _extract_from_text(file_system, file_path: str, group_regex: str, default_transect: str) -> Dict[str, Dict[str, Any]]:
"""
Extracts raw files from transect file text format.
Expand All @@ -221,21 +243,15 @@ def _extract_from_text(file_system, file_path: str, default_transect: str = "Def
extracted_data = _extract_from_text(file_system, file_path)
"""
filename = os.path.basename(file_path)
m = re.match(TRANSECT_FILE_REGEX, filename)
if m is None:
transect_num = default_transect
else:
transect_num = str(m.groupdict()["transect_num"])

transect_num = _get_group_name(filename=filename, group_regex=group_regex, default_transect=default_transect)

transect_dict = {}

with file_system.open(file_path) as txtfile:
file_list = txtfile.readlines()
for rawfile in file_list:
transect_dict.setdefault(
rawfile.decode("utf-8").split("\n")[0].split("\r")[0],
{"filename": filename, "num": transect_num},
)
for line_bytes in txtfile:
line = line_bytes.decode("utf-8").strip("\r\n")
transect_dict.setdefault(line, {"filename": filename, "num": transect_num})

return transect_dict

Expand Down Expand Up @@ -364,26 +380,37 @@ def parse_raw_paths(all_raw_files: List[str], config: Dataset) -> List[Dict[Any,
sonar_model = config.sonar_model
fname_pattern = config.raw_regex
transect_dict = {}

if config.args.group is not None and config.args.group.file is not None:
default_transect = str(config.args.group_name) if config.args.group_name else None

if config.args.group and config.args.group.file:

# When transect info is available, extract it
file_input = config.args.group.file
storage_options = config.args.group.storage_options_dict
default_transect = str(config.args.group_name if config.args.group_name else "DefaultGroup")
storage_options = config.args.group.storage_options_dict
group_regex = config.args.group.grouping_regex

if isinstance(file_input, str):
filename = os.path.basename(file_input)
_, ext = os.path.splitext(filename)
transect_dict = extract_transect_files(
ext.strip("."), file_input, storage_options, default_transect)
transect_dict = extract_transect_files(file_format=ext.strip("."),
file_path=file_input,
storage_options=storage_options,
group_regex=group_regex,
default_transect=default_transect)
else:
transect_dict = {}
for f in file_input:
filename = os.path.basename(f)
_, ext = os.path.splitext(filename)
result = extract_transect_files(
ext.strip("."), f, storage_options, default_transect)
_, ext = os.path.splitext(filename)
result = extract_transect_files(file_format=ext.strip("."),
file_path=f,
storage_options=storage_options,
group_regex=group_regex,
default_transect=default_transect)
transect_dict.update(result)

else:
default_transect = "DefaultGroup"

raw_file_dicts = []
for raw_file in all_raw_files:
# get transect info from the transect_dict above
Expand Down

0 comments on commit 4e686be

Please sign in to comment.