Skip to content

Commit

Permalink
Turn process method into a generator to yield results obtained from…
Browse files Browse the repository at this point in the history
… the callback methods. (#1)
  • Loading branch information
git-steven authored Mar 19, 2024
1 parent 864648a commit 2781f42
Show file tree
Hide file tree
Showing 10 changed files with 283 additions and 235 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
.DS_Store
*.pem

.idea

dist/**/*

.virtualenvs
Expand Down
91 changes: 48 additions & 43 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,98 +12,103 @@ https://github.com/tangledpath/csv-batcher
## Documentation
https://tangledpath.github.io/csv-batcher/csv_batcher.html

## TODO
* Better integrate results from callbacks
* Maybe implement pooling with celery (for use in django apps, etc.), which can bring about [horizontal scaling]([url](https://en.wikipedia.org/wiki/Scalability#Horizontal_or_scale_out)).
## Further excercises
* Possibly implement pooling with celery (for use in django apps, etc.), which can bring about [horizontal scaling]([url](https://en.wikipedia.org/wiki/Scalability#Horizontal_or_scale_out)).

## Usage
Arguments sent to callback function can be controlled by
creating pooler with `callback_with` and the CallbackWith enum
values:

### As dataframe row
```python
from csv_batcher.csv_pooler import CSVPooler, CallbackWith
from csv_batcher.csv_pooler import CSVPooler, CallbackWith

# Callback function passed to pooler; accepts a dataframe row
# as a pandas Series (via apply)
def process_dataframe_row(row):
# Callback function passed to pooler; accepts a dataframe row
# as a pandas Series (via apply)
def process_dataframe_row(row):
return row.iloc[0]

pooler = CSVPooler(
pooler = CSVPooler(
"5mSalesRecords.csv",
process_dataframe_row,
callback_with=CallbackWith.DATAFRAME_ROW,
pool_size=16
)
pooler.process()
)
for processed_batch in pooler.process():
print(processed_batch)
```

### As dataframe
```python
from csv_batcher.csv_pooler import CSVPooler, CallbackWith
from csv_batcher.csv_pooler import CSVPooler, CallbackWith

# Used in DataFrame.apply:
def process_dataframe_row(row):
# Used from process_datafrom's apply:
def process_dataframe_row(row):
return row.iloc[0]

# Callback function passed to pooler; accepts a dataframe:
def process_dataframe(df):
# Callback function passed to pooler; accepts a dataframe:
def process_dataframe(df):
foo = df.apply(process_dataframe_row, axis=1)
# Or do something more complicated....
return len(df)

pooler = CSVPooler(
pooler = CSVPooler(
"5mSalesRecords.csv",
process_dataframe,
callback_with=CallbackWith.DATAFRAME,
pool_size=16
)
pooler.process()
)
for processed_batch in pooler.process():
print(processed_batch)
```

### As CSV filename
```python
from csv_batcher.csv_pooler import CSVPooler, CallbackWith
import pandas as pd
from csv_batcher.csv_pooler import CSVPooler, CallbackWith

def process_csv_filename(csv_chunk_filename):
# print("processing ", csv_chunk_filename)
df = pd.read_csv(csv_chunk_filename, skipinitialspace=True, index_col=None)
foo = df.apply(process_dataframe_row, axis=1)
return len(df)

def process_as_dataframe(df):
foo = df.apply(process_dataframe_row, axis=1)
return len(df)
# Used from process_csv_filename's apply:
def process_dataframe_row(row):
return row.iloc[0]

def process_dataframe_row(row):
return row.iloc[0]
def process_csv_filename(csv_chunk_filename):
# print("processing ", csv_chunk_filename)
df = pd.read_csv(csv_chunk_filename, skipinitialspace=True, index_col=None)
foo = df.apply(process_dataframe_row, axis=1)
return len(df)

pooler = CSVPooler(
pooler = CSVPooler(
"5mSalesRecords.csv",
process_dataframe,
callback_with=CallbackWith.CSV_FILENAME
process_csv_filename,
callback_with=CallbackWith.CSV_FILENAME,
chunk_lines=10000,
pool_size=16
)
pooler.process()
)
for processed_batch in pooler.process():
print(processed_batch)
```
## Development
### Linting
```bash
ruff check . # Find linting errors
ruff check . --fix # Auto-fix linting errors (where possible)
ruff check . # Find linting errors
ruff check . --fix # Auto-fix linting errors (where possible)
```

### Documentation
```
# Shows in browser
poetry run pdoc csv_batcher
# Generates to ./docs
poetry run pdoc csv_batcher -o ./docs
# Shows in browser
poetry run pdoc csv_batcher
# Generates to ./docs
poetry run pdoc csv_batcher -o ./docs
```

### Testing
```bash
clear; pytest
clear; pytest
```

### Publishing
`poetry publish --build -u __token__ -p $PYPI_TOKEN`
```bash
poetry publish --build -u __token__ -p $PYPI_TOKEN`
```
19 changes: 10 additions & 9 deletions csv_batcher/csv_pooler.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,26 +66,27 @@ def process(self):
csv_file_cnt = len(csv_splitter.csv_files())
logging.info(f"Pooling against {csv_file_cnt} files")
with Pool(self.pool_size) as p:
for result in p.imap(self._process_csv, csv_splitter.csv_files()):
processed_count += result
for result, count in p.imap(self._process_csv, csv_splitter.csv_files()):
yield(result)
processed_count += count
finally:
csv_splitter.cleanup()

logging.info(f"Processed {processed_count} rows from {csv_file_cnt} CSV Files")

def _process_csv(self, csv_chunk_filename):
if self.callback_with == CallbackWith.CSV_FILENAME:
self.process_fn(csv_chunk_filename)
result = self.process_fn(csv_chunk_filename)
with open(csv_chunk_filename) as f:
# Get total lines and subtract for header:
result = sum(1 for line in f) - 1
count = sum(1 for line in f) - 1
elif self.callback_with == CallbackWith.DATAFRAME:
df = pd.read_csv(csv_chunk_filename, skipinitialspace=True, index_col=None)
result = df.shape[0]
self.process_fn(df)
count = df.shape[0]
result = self.process_fn(df)
elif self.callback_with == CallbackWith.DATAFRAME_ROW:
df = pd.read_csv(csv_chunk_filename, skipinitialspace=True, index_col=None)
result = df.shape[0]
df.apply(self.process_fn, axis=1)
count = df.shape[0]
result = df.apply(self.process_fn, axis=1)

return result
return result, count
18 changes: 12 additions & 6 deletions csv_batcher/test_csv_pooler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import unittest
from csv_batcher.utils.time import time_and_log
from csv_batcher.csv_pooler import CSVPooler, CallbackWith
import pandas as pd
Expand All @@ -17,22 +16,29 @@ def __process_as_dataframe(df):
def test_big_file_as_csv():
with time_and_log("test_big_file_as_csv"):
pooler = CSVPooler("5mSalesRecords.csv", __process_csv_filename)
pooler.process()
for processed_batch in pooler.process():
assert isinstance(processed_batch, pd.Series)

def test_big_file_as_dataframe():
with time_and_log("test_big_file_as_dataframe"):
pooler = CSVPooler("5mSalesRecords.csv", __process_as_dataframe, callback_with=CallbackWith.DATAFRAME)
pooler.process()
for processed_batch in pooler.process():
assert isinstance(processed_batch, pd.Series)

def test_big_file_as_dataframe_rows():
with time_and_log("test_big_file_as_dataframe_rows"):
pooler = CSVPooler("5mSalesRecords.csv", __process_dataframe_row, callback_with=CallbackWith.DATAFRAME_ROW)
pooler.process()
for processed_batch in pooler.process():
assert isinstance(processed_batch, pd.Series)

def test_no_pooler():
with time_and_log("test_no_pooler"):
__process_csv_filename("5mSalesRecords.csv")


if __name__ == "__main__":
unittest.main()
if __name__ == '__main__':
test_big_file_as_csv()
test_big_file_as_dataframe()
test_big_file_as_dataframe_rows()
# test_migrator_idempotency()

Loading

0 comments on commit 2781f42

Please sign in to comment.