Dask-jobqueue: Difference between revisions

From Grid5000
Jump to navigation Jump to search
Line 1: Line 1:
= Dask-jobqueue =
= Dask-jobqueue =
[https://jobqueue.dask.org/en/latest/index.html Dask-jobqueue] is a Python library which makes it easy to deploy [https://www.dask.org/ Dask] on common job queuing systems typically found in high performance supercomputers, academic research institutions, and other clusters. Dask is a Python library for parallel computing which scales Python code from multi-core local machines to large distributed clusters in the cloud.  
[https://jobqueue.dask.org/en/latest/index.html Dask-jobqueue] is a Python library which makes it easy to deploy [https://www.dask.org/ Dask] on common job queuing systems typically found in high performance supercomputers, academic research institutions, and other clusters. Dask is a Python library for parallel computing which scales Python code from multi-core local machines to large distributed clusters in the cloud.  
Since Dask-jobqueue provides interfaces for OAR and Slurm based clusters, it can be used to facilite the passage between OAR and Slurm based resource managers. Code source, issue and pull requests can be found [https://github.com/dask/dask-jobqueue here].  
Since Dask-jobqueue provides interfaces for OAR and Slurm based clusters, it can be used to facilite the passage between OAR and Slurm based resource managers. Source source, issues and pull requests can be found [https://github.com/dask/dask-jobqueue here].  


== Installing ==
== Installing ==
Line 9: Line 9:
   conda install dask-jobqueue -c conda-forge
   conda install dask-jobqueue -c conda-forge


== Usage ==
== Basic usage ==


Here is a Python script example which requests for starting a batch script on a well defined resource (2 core, 24GB, at least 1 GPU, specific cluster - chifflet -)
Here is a Python script example which requests for starting a batch script on a well defined resource (2 core, 24GB, at least 1 GPU, specific cluster - chifflet -)
Line 50: Line 50:
</syntaxhighlight>
</syntaxhighlight>


Dask-jobqueue creates a Dask Scheduler in the Python process where the cluster object is instantiated with the defined resource. Several ways exist to interact with the cluster through the Client (e.g. .submit or .map functions). Details about the Client of Dask can be found [https://distributed.dask.org/en/stable/client.html here].
The example script can be launched on frontend as follow:
{{Term|location=flille|cmd=<code class="command">python3 this-script.py</code>}}
 
 
=== How is Dask-jobqueue interacting with OAR in practice? ===
 
The [[#Usage|example above]] will generate the following job script:
<syntaxhighlight lang="python">
#!/usr/bin/env bash
 
#OAR -n dask-worker
#OAR -q default
#OAR -l gpu=1,walltime=1:0:0
#OAR -t besteffort
#OAR -p chifflet
#OAR -p "gpu_count >= 1"
 
/usr/bin/python3 -m distributed.cli.dask_worker tcp://172.16.47.106:39655 --nthreads 1 --nprocs 2 --memory-limit 11.18GiB --name dummy-name --nanny --death-timeout 60 --protocol tcp://
</syntaxhighlight>
 
Each of these jobs are sent to the job queue independently. Once the job starts, a dask worker process will start up and connect back to the scheduler running within this process.
 
== Advanced usage ==


The example script can be launched as follow:
=== Use a configuration file to specify resources ===
{{Term|location=flille|cmd=<code class="command">python3 this-script.py</code>}}


About the resource request, user's configuration can also be specified in ~/.config/dask/jobqueue.yaml file as follow:
About the resource request, user's configuration can also be specified in ~/.config/dask/jobqueue.yaml file as follow:
Line 86: Line 107:
</syntaxhighlight>
</syntaxhighlight>


== Parameters ==
=== Cluster parameters ===
 
 
{| class="wikitable"
{| class="wikitable"
|-
|-
Line 117: Line 140:
| job_extra || #OAR -a jobid || #SBATCH --dependency state:jobid || Jobs dependency
| job_extra || #OAR -a jobid || #SBATCH --dependency state:jobid || Jobs dependency
|}
|}
=== Start multiple computations at once using 'scale' parameter ==
Dask-jobqueue creates a Dask Scheduler in the Python process where the cluster object is instantiated with the defined resource.
Several ways exist to interact with the cluster through the Client (e.g. .submit or .map functions). Details about the Client of Dask can be found [https://distributed.dask.org/en/stable/client.html here].


In Dask, a '''Worker''' is a Python object and node serving data and performing computations; '''Jobs''' are resources submitted to, and managed by, the job queueing system (e.g. OAR, Slurm, etc..).
In Dask, a '''Worker''' is a Python object and node serving data and performing computations; '''Jobs''' are resources submitted to, and managed by, the job queueing system (e.g. OAR, Slurm, etc..).
Line 139: Line 168:
</syntaxhighlight>
</syntaxhighlight>


The [[#Usage|example above]] will generate the following job script:
<syntaxhighlight lang="python">
#!/usr/bin/env bash
#OAR -n dask-worker
#OAR -q default
#OAR -l gpu=1,walltime=1:0:0
#OAR -t besteffort
#OAR -p chifflet
#OAR -p "gpu_count >= 1"
/usr/bin/python3 -m distributed.cli.dask_worker tcp://172.16.47.106:39655 --nthreads 1 --nprocs 2 --memory-limit 11.18GiB --name dummy-name --nanny --death-timeout 60 --protocol tcp://
</syntaxhighlight>


Each of these jobs are sent to the job queue independently. Once the job starts, a dask worker process will start up and connect back to the scheduler running within this process.


Note: All experiment above is tested on Grid5000, OAR based cluster. Plafrim and Cleps are used as Slurm based clusters to run the same experiment, in order to find the common concepts between OAR and Slurm. Since heterogenities are still observed between Pafrim and Cleps today, the "Slurm command example" column of the table above will be updated when Slurm will be fully supported by the Inria's national computing infrastructure.
Note: All experiment above is tested on Grid5000, OAR based cluster. Plafrim and Cleps are used as Slurm based clusters to run the same experiment, in order to find the common concepts between OAR and Slurm. Since heterogenities are still observed between Pafrim and Cleps today, the "Slurm command example" column of the table above will be updated when Slurm will be fully supported by the Inria's national computing infrastructure.

Revision as of 17:18, 1 August 2022

Dask-jobqueue

Dask-jobqueue is a Python library which makes it easy to deploy Dask on common job queuing systems typically found in high performance supercomputers, academic research institutions, and other clusters. Dask is a Python library for parallel computing which scales Python code from multi-core local machines to large distributed clusters in the cloud. Since Dask-jobqueue provides interfaces for OAR and Slurm based clusters, it can be used to facilite the passage between OAR and Slurm based resource managers. Source source, issues and pull requests can be found here.

Installing

pip can be used to install dask-jobqueue and its dependencies:

 pip install dask-jobqueue --upgrade # Install everything from last released version

otherwise, conda can be used to install dask-jobqueue from the conda-forge:

 conda install dask-jobqueue -c conda-forge

Basic usage

Here is a Python script example which requests for starting a batch script on a well defined resource (2 core, 24GB, at least 1 GPU, specific cluster - chifflet -)

from dask_jobqueue import OARCluster as Cluster
from dask.distributed import Client
import os

cluster = Cluster(
queue='default',
# Should be specified if you belongs to more than one GGA
project='<your grant access group>',
# cores per job, required parameter
cores=2,
# memory per job, required parameter
memory='24GB',
# walltime for each worker job
walltime='1:0:0',

job_extra=[
'-t besteffort',
# reserve node from specific cluster
'-p chifflet',
# reserve node with at least 1 GPU
'-p "gpu_count >= 1"'
],

# another way to reserve node with GPU
resource_spec='gpu=1'
)

client = Client(cluster)

# call your favorite script
client.submit(os.system, "./hello-world.sh").result()

client.close()
cluster.close()

The example script can be launched on frontend as follow:

Terminal.png flille:
python3 this-script.py


How is Dask-jobqueue interacting with OAR in practice?

The example above will generate the following job script:

#!/usr/bin/env bash

#OAR -n dask-worker
#OAR -q default
#OAR -l gpu=1,walltime=1:0:0
#OAR -t besteffort
#OAR -p chifflet
#OAR -p "gpu_count >= 1"

/usr/bin/python3 -m distributed.cli.dask_worker tcp://172.16.47.106:39655 --nthreads 1 --nprocs 2 --memory-limit 11.18GiB --name dummy-name --nanny --death-timeout 60 --protocol tcp://

Each of these jobs are sent to the job queue independently. Once the job starts, a dask worker process will start up and connect back to the scheduler running within this process.

Advanced usage

Use a configuration file to specify resources

About the resource request, user's configuration can also be specified in ~/.config/dask/jobqueue.yaml file as follow:

jobqueue:
  oar:
    name: dask-worker
    # Dask worker options
    cores: 2
    memory: '24GB'
    processes: 1
    #interface: null # Network interface to use: eth0 or ib0
    death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
    #local-directory: null # Location of fast local storage like /scratch or $TMPDIR
    #extra: []
    # OAR resource manager options
    #shebang: "#!/usr/bin/env bash"
    queue: 'default'
    project: null
    walltime: '1:00:00'
    #env-extra: []
    resource-spec: null
    job-extra: []
    log-directory: null
    # Scheduler options
    scheduler-options: {}

The cluster can be then instantiated with one single line as follow:

cluster = OARCluster()

Cluster parameters

dask-jobqueue parameter OAR command example Slurm command example Description
queue #OAR -q #SBATCH -p Destination queue for each worker job
project #OAR --project #SBATCH -A Accounting group associated with each worker job
cores #OAR -l core=2 #SBATCH --cpu-per-task=2 Total cores per job
memory #SBATCH --mem=24GB Total memory per job
walltime #OAR -l walltime=hh:mm:ss #SBATCH -t hh:mm:ss Walltime for each worker job
name #OAR -n #SBATCH -J Name of worker, always set to the default value dask-worker
resource_spec #OAR -l host=1/core=2, gpu=1 Not supported Request resources and specify job placement
job_extra #OAR -O, -E #SBATCH -o, -e Log directory
job_extra #OAR -p parasilo #SBATCH -C sirocoo Property request
job_extra #OAR -t besteffort #SBATCH -t besteffort Besteffort job
job_extra #OAR -r now #SBATCH --begin=now Advance reservation
job_extra #OAR --checkpoint 150 #SBATCH --checkpoint 150 Checkpoint
job_extra #OAR -a jobid #SBATCH --dependency state:jobid Jobs dependency

= Start multiple computations at once using 'scale' parameter

Dask-jobqueue creates a Dask Scheduler in the Python process where the cluster object is instantiated with the defined resource. Several ways exist to interact with the cluster through the Client (e.g. .submit or .map functions). Details about the Client of Dask can be found here.

In Dask, a Worker is a Python object and node serving data and performing computations; Jobs are resources submitted to, and managed by, the job queueing system (e.g. OAR, Slurm, etc..).

In Dask-jobqueue, a single Job may include one or more Workers.

The number of Workers can be set by the processes parameter as shown in the configuration above, if your job can be cut into many processes.

To specify the number of Jobs, you can use the scale command. The number of Jobs can either be specified directly as shown in the example above, or indirectly by the cores or memory request:

# 2 job with 1 worker for each will be launched
cluster.scale(2)
# specify total cores 
cluster.scale(cores=4)
# specify total memory
cluster.scale(memory="48GB")

The cluster generated a traditional job script and submits it number of times as specified to the job queue. You can see the generated job script as follow:

print(cluster.job_script())


Note: All experiment above is tested on Grid5000, OAR based cluster. Plafrim and Cleps are used as Slurm based clusters to run the same experiment, in order to find the common concepts between OAR and Slurm. Since heterogenities are still observed between Pafrim and Cleps today, the "Slurm command example" column of the table above will be updated when Slurm will be fully supported by the Inria's national computing infrastructure.