Dask-jobqueue
Note | |
---|---|
This page is actively maintained by the Grid'5000 team. If you encounter problems, please report them (see the Support page). Additionally, as it is a wiki page, you are free to make minor corrections yourself if needed. If you would like to suggest a more fundamental change, please contact the Grid'5000 team. |
Dask-jobqueue
Dask-jobqueue is a Python library that simplifies the deployment of Dask on commonly used job queuing systems found in high performance supercomputers, academic research institutions, and other clusters. Dask itself is a Python library for parallel computing, allowing scalability of Python code from multi-core local machines to large distributed clusters in the cloud.
Since Dask-jobqueue provides interfaces for both OAR and Slurm-based clusters, it can be used to facilitate the switch between OAR and Slurm-based resource managers.
The source code, issues and pull requests can be found here.
The OARCluster implementation in Dask-jobqueue has been improved to address a limitation. Previously, it did not consider the memory parameter, leading to inconsistencies in memory allocation. To overcome this, an extension was proposed and accepted into the Dask-jobqueue library.
This extension enables OAR to consider the memory parameter, ensuring that the allocated resources fulfill the specified memory requirements of Dask workers. By making OAR aware of the memory request, the extension facilitates a more intuitive and efficient allocation of resources.
Additionally, documentation work has been undertaken to assist users in seamlessly transitioning between OAR and Slurm-based clusters. You can refer to this table to see the mapping of commands and concepts between OAR and Slurm-based resource managers.
Dask-jobqueue installation
Using pip
pip can be used to install dask-jobqueue and its dependencies in your homedir:
Note that, it is recommended to install python dependencies via a virtual environment. To do so, before running the pip
command, you need to set up a virtual environment:
Using conda
conda can be used to install dask-jobqueue from the conda-forge:
Basic usage
Executing a bash script with dask-jobqueue
Here is a Python script example of dask-jobqueue which executes a batch script (hello-world.sh) on a specific resource (2 cores, 24GB, at least 1 GPU, specific cluster - chifflet -, for 1 hour)
from dask_jobqueue import OARCluster as Cluster
from dask.distributed import Client
import os
cluster = Cluster(
queue='default',
# Should be specified if you belongs to more than one GGA
#project='<your grant access group>',
# cores per job, required parameter
cores=2,
# memory per job, required parameter
memory='24GB',
memory_per_core_property_name='memcore',
# walltime for each worker job
walltime='1:0:0',
job_extra_directives=[
# Besteffort job
#'-t besteffort',
# reserve node from specific cluster
'-p chifflet',
],
# reserve node with GPU
#resource_spec='gpu=1'
)
cluster.scale(1)
client = Client(cluster)
# call your favorite batch script
client.submit(os.system, './hello-world.sh').result()
client.close()
cluster.close()
The Python script has to be launched on frontend:
Advanced usage
Start multiple computations in parallel using 'scale' parameter
Dask provides the ability (through Dask-distributed) to scale jobs for parallel computing, which is very convenient for interacting with scientific libraries like NumPy, Pandas, Scikit-Learn. The Client
class enables users to submit computations (through submit
or map
calls) to a defined Dask Cluster. More details about the Client
class can be found here.
To specify the number of Dask workers or OAR jobs, you can use the scale command. The number of workers or jobs can be specified directly, or indirectly by specifying the cores or memory request:
# specify total workers
cluster.scale(2)
# specify total OAR jobs
cluster.scale(jobs=2)
# specify total cores
cluster.scale(cores=4)
# specify total memory
cluster.scale(memory="48GB")
Dask Cluster also has the ability to "autoscale", with the adapt interface:
cluster.adapt(minimum=2, maximum=20)
Note that a single Job may include one or more Workers. The number of Workers can be set by the processes parameter. It might be useful if your job can be cut into many processes.
Example
A more complicated example below can help understanding the Worker and Job notions of Dask-jobqueue:
from dask_jobqueue import OARCluster as Cluster
from dask.distributed import Client
from pprint import pprint
import logging
import os
import socket
import time
cluster = Cluster(
queue='default',
# Should be specified if you belongs to more than one GGA
#project='<your grant access group>',
cores=4,
memory='64GiB',
memory_per_core_property_name='memcore',
# walltime for each worker job
walltime='1:0:0',
processes=2,
)
cluster.scale(jobs=2)
print(cluster.job_script())
client = Client(cluster)
def slow_increment(x):
time.sleep(5)
return {'result': x + 1,
'host': socket.gethostname(),
'pid': os.getpid(),
'time': time.asctime(time.localtime(time.time()))}
nb_workers = 0
while True:
nb_workers = len(client.scheduler_info()["workers"])
if nb_workers >= 2:
print('Finally got {} workers '.format(nb_workers), 'with client: ', client)
break
time.sleep(1)
futures = client.map(slow_increment, range(6))
results = client.gather(futures)
print('results: ')
pprint(results)
client.close()
cluster.close()
The regular job script is generated as follow:
#!/usr/bin/env bash
#OAR -n dask-worker
#OAR -q default
#OAR -l /nodes=1/core=4,walltime=1:0:0
#OAR -p memcore>=16384
/home/ychi/dask-jobq-venv/env/bin/python -m distributed.cli.dask_worker tcp://172.16.47.106:45409 --nthreads 2 --nworkers 2 --memory-limit 32.00GiB --name dummy-name --nanny --death-timeout 60
Finally got 4 workers with client: <Client: 'tcp://172.16.47.106:45409' processes=4 threads=8, memory=128.00 GiB>
results:
[{'host': 'chifflet-8.lille.grid5000.fr',
'pid': 20552,
'result': 1,
'time': 'Tue Jun 27 16:43:52 2023'},
{'host': 'chifflet-8.lille.grid5000.fr',
'pid': 20546,
'result': 2,
'time': 'Tue Jun 27 16:43:52 2023'},
{'host': 'chifflet-8.lille.grid5000.fr',
'pid': 20553,
'result': 3,
'time': 'Tue Jun 27 16:43:52 2023'},
{'host': 'chifflet-8.lille.grid5000.fr',
'pid': 20547,
'result': 4,
'time': 'Tue Jun 27 16:43:52 2023'},
{'host': 'chifflet-8.lille.grid5000.fr',
'pid': 20552,
'result': 5,
'time': 'Tue Jun 27 16:43:52 2023'},
{'host': 'chifflet-8.lille.grid5000.fr',
'pid': 20546,
'result': 6,
'time': 'Tue Jun 27 16:43:52 2023'}]
Here 2 OAR jobs are required. Each of these OAR jobs contains 2 workers(--nworkers 2). Therefore, we got a total of 4 workers.
Understand the interaction between Dask-jobqueue and OAR
Dask-jobqueue creates at first a Dask Scheduler when the Cluster object is instantiated (see line 5 of the previous example). The Dask Scheduler interacts with the scheduler (i.e., OAR or Slurm) to submit jobs according to the wanted specification. The interaction between Dask-jobqueue and OAR (hidden for the end-user) is made through a submission file. In the previous example, Dask Scheduler creates the following configuration file and submits it to OAR:
#!/usr/bin/env bash
#OAR -n dask-worker
#OAR -q default
#OAR -l /nodes=1/core=2,walltime=1:0:0
#OAR -p 'chifflet AND memcore>=11445'
/home/ychi/source-dask-jobq/env/bin/python3 -m distributed.cli.dask_worker tcp://172.16.47.106:36611 --nthreads 1 --nworkers 2 --memory-limit 11.18GiB --name dummy-name --nanny --death-timeout 60
The job request is sent to the OAR and executing on the wanted resources. OAR is starting a Dask Worker on the distant nodes. The Dask Scheduler is interacting with these Workers to indicate the computations they have to perform. In the previous example, this is done by executing the bash script specified by the submit
method (see line 32).
The schema below describes the workflow of the different entities (Dask, OAR and resources):
Note that :
- when the Cluster object goes away, either because the Python program is killed or the dask object has been deleted, a signal is sent to the Workers to shut them down. If for some reason, the signal does not get through, the Workers kill themselves after waiting for 60 seconds reconnecting with the Dask Scheduler (timeout can be adjusted with the
death-timeout
parameter presented in the next section). - to schedule job(s) on the previously reserved nodes for the computation, you need to tell the Dask Scheduler the number of job(s) using the scale command. At this step, the Dask Scheduler starts to interact with OAR. In the example above, only one job will be launched, with only one worker inside. For advanced usage, please refer to the Advanced usage section.
Use a configuration file to specify resources
Instead of specifying resources in the Python script, it is possible to use a configuration file. This can be useful when the same script is executed on different G5K sites.
Bellow an example of Dask config file stored in ~/.config/dask/jobqueue.yaml:
jobqueue:
oar:
name: dask-worker
# Dask worker options
cores: 2 # Total number of cores per job
memory: '24GB' # Total amount of memory per job
#processes: 1 # Number of Python processes per job, ~= sqrt(cores)
#interface: null # Network interface to use: eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
#local-directory: null # Location of fast local storage like /scratch or $TMPDIR
#extra: [] # Extra arguments to pass to Dask worker
worker-extra-args: [] # Additional arguments to pass to `dask-worker`
# OAR resource manager options
#shebang: "#!/usr/bin/env bash"
queue: 'default'
#project: null
walltime: '1:00:00'
#env-extra: []
job-script-prologue: []
#resource-spec: null
#job-extra: null
#job-extra-directives: []
#job-directives-skip: []
log-directory: null
# Scheduler options
scheduler-options: {}
When a resource configuration file exists, the Dask cluster object can be then instantiated without parameters:
cluster = OARCluster()
Cluster parameters
The following table is giving the mapping between Dask parameters and OAR/Slurm parameters.
dask-jobqueue parameter | OAR command example | Slurm command example | Description |
---|---|---|---|
queue | #OAR -q | #SBATCH -p | Destination queue for each worker job |
project | #OAR --project | #SBATCH -A | Accounting group associated with each worker job |
cores | #OAR -l core=2 | #SBATCH --cpu-per-task=2 | Total cores per job |
memory | #OAR -p memcore>=16000 | #SBATCH --mem=24GB if job_mem is None | Workers' --memory-limit parameter |
walltime | #OAR -l walltime=hh:mm:ss | #SBATCH -t hh:mm:ss | Walltime for each worker job |
name | #OAR -n | #SBATCH -J | Name of worker, always set to the default value dask-worker |
resource_spec | #OAR -l host=1/core=2, gpu=1 | Not supported | Request resources and specify job placement |
job_mem | Not supported | #SBATCH --mem=24GB | Amount of memory to request (If None, defaults to memory * Worker processes) |
job_cpu | Not supported | #SBATCH --cpus-per-task=2 | Number of CPU to book (If None, defaults to Worker threads * processes) |
job_extra_directives | #OAR -O, -E | #SBATCH -o, -e | Log directory |
job_extra_directives | #OAR -p parasilo | #SBATCH -C sirocoo | Property request |
job_extra_directives | #OAR -t besteffort | #SBATCH -t besteffort | Besteffort job |
job_extra_directives | #OAR -r now | #SBATCH --begin=now | Advance reservation |
job_extra_directives | #OAR --checkpoint 150 | #SBATCH --checkpoint 150 | Checkpoint |
job_extra_directives | #OAR -a jobid | #SBATCH --dependency state:jobid | Jobs dependency |