elowy01
9/19/2018 - 4:25 PM

nextflow cheat sheet/

nextflow cheat sheet

#Example 1:

#!/usr/bin/env nextflow

params.str = 'Hello world!'

process AFcalc {

    """
    echo  '${params.str}'
    """
}

//this is necessary to print the output
result.subscribe {
    println it.trim()
}
#If we run this script by doing:
nextflow run decorate.nf
//
# Propagating parameters
#!/usr/bin/env nextflow

params.query = "test.py"

query = file(params.query)

process do_cat {
    input:
    file query

    output:
    file out_cat

    """
    cat $query > out_cat
    """
}


process do_another_thing {
    input:
    file out_cat

    """
    cat $out_cat
    """
}

result.subscribe {
    println it.trim()
}

#This script will print out:
Hello world!

#If now we run it by doing:
./bin/nextflow run decorate.nf --str 'Hola mundo'

We are assigning a value for the --str param from the command line
//
#In the second example, we execute a python script that just prints out hello:
#!/usr/bin/env nextflow

params.script = "test.py"

script = file(params.script)

process AFcalc {
    input:
    file script

    """
    python $script
    """
}

result.subscribe {
    println it.trim()
}
//
### Configuration files:
Write a file named 'nextflow.config' and put it in the dir where you will run nextflow

#comments in the config file are writen as '//' or '/* .. */' to comment a block on multiple lines
//
#Ex1:
#Passing a param to the script directive,
#write the following to 'nextflow.config'
params.script='test.py'

#Then in the file with the analyses we write the following:
#!/usr/bin/env nextflow


script = file(params.script)

process AFcalc {
    input:
    file script


    """
    python $script
    """
}

result.subscribe {
    println it.trim()
}
//
/ We named the channel to 'num'
num = Channel.from( 'test1', 'test2', 'test3' )

process basicExample {
  input:
  val x from num

  "echo process job $x"

}

result.subscribe {
    println it.trim()
}
/This will print:
[e5/20785b] Submitted process > basicExample (1)
[15/cd4308] Submitted process > basicExample (3)
[71/b0abbe] Submitted process > basicExample (2)
process job test1
process job test3
process job test2
/
#if we want to print the contents of a channel we just need to do:
num = Channel.from( 'test1', 'test2', 'test3' )
num.println()
/
When the val has the same name as the channel from where the data is received, the from part can be omitted. 

num = Channel.from( 1, 2, 3 )

process basicExample {
  input:
  val num

  "echo process job $num"

}
/
#example of channel factory
zeroToNine = Channel.from( 0..9 )

process test {

    input:
    val x from zeroToNine

    """
    echo $x
    """
}
/
#the same than above but creating the channel factory from a list:
myList = [1,2,3,4]

zeroToNine = Channel.from( myList )

process test {

        input:
         val x from zeroToNine

        """
        echo $x
        """
}
/
#Now, let's create a factory channel from a string composed of several
#comma-separated elements:
chr_str="chr1,chr2,chr3"

chrList = Channel.from( chr_str.split(',') )

process test {

        input:
         val x from chrList

        """
        echo $x
        """
}
//
/ Using a channel to read all files and operate on each particular file
/ In this case, we count the lines for each particular file

files = Channel.fromPath( './*.py' )

process countLines {
  input:
  file query_file from files

  "wc -l ${query_file}"

}

result.subscribe {
    println it.trim()
}
/ This will print:
[a1/2d4a9a] Submitted process > countLines (2)
[99/124b71] Submitted process > countLines (1)
1 test.py
1 test1.py
//
# Pass the parameters from one process to the other

#!/usr/bin/env nextflow

params.query = "test.py"

query = file(params.query)

process do_cat {
    input:
    file query

    output:
    file out_cat

    """
    cat $query > out_cat
    """
}


process do_another_thing {
    input:
    file out_cat

    """
    cat $out_cat
    """
}

result.subscribe {
    println it.trim()
}
//
#Create a an output channel an write the output to a file named
#'result.txt' that will be put to a folder inside the 'work' folder
#created by nextflow
process randomNum {

   output:
   file 'result.txt' into numbers

   '''
   echo $RANDOM > result.txt
   '''

}

numbers.subscribe { println "Received: " + it.text }
//
# creating an output file from parameters used with the script

params.outdir='17_09_2018/out'

out_annot = file(params.outdir + "/annotation.txt")

process randomNum {

        """
        echo "hello" > ${out_annot}
        """

}

result.subscribe {
    println it.trim()
}
//
#Create an output filename from params and pass the output file to a different process

params.outdir='17_09_2018/out'

out_annot = file(params.outdir + "/annotation.txt")

process randomNum {

        """
        echo "hello" > ${out_annot}
        """
}

process printFile {
        input:
        file out_annot

        """
        cat ${out_annot}
        """
}

result.subscribe {
    println it.trim()
}
//
# Write some string and variable to the log

out_Annotate=file(params.outdir+"/annot_tab2."+params.region+".txt")

log.info "Hello: ${out_Annotate}"

process Annotate {


        """
        python ${params.scripts_dir}/annotate.py --AFcalc ${params.scripts_dir} --phased_vcf ${params.phased_vcf} --sample_panel ${params.sample_panel} --tabix ${params.tabix} --region ${params.region} --pops ${params.pops} --exome ${params.exome} --outdir ${params.outdir} --ann_vcf ${params.ann_vcf}
        """
}
//
# printing a string (Groovy syntax)
 println "Hello"
//
# replacing characters in a string
println "aaaaa".replace("a", "b")
//
# replace in action:
cutoff_values=[0.95,0.96]

process test1 {

        input:
        each cutoff from cutoff_values

        output:
        file(output_cutoff)

        script:
        output_cutoff="${cutoff}".replace('.', '_')

        """
        touch "${output_cutoff}"
        """
}
# It will create 2 files named 0_95 and 0_96

#2nd example:
cutoff_values=[0.95,0.96]


process test1 {

        input:
        each cutoff from cutoff_values

        output:
        file(output_cutoff)

        script:
        output_cutoff="${cutoff}".replace('.', '_')+".vcf"

        """
        touch "${output_cutoff}"
        """
}
# It will create 2 files named 0_95.vcf and 0_96.vcf
//
# running nexflow
#
#cleaning a project
./bin/nextflow clean -f
//
# Executor

#In order to run all processes with lsf, send to a specific queue and requesting a certain cpu and memory, add the following
#to nexflow.config

process {
  executor='lsf'
  queue='production-rh7'
  cpus=1
  memory=1.GB
}
/
# Running specific processes using a certain executor:
process runFastqSimpleQA {
    /*
    An example process
    */

    memory '2 GB'
    executor 'lsf'
    cpus 2
    queue 'standard1'

    """
    echo "Hello"
    """
}
/
# Factory from file
# 'runs.txt' is splitted in 2 lines chunks and each of the 
# chunks is processed by foo. In this factory, each of the lines in the file 
# is considered a file and this is why the code in the 'script' part is considered
# a string

Channel
    .fromPath('runs.txt')
    .splitText(by: 2)
    .set{ chunks_ch }

process foo {
  echo true
  input:
  file x from chunks_ch

  script:
  """
  rev $x | rev
  """
}
/
#Factory from file ,in this case a list of run ids. Each line
#is considered a string
params.index = 'runs.txt'

Channel
    .fromPath(params.index)
    .splitCsv(header:true)
    .map{ row-> row.runId }
    .set { runs_ch }

process foo {
    input:
    val x from runs_ch

    script:
    """
    echo $x
    """
}

result.subscribe {
    println it.trim()
}
//
#propagate a file and a value and pass them to another process
#that will modify the file name
process createFile {

  output:
  file 'test.txt' into output_f
  val 'runid' into output_v

  script:
  """
  touch test.txt
  """

}

process mvFile {

  input:
  file output_f
  val output_v

  """
  mv ${output_f} ${output_v}
  """
}
//
# this factory reads a .csv file having different columns and will create
# a variable for each column that can be used by the process
params.index = 'runs.txt'

Channel
    .fromPath(params.index)
    .splitCsv(header:true)
    .map{ row-> tuple(row.sampleId, file(row.read1), file(row.read2)) }
    .set { samples_ch }

process foo {
    input:
    set sampleId, file(read1), file(read2) from samples_ch

    script:
    """
    echo your_command --sample $sampleId --reads $read1 $read2
    """
}

result.subscribe {
    println it.trim()
}
//
# This workflow is interesting because it propagates 
# a file and a value and the value is used by mvFile process
# to name a new file and put this new file into 'results' folder

process createFile {

  output:
  file 'test.txt' into output_f
  val 'runid' into output_v

  script:
  """
  touch test.txt
  """

}

process mvFile {
  publishDir 'results', saveAs:{ filename -> "$filename" }

  input:
  file output_f
  val output_v

  output:
  file "${output_v}.test1.txt"

  """
  mv ${output_f} ${output_v}.test1.txt
  """
}
//
#Saving all files produced in a process to a certain dir:
# This is the difference with respect to publishDir in combination with saveAs, 
# which allow to save some of the files
process foo {

    publishDir 'out/'

    output:
    file 'chunk_*' into letters

    '''
    printf 'Hola' | split -b 1 - chunk_
    '''
}
//
#Saving all files produced in a process to a certain folder without creating the symbolic link
publishDir "result", mode: 'copy', overwrite: true
//
# Use nextflow with Docker

#First, put the following in your nextflow.config:

process.container = 'variant_filtering:latest' # this is the name of the image
docker.enabled = true 
docker.runOptions = '--volume $HOME:$HOME --workdir $PWD' # Mount the files within the container

#Then, you run your nextflow workflow as usual:
nextflow -C test.config run test.nf --vcf input10.reheaded.vcf.gz
#Where input10.reheaded.vcf.gz will in your local system
//
# Conditional process (downloaded from https://github.com/nextflow-io/patterns/blob/master/conditional-process.nf)

#!/usr/bin/env nextflow

/*
 * Copyright (c) 2018, Centre for Genomic Regulation (CRG).
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

 /*
  * author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
  */

params.flag = false

process foo {
  output:
  file 'x.txt' into foo_ch
  when:
  !params.flag

  script:
  '''
  echo foo > x.txt
  '''
}

process bar {
  output:
  file 'x.txt' into bar_ch
  when:
  params.flag

  script:
  '''
  echo bar > x.txt
  '''
}

process omega {
  echo true
  input:
  file x from foo_ch.mix(bar_ch)

  script:
  """
  cat $x
  """
}
# This workflow will execute bar if params.flag=true and foo if params.flag=false
So if you run:
nextflow run conditional_process.nf
You get:
Launching `conditional_process.nf` [shrivelled_lalande] - revision: fab3a727cf
[warm up] executor > local
[6a/87f036] Submitted process > foo
[fb/647063] Submitted process > omega (1)
But if you run:
nextflow run conditional_process.nf --flag
You get:
N E X T F L O W  ~  version 18.10.1
Launching `conditional_process.nf` [awesome_sax] - revision: fab3a727cf
[warm up] executor > local
[6c/edf567] Submitted process > bar
[0e/4b6f7d] Submitted process > omega (1)
bar
//
# Nextflow environment variables:
NXF_WORK # to set the work dir
NXF_HOME # to set Nextflow home
NXF_TEMP # to set Nextflow tmp dir
//
#Maximum number of processes to parallelize
process doNotParallelizeIt {

   maxForks 1

   '''
   <your script here>
   '''

}
By default this value is equals to the number of CPU cores available minus 1.
//
#User the errorStrategy directive if you want to instruct nf what to do in case of an error
 memory '2 GB'
 executor 'lsf'
 errorStrategy 'ignore'
//
#setting the workind directory from the command line
nextflow -C test.config run test.nf -w /path/to/workingdir
//
#mix operator to mix two channels:
chr_str="chr1,chr2,chr3"
letter_str="a,b,c"

letterList = Channel.from( letter_str.split(','))
chrList = Channel.from( chr_str.split(',') )

process test {
        echo true

        input:
        val x from chrList.mix(letterList)

        """
        echo $x
        """
}

# This command will print out:
chr3
chr1
a
b
c
chr2
//
# Combine 2 channels (cartesian product)
chr_str="chr1,chr2,chr3"
letter_str="a,b,c"

letterList = Channel.from( letter_str.split(','))
chrList = Channel.from( chr_str.split(',') )

process test {
        echo true

        input:
        set x,y from chrList.combine(letterList)

        """
        echo $x $y
        """
}
# Will print out:
chr1 b
chr2 c
chr2 b
chr3 c
chr3 a
chr1 a
chr3 b
chr1 c
chr2 a
//
# How do I use the same channel multiple times:

vegetable_datasets = Channel.fromPath(params.input)

vegetable_datasets.into { datasets_clustalw; datasets_tcoffee }

process clustalw2_align {
    input:
    file vegetable_fasta from datasets_clustalw

    output:
    file "${vegetable_fasta.baseName}.aln" into clustalw_alns

    script:
    """
    clustalw2 -INFILE=${vegetable_fasta}
    """
}

process tcoffee_align {
    input:
    file vegetable_fasta from datasets_tcoffee

    output:
    file "${vegetable_fasta.baseName}.aln" into tcoffee_alns

    script:
    """
    t_coffee ${vegetable_fasta}
    """
}
//
#tag
#The tag directive allows you to associate each process executions with a custom label, so that it will be easier to
identify them in the log file or in the trace execution report. For example:
process foo {
    tag { code }
    input:
    val code from 'alpha', 'gamma', 'omega'
    """
    echo $code
    """
}
And the log will have:
[6e/28919b] Submitted process > foo (alpha)
[d2/1c6175] Submitted process > foo (gamma)
[1c/3ef220] Submitted process > foo (omega)
/
#tag mixing variable and text:
tag "FASTQC on $sample_id"
//
#duplicating output to two channels within a process:
process get_variant_annotations {
        /*
        Process to get the variant annotations for training files
        and for VCF file to annotate (for a single chromosome in this case)
        */

        memory '2 GB'
        executor 'local'
        queue "${params.queue}"
        cpus 1

        input:
        file tp_vcf
        file fp_vcf

        output:
        file 'TP_annotations.tsv.gz' into tp_annotations_train, tp_annotations_rfe
        file 'FP_annotations.tsv.gz' into fp_annotations_train, fp_annotations_rfe

        """
        bcftools query -H -f '${params.annotations}' ${tp_vcf} | bgzip -c > TP_annotations.tsv.gz
        bcftools query -H -f '${params.annotations}' ${fp_vcf} | bgzip -c > FP_annotations.tsv.gz
        """
}
//
# Initializing an empty channel:
chrs_splitmultiallelic_withchr=Channel.empty()

if (params.region) {
    chrList = Channel.from( params.region.split(',') )

    chrList.into { chrs_splitmultiallelic_withchr ; chrs_intersecionCallSets; chrs_trainModel; chrs_rfe}
}
# The chrs_splitmultiallelic_withchr channel needs to be initialized out of the if scope
//
##groovy syntax:
/
#print out 'hello' to STDOUT
process test {

        script:

        println("hello")
        """
        echo "hello"
        """
}
//
# defining and initializing a variable and printing to STDOUT the contents
process test {

        def replace="hello"

        println(replace)

        script:
        """
        echo "hello"
        """
}
/
#conditional if statement:
# exec is used to execute Nextflow code
process test {

        exec:
        def replace="hello"

        if (replace=="hello") {
           println(replace)
        }
}
/
#scope of variables:
process test {

        exec:
        def replace="hello"

        if (replace=="hello") {
           def bye="bye"
        }
        println(bye)
}
#Trying to print bye out of scope will throw an error:ERROR ~ Error executing process > 'test'

Caused by:
  Unknown variable 'bye' -- Make sure you didn't misspell it or define somewhere in the script before use it
#In order to fix this one needs to do:
process test {

        exec:
        def replace="hello"
        def bye #or def bye=null is also valid

        if (replace=="hello") {
           bye="bye"
        }
        println(bye)
}
//
#conditional execution based on a param:
process test {

        script:

        if (params.flag) {
           println("hello")
        } else {
           println("bye")
        }

        """
        echo "hello"
        """
}
# if it is run by nextflow run test.nf --flag true then it will print out "hello"
//
#creating val inside process that will outputted to channel 
process test1 {

        output:
        val outest into outtest

        exec:
                outest="hello"

}

outtest.println()
# This will print:
hello
//
process test1 {

        output:
        val outest into outtest

        outest="hello"

        script:

                """
                echo "hello"
                """

}

outtest.println()
# This will crash with:
ERROR ~ Error executing process > 'test1'

Caused by:
  Missing value declared as output parameter: outest
#because outest is not initialized within the script/exec block.
#This will be fixed by doing:
process test1 {

        output:
        val outest into outtest

        script:
               outest="hello"

               """
               echo "hello"
               """
}

outtest.println()
//
////// Check input parameters //////
if (!params.genome) {
  exit 1, "Please specify a genome file"
}
//
#getting the basenames from files:
iles = Channel.fromPath( './*.gz' )

process printFiles {
  input:
  file query_file from files

  "echo ${query_file.baseName}"

}

result.subscribe {
    println it.trim()
}
//
#with -q we cancel all the messages printed by nextflow at the beginning
nextflow -q run.nf
//
#concat operator 
c1 = Channel.from( 1,2,3 )
c2 = Channel.from( 'a','b' )
c3 = Channel.from( 'z' )

c1.concat(c2).concat(c3).println()
# print out in order, so it will print:
1
2
3
a
b
z
//
# I've got the following error after running Nexflow:
Failed to write core dump. Core dumps have been disabled

This seems to be fixed by entering the following command:
ulimit -c unlimited
//
Restarting an older run that has failed:
> nextflow log
TIMESTAMP          	DURATION	RUN NAME        	STATUS	REVISION ID	SESSION ID                          	COMMAND                   
2017-11-24 18:41:34	672ms   	ecstatic_noether	OK    	bab98280bf 	7a8fefda-c812-4842-9248-2fd1b8d1d1e1	nextflow run <your pipeline>
2017-11-29 10:55:15	2.5s    	grave_lavoisier 	OK    	6a1acf3211 	56c9a1a1-ad16-4671-b98a-96adbd5051f2	nextflow run <your pipeline>          
2017-11-29 10:55:29	-       	golden_roentgen 	-     	6a1acf3211 	6b12ae11-74d8-4395-9685-4bb91e05e324	nextflow run <your pipeline>          
2017-11-29 09:57:37	6.2s    	silly_austin    	OK    	6a1acf3211 	a896b4da-4530-48e5-a519-39016adff6fb	nextflow run <your pipeline>  

Check the ones that have STATUS=ERR and then get the RUN NAME, then you can resume that particular run by doing:
nextflow run <your pipeline> -resume grave_lavoisier
/
#Checking working folders for proceses in a session:
> nextflow log
TIMESTAMP          	DURATION	RUN NAME        	STATUS	REVISION ID	SESSION ID                          	COMMAND                   
2017-11-24 18:41:34	672ms   	ecstatic_noether	OK    	bab98280bf 	7a8fefda-c812-4842-9248-2fd1b8d1d1e1	nextflow run <your pipeline>
2017-11-29 10:55:15	2.5s    	grave_lavoisier 	OK    	6a1acf3211 	56c9a1a1-ad16-4671-b98a-96adbd5051f2	nextflow run <your pipeline>          
2017-11-29 10:55:29	-       	golden_roentgen 	-     	6a1acf3211 	6b12ae11-74d8-4395-9685-4bb91e05e324	nextflow run <your pipeline>          
2017-11-29 09:57:37	6.2s    	silly_austin    	OK    	6a1acf3211 	a896b4da-4530-48e5-a519-39016adff6fb	nextflow run <your pipeline>
> nextflow silly_austin
//
/ #Combining 2 channels:

#!/usr/bin/env nextflow

echo true
samples = Channel.from '10000', '20000', '30000'
chrs = Channel.from 'chr1', 'chr2'
all = samples.spread(chrs)

all.println()

# This will produce:
[10000, chr1]
[10000, chr2]
[20000, chr1]
[20000, chr2]
[30000, chr1]
[30000, chr2]

//
# Executor in a config file is used to set optional config settings for a given executor. For example, for 'lsf' put this in a config file
that will be run in the command line using -C:
executor {
    name = 'lsf'
    queueSize= 500
}

'queueSize' is used to set the limit of jobs submitted to cluster
//
# checking the progress of nextflow processes progress:
1) Enter bjobs -w # examine the EXEC_HOST of the process you want to check
2) ssh exec_host
3) Enter 'ps ax |grep ernesto'
4) Locat the workdir where nextflow is running the process

# Constructing a file path with a param:
include { ALLELIC_PRIMITIVES; RUN_VT_UNIQ } from "${params.NF_MODULES}/processes/normalization.nf"
//
# Check files attributes
https://www.nextflow.io/docs/latest/script.html#check-file-attributes
//
# Optional publishdir
params.publish = false
process test_something {
    publishDir "test/", enabled: params.publish

    input:
    val x from chan
    echo true
    output:
    file("*") into chen
    script:
    if (params.publish) { 
    println "publishing"
    }
    """
    echo $x > ${x}.txt
    """
}