Processes Part 2
Overview
Teaching: 30 min
Exercises: 10 minQuestions
How do I get data, files, and values, out of processes?
How do I handle grouped input and output?
How can I control when a process is executed?
How do I control resources, such as number of CPUs and memory, available to processes?
How do I save output/results from a process?
Objectives
Define outputs to a process.
Understand how to handle grouped input and output using the tuple qualifier.
Understand how to use conditionals to control process execution.
Use process directives to control execution of a process.
Use the
publishDir
directive to save result files to a directory.
Outputs
We have seen how to input data into a process; now we will see how to output files and values from a process.
The output
declaration block allows us to define the channels used by the process to send out the files and values produced.
An output block is not required, but if it is present it can contain one or more output declarations.
The output block follows the syntax shown below:
output:
<output qualifier> <output name>
<output qualifier> <output name>
...
Output values
Like the input, the type of output data is defined using type qualifiers.
The val
qualifier allows us to output a value defined in the script.
Because Nextflow processes can only communicate through channels, if we want to share a value input into one process as input to another process we would need to define that value in the output declaration block as shown in the following example:
//process_output_value.nf
nextflow.enable.dsl=2
process METHOD {
input:
val x
output:
val x
script:
"""
echo $x > method.txt
"""
}
// Both 'Channel' and 'channel' keywords work to generate channels.
// However, it is a good practice to be consistent through the whole pipeline development
methods_ch = channel.of('salmon', 'kallisto')
workflow {
METHOD(methods_ch)
// use the view operator to display contents of the channel
METHOD.out.view({ "Received: $it" })
}
[d8/fd3f9d] process > METHOD (2) [100%] 2 of 2 ✔
Received: salmon
Received: kallisto
Output files
If we want to capture a file instead of a value as output we can use the
path
qualifier that can capture one or more files produced by the process, over the specified channel.
//process_output_file.nf
nextflow.enable.dsl=2
methods_ch = channel.of('salmon', 'kallisto')
process METHOD {
input:
val x
output:
path 'method.txt'
"""
echo $x > method.txt
"""
}
workflow {
METHOD(methods_ch)
// use the view operator to display contents of the channel
METHOD.out.view({ "Received: $it" })
}
executor > local (2)
[13/85ecb6] process > METHOD (1) [100%] 2 of 2 ✔
Received: /Users/ggrimes2/Downloads/nf-training/work/ea/f8998abfdcbfc6038757a60806c00a/method.txt
Received: /Users/ggrimes2/Downloads/nf-training/work/13/85ecb69a9bc85370e749254646984d/method.txt
In the above example the process METHOD
creates a file named method.txt
in the work directory containing the method name.
Since a file parameter using the same name, method.txt
, is declared in the output block, when the task is completed that file is sent over the output channel.
A downstream operator
, such as .view
or a process
declaring the same channel as input will be able to receive it.
Multiple output files
When an output file name contains a *
or ?
metacharacter it is interpreted as a pattern match.
This allows us to capture multiple files into a list and output them as a one item channel.
For example, here we will capture the files fastqc.html
and directory fastqc.zip
produced as results from FastQC in the output channel.
//process_output_multiple.nf
nextflow.enable.dsl=2
process FASTQC {
input:
path read
output:
path "fqc_res/*"
script:
"""
mkdir fqc_res
fastqc $read -o fqc_res
"""
}
read_ch = channel.fromPath("data/yeast/reads/ref1*.fq.gz")
workflow {
FASTQC(read_ch)
FASTQC.out.view()
}
$ nextflow run process_output_multiple.nf
[3e/86de98] process > FASTQC (2) [100%] 2 of 2 ✔
[work/64/9de164199568800a4609c3af78cf71/fqc_res/ref1_2_fastqc.html, work/64/9de164199568800a4609c3af78cf71/fqc_res/ref1_2_fastqc.zip]
Analysis complete for ref1_2.fq.gz
[work/3e/86de9868ecf321702f2df0b8ccbfd3/fqc_res/ref1_1_fastqc.html, work/3e/86de9868ecf321702f2df0b8ccbfd3/fqc_res/ref1_1_fastqc.zip]
Analysis complete for ref1_1.fq.gz
Note: There are some caveats on glob pattern behaviour:
- Input files are not included in the list of possible matches.
- There is no information about the order of the matching files in the output channel.
- Glob pattern matches against both files and directories path.
- When a two stars pattern
**
is used to recurse through subdirectories, only file paths are matched i.e. directories are not included in the result list.
Output channels
Modify the nextflow script
process_exercise_output.nf
to include an output block that captures the different index foldersindex_$kmer
. Use theview
operator on the output channel.//process_exercise_output.nf nextflow.enable.dsl=2 process INDEX { input: path transcriptome each kmer //add output block here to capture index folders "index_$kmer" script: """ salmon index -t $transcriptome -i index_$kmer -k $kmer """ } transcriptome_ch = channel.fromPath('data/yeast/transcriptome/Saccharomyces_cerevisiae.R64-1-1.cdna.all.fa.gz') kmer_ch = channel.of(21, 27, 31) workflow { INDEX(transcriptome_ch, kmer_ch) INDEX.out.view() }
Solution
//process_exercise_output_answer.nf nextflow.enable.dsl=2 process INDEX { input: path transcriptome each kmer output: path "index_${kmer}" script: """ salmon index -t $transcriptome -i index_$kmer -k $kmer """ } transcriptome_ch = channel.fromPath('data/yeast/transcriptome/Saccharomyces_cerevisiae.R64-1-1.cdna.all.fa.gz') kmer_ch = channel.of(21, 27, 31) workflow { INDEX(transcriptome_ch, kmer_ch) INDEX.out.view() }
Grouped inputs and outputs
So far we have seen how to declare multiple input and output channels, but each channel was handling only one value at time. However Nextflow can handle groups of values using the tuple
qualifiers.
[value1, file1, file2]
[file1, file2, [value1, value2]]
In case the channel is created with .filesFromPairs
factory method, the first item of the tuple will be a grouping value/key and the second item will be a list of files.
[group_key,[file1,file2,...]]
The corresponding input declaration must be declared with a tuple
qualifier, followed by definition of each item in the tuple.
//process_tuple_input.nf
nextflow.enable.dsl=2
process TUPLEINPUT{
input:
tuple val(sample_id), path(reads)
script:
"""
echo $sample_id
echo $reads
"""
}
reads_ch = Channel.fromFilePairs('data/yeast/reads/ref1_{1,2}.fq.gz')
workflow {
TUPLEINPUT(reads_ch)
}
outputs
ref1
ref1_1.fq.gz ref1_2.fq.gz
In the same manner an output channel containing tuple of values can be declared using the tuple
qualifier following by the definition of each tuple element in the tuple.
In the code snippet below the output channel would contain a tuple with the grouping key value as the Nextflow variable sample_id
and a list containing the files matching the following pattern "*FP*.fq.gz"
.
output:
tuple val(sample_id), path("*FP*.fq.gz")
An example can be seen in this script below.
//process_tuple_io_fastp.nf
nextflow.enable.dsl=2
process FASTP {
input:
tuple val(sample_id), path(reads)
output:
tuple val(sample_id), path("*FP*.fq.gz")
script:
"""
fastp \
-i ${reads[0]} \
-I ${reads[1]} \
-o ${sample_id}_FP_R1.fq.gz \
-O ${sample_id}_FP_R2.fq.gz
"""
}
reads_ch = Channel.fromFilePairs('data/yeast/reads/ref1_{1,2}.fq.gz')
workflow {
FASTP(reads_ch)
FASTP.out.view()
}
nextflow run process_tuple_io_fastp.nf
The output is now a tuple containing the sample id and the two processed fastq files.
[ref1, [work/9b/fcca75db83718a15f7a95caabfbc15/ref1_FP_R1.fq.gz, work/9b/fcca75db83718a15f7a95caabfbc15/ref1_FP_R2.fq.gz]]
Composite inputs and outputs
Fill in the blank ___ input and output qualifers for
process_exercise_tuple.nf
. Note: the output for the FASTQC process is a directory namedfastqc_out
.//process_exercise_tuple.nf nextflow.enable.dsl=2 process FASTQC { input: tuple ___(sample_id), ___(reads) output: tuple ___(sample_id), ___("fastqc_out") script: """ mkdir fastqc_out fastqc $reads -o fastqc_out -t 1 """ } reads_ch = Channel.fromFilePairs('data/yeast/reads/ref*_{1,2}.fq.gz') workflow{ FASTQC(reads_ch) FASTQC.out.view() }
Solution
//process_exercise_tuple_answer.nf nextflow.enable.dsl=2 process FASTQC { input: tuple val(sample_id), path(reads) output: tuple val(sample_id), path("fastqc_out") script: """ mkdir fastqc_out fastqc $reads -o fastqc_out -t 1 """ } reads_ch = Channel.fromFilePairs('data/yeast/reads/ref*_{1,2}.fq.gz') workflow{ FASTQC(reads_ch) FASTQC.out.view() }
N E X T F L O W ~ version 21.04.0 Launching `process_exercise_tuple.nf` [spontaneous_coulomb] - revision: 06ff22f1a9 executor > local (3) [75/f4a44d] process > FASTQC (3) [100%] 3 of 3 ✔ [ref3, work/99/a7d9176e332fdc0988973dbb89df63/fastqc_out] [ref2, /work/53/e3cbd39afa9f0f84a3d9cd060d991a/fastqc_out] [ref1, work/75/f4a44d0bc761fa4774c2f23a465766/fastqc_out]
Conditional execution of a process
The when
declaration allows you to define a condition that must be verified in order to execute the process. This can be any expression that evaluates a boolean value; true
or false
.
It is useful to enable/disable the process execution depending on the state of various inputs and parameters.
In the example below the process CONDITIONAL
will only execute when the value of the chr
variable is less than or equal to 5:
//process_when.nf
nextflow.enable.dsl=2
process CONDITIONAL {
input:
val chr
when:
chr <= 5
script:
"""
echo $chr
"""
}
chr_ch = channel.of(1..22)
workflow {
CONDITIONAL(chr_ch)
}
4
5
2
3
1
Directives
Directive declarations allow the definition of optional settings, like the number of cpus
and amount of memory
, that affect the execution of the current process without affecting the task itself.
They must be entered at the top of the process body, before any other declaration blocks (i.e. input
, output
, etc).
Note: You do not use =
when assigning a value to a directive.
Directives are commonly used to define the amount of computing resources to be used or extra information for configuration or logging purpose.
For example:
//process_directive.nf
nextflow.enable.dsl=2
process PRINTCHR {
tag "tagging with chr$chr"
cpus 1
echo true
input:
val chr
script:
"""
echo processing chromosome: $chr
echo number of cpus $task.cpus
"""
}
chr_ch = channel.of(1..22, 'X', 'Y')
workflow {
PRINTCHR(chr_ch)
}
processing chromosome: 1
number of cpus 1
processing chromosome: 2
number of cpus 1
processing chromosome: 6
number of cpus 1
[..truncated..]
The above process uses the three directives, tag
, cpus
and echo
.
The tag
directive to allow you to give a custom tag to each process execution. This tag makes it easier to identify a particular task (executed instance of a process) in a log file or in the execution report.
The second directive cpus
allows you to define the number of CPUs required for each task.
The third directive echo true
prints the stdout to the terminal.
We use the Nextflow task.cpus
variable to capture the number of cpus assigned to a task. This is frequently used to specify the number of threads in a multi-threaded command in the script block.
Another commonly used directive is memory specification: memory
.
A complete list of directives is available at this link.
Adding directives
Modify the Nextflow script
process_exercise_directives.nf
- Add a
tag
directive logging the sample_id in the execution output.- Add a
cpus
directive to specify the number of cpus as 2.- Change the fastqc
-t
option value to$task.cpus
in the script directive.//process_exercise_directives.nf nextflow.enable.dsl=2 process FASTQC { //add tag directive //add cpu directive input: tuple val(sample_id), path(reads) output: tuple val(sample_id), path("fastqc_out") script: """ mkdir fastqc_out fastqc $reads -o fastqc_out -t 1 """ } read_pairs_ch = Channel.fromFilePairs('data/yeast/reads/ref*_{1,2}.fq.gz') workflow { FASTQC(read_pairs_ch) FASTQC.out.view() }
solution
//process_exercise_directives_answers.nf nextflow.enable.dsl=2 process FASTQC { tag "$sample_id" cpus 2 input: tuple val(sample_id), path(reads) output: tuple val(sample_id), path("fastqc_out") script: """ mkdir fastqc_out fastqc $reads -o fastqc_out -t $task.cpus """ } read_pairs_ch = Channel.fromFilePairs('data/yeast/reads/ref*_{1,2}.fq.gz') workflow { FASTQC(read_pairs_ch) FASTQC.out.view() }
N E X T F L O W ~ version 21.04.0 Launching `process_exercise_directives.nf` [sad_rosalind] - revision: 2ccbfa4937 executor > local (3) [90/de1125] process > FASTQC (ref1) [100%] 3 of 3 ✔ [ref2, work/ea/9e6a341b88caf8879e8d18b77049c8/fastqc_out] [ref3, work/94/d059b816a9ec3d868f2924c26813e7/fastqc_out] [ref1, work/90/de11251d362f494d6650789d9f8c1d/fastqc_out]
Organising outputs
PublishDir directive
Nextflow manages intermediate results from the pipeline’s expected outputs independently.
Files created by a process
are stored in a task specific working directory which is considered as temporary. Normally this is under the work
directory, which can be deleted upon completion.
The files you want the workflow to return as results need to be defined in the process
output
block and then the output directory specified using the directive
publishDir
. More information here.
Note: A common mistake is to specify an output directory in the publishDir
directive while forgetting to specify the files you want to include in the output
block.
publishDir <directory>, parameter: value, parameter2: value ...
For example if we want to capture the results of the QUANT
process in a results/quant
output directory we
need to define the files in the output
and specify the location of the results directory in the publishDir
directive:
//process_publishDir.nf
nextflow.enable.dsl=2
process QUANT {
publishDir "results/quant"
input:
tuple val(sample_id), path(reads)
each index
output:
tuple val(sample_id), path("${sample_id}_salmon_output")
script:
"""
salmon quant -i $index \
-l A \
-1 ${reads[0]} \
-2 ${reads[1]} \
-o ${sample_id}_salmon_output
"""
}
reads_ch = Channel.fromFilePairs('data/yeast/reads/ref1_{1,2}.fq.gz')
index_ch = Channel.fromPath('data/yeast/salmon_index')
workflow {
QUANT(reads_ch, index_ch)
QUANT.out.view()
}
$ nextflow run process_publishDir.nf
N E X T F L O W ~ version 21.04.0
Launching `process_publishDir.nf` [friendly_pauling] - revision: 9b5c315893
executor > local (1)
[48/f97234] process > QUANT (1) [100%] 1 of 1 ✔
[ref1, work/48/f97234d7185cbfbd86e2f11c1afab5/ref1_salmon_output]
We can use the UNIX command tree
to examine the contents of the results directory.
tree results
results/
└── quant
└── ref1_salmon_output -> work/48/f97234d7185cbfbd86e2f11c1afab5/ref1_salmon_output
In the above example, the publishDir "results/quant"
, creates a symbolic link ->
to the output files specified by the process salmon_quant
to the directory path results/quant
.
publishDir
The publishDir output is relative to the path the pipeline run has been launched. Hence, it is a good practice to use implicit variables like
projectDir
to specify publishDir value.
publishDir parameters
The publishDir
directive can take optional parameters, for example the mode
parameter can take the value "copy"
to specify that you wish to copy the file to output directory rather than just a symbolic link to the files in the working directory. Since the working directory is generally deleted on completion of a pipeline, it is safest to use mode: "copy"
for results files. The default mode (symlink) is helpful for checking intermediate files which are not needed in the long term.
publishDir "results/quant", mode: "copy"
Full list of the publishDir directive parameters is here.
Manage semantic sub-directories
You can use more than one publishDir
to keep different outputs in separate directories. To specify which files to put in which output directory use the parameter pattern
with the a glob pattern that selects which files to publish from the overall set of output files.
In the example below we will create an output folder structure in the directory results, which contains a separate sub-directory for bam files, pattern:"*.bam"
, and a salmon output directory, ${sample_id}_salmon_output"
. Remember, we need to specify the files we want to be copied as outputs.
//process_publishDir_semantic.nf
nextflow.enable.dsl=2
process QUANT {
publishDir "results/bams", pattern: "*.bam", mode: "copy"
publishDir "results/quant", pattern: "${sample_id}_salmon_output", mode: "copy"
input:
tuple val(sample_id), path(reads)
path index
output:
tuple val(sample_id), path("${sample_id}.bam")
path "${sample_id}_salmon_output"
script:
"""
salmon quant -i $index \
-l A \
-1 ${reads[0]} \
-2 ${reads[1]} \
-o ${sample_id}_salmon_output \
--writeMappings | samtools sort | samtools view -bS -o ${sample_id}.bam
"""
}
reads_ch = Channel.fromFilePairs('data/yeast/reads/ref1_{1,2}.fq.gz')
index_ch = Channel.fromPath('data/yeast/salmon_index')
workflow {
QUANT(reads_ch, index_ch)
}
$ nextflow run process_publishDir_semantic.nf
N E X T F L O W ~ version 21.04.0
Launching `process_publishDir_semantic.nf` [golden_poisson] - revision: 421a604840
executor > local (1)
[be/950786] process > QUANT (1) [100%] 1 of 1 ✔
We can now use the tree
command to examine the results directory.
$ tree results
results/
├── bams
│ └── ref1.bam
└── quant
└── ref1_salmon_output
├── aux_info
│ ├── ambig_info.tsv
│ ├── expected_bias.gz
│ ├── fld.gz
│ ├── meta_info.json
│ ├── observed_bias.gz
│ └── observed_bias_3p.gz
├── cmd_info.json
├── libParams
│ └── flenDist.txt
├── lib_format_counts.json
├── logs
│ └── salmon_quant.log
└── quant.sf
6 directories, 12 files
Publishing results
Add a
publishDir
directive to the nextflow scriptprocess_exercise_publishDir.nf
that copies the index directory to the results folder .//process_exercise_publishDir.nf nextflow.enable.dsl=2 process INDEX { //add publishDir directive here input: path transcriptome output: path "index" script: """ salmon index -t $transcriptome -i index """ } params.transcriptome = "data/yeast/transcriptome/Saccharomyces_cerevisiae.R64-1-1.cdna.all.fa.gz" transcriptome_ch = channel.fromPath(params.transcriptome, checkIfExists: true) workflow { INDEX(transcriptome_ch) }
Solution
//process_exercise_publishDir_answer.nf nextflow.enable.dsl=2 process INDEX { publishDir "results", mode: "copy" input: path transcriptome output: path "index" script: """ salmon index -t $transcriptome -i index """ } params.transcriptome = "data/yeast/transcriptome/Saccharomyces_cerevisiae.R64-1-1.cdna.all.fa.gz" transcriptome_ch = channel.fromPath(params.transcriptome, checkIfExists: true) workflow{ INDEX(transcriptome_ch) }
$ nextflow run process_exercise_publishDir.nf
N E X T F L O W ~ version 21.04.0 Launching `process_exercise_publishDir.nf` [infallible_becquerel] - revision: 4d865241a8 executor > local (1) [fe/79c042] process > INDEX (1) [100%] 1 of 1 ✔
Nextflow Patterns
If you want to find out common structures of Nextflow processes, the Nextflow Patterns page collects some recurrent implementation patterns used in Nextflow applications.
Key Points
Outputs to a process are defined using the output blocks.
You can group input and output data from a process using the tuple qualifer.
The execution of a process can be controlled using the
when
declaration and conditional statements.Files produced within a process and defined as
output
can be saved to a directory using thepublishDir
directive.