Processes
Overview
Teaching: 30 min
Exercises: 15 minQuestions
How do I run tasks/processes in Nextflow?
How do I get data, files and values, into a processes?
Objectives
Understand how Nextflow uses processes to execute tasks.
Create a Nextflow process.
Define inputs to a process.
Processes
We now know how to create and use Channels to send data around a workflow. We will now see how to run tasks within a workflow using processes.
A process
is the way Nextflow executes commands you would run on the command line or custom scripts.
A process can be thought of as a particular step in a workflow, e.g. an alignment step in RNA-seq analysis. Processes are independent of each other (don’t require any another process to execute) and can not communicate/write to each other. Data is passed between processes via input and output Channels.
For example, below is the command line you would run to create a index for the yeast transcriptome to be used with the salmon aligner:
$ salmon index -t data/yeast/transcriptome/Saccharomyces_cerevisiae.R64-1-1.cdna.all.fa.gz -i data/yeast/salmon_index --kmerLen 31
Now we will show how to convert this into a simple Nextflow process.
Process definition
The process definition starts with keyword process
, followed by process name, in this case INDEX
, and finally the process body
delimited by curly brackets {}
. The process body must contain a string which represents the command or, more generally, a script that is executed by it.
process INDEX {
script:
"salmon index -t ${projectDir}/data/yeast/transcriptome/Saccharomyces_cerevisiae.R64-1-1.cdna.all.fa.gz -i ${projectDir}/data/yeast/salmon_index --kmerLen 31"
}
This process would run once.
Implicit variables
We use the Nextflow implicit variable
${projectDir}
to specify the directory where the main script is located. This is important as Nextflow scripts are executed in a separate working directory. A full list of implicit variables can be found here
To add the process to a workflow add a workflow
block, and call the process like a function. We will learn more about the workflow
block in the workflow episode.
Note: As we are using DSL2 we need to include nextflow.enable.dsl=2
in the script.
//process_index.nf
nextflow.enable.dsl=2
process INDEX {
script:
"salmon index -t ${projectDir}/data/yeast/transcriptome/Saccharomyces_cerevisiae.R64-1-1.cdna.all.fa.gz -i ${projectDir}/data/yeast/salmon_index --kmerLen 31"
}
workflow {
//process is called like a function in the workflow block
INDEX()
}
We can now run the process:
$ nextflow run process_index.nf
N E X T F L O W ~ version 21.04.0
Launching `process.nf` [lethal_hamilton] - revision: eff6186015
executor > local (1)
[10/583af2] process > INDEX [100%] 1 of 1 ✔
A Simple Process
Create a Nextflow script
simple_process.nf
that has one processSALMON_VERSION
that runs the command.salmon --version
Solution
nextflow.enable.dsl=2 process SALMON_VERSION { script: """ salmon --version """ } workflow { SALMON_VERSION() }
Note We need to add the Nextflow run option
-process.echo
to print the output to the terminal.$ nextflow run simple_process.nf -process.echo
N E X T F L O W ~ version 21.04.0 Launching `process.nf` [prickly_gilbert] - revision: 471a79c65c executor > local (1) [56/5e6001] process > SALMON_VERSION [100%] 1 of 1 ✔ salmon 1.5.2
Definition blocks
The previous example was a simple process
with no defined inputs and outputs that ran only once. To control inputs, outputs and how a command is executed a process may contain five definition blocks:
- directives - 0, 1, or more: allow the definition of optional settings that affect the execution of the current process e.g. the number of cpus a task uses and the amount of memory allocated.
- inputs - 0, 1, or more: Define the input dependencies, usually channels, which determines the number of times a process is executed.
- outputs - 0, 1, or more: Defines the output channels used by the process to send results/data produced by the process.
- when clause - optional: Allows you to define a condition that must be verified in order to execute the process.
- script block - required: A statement within quotes that defines the commands that are executed by the process to carry out its task.
The syntax is defined as follows:
process < NAME > {
[ directives ]
input:
< process inputs >
output:
< process outputs >
when:
< condition >
[script|shell|exec]:
< user script to be executed >
}
Script
At minimum a process block must contain a script
block.
The script
block is a String “statement” that defines the command that is executed by the process to carry out its task. These are normally the commands you would run on a terminal.
A process contains only one script
block, and it must be the last statement when the process contains input
and output
declarations.
The script
block can be a simple one line string in quotes e.g.
nextflow.enable.dsl=2
process PROCESSBAM {
script:
"samtools sort -o ref1.sorted.bam ${projectDir}/data/yeast/bams/ref1.bam"
}
workflow {
PROCESSBAM()
}
Or, for commands that span multiple lines you can encase the command in triple quotes """
.
For example:
//process_multi_line.nf
nextflow.enable.dsl=2
process PROCESSBAM {
script:
"""
samtools sort -o ref1.sorted.bam ${projectDir}/data/yeast/bams/ref1.bam
samtools index ref1.sorted.bam
samtools flagstat ref1.sorted.bam
"""
}
workflow {
PROCESSBAM()
}
By default the process command is interpreted as a Bash script. However any other scripting language can be used just simply starting the script with the corresponding Shebang declaration. For example:
//process_python.nf
nextflow.enable.dsl=2
process PYSTUFF {
script:
"""
#!/usr/bin/env python
import gzip
reads = 0
bases = 0
with gzip.open('${projectDir}/data/yeast/reads/ref1_1.fq.gz', 'rb') as read:
for id in read:
seq = next(read)
reads += 1
bases += len(seq.strip())
next(read)
next(read)
print("reads", reads)
print("bases", bases)
"""
}
workflow {
PYSTUFF()
}
//process_rscript.nf
nextflow.enable.dsl=2
process RSTUFF {
script:
"""
#!/usr/bin/env Rscript
library("ShortRead")
countFastq(dirPath="data/yeast/reads/ref1_1.fq.gz")
"""
}
workflow {
RSTUFF()
}
This allows the use of a different programming languages which may better fit a particular job. However, for large chunks of code it is suggested to save them into separate files and invoke them from the process script.
// process_python_with_scriptfile.nf
process PYSTUFF {
script:
"""
myscript.py
"""
}
workflow {
PYSTUFF()
}
Associated scripts
Scripts such as the one in the example above,
myscript.py
, can be stored in abin
folder at the same directory level as the Nextflow workflow script that invokes them, and given execute permission. Nextflow will automatically add this folder to thePATH
environment variable. To invoke the script in a Nextflow process, simply use its filename on its own rather than invoking the interpreter e.g.myscript.py
instead ofpython myscript.py
.
Script parameters
The command in the script
block can be defined dynamically using Nextflow variables e.g. ${projectDir}
.
To reference a variable in the script block you can use the $
in front of the Nextflow variable name, and additionally you can add {}
around the variable name e.g. ${projectDir}
.
Variable substitutions
Similar to bash scripting Nextflow uses the “$” character to introduce variable substitutions. The variable name to be expanded may be enclosed in braces
{variable_name}
, which are optional but serve to protect the variable to be expanded from characters immediately following it which could be interpreted as part of the name. It is a good rule of thumb to always use the{}
syntax.
In the example below the variable kmer
is set to the value 31 at the top of the Nextflow script.
The variable is referenced using the $kmer
syntax within the multi-line string statement in the script
block.
A Nextflow variable can be used multiple times in the script block.
//process_script.nf
nextflow.enable.dsl=2
kmer = 31
process INDEX {
script:
"""
salmon index \
-t $projectDir/data/yeast/transcriptome/Saccharomyces_cerevisiae.R64-1-1.cdna.all.fa.gz \
-i index \
--kmer $kmer
echo "kmer size is $kmer"
"""
}
workflow {
INDEX()
}
In most cases we do not want to hard code parameter values. We saw in the parameter episode the use of a special Nextflow variable params
that can be used to assign values from the command line. You would do this by adding a key name to the params variable and specifying a value, like params.keyname = value
In the example below we define the variable params.kmer
with a default value of 31 in the Nextflow script.
//process_script_params.nf
nextflow.enable.dsl=2
params.kmer = 31
process INDEX {
script:
"""
salmon index \
-t $projectDir/data/yeast/transcriptome/Saccharomyces_cerevisiae.R64-1-1.cdna.all.fa.gz \
-i index \
--kmer $params.kmer
echo "kmer size is $params.kmer"
"""
}
workflow {
INDEX()
}
Remember, we can change the default value of kmer
to 11 by running the Nextflow script using the command below. Note: parameters to the workflow have two hyphens --
.
nextflow run process_script_params.nf --kmer 11
Script parameters
For the Nextflow script below.
//process_script_params.nf nextflow.enable.dsl=2 params.kmer = 31 process INDEX { script: """ salmon index \ -t $projectDir/data/yeast/transcriptome/Saccharomyces_cerevisiae.R64-1-1.cdna.all.fa.gz \ -i index \ --kmer $params.kmer echo "kmer size is $params.kmer" """ } workflow { INDEX() }
Run the pipeline using a kmer value of
27
using the--kmer
command line option.$ nextflow run process_script_params.nf --kmer <some value> -process.echo
Note: The Nextflow option
-process.echo
will print the process’ stdout to the terminal.Solution
nextflow run process_script_params.nf --kmer 27 -process.echo
N E X T F L O W ~ version 21.04.0 Launching `juggle_processes.nf` [nostalgic_jones] - revision: 9feb8de4fe executor > local (1) [92/cdc9de] process > INDEX [100%] 1 of 1 ✔ Threads = 2 Vertex length = 27 [...Truncated...] kmer size is 27
Bash variables
Nextflow uses the same Bash syntax for variable substitutions, $variable
, in strings.
However, Bash variables need to be escaped using \
character in front of \$variable
name.
In the example below we will set the bash variable KMERSIZE
to the value of $params.kmer
, and then use KMERSIZE
in our script block.
//process_escape_bash.nf
nextflow.enable.dsl=2
process INDEX {
script:
"""
#set bash variable KMERSIZE
KMERSIZE=$params.kmer
salmon index -t $projectDir/data/yeast/transcriptome/Saccharomyces_cerevisiae.R64-1-1.cdna.all.fa.gz -i index --kmer \$KMERSIZE
echo "kmer size is $params.kmer"
"""
}
params.kmer = 31
workflow {
INDEX()
}
Shell
Another alternative is to use a shell
block definition instead of script
.
When using the shell
statement Bash variables are referenced in the normal way $my_bash_variable
;
However, the shell
statement uses a different syntax for Nextflow variable substitutions: !{nextflow_variable}
, which is needed to use both Nextflow and Bash variables in the same script.
For example in the script below that uses the shell
statement
we reference the Nextflow variables as !{projectDir}
and !{params.kmer}
, and the Bash variable as ${KMERSIZE}
.
//process_shell.nf
nextflow.enable.dsl=2
params.kmer = 31
process INDEX {
shell:
'''
#set bash variable KMERSIZE
KMERSIZE=!{params.kmer}
salmon index -t !{projectDir}/data/yeast/transcriptome/Saccharomyces_cerevisiae.R64-1-1.cdna.all.fa.gz -i index --kmer ${KMERSIZE}
echo "kmer size is !{params.kmer}"
'''
}
workflow {
INDEX()
}
Conditional script execution
Sometimes you want to change how a process is run depending on some condition. In Nextflow scripts we can use conditional statements such as the if
statement or any other expression evaluating to boolean value true
or false
.
If statement
The if
statement uses the same syntax common to other programming languages such Java, C, JavaScript, etc.
if( < boolean expression > ) {
// true branch
}
else if ( < boolean expression > ) {
// true branch
}
else {
// false branch
}
For example, the Nextflow script below will use the if
statement to change which index is created depending on the Nextflow variable params.aligner
.
//process_conditional.nf
nextflow.enable.dsl=2
params.aligner = 'kallisto'
params.transcriptome = "$projectDir/data/yeast/transcriptome/Saccharomyces_cerevisiae.R64-1-1.cdna.all.fa.gz"
params.kmer = 31
process INDEX {
script:
if( params.aligner == 'kallisto' ) {
"""
echo indexed using kallisto
kallisto index -i index -k $params.kmer $params.transcriptome
"""
}
else if( params.aligner == 'salmon' ) {
"""
echo indexed using salmon
salmon index -t $params.transcriptome -i index --kmer $params.kmer
"""
}
else {
"""
echo Unknown aligner $params.aligner
"""
}
}
workflow {
INDEX()
}
nextflow run process_conditional.nf -process.echo --aligner kallisto
N E X T F L O W ~ version 21.04.0
Launching `juggle_processes.nf` [cheeky_shirley] - revision: 588f20ae5a
executor > local (1)
[84/c44f25] process > INDEX [100%] 1 of 1 ✔
indexed using kallisto
Inputs
Processes are isolated from each other but can communicate by sending values and files via Nextflow channels from input
and into output
blocks.
The input
block defines which channels the process is expecting to receive input from.
The number of elements in input channels determines the process dependencies and the number of times a process executes.
You can only define one input block at a time and it must contain one or more input declarations.
The input block follows the syntax shown below:
input:
<input qualifier> <input name>
The input qualifier declares the type of data to be received.
Input qualifiers
val
: Lets you access the received input value by its name as a variable in the process script.env
: Lets you use the input value to set an environment variable named as the specified input name.path
: Lets you handle the received value as a file, staging the file properly in the execution context.stdin
: Lets you forward the received value to the process stdin special file.tuple
: Lets you handle a group of input values having one of the above qualifiers.each
: Lets you execute the process for each entry in the input collection. A complete list of inputs can be found here.
Input values
The val
qualifier allows you to receive value data as input. It can be accessed in the process script by using the specified input name, as shown in the following example:
//process_input_value.nf
nextflow.enable.dsl=2
process PRINTCHR {
input:
val chr
script:
"""
echo processing chromosome $chr
"""
}
chr_ch = Channel.of( 1..22, 'X', 'Y' )
workflow {
PRINTCHR(chr_ch)
}
$ nextflow run process_input_value.nf -process.echo
N E X T F L O W ~ version 21.04.0
Launching `juggle_processes.nf` [wise_kalman] - revision: 7f90e1bfc5
executor > local (24)
[b1/88df3f] process > PRINTCHR (24) [100%] 24 of 24 ✔
processing chromosome 3
processing chromosome 1
processing chromosome 2
..truncated...
In the above example the process is executed 24 times; each time a value is received from the queue channel chr_ch
it is used to run the process.
Channel order
The channel guarantees that items are delivered in the same order as they have been sent, but since the process is executed in a parallel manner, there is no guarantee that they are processed in the same order as they are received.
Input files
When you need to handle files as input you need the path
qualifier. Using the path
qualifier means that Nextflow will stage it in the process execution directory, and it can be accessed in the script by using the name specified in the input declaration.
The input file name can be defined dynamically by defining the input name as a Nextflow variable and referenced in the script using the $variable_name
syntax.
For example in the script below we assign the variable name read
to the input files using the path
qualifier. The file is referenced using the variable substitution syntax ${read}
in the script block:
//process_input_file.nf
nextflow.enable.dsl=2
process NUMLINES {
input:
path read
script:
"""
printf '${read} '
gunzip -c ${read} | wc -l
"""
}
reads_ch = Channel.fromPath( 'data/yeast/reads/ref*.fq.gz' )
workflow {
NUMLINES(reads_ch)
}
$ nextflow run process_input_file.nf -process.echo
[cd/77af6d] process > NUMLINES (1) [100%] 6 of 6 ✔
ref1_1.fq.gz 58708
ref3_2.fq.gz 52592
ref2_2.fq.gz 81720
ref2_1.fq.gz 81720
ref3_1.fq.gz 52592
ref1_2.fq.gz 58708
The input name can also be defined as user specified filename inside quotes.
For example in the script below the name of the file is specified as 'sample.fq.gz'
in the input definition and can be referenced by that name in the script block.
//process_input_file_02.nf
nextflow.enable.dsl=2
process NUMLINES {
input:
path 'sample.fq.gz'
script:
"""
printf 'sample.fq.gz '
gunzip -c sample.fq.gz | wc -l
"""
}
reads_ch = Channel.fromPath( 'data/yeast/reads/ref*.fq.gz' )
workflow {
NUMLINES(reads_ch)
}
$ nextflow run process_input_file_02.nf -process.echo
[d2/eb0e9d] process > NUMLINES (1) [100%] 6 of 6 ✔
sample.fq.gz 58708
sample.fq.gz 52592
sample.fq.gz 81720
sample.fq.gz 81720
sample.fq.gz 52592
sample.fq.gz 58708
File Objects as inputs
When a process declares an input file the corresponding channel elements must be file objects i.e. created with the path helper function from the file specific channel factories e.g.
Channel.fromPath
orChannel.fromFilePairs
.
Add input channel
Add an input channel to the script below that takes the reads channel as input. FastQC is a quality control tool for high throughput sequence data.
//process_exercise_input.nf nextflow.enable.dsl=2 process FASTQC { //add input channel script: """ mkdir fastqc_out fastqc -o fastqc_out ${reads} ls -1 fastqc_out """ } reads_ch = Channel.fromPath( 'data/yeast/reads/ref1*_{1,2}.fq.gz' ) workflow { FASTQC(reads_ch) }
Then run your script using
nextflow run process_exercise_input.nf -process.echo
Solution
//process_exercise_input_answer.nf nextflow.enable.dsl=2 process FASTQC { input: path reads script: """ mkdir fastqc_out fastqc -o fastqc_out ${reads} ls -1 fastqc_out """ } reads_ch = Channel.fromPath( 'data/yeast/reads/ref1*_{1,2}.fq.gz' ) workflow { FASTQC(reads_ch) }
N E X T F L O W ~ version 21.04.0 Launching `process_exercise_input_answer.nf` [jovial_wescoff] - revision: e3db00a4dc executor > local (2) [d9/559a27] process > FASTQC (2) [100%] 2 of 2 ✔ Analysis complete for ref1_1.fq.gz ref1_1_fastqc.html ref1_1_fastqc.zip Analysis complete for ref1_2.fq.gz ref1_2_fastqc.html ref1_2_fastqc.zip
Combining input channels
A key feature of processes is the ability to handle inputs from multiple channels. However it’s important to understand how the number of items within the multiple channels affect the execution of a process.
Consider the following example:
//process_combine.nf
nextflow.enable.dsl=2
process COMBINE {
input:
val x
val y
script:
"""
echo $x and $y
"""
}
num_ch = Channel.of(1, 2, 3)
letters_ch = Channel.of('a', 'b', 'c')
workflow {
COMBINE(num_ch, letters_ch)
}
$ nextflow run process_combine.nf -process.echo
Both channels contain three elements, therefore the process is executed three times, each time with a different pair:
2 and b
1 and a
3 and c
What is happening is that the process waits until it receives an input value from all the queue channels declared as input.
When this condition is verified, it uses up the input values coming from the respective queue channels, runs the task. This logic repeats until one or more queue channels have no more content. The process then stops.
What happens when not all channels have the same number of elements?
For example:
//process_combine_02.nf
nextflow.enable.dsl=2
process COMBINE {
input:
val x
val y
script:
"""
echo $x and $y
"""
}
ch_num = Channel.of(1, 2)
ch_letters = Channel.of('a', 'b', 'c', 'd')
workflow {
COMBINE(ch_num, ch_letters)
}
$ nextflow run process_combine_02.nf -process.echo
In the above example the process is executed only two times, because when a queue channel has no more data to be processed it stops the process execution.
2 and b
1 and a
Value channels and process termination
Note however that value channels, Channel.value
, do not affect the process termination.
To better understand this behaviour compare the previous example with the following one:
//process_combine_03.nf
nextflow.enable.dsl=2
process COMBINE {
input:
val x
val y
script:
"""
echo $x and $y
"""
}
ch_num = Channel.value(1)
ch_letters = Channel.of('a', 'b', 'c')
workflow {
COMBINE(ch_num, ch_letters)
}
$ nextflow run process_combine_03.nf -process.echo
In this example the process is run three times.
1 and b
1 and a
1 and c
Combining input channels
Write a nextflow script
process_exercise_combine.nf
that combines two input channelstranscriptome_ch = channel.fromPath('data/yeast/transcriptome/Saccharomyces_cerevisiae.R64-1-1.cdna.all.fa.gz') kmer_ch = channel.of(21)
And include the command below in the script directive
script: """ salmon index -t $transcriptome -i index -k $kmer """
Solution
// process_exercise_combine_answer.nf nextflow.enable.dsl=2 process COMBINE { input: path transcriptome val kmer script: """ salmon index -t $transcriptome -i index -k $kmer """ } transcriptome_ch = channel.fromPath('data/yeast/transcriptome/Saccharomyces_cerevisiae.R64-1-1.cdna.all.fa.gz', checkIfExists: true) kmer_ch = channel.of(21) workflow { COMBINE(transcriptome_ch, kmer_ch) }
Input repeaters
We saw previously that by default the number of times a process runs is defined by the queue channel with the fewest items. However, the each
qualifier allows you to repeat the execution of a process for each item in a list or a queue channel, every time new data is received.
For example if we can fix the previous example by using the input qualifer each
for the letters queue channel:
//process_repeat.nf
nextflow.enable.dsl=2
process COMBINE {
input:
val x
each y
script:
"""
echo $x and $y
"""
}
ch_num = Channel.of(1, 2)
ch_letters = Channel.of('a', 'b', 'c', 'd')
workflow {
COMBINE(ch_num, ch_letters)
}
$ nextflow run process_repeat.nf -process.echo
The process will run eight times.
2 and d
1 and a
1 and c
2 and b
2 and c
1 and d
1 and b
2 and a
Input repeaters
Extend the script
process_exercise_repeat.nf
by adding more values to thekmer
queue channel e.g. (21, 27, 31) and running the process for each value.//process_exercise_repeat.nf nextflow.enable.dsl=2 process COMBINE { input: path transcriptome val kmer script: """ salmon index -t $transcriptome -i index -k $kmer """ } transcriptome_ch = channel.fromPath('data/yeast/transcriptome/Saccharomyces_cerevisiae.R64-1-1.cdna.all.fa.gz', checkIfExists: true) kmer_ch = channel.of(21) workflow { COMBINE(transcriptome_ch, kmer_ch) }
How many times does this process run?
Solution
//process_exercise_repeat_answer.nf nextflow.enable.dsl=2 process COMBINE { input: path transcriptome each kmer script: """ salmon index -t $transcriptome -i index -k $kmer """ } transcriptome_ch = channel.fromPath('data/yeast/transcriptome/Saccharomyces_cerevisiae.R64-1-1.cdna.all.fa.gz', checkIfExists: true) kmer_ch = channel.of(21, 27, 31) workflow { COMBINE(transcriptome_ch, kmer_ch) }
nextflow run process_exercise_repeat.nf -process.echo
This process runs three times.
Key Points
A Nextflow process is an independent step in a workflow
Processes contain up to five definition blocks including: directives, inputs, outputs, when clause and finally a script block.
The script block contains the commands you would like to run.
A process should have a script but the other four blocks are optional
Inputs are defined in the input block with a type qualifier and a name.