Dask-jobqueue: Difference between revisions

From Grid5000
Jump to navigation Jump to search
No edit summary
Line 3: Line 3:
Since Dask-jobqueue provides interfaces for OAR and Slurm based clusters, it can be used to facilite the passage between OAR and Slurm based resource managers. Source code, issues and pull requests can be found [https://github.com/dask/dask-jobqueue here].  
Since Dask-jobqueue provides interfaces for OAR and Slurm based clusters, it can be used to facilite the passage between OAR and Slurm based resource managers. Source code, issues and pull requests can be found [https://github.com/dask/dask-jobqueue here].  


== Installing ==
== Basic usage ==
 
=== Dask-jobqueue installation ===
 
pip can be used to install dask-jobqueue and its dependencies:
pip can be used to install dask-jobqueue and its dependencies:
{{Term|location=flille|cmd=<code class="command">pip install dask-jobqueue --upgrade # Install everything from last released version</code>}}
{{Term|location=flille|cmd=<code class="command">pip install dask-jobqueue --upgrade # Install everything from last released version</code>}}
Line 9: Line 12:
{{Term|location=flille|cmd=<code class="command">conda install dask-jobqueue -c conda-forge</code>}}
{{Term|location=flille|cmd=<code class="command">conda install dask-jobqueue -c conda-forge</code>}}


== Basic usage ==
=== Executing a bash script with dask-jobqueue ===


Here is a Python script example which requests for starting a batch script on a well defined resource (2 cores, 24GB, at least 1 GPU, specific cluster - chifflet -, for 1 hour)
Here is a Python script example which requests for starting a batch script on a well defined resource (2 cores, 24GB, at least 1 GPU, specific cluster - chifflet -, for 1 hour)
Line 19: Line 22:


cluster = Cluster(
cluster = Cluster(
queue='default',
  queue='default',
# Should be specified if you belongs to more than one GGA
  # Should be specified if you belongs to more than one GGA
project='<your grant access group>',
  project='<your grant access group>',
# cores per job, required parameter
  # cores per job, required parameter
cores=2,
  cores=2,
# memory per job, required parameter
  # memory per job, required parameter
memory='24GB',
  memory='24GB',
# walltime for each worker job
  # walltime for each worker job
walltime='1:0:0',
  walltime='1:0:0',


job_extra=[
  job_extra=[
'-t besteffort',
    '-t besteffort',
# reserve node from specific cluster
    # reserve node from specific cluster
'-p chifflet',
    '-p chifflet',
# reserve node with at least 1 GPU
    # reserve node with at least 1 GPU
'-p "gpu_count >= 1"'
    '-p "gpu_count >= 1"'
],
  ],


# another way to reserve node with GPU
  # another way to reserve node with GPU
#resource_spec='gpu=1'
  #resource_spec='gpu=1'
)
)


cluster.scale(1)
client = Client(cluster)
client = Client(cluster)


Line 57: Line 59:
=== How is Dask-jobqueue interacting with OAR in practice? ===
=== How is Dask-jobqueue interacting with OAR in practice? ===


In the [[#Basic usage|example above]], Dask-jobqueue creates at first a Dask Scheduler in the Python process when the Cluster object is instantiated. It allows to reserve nodes with resources you would like to have.
In the [[#Executing_a_bash_script_with_dask-jobqueue|example above]], Dask-jobqueue creates at first a Dask Scheduler in the Python process when the Cluster object is instantiated. It allows to reserve nodes with resources you would like to have.
To schedule job(s) on the previously reserved nodes for the computation, you need to tell the Dask Scheduler the number of job(s) using the '''scale''' command. At this step, the Dask Scheduler starts to interact with OAR. In the [[#Basic usage|example above]], only one job will be launched, with only one worker inside. For advanced usage, please refer to the [[#Start multiple computations at once using 'scale' parameter|Advanced usage section]].  
To schedule job(s) on the previously reserved nodes for the computation, you need to tell the Dask Scheduler the number of job(s) using the '''scale''' command. At this step, the Dask Scheduler starts to interact with OAR. In the [[#Basic usage|example above]], only one job will be launched, with only one worker inside. For advanced usage, please refer to the [[#Start multiple computations at once using 'scale' parameter|Advanced usage section]].  



Revision as of 09:53, 9 August 2022

Dask-jobqueue

Dask-jobqueue is a Python library which makes it easy to deploy Dask on common job queuing systems typically found in high performance supercomputers, academic research institutions, and other clusters. Dask is a Python library for parallel computing which scales Python code from multi-core local machines to large distributed clusters in the cloud. Since Dask-jobqueue provides interfaces for OAR and Slurm based clusters, it can be used to facilite the passage between OAR and Slurm based resource managers. Source code, issues and pull requests can be found here.

Basic usage

Dask-jobqueue installation

pip can be used to install dask-jobqueue and its dependencies:

Terminal.png flille:
pip install dask-jobqueue --upgrade # Install everything from last released version

otherwise, conda can be used to install dask-jobqueue from the conda-forge:

Terminal.png flille:
conda install dask-jobqueue -c conda-forge

Executing a bash script with dask-jobqueue

Here is a Python script example which requests for starting a batch script on a well defined resource (2 cores, 24GB, at least 1 GPU, specific cluster - chifflet -, for 1 hour)

from dask_jobqueue import OARCluster as Cluster
from dask.distributed import Client
import os

cluster = Cluster(
  queue='default',
  # Should be specified if you belongs to more than one GGA
  project='<your grant access group>',
  # cores per job, required parameter
  cores=2,
  # memory per job, required parameter
  memory='24GB',
  # walltime for each worker job
  walltime='1:0:0',

  job_extra=[
    '-t besteffort',
    # reserve node from specific cluster
    '-p chifflet',
    # reserve node with at least 1 GPU
    '-p "gpu_count >= 1"'
  ],

  # another way to reserve node with GPU
  #resource_spec='gpu=1'
)

client = Client(cluster)

# call your favorite batch script
client.submit(os.system, "./hello-world.sh").result()

client.close()
cluster.close()

The example script can be launched on frontend as follow:

Terminal.png flille:
python3 this-script.py


How is Dask-jobqueue interacting with OAR in practice?

In the example above, Dask-jobqueue creates at first a Dask Scheduler in the Python process when the Cluster object is instantiated. It allows to reserve nodes with resources you would like to have. To schedule job(s) on the previously reserved nodes for the computation, you need to tell the Dask Scheduler the number of job(s) using the scale command. At this step, the Dask Scheduler starts to interact with OAR. In the example above, only one job will be launched, with only one worker inside. For advanced usage, please refer to the Advanced usage section.

Below is a schema that illustrates the different layers about Dask, OAR and resources.

Schema dask oar.png

Note that a Worker in Dask is a Python object that serves data and performs computations; Job here means resources submitted to, and managed by the job queueing system (OAR for instance). One signle Job may include one or more Workers.

You can see the regular job script generated for the example above by:

print(cluster.job_script())
#!/usr/bin/env bash

#OAR -n dask-worker
#OAR -q default
#OAR -l walltime=1:0:0
#OAR -t besteffort
#OAR -p chifflet
#OAR -p "gpu_count >= 1"

/usr/bin/python3 -m distributed.cli.dask_worker tcp://172.16.47.106:39655 --nthreads 1 --nprocs 2 --memory-limit 11.18GiB --name dummy-name --nanny --death-timeout 60 --protocol tcp://

The job is sent to the OAR job queue and when the job starts, a Worker will start up, do the computations defined by your favorite batch script, and connect back to the Schedular running with the Worker. When the Cluster object goes away, either because you close your Python program or you delete the object, a signal will be sent to the Worker to shut down. If for some reason, the signal does not get through, then Workers will kill themselves after 60 seconds (configurable with death-timeout parameter presented in the next section) of waiting for a non-existent Scheduler.

Advanced usage

Use a configuration file to specify resources

About the resource request, user's configuration can also be specified in ~/.config/dask/jobqueue.yaml file as follow:

jobqueue:
  oar:
    name: dask-worker
    # Dask worker options
    cores: 2 # Total number of cores per job
    memory: '24GB' # Total amount of memory per job
    #processes: 1 # Number of Python processes per job, ~= sqrt(cores) 
    #interface: null # Network interface to use: eth0 or ib0
    death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
    #local-directory: null # Location of fast local storage like /scratch or $TMPDIR
    #extra: [] # Extra arguments to pass to Dask worker

    # OAR resource manager options
    #shebang: "#!/usr/bin/env bash"
    queue: 'default'
    #project: null
    walltime: '1:00:00'
    #env-extra: []
    #resource-spec: null
    job-extra: []
    log-directory: null

    # Scheduler options
    scheduler-options: {}

The cluster can be then instantiated with one single line as follow:

cluster = OARCluster()

Cluster parameters

dask-jobqueue parameter OAR command example Slurm command example Description
queue #OAR -q #SBATCH -p Destination queue for each worker job
project #OAR --project #SBATCH -A Accounting group associated with each worker job
cores #OAR -l core=2 #SBATCH --cpu-per-task=2 Total cores per job
memory #SBATCH --mem=24GB Total memory per job
walltime #OAR -l walltime=hh:mm:ss #SBATCH -t hh:mm:ss Walltime for each worker job
name #OAR -n #SBATCH -J Name of worker, always set to the default value dask-worker
resource_spec #OAR -l host=1/core=2, gpu=1 Not supported Request resources and specify job placement
job_extra #OAR -O, -E #SBATCH -o, -e Log directory
job_extra #OAR -p parasilo #SBATCH -C sirocoo Property request
job_extra #OAR -t besteffort #SBATCH -t besteffort Besteffort job
job_extra #OAR -r now #SBATCH --begin=now Advance reservation
job_extra #OAR --checkpoint 150 #SBATCH --checkpoint 150 Checkpoint
job_extra #OAR -a jobid #SBATCH --dependency state:jobid Jobs dependency


Note: All experiment above is tested on Grid5000, OAR based cluster. Plafrim and Cleps are used as Slurm based clusters to run the same experiment, in order to find the common concepts between OAR and Slurm. Since heterogenities are still observed between Pafrim and Cleps today, the "Slurm command example" column of the table above will be updated when Slurm will be fully supported by the Inria's national computing infrastructure.

Start multiple computations at once using 'scale' parameter

Dask-jobqueue allows to seamlessly deploy Dask on clusters that use a variety of job queuing systems such OAR and Slurm. With Dask-jobqueue's Pythonic interface, users can easily manage submissions, executions and deletions of jobs through different resource and job management systems. But Dask also gives users the ability to scale the jobs for parallel computing that coordinates with Python's existing scientific librairies like NumPy, Pandas and Scikit-Learn.

Dask-distributed that we imported in the example above is an extension of Dask which facilitates parallel computings. The Client class allows to connect to and to submit computations to a defined Dask Cluster, by 'submit' or 'map' calls. Details of the class can be found here.

As shown by the example above, a single Job may include one or more Workers. The number of Workers can be set by the processes parameter (see configuration section), if your job can be cut into many processes.

To specify the number of Jobs, you can use the scale command. The number of Jobs can either be specified directly as shown in the example above (2 cores, 24GB), or indirectly by the cores or memory request:

# 2 jobs with 1 worker for each will be launched
cluster.scale(2)
# specify total cores 
cluster.scale(cores=4)
# specify total memory
cluster.scale(memory="48GB")

Dask Cluster also has the ability to "autoscale", with the adapt interface:

cluster.adapt(minimum=2, maximum=20)

A more complicated example below can help understanding the Worker and Job notions of Dask-jobqueue:

from dask_jobqueue import OARCluster as Cluster
from dask.distributed import Client
import os
import time
import logging
import socket
from pprint import pprint

cluster = Cluster(
queue='default',
project='mc-staff',
cores=4,
memory='24GB',
# walltime for each worker job
walltime='1:0:0',
processes=2,

# Logs
job_extra=['-O /home/ychi/logs/%jobid%.stdout', '-E /home/ychi/logs/%jobid%.stderr',
],

)
cluster.scale(6)
print(cluster.job_script())

client = Client(cluster)

def slow_increment(x):
    time.sleep(5)
    return {'result': x + 1,
            'host': socket.gethostname(),
            'pid': os.getpid(),
            'time': time.asctime(time.localtime(time.time()))}

nb_workers = 0
while True:
    nb_workers = len(client.scheduler_info()["workers"])
    if nb_workers >= 2:
        print('Finally got {} workers '.format(nb_workers), 'with client: ', client)
        break
    time.sleep(1)


futures = client.map(slow_increment, range(6))
results = client.gather(futures)
print('results: ')
pprint(results)

client.close()
cluster.close()

The regular job script is generated as follow:

#!/usr/bin/env bash

#OAR -n dask-worker
#OAR -q default
#OAR --project mc-staff
#OAR -l /nodes=1/core=4,walltime=1:0:0
#OAR -O /home/ychi/logs/%jobid%.stdout
#OAR -E /home/ychi/logs/%jobid%.stderr

/usr/bin/python3 -m distributed.cli.dask_worker tcp://172.16.47.106:40605 --nthreads 2 --nprocs 2 --memory-limit 11.18GiB --name dummy-name --nanny --death-timeout 60 --protocol tcp://

Finally got 6 workers  with client:  <Client: 'tcp://172.16.47.106:40605' processes=6 threads=12, memory=67.08 GiB>
results:
[{'host': 'chetemi-13.lille.grid5000.fr',
  'pid': 6780,
  'result': 1,
  'time': 'Fri Aug  5 17:08:03 2022'},
 {'host': 'chetemi-13.lille.grid5000.fr',
  'pid': 6767,
  'result': 2,
  'time': 'Fri Aug  5 17:08:03 2022'},
 {'host': 'chetemi-13.lille.grid5000.fr',
  'pid': 6768,
  'result': 3,
  'time': 'Fri Aug  5 17:08:03 2022'},
 {'host': 'chetemi-13.lille.grid5000.fr',
  'pid': 6777,
  'result': 4,
  'time': 'Fri Aug  5 17:08:03 2022'},
 {'host': 'chetemi-13.lille.grid5000.fr',
  'pid': 6769,
  'result': 5,
  'time': 'Fri Aug  5 17:08:03 2022'},
 {'host': 'chetemi-13.lille.grid5000.fr',
  'pid': 6776,
  'result': 6,
  'time': 'Fri Aug  5 17:08:03 2022'}]

Here 2 Workers (--nprocs 2) with 2 cores for each are asked for Dask. Since there are 2 Workers per job, Dask-jobqueue will ask 3 OAR Jobs, equivalent of 6 Dask Jobs/Workers (processes=6).