Dask-jobqueue

From Grid5000
Jump to navigation Jump to search
Note.png Note

This page is actively maintained by the Grid'5000 team. If you encounter problems, please report them (see the Support page). Additionally, as it is a wiki page, you are free to make minor corrections yourself if needed. If you would like to suggest a more fundamental change, please contact the Grid'5000 team.

Dask-jobqueue

Dask-jobqueue is a Python library that simplifies the deployment of Dask on commonly used job queuing systems found in high performance supercomputers, academic research institutions, and other clusters. Dask itself is a Python library for parallel computing, allowing scalability of Python code from multi-core local machines to large distributed clusters in the cloud.

Since Dask-jobqueue provides interfaces for both OAR and Slurm-based clusters, it can be used to facilitate the switch between OAR and Slurm-based resource managers.

The source code, issues and pull requests can be found here.

The OARCluster implementation in Dask-jobqueue has been improved to address a limitation. Previously, it did not consider the memory parameter, leading to inconsistencies in memory allocation. To overcome this, an extension was proposed and accepted into the Dask-jobqueue library.

This extension enables OAR to consider the memory parameter, ensuring that the allocated resources fulfill the specified memory requirements of Dask workers. By making OAR aware of the memory request, the extension facilitates a more intuitive and efficient allocation of resources.

Additionally, documentation work has been undertaken to assist users in seamlessly transitioning between OAR and Slurm-based clusters. You can refer to this table to see the mapping of commands and concepts between OAR and Slurm-based resource managers.

Dask-jobqueue installation

Note.png Note

Dask-jobqueue has to be installed on both the frontend and the nodes. By installing it locally in your homedir, Dask-jobqueue is available on both the frontend and the nodes (as your homedir is stored on a NFS server).

Using pip

pip can be used to install dask-jobqueue and its dependencies in your homedir:

Terminal.png fontend.site:
pip install --user dask-jobqueue==0.8.2 distributed==2023.8.0 --upgrade

Note that, it is recommended to install python dependencies via a virtual environment. To do so, before running the pip command, you need to set up a virtual environment:

Terminal.png fontend.site:
python3 -m venv env
Terminal.png fontend.site:
source env/bin/activate
Terminal.png fontend.site:
pip install dask-jobqueue==0.8.2 distributed==2023.8.0

Using conda

conda can be used to install dask-jobqueue from the conda-forge:

Terminal.png fontend.site:
module load conda
Terminal.png fontend.site:
conda install dask-jobqueue=0.8.2 distributed=2023.8.0 -c conda-forge


Note.png Note

The example usages below have been tested with specific versions of the packages: dask-jobqueue==0.8.2 and distributed==2023.8.0. Without these version pins, we may encounter the error as described here: https://intranet.grid5000.fr/bugzilla/show_bug.cgi?id=15889. We recommend using these versions until a verified fix is released in newer versions.

Basic usage

Executing a bash script with dask-jobqueue

Here is a Python script example of dask-jobqueue which executes a batch script (hello-world.sh) on a specific resource (2 cores, 24GB, at least 1 GPU, specific cluster - chifflet -, for 1 hour)

from dask_jobqueue import OARCluster as Cluster
from dask.distributed import Client
import os

cluster = Cluster(
  queue='default',
  # Should be specified if you belongs to more than one GGA
  #project='<your grant access group>',
  # cores per job, required parameter
  cores=2,
  # memory per job, required parameter
  memory='12GB',
  # The memory per core property name of your OAR cluster (usually memcore or mem_core).
  memory_per_core_property_name='memcore',
  # walltime for each worker job
  walltime='1:0:0',

  job_extra_directives=[
    # Besteffort job
    #'-t besteffort',
    # reserve node from specific cluster
    '-p chifflet',
  ],

  # reserve node with GPU
  #resource_spec='gpu=1'
)

cluster.scale(1)
client = Client(cluster)

# call your favorite batch script
client.submit(os.system, './hello-world.sh').result()

client.close()
cluster.close()

A sample hello-world.sh could be the following:

echo "hello from $(hostname)"

The Python script has to be launched on frontend:

Terminal.png fontend.site:
python3 this-script.py


Warning.png Warning

The OARCluster extension introduces a new parameter: oar_mem_core_property_name. Indeed, as OAR does not provide a generic property name for specifying the amount of memory associated with a resource, administrators are free to setup their own naming convention (usually memcore or mem_core).


Setting this property is required, otherwise dask jobqueue cannot ensure that the nodes possess the specified amount of memory. If the property does not exist on your cluster, you should set it to not_applicable to avoid getting a warning. On Grid'5000, the property is named memcore, this is why you need to have the parameter memory_per_core_property_name='memcore'

Advanced usage

Start multiple computations in parallel using 'scale' parameter

Dask provides the ability (through Dask-distributed) to scale jobs for parallel computing, which is very convenient for interacting with scientific libraries like NumPy, Pandas, Scikit-Learn. The Client class enables users to submit computations (through submit or map calls) to a defined Dask Cluster. More details about the Client class can be found here.

To specify the number of Dask workers or OAR jobs, you can use the scale command. The number of workers or jobs can be specified directly, or indirectly by specifying the cores or memory request:

# specify total workers
cluster.scale(2)
# specify total OAR jobs
cluster.scale(jobs=2)
# specify total cores 
cluster.scale(cores=4)
# specify total memory
cluster.scale(memory="48GB")

Dask Cluster also has the ability to "autoscale", with the adapt interface:

cluster.adapt(minimum=2, maximum=20)

Note that a single Job may include one or more Workers.

A more complicated example below can help understanding the Worker and Job notions of Dask-jobqueue:

from dask_jobqueue import OARCluster as Cluster
from dask.distributed import Client
from pprint import pprint
import logging
import os
import socket
import time

cluster = Cluster(
    queue='default',
    # Should be specified if you belongs to more than one GGA
    #project='<your grant access group>',
    cores=4,
    memory='16GiB',
    # The memory per core property name of your OAR cluster (usually memcore or mem_core).
    memory_per_core_property_name='memcore',
    # walltime for each worker job
    walltime='1:0:0',
    processes=2,
)
cluster.scale(jobs=2)
print('OAR submission file: \n', cluster.job_script())

client = Client(cluster)

def slow_increment(x):
    time.sleep(5)
    return {'result': x + 1,
            'host': socket.gethostname(),
            'pid': os.getpid(),
            'time': time.asctime(time.localtime(time.time())),
            'jobId': os.environ.get('OAR_JOB_ID')}

# cluster.scale() doesn't wait for the workers to spin up.
# sleep here and wait for workers to be up.
# more details here: https://github.com/dask/dask-jobqueue/issues/206
while len(client.scheduler_info()["workers"]) < 4:
    time.sleep(1)

futures = client.map(slow_increment, range(6))
results = client.gather(futures)
print('results: ')
pprint(results)

client.close()
cluster.close()

The previous code generates the following output:

results:
[{'host': 'chifflet-7.lille.grid5000.fr',
  'jobId': '1952956',
  'pid': 16502,
  'result': 1,
  'time': 'Thu Jun 29 09:56:06 2023'},
 {'host': 'chifflet-7.lille.grid5000.fr',
  'jobId': '1952955',
  'pid': 16501,
  'result': 2,
  'time': 'Thu Jun 29 09:56:06 2023'},
 {'host': 'chifflet-7.lille.grid5000.fr',
  'jobId': '1952956',
  'pid': 16508,
  'result': 3,
  'time': 'Thu Jun 29 09:56:06 2023'},
 {'host': 'chifflet-7.lille.grid5000.fr',
  'jobId': '1952955',
  'pid': 16507,
  'result': 4,
  'time': 'Thu Jun 29 09:56:06 2023'},
 {'host': 'chifflet-7.lille.grid5000.fr',
  'jobId': '1952956',
  'pid': 16502,
  'result': 5,
  'time': 'Thu Jun 29 09:56:06 2023'},
 {'host': 'chifflet-7.lille.grid5000.fr',
  'jobId': '1952955',
  'pid': 16501,
  'result': 6,
  'time': 'Thu Jun 29 09:56:06 2023'}]

As shown in the output, we got 4 workers (corresponding to the 4 different PIDs: 16501, 16502, 16507, and 16508). Besides, we can see that as requested, the workers were executed in 2 different OAR jobs (1952955 and 1952956) ; 2 workers on each job.

Understand the interaction between Dask-jobqueue and OAR

Dask-jobqueue creates at first a Dask Scheduler when the Cluster object is instantiated (see line 5 of the basic example). The Dask Scheduler interacts with the scheduler (i.e., OAR or Slurm) to submit jobs according to the wanted specification. The interaction between Dask-jobqueue and OAR (hidden for the end-user) is made through an inline command for the job submission.


Note.png Note

If needed, you can display the generated OAR file by adding the following line to the python script:

print(cluster.job_script())
It might be useful for debugging if you do not understand why Dask is not correctly reserving the nodes you specified.

In the basic example, Dask Scheduler generated this file:

#!/usr/bin/env bash

#OAR -n dask-worker
#OAR -q default
#OAR -l /nodes=1/core=2,walltime=1:0:0
#OAR -p 'chifflet AND memcore>=11445'

/home/ychi/source-dask-jobq/env/bin/python3 -m distributed.cli.dask_worker tcp://172.16.47.106:36611 --nthreads 1 --nworkers 2 --memory-limit 11.18GiB --name dummy-name --nanny --death-timeout 60

The job request is sent to the OAR and executing on the wanted resources. OAR is starting a Dask Worker on the distant nodes. The Dask Scheduler is interacting with these Workers to indicate the computations they have to perform. In the basic example, this is done by executing the bash script specified by the submit method (see line 33).

The schema below describes the workflow of the different entities (Dask, OAR and resources):

Workflow dask jobq.png

Note that :

  • when the Cluster object goes away, either because the Python program is killed or the dask object has been deleted, a signal is sent to the Workers to shut them down. If for some reason, the signal does not get through, the Workers kill themselves after waiting for 60 seconds reconnecting with the Dask Scheduler (timeout can be adjusted with the death-timeout parameter presented in the next section).
  • to schedule job(s) on the previously reserved nodes for the computation, you need to tell the Dask Scheduler the number of job(s) using the scale command. At this step, the Dask Scheduler starts to interact with OAR. In the basic example, only one job will be launched, with only one worker inside. For advanced usage, please refer to this example.
  • dask-jobqueue does not handle multi-node jobs. In order to do this, you probably want to use Dask-mpi.

Use a configuration file to specify resources

Instead of specifying resources in the Python script, it is possible to use a configuration file. This can be useful when the same script is executed on different G5K sites.

Bellow an example of Dask config file stored in ~/.config/dask/jobqueue.yaml:

jobqueue:
  oar:
    name: dask-worker
    # Dask worker options
    cores: 2 # Total number of cores per job
    memory: '24GB' # Total amount of memory per job
    #processes: 1 # Number of Python processes per job, ~= sqrt(cores) 
    #interface: null # Network interface to use: eth0 or ib0
    death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
    #local-directory: null # Location of fast local storage like /scratch or $TMPDIR
    #extra: [] # Extra arguments to pass to Dask worker
    worker-extra-args: [] # Additional arguments to pass to `dask-worker`

    # OAR resource manager options
    #shebang: "#!/usr/bin/env bash"
    queue: 'default'
    #project: null
    walltime: '1:00:00'
    #env-extra: []
    job-script-prologue: []
    #resource-spec: null
    #job-extra: null
    #job-extra-directives: []
    #job-directives-skip: []
    log-directory: null

    # Scheduler options
    scheduler-options: {}

When a resource configuration file exists, the Dask cluster object can be then instantiated without parameters:

cluster = OARCluster()

Cluster parameters

The following table is giving the mapping between Dask parameters and OAR/Slurm parameters.

dask-jobqueue parameter OAR command example Slurm command example Description
queue #OAR -q #SBATCH -p Destination queue for each worker job
project #OAR --project #SBATCH -A Accounting group associated with each worker job
cores #OAR -l core=2 #SBATCH --cpu-per-task=2 Total cores per job
memory #OAR -p memcore>=16000 #SBATCH --mem=24GB if job_mem is None Workers' --memory-limit parameter
walltime #OAR -l walltime=hh:mm:ss #SBATCH -t hh:mm:ss Walltime for each worker job
name #OAR -n #SBATCH -J Name of worker, always set to the default value dask-worker
resource_spec #OAR -l host=1/core=2, gpu=1 Not supported Request resources and specify job placement
job_mem Not supported #SBATCH --mem=24GB Amount of memory to request (If None, defaults to memory * Worker processes)
job_cpu Not supported #SBATCH --cpus-per-task=2 Number of CPU to book (If None, defaults to Worker threads * processes)
job_extra_directives #OAR -O, -E #SBATCH -o, -e Log directory
job_extra_directives #OAR -p parasilo #SBATCH -C sirocoo Property request
job_extra_directives #OAR -t besteffort #SBATCH -t besteffort Besteffort job
job_extra_directives #OAR -r now #SBATCH --begin=now Advance reservation
job_extra_directives #OAR --checkpoint 150 #SBATCH --checkpoint 150 Checkpoint
job_extra_directives #OAR -a jobid #SBATCH --dependency state:jobid Jobs dependency