diff --git a/test/test_sra.sh b/test/test_sra.sh index acb5d8d0..0f454d80 100755 --- a/test/test_sra.sh +++ b/test/test_sra.sh @@ -15,7 +15,7 @@ echo "WD="$WD atlas init-public PRJEB20796 -w $WD -echo "Run Atlas" +echo "Dry run Atlas" atlas run qc -w $WD --dry-run $@ @@ -31,6 +31,8 @@ echo "WD="$WD atlas init-public ERR2213683 -w $WD +echo "Dry run Atlas" + atlas run qc -w $WD --dry-run $@ @@ -60,7 +62,7 @@ sed -i.bak '/ILLUMINA/d' $WD/RunInfo.csv echo "Continue public init" atlas init-public continue -w $WD -echo "Run Atlas" +echo "Dry run Atlas" atlas run qc -w $WD --dry-run $@ diff --git a/workflow/envs/kingfisher.yaml b/workflow/envs/kingfisher.yaml new file mode 100644 index 00000000..5ccef509 --- /dev/null +++ b/workflow/envs/kingfisher.yaml @@ -0,0 +1,6 @@ +channels: + - conda-forge + - bioconda + - defaults +dependencies: + - kingfisher=0.4 diff --git a/workflow/rules/sra.smk b/workflow/rules/sra.smk index 310be594..738af12d 100644 --- a/workflow/rules/sra.smk +++ b/workflow/rules/sra.smk @@ -3,97 +3,60 @@ wildcard_constraints: localrules: - prefetch, + kingfisher_get, + merge_runs_to_sample SRA_read_fractions = ["_1", "_2"] if PAIRED_END else [""] -SRA_SUBDIR_RUN = "SRA/Runs" +SRA_SUBDIR_RUN = Path("SRA/Runs") -rule prefetch: - output: - sra=temp(touch(SRA_SUBDIR_RUN + "/{sra_run}/{sra_run}_downloaded")), - # not givins sra file as output allows for continue from the same download - params: - outdir=SRA_SUBDIR_RUN, # prefetch creates file in subfolder with run name automatically - log: - "logs/SRAdownload/prefetch/{sra_run}.log", - benchmark: - "logs/benchmarks/SRAdownload/prefetch/{sra_run}.tsv" - threads: 1 - resources: - mem_mb=1000, - time_min=60 * int(config["runtime"]["simplejob"]), - internet_connection=1, - conda: - "%s/sra.yaml" % CONDAENV - shell: - " mkdir -p {params.outdir} 2> {log} " - " ; " - " prefetch " - " --output-directory {params.outdir} " - " -X 999999999 " - " --progress " - " --log-level info " - " {wildcards.sra_run} &>> {log} " - " ; " - " vdb-validate {params.outdir}/{wildcards.sra_run}/{wildcards.sra_run}.sra &>> {log} " +RunTable = None +def load_runtable(): + global RunTable + if RunTable is None: + from atlas.init import parse_sra + RunTable = parse_sra.load_and_validate_runinfo_table() + return RunTable +def get_run_ids_for_sample(wildcards): + RunTable = load_runtable() + from atlas.init import parse_sra + return parse_sra.get_run_ids_for_sample(RunTable, wildcards.sample) -rule extract_run: - input: - flag=rules.prefetch.output, + +rule kingfisher_get: output: - temp( - expand( - SRA_SUBDIR_RUN + "/{{sra_run}}/{{sra_run}}{fraction}.fastq.gz", - fraction=SRA_read_fractions, - ) - ), + #dir = temp(directory("Reads/tmp/runs/{sample}")), + flag = temp(touch("Reads/tmp/flags/{sample}.downloaded")), params: - outdir=os.path.abspath(SRA_SUBDIR_RUN + "/{sra_run}"), - sra_file=SRA_SUBDIR_RUN + "/{sra_run}/{sra_run}.sra", + run_ids = get_run_ids_for_sample, + download_methods="ena-ascp ena-ftp prefetch", + output_dir= lambda wc: SRA_SUBDIR_RUN / wc.sample, log: - "logs/SRAdownload/extract/{sra_run}.log", - benchmark: - "logs/benchmarks/SRAdownload/fasterqdump/{sra_run}.tsv" - threads: config["simplejob_threads"] + Path("log/download_reads/download/{sample}.log").resolve(), + threads: config['threads'], resources: - time_min=60 * int(config["runtime"]["simplejob"]), - mem_mb=1000, #default 100Mb + mem_mb=config['mem']*1000, + time_min=config["runtime"]["long"]*60, + ncbi_connection=1 conda: - "%s/sra.yaml" % CONDAENV + "../envs/kingfisher.yaml" shell: - " vdb-validate {params.sra_file} &>> {log} " - " ; " - " parallel-fastq-dump " - " --threads {threads} " - " --gzip --split-files " - " --outdir {params.outdir} " - " --tmpdir {resources.tmpdir} " - " --skip-technical --split-3 " - " -s {params.sra_file} &>> {log} " + " mkdir -p {params.output_dir} ; " + " cd {params.output_dir} " " ; " - " rm -f {params.sra_file} 2>> {log} " - - - + "kingfisher get --run-identifiers {params.run_ids} " + " --download-threads 2 --extraction-threads {threads} " + " --hide-download-progress " + " --output-format-possibilities 'fastq.gz' " + " --force --check-md5sums " + " --download-methods {params.download_methods} " + " -f fastq.gz &> {log}" - -RunTable = None -def get_run_fastq_for_sample(wildcards): - - from atlas.init.parse_sra import load_and_validate_runinfo_table, get_run_ids_for_sample - - # load RunTable if not already loaded - global RunTable - if RunTable is None: - - RunTable = load_and_validate_runinfo_table() - - run_ids = get_run_ids_for_sample(RunTable,wildcards.sample) +def get_run_fastq_for_sample(run_ids): ReadFiles = {} for fraction in SRA_read_fractions: @@ -103,7 +66,7 @@ def get_run_fastq_for_sample(wildcards): key = fraction ReadFiles[key] = expand( - SRA_SUBDIR_RUN + "/{sra_run}/{sra_run}{fraction}.fastq.gz", + str(SRA_SUBDIR_RUN / "{sample}/{sra_run}{fraction}.fastq.gz"), fraction=fraction, sra_run=run_ids, ) @@ -113,20 +76,39 @@ def get_run_fastq_for_sample(wildcards): rule merge_runs_to_sample: input: - unpack(get_run_fastq_for_sample), + flag= "Reads/tmp/flags/{sample}.downloaded" output: expand( "SRA/Samples/{{sample}}/{{sample}}{fraction}.fastq.gz", fraction=SRA_read_fractions, ), + params: + run_ids = get_run_ids_for_sample, threads: 1 run: + + # print(list( (SRA_SUBDIR_RUN / wildcards.sample).iterdir())) + from utils import io + for i, fraction in enumerate(SRA_read_fractions): - if fraction == "": - fraction = "se" - io.cat_files(input[fraction], output[i]) + + run_fastqs= expand( + SRA_SUBDIR_RUN / "{sample}/{sra_run}{fraction}.fastq.gz", + fraction=fraction, + sra_run=params.run_ids, + sample=wildcards.sample + ) + + assert all([Path(f).exists() for f in run_fastqs])," Not all fastq files exist. Expected: %s" % run_fastqs + + io.cat_files(run_fastqs, output[i]) + + + + + rule download_sra: