Skip to content

Commit

Permalink
Merge branch 'CW-1728' into 'dev'
Browse files Browse the repository at this point in the history
Use watchPath to enable streaming basecalling

Closes CW-1728

See merge request epi2melabs/workflows/wf-basecalling!14
  • Loading branch information
sarahjeeeze committed May 31, 2023
2 parents fa3ec6f + 7d32444 commit adc677a
Show file tree
Hide file tree
Showing 10 changed files with 435 additions and 68 deletions.
34 changes: 23 additions & 11 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,34 @@ docker-run:
# which options to apply as part of the rules block below
# NOTE There is a slightly cleaner way to define this matrix to include
# the variables, but it is broken when using long strings! See CW-756
tags:
- grid
- shell
parallel:
matrix:
- MATRIX_NAME: [
"dorado",
]
"dorado", "watch_path", "no_reference"
]
rules:
- when: never
# NOTE As we're overriding the rules block for the included docker-run
# we must redefine this CI_COMMIT_BRANCH rule to prevent docker-run
# being incorrectly scheduled for "detached merge request pipelines" etc.
#- if: ($CI_COMMIT_BRANCH == null || $CI_COMMIT_BRANCH == "dev-template")
# when: never
#- if: $MATRIX_NAME == "dorado"
# variables:
# NF_BEFORE_SCRIPT: "wget -qO demo_data.tar.gz https://ont-exd-int-s3-euwst1-epi2me-labs.s3.amazonaws.com/wf-basecalling/demo_data.tar.gz && tar -xzvf demo_data.tar.gz && cat demo_data/VERSION && rm demo_data.tar.gz"
# NF_WORKFLOW_OPTS: "--input demo_data/input --ref demo_data/GCA_000001405.15_GRCh38_no_alt_analysis_set.fasta --basecaller_cfg '/home/epi2melabs/[email protected]'"
# we must redefine this CI_COMMIT_BRANCH rule to prevent docker-run
# being incorrectly scheduled for "detached merge request pipelines" etc.
- if: ($CI_COMMIT_BRANCH == null || $CI_COMMIT_BRANCH == "dev-template")
when: never
- if: $MATRIX_NAME == "dorado"
variables:
NF_BEFORE_SCRIPT: "wget -qO demo_data.tar.gz https://ont-exd-int-s3-euwst1-epi2me-labs.s3.amazonaws.com/wf-basecalling/demo_data.tar.gz && tar -xzvf demo_data.tar.gz && cat demo_data/VERSION && rm demo_data.tar.gz"
NF_WORKFLOW_OPTS: "--input demo_data/input --ref demo_data/GCA_000001405.15_GRCh38_no_alt_analysis_set.fasta --basecaller_cfg [email protected] --basecaller_chunk_size 1"
NF_IGNORE_PROCESSES: "stopCondition"
- if: $MATRIX_NAME == "watch_path"
variables:
NF_BEFORE_SCRIPT: "wget -qO demo_data.tar.gz https://ont-exd-int-s3-euwst1-epi2me-labs.s3.amazonaws.com/wf-basecalling/demo_data.tar.gz && tar -xzvf demo_data.tar.gz && cat demo_data/VERSION && rm demo_data.tar.gz"
NF_WORKFLOW_OPTS: "--input demo_data/input --ref demo_data/GCA_000001405.15_GRCh38_no_alt_analysis_set.fasta --basecaller_cfg [email protected] --watch_path --read_limit 3000 --basecaller_chunk_size 1"
- if: $MATRIX_NAME == "no_reference"
variables:
NF_BEFORE_SCRIPT: "wget -qO demo_data.tar.gz https://ont-exd-int-s3-euwst1-epi2me-labs.s3.amazonaws.com/wf-basecalling/demo_data.tar.gz && tar -xzvf demo_data.tar.gz && cat demo_data/VERSION && rm demo_data.tar.gz"
NF_WORKFLOW_OPTS: "--input demo_data/input --basecaller_cfg [email protected] --basecaller_chunk_size 1"
NF_IGNORE_PROCESSES: "cram_cache,stopCondition"

aws-run:
rules:
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [v0.6.0]
### Added
- Ability to watch the input path and process files as they become available in real time.

## [v0.5.2]
### Added
- Configuration for running demo data in AWS
Expand Down
34 changes: 31 additions & 3 deletions basecalling.nf
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ workflow wf_dorado {
basecaller_model_path
remora_model_name
remora_model_path
watch_path
dorado_ext
main:

if (input_ref) {
Expand Down Expand Up @@ -123,10 +125,33 @@ workflow wf_dorado {
}

Integer chunk_idx = 0
pod5_chunks = Channel
String stop_filename = "STOP.${workflow.sessionId}.${params.dorado_ext}" // use the sessionId so resume works
existing_pod5_chunks = Channel
.fromPath(input_path + "**.${params.dorado_ext}", checkIfExists: true)
.buffer(size:params.basecaller_chunk_size, remainder:true)
.map { tuple(chunk_idx++, it) }
if (watch_path) {
// watch input path for more pod5s
if (params.read_limit) {
log.warn "Watching ${input_path} for new ${dorado_ext} files, until ${params.read_limit} reads have been observed."
log.warn "To stop this workflow early, create: ${input_path}/${stop_filename}"
}
else {
log.warn "Watching ${input_path} for new ${dorado_ext} files indefinitely."
log.warn "To stop this workflow create: ${input_path}/${stop_filename}"
}
watch_pod5_chunks = Channel
.watchPath("$input_path/**.${params.dorado_ext}")
.until{ it.name == stop_filename }
pod5_chunks = existing_pod5_chunks
.concat(watch_pod5_chunks)
.buffer(size:params.basecaller_chunk_size, remainder:true)
.map { tuple(chunk_idx++, it) }
} else {
pod5_chunks = existing_pod5_chunks
.buffer(size:params.basecaller_chunk_size, remainder:true)
.map { tuple(chunk_idx++, it) }
}


called_bams = dorado(
pod5_chunks,
tuple(basecaller_model_name, basecaller_model, basecaller_model_override),
Expand Down Expand Up @@ -156,7 +181,10 @@ workflow wf_dorado {
pass = merge_pass_calls(ref, crams.pass.collect(), "pass")
fail = merge_fail_calls(ref, crams.fail.collect(), "fail")
}

emit:
chunked_pass_crams = crams.pass
pass = pass
fail = fail

}
54 changes: 54 additions & 0 deletions bin/add_jsons.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/usr/bin/env python
"""Combine two JSONS, sum values by matching json keys."""

import argparse
import json
import os


def add_dicts(d1, d2):
"""Extend json, sum values."""
def sum_a(v1, v2):
if v2 is None:
return v1
try:
if isinstance(v1 + v2, int):
return v1 + v2
elif isinstance(v1 + v2, str):
return v1
except TypeError:
return add_dicts(v1, v2)
result = d2.copy()
result.update({k: sum_a(v, d2.get(k)) for k, v in d1.items()})
return result


def main(args):
"""Run the entry point."""
if os.stat(args.state).st_size == 0:
state = {}
else:
with open(args.state) as json_file:
state = json.load(json_file)
with open(args.new_file) as json_file:
new_file = json.load(json_file)
combined = add_dicts(state, new_file)
with open(args.output, "w") as outfile:
json.dump(combined, outfile)


def argparser():
"""Create argument parser."""
parser = argparse.ArgumentParser(
"add_jsons",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
add_help=False)
parser.add_argument("new_file")
parser.add_argument("state")
parser.add_argument("output")
return parser


if __name__ == "__main__":
args = argparser().parse_args()
main(args)
71 changes: 71 additions & 0 deletions bin/fastcat_histogram.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#!/usr/bin/env python
"""Histogram-json."""

import argparse
import json
import sys

import numpy as np
import pandas as pd


def histogram_counts(data, dmin=0, bin_width=100):
"""Histogram bins and counts."""
bins = np.arange(dmin, max(data) + bin_width, bin_width)
counts, _ = np.histogram(data, bins=bins)
# Note that there can be small differences with/without batch_size=1.
# https://numpy.org/doc/stable/reference/generated/numpy.histogram.html
# bins from =[1, 2, 3, 4] => First bin=[1, 2), last bin=[3, 4].
# i.e. in batch_size=1, the count will be in the last interval (both edges included)
# With more sequences, there can be different intervals and edge value can be moved
# to the next consecutive interval.
return bins.tolist(), counts.tolist()


def get_stats(seq_summary):
"""Get Stats Json."""
stats_json = {
"total_reads": len(seq_summary)}
if len(seq_summary) >= 1:
len_data = seq_summary['read_length']
len_bins, len_counts = histogram_counts(
len_data, dmin=0, bin_width=50)
stats_json["len"] = dict(list(zip(len_bins, len_counts)))

qual_data = seq_summary['mean_quality']
qual_bins, qual_counts = histogram_counts(
qual_data, dmin=0, bin_width=0.2)
stats_json["qual"] = dict(list(zip(qual_bins, qual_counts)))
else:
sys.stderr.write("WARNING: summary file was empty.\n")
stats_json["len"] = dict()
stats_json["qual"] = dict()
return stats_json


def main(args):
"""Run the entry point."""
df = pd.read_csv(
args.input, sep="\t",
usecols=['read_length', 'mean_quality'],
dtype={'read_length': int, 'mean_quality': float})
final = {args.sample_id: get_stats(df)}
with open(args.output, 'w') as fp:
json.dump(final, fp)


def argparser():
"""Argument parser for entrypoint."""
parser = argparse.ArgumentParser()
parser.add_argument(
"input", help="Read summary file.")
parser.add_argument(
"output", help="Output summary JSON.")
parser.add_argument(
"--sample_id", help="Sample name.")
return parser


if __name__ == "__main__":
parser = argparser()
main(parser.parse_args())
89 changes: 50 additions & 39 deletions bin/report.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,61 @@
#!/usr/bin/env python
"""Create workflow report."""

import argparse
import json

from aplanat.components import fastcat
from aplanat.components import simple as scomponents
from aplanat.report import WFReport
import pandas
import ezcharts as ezc
from ezcharts.components.ezchart import EZChart
from ezcharts.components.reports import labs
from ezcharts.layout.snippets import Grid
from ezcharts.layout.snippets import Tabs
import pandas as pd
import report_utils
from ezcharts.util import get_named_logger # noqa: ABS101


THEME = 'epi2melabs'

def main():

def main(args):
"""Run the entry point."""
logger = get_named_logger("Report")
report = labs.LabsReport(
"Basecalling report", "wf-basecalling-report",
args.params, args.versions)

if args.stats:
with report.add_section("Read summary", "Read summary"):
with open(args.stats[0]) as f:
datas = json.load(f)
tabs = Tabs()
total_reads = {}
for sample_id, data in sorted(datas.items()):
with tabs.add_tab(sample_id):
with Grid(columns=2):
EZChart(
report_utils.read_quality_plot(data), THEME)
EZChart(report_utils.read_length_plot(data), THEME)
total_reads[sample_id] = data['total_reads']
with tabs.add_tab('total'):
with Grid(columns=1): # total read counts per sample
df_stats = pd.DataFrame.from_dict(total_reads.items())
df_stats.columns = ['Sample_name', 'Number of reads']
plt = ezc.barplot(
data=df_stats, x='Sample_name', y='Number of reads')
plt.title = {"text": "Number of reads per sample."}
plt.tooltip = {'trigger': 'axis'}
EZChart(plt, THEME)

report.write(args.report)
logger.info(f"Report written to {args.report}.")


def argparser():
"""Argument parser for entrypoint."""
parser = argparse.ArgumentParser()
parser.add_argument("report", help="Report output file")
parser.add_argument("summaries", nargs='+', help="Read summary file.")
parser.add_argument(
"--metadata", default='metadata.json',
help="sample metadata")
"--stats", nargs='*', help="Fastcat per-read stats file(s).")
parser.add_argument(
"--versions", required=True,
help="directory containing CSVs containing name,version.")
Expand All @@ -30,36 +68,9 @@ def main():
parser.add_argument(
"--commit", default='unknown',
help="git commit of the executed workflow")
args = parser.parse_args()

report = WFReport(
"Workflow Template Sequencing report", "wf-template",
revision=args.revision, commit=args.commit)

with open(args.metadata) as metadata:
sample_details = [
{
'sample': d['sample_id'],
'type': d['type'],
'barcode': d['barcode']
} for d in json.load(metadata)
]

report.add_section(
section=fastcat.full_report(args.summaries))

section = report.add_section()
section.markdown('## Samples')
section.table(pandas.DataFrame(sample_details))

report.add_section(
section=scomponents.version_table(args.versions))
report.add_section(
section=scomponents.params_table(args.params))

# write report
report.write(args.report)
return parser


if __name__ == "__main__":
main()
parser = argparser()
main(parser.parse_args())
44 changes: 44 additions & 0 deletions bin/report_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/env python
"""Create tables for the report."""
from ezcharts.plots.distribution import histplot
import pandas as pd

# PLOTS

# The SeqSummary from ezcharts.components.fastcat cannot be used.
# It groups data into bins, but from the real time analysis output
# the input data is already grouped into bins.
# Use weights of histplot for y axis.


def read_quality_plot(seq_summary, min_qual=4, max_qual=30, title='Read quality'):
"""Create read quality summary plot."""
df = pd.DataFrame.from_dict(seq_summary['qual'].items())
df.columns = ['mean_quality', 'counts']
df['mean_quality'] = df['mean_quality'].astype('float')
plt = histplot(
data=df['mean_quality'],
bins=len(df),
weights=list(df['counts'])
)
plt.title = dict(text=title)
plt.xAxis.name = 'Quality score'
plt.xAxis.min, plt.xAxis.max = min_qual, max_qual
plt.yAxis.name = 'Number of reads'
return plt


def read_length_plot(seq_summary, title='Read length'):
"""Create a read length plot."""
df = pd.DataFrame.from_dict(seq_summary['len'].items())
df.columns = ['read_length', 'counts']
df['read_length'] = df['read_length'].astype('uint64')
df['read_length'] = df['read_length'] / 1000
plt = histplot(
data=df['read_length'],
bins=len(df),
weights=list(df['counts']))
plt.title = dict(text=title)
plt.xAxis.name = 'Read length / kb'
plt.yAxis.name = 'Number of reads'
return plt
Loading

0 comments on commit adc677a

Please sign in to comment.