Skip to content

Commit

Permalink
all parsing tests passing with new data model
Browse files Browse the repository at this point in the history
  • Loading branch information
alisterburt committed Sep 20, 2023
1 parent 9be107a commit f2cd72c
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 297 deletions.
6 changes: 3 additions & 3 deletions starfile/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ def read(filename: PathLike, read_n_blocks: int = None, always_dict: bool = Fals
default behaviour in the case of only one data block being present in the STAR file is to
return only a dataframe, this can be changed by setting 'always_dict=True'
"""
star = StarParser(filename, read_n_blocks=read_n_blocks)
if len(star.dataframes) == 1 and always_dict is False:
star = StarParser(filename, n_blocks_to_read=read_n_blocks)
if len(star.data_blocks) == 1 and always_dict is False:
return star.first_dataframe
else:
return star.dataframes
return star.data_blocks


def write(data: Union[pd.DataFrame, Dict[str, pd.DataFrame], List[pd.DataFrame]],
Expand Down
300 changes: 114 additions & 186 deletions starfile/parser.py
Original file line number Diff line number Diff line change
@@ -1,217 +1,145 @@
from __future__ import annotations

from collections import OrderedDict
from collections import deque
from io import StringIO
from linecache import getline

import numpy as np
import pandas as pd
from pathlib import Path
from typing import TYPE_CHECKING, List, Union, Optional

from .utils import TextBuffer, TextCrawler
from typing import TYPE_CHECKING, Union, Optional, Dict, TypeAlias, Tuple

if TYPE_CHECKING:
from os import PathLike

DataBlock: TypeAlias = Union[
pd.DataFrame,
Dict[str, Union[str, int, float]]
]


class StarParser:
def __init__(self, filename: PathLike, read_n_blocks: Optional[int] = None):
filename: Path
n_lines_in_file: int
n_blocks_to_read: int
current_line_number: int
data_blocks: Dict[DataBlock]

def __init__(self, filename: PathLike, n_blocks_to_read: Optional[int] = None):
# set filename, with path checking
filename = Path(filename)
if not filename.exists():
raise FileNotFoundError(filename)
self.filename = filename

# initialise attributes for parsing
self.text_buffer = TextBuffer()
self.crawler = TextCrawler(self.filename)
self.read_n_blocks = read_n_blocks
self._dataframes = OrderedDict()
self._current_dataframe_index = 0
self._initialise_n_lines()
# setup for parsing
self.data_blocks = {}
self.n_lines_in_file = count_lines(self.filename)
self.n_blocks_to_read = n_blocks_to_read

# parse file
self.current_line_number = 0
self.parse_file()

def parse_file(self):
while self.crawler.current_line_number <= self.n_lines:
if len(self.dataframes) == self.read_n_blocks:
break

elif self.crawler.current_line.startswith('data_'):
self._parse_data_block()

if not self.crawler.current_line.startswith('data_'):
self.crawler.increment_line_number()

self.dataframes_to_numeric()
return

def _parse_data_block(self):
self.current_block_name = self._block_name_from_current_line()

while self.crawler.current_line_number <= self.n_lines:
self.crawler.increment_line_number()
line = self.crawler.current_line

if line.startswith('loop_'):
self._parse_loop_block()
return

elif line.startswith(
'data_') or self.crawler.current_line_number == self.n_lines:
self._parse_simple_block_from_buffer()
return

self.text_buffer.add_line(line)
return

def _parse_simple_block_from_buffer(self):
data = self._clean_simple_block_in_buffer()

df = self._cleaned_simple_block_to_dataframe(data)
df.name = self._current_data_block_name
self._add_dataframe(df)

self.text_buffer.clear()

def _parse_loop_block(self):
self.crawler.increment_line_number()
header = self._parse_loop_header()
df = self._parse_loop_data()
if df is None:
df = pd.DataFrame({h: None for h in header}, index=[0])
df.columns = header
df.name = self._current_data_block_name
self._add_dataframe(df)
return

@property
def filename(self):
return self._filename
def current_line(self) -> str:
return getline(str(self.filename), self.current_line_number).strip()

@filename.setter
def filename(self, filename: Union[str, Path]):
filename = Path(filename)
if filename.exists():
self._filename = filename
else:
raise FileNotFoundError

@property
def n_lines(self):
return self._n_lines

def _initialise_n_lines(self):
self._n_lines = self.crawler.count_lines()

@property
def dataframes(self):
return self._dataframes

def _add_dataframe(self, df: pd.DataFrame):
key = self._get_dataframe_key(df)
self.dataframes[key] = df
self._increment_dataframe_index()

@property
def current_block_name(self):
return self._current_data_block_name

@current_block_name.setter
def current_block_name(self, name: str):
self._current_data_block_name = name

@property
def current_dataframe_index(self):
return self._current_dataframe_index

def _increment_dataframe_index(self):
self._current_dataframe_index += 1

def _get_dataframe_key(self, df):
name = df.name

if name == '' or isinstance(name, int) or name in self.dataframes.keys():
return self._current_dataframe_index
else:
return df.name

def _clean_simple_block_in_buffer(self):
clean_datablock = {}

for line in self.text_buffer.buffer:
if line == '' or line.startswith('#'):
continue

heading_name = self.heading_from_line(line)
value = line.split()[1]
clean_datablock[heading_name] = value

return clean_datablock

@staticmethod
def _cleaned_simple_block_to_dataframe(data: dict):
return pd.DataFrame(data, columns=data.keys(), index=[0])

def _parse_loop_header(self) -> List[str]:
self.text_buffer.clear()

while self.crawler.current_line.startswith('_'):
heading = self.heading_from_line(self.crawler.current_line)
self.text_buffer.add_line(heading)
self.crawler.increment_line_number()

return self.text_buffer.buffer

def _parse_loop_data(self) -> Union[pd.DataFrame, None]:
self.text_buffer.clear()

while self.crawler.current_line_number <= self.n_lines:
current_line = self.crawler.current_line
if current_line.startswith('data_'):
def parse_file(self):
while self.current_line_number <= self.n_lines_in_file:
if len(self.data_blocks) == self.n_blocks_to_read:
break
elif self.current_line.startswith('data_'):
block_name, block = self._parse_data_block()
self.data_blocks[block_name] = block
else:
self.current_line_number += 1

def _parse_data_block(self) -> Tuple[str, DataBlock]:
# current line starts with 'data_foo'
block_name = self.current_line[5:] # 'data_foo' -> 'foo'
self.current_line_number += 1

# iterate over file,
while self.current_line_number <= self.n_lines_in_file:
self.current_line_number += 1
if self.current_line.startswith('loop_'):
return block_name, self._parse_loop_block()
elif self.current_line.startswith('_'): # line is simple block
return block_name, self._parse_simple_block()

def _parse_simple_block(self) -> Dict[str, Union[str, int, float]]:
block = {}
while self.current_line_number <= self.n_lines_in_file:
if self.current_line.startswith('data'):
break
self.text_buffer.add_line(current_line)
self.crawler.increment_line_number()

# check whether the buffer is empty
if self.text_buffer.is_empty:
return None

df = pd.read_csv(
StringIO(self.text_buffer.as_str()),
delim_whitespace=True,
header=None,
comment='#'
)
elif self.current_line.startswith('_'): # '_foo bar'
k, v = self.current_line.split()
block[k[1:]] = numericise(v)
self.current_line_number += 1
return block

def _parse_loop_block(self) -> pd.DataFrame:
# parse loop header
loop_column_names = deque()
self.current_line_number += 1

while self.current_line.startswith('_'):
column_name = self.current_line.split()[0][1:]
loop_column_names.append(column_name)
self.current_line_number += 1

# now parse the loop block data
loop_data = deque()
while self.current_line_number <= self.n_lines_in_file:
if self.current_line.startswith('data_'):
break
loop_data.append(self.current_line)
self.current_line_number += 1
loop_data = '\n'.join(loop_data)
if loop_data[-2:] != '\n':
loop_data += '\n'

# put string data into a dataframe
if loop_data == '\n':
n_cols = len(loop_column_names)
df = pd.DataFrame(np.zeros(shape=(0, n_cols)))
else:
df = pd.read_csv(
StringIO(loop_data),
delim_whitespace=True,
header=None,
comment='#'
)
df = df.apply(pd.to_numeric, errors='ignore')
df.columns = loop_column_names
return df

def dataframes_to_numeric(self):
"""
Converts strings in dataframes into numerical values where possible

applying pd.to_numeric causes loss of 'name' attribute of DataFrame,
need to extract name and reapply inline
"""
for key, df in self.dataframes.items():
name = getattr(df, 'name', None)
self.dataframes[key] = df.apply(pd.to_numeric, errors='ignore')
if name is not None:
self.dataframes[key].name = name
def count_lines(file: Path) -> int:
with open(file, 'rb') as f:
return sum(1 for _ in f)

@staticmethod
def _block_name_from_line(line: str):
return line[5:]

def _block_name_from_current_line(self):
return self._block_name_from_line(self.crawler.current_line)
def block_name_from_line(line: str) -> str:
"""'data_general' -> 'general'"""
return line[5:]

@staticmethod
def heading_from_line(line: str):
return line.split()[0][1:]

@property
def first_dataframe(self):
return self.dataframe_at_index(0)
def heading_from_line(line: str) -> str:
"""'_rlnSpectralIndex #1' -> 'rlnSpectralIndex'."""
return line.split()[0][1:]

def dataframe_at_index(self, idx: int):
return self.dataframes_as_list()[idx]

def dataframes_as_list(self):
return list(self.dataframes.values())
def numericise(value: str) -> Union[str, int, float]:
try:
# Try to convert the string value to an integer
value = int(value)
except ValueError:
try:
# If it's not an integer, try to convert it to a float
value = float(value)
except ValueError:
# If it's not a float either, leave it as a string
value = value
return value
15 changes: 10 additions & 5 deletions starfile/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@


class StarWriter:
def __init__(self, dataframes: Union[pd.DataFrame, Dict[pd.DataFrame], List[pd.DataFrame]],
filename: PathLike, overwrite: bool = False, float_format: str = '%.6f',
def __init__(self, dataframes: Union[
pd.DataFrame, Dict[pd.DataFrame], List[pd.DataFrame]],
filename: PathLike, overwrite: bool = False,
float_format: str = '%.6f',
sep: str = '\t', na_rep: str = '<NA>', force_loop: bool = False):
self.overwrite = overwrite
self.filename = filename
Expand All @@ -39,15 +41,17 @@ def dataframes(self):
return self._dataframes

@dataframes.setter
def dataframes(self, dataframes: Union[pd.DataFrame, Dict[pd.DataFrame], List[pd.DataFrame]]):
def dataframes(self, dataframes: Union[
pd.DataFrame, Dict[pd.DataFrame], List[pd.DataFrame]]):
if isinstance(dataframes, pd.DataFrame):
self._dataframes = self.coerce_dataframe(dataframes)
elif isinstance(dataframes, dict):
self._dataframes = self.coerce_dict(dataframes)
elif isinstance(dataframes, list):
self._dataframes = self.coerce_list(dataframes)
else:
raise ValueError(f'Expected a DataFrame, Dict or List object, got {type(dataframes)}')
raise ValueError(
f'Expected a DataFrame, Dict or List object, got {type(dataframes)}')

@staticmethod
def coerce_dataframe(df: pd.DataFrame):
Expand Down Expand Up @@ -112,7 +116,8 @@ def write_star_file(self, filename: str = None):

def write_loopheader(self, df: pd.DataFrame):
self.buffer.add_line('loop_')
lines = [f'_{column_name} #{idx}' for idx, column_name in enumerate(df.columns, 1)]
lines = [f'_{column_name} #{idx}' for idx, column_name in
enumerate(df.columns, 1)]

for line in lines:
self.buffer.add_line(line)
Expand Down
Loading

0 comments on commit f2cd72c

Please sign in to comment.