Dask-jobqueue: Difference between revisions
Line 164: | Line 164: | ||
client.close() | client.close() | ||
cluster.close() | cluster.close() | ||
</syntaxhighlight> | </syntaxhighlight> | ||
The | The previous code generates the following output: | ||
<syntaxhighlight lang="bash"> | <syntaxhighlight lang="bash"> | ||
results: | results: | ||
[{'host': 'chifflet-7.lille.grid5000.fr', | [{'host': 'chifflet-7.lille.grid5000.fr', | ||
Line 216: | Line 203: | ||
</syntaxhighlight> | </syntaxhighlight> | ||
As shown in the output, we got 4 workers (corresponding to the 4 different PIDs: 16501, 16502, 16507, and 16508). Besides, we can see that as requested, the workers were executed in 2 different OAR jobs (1952955 and 1952956) ; 2 workers on each job. | |||
{{Warning|text=By default Dask-jobqueue will allocate all required cores on the same node if possible. If you would like to have 4 cores on 4 different nodes, you should specify it by adding <code>resource_spec="/nodes=4/core=1"</code> as parameter of the Cluster object initialisation (before line 13 <code>cores=4</code> for instance).'''}} | {{Warning|text=By default Dask-jobqueue will allocate all required cores on the same node if possible. If you would like to have 4 cores on 4 different nodes, you should specify it by adding <code>resource_spec="/nodes=4/core=1"</code> as parameter of the Cluster object initialisation (before line 13 <code>cores=4</code> for instance).'''}} |
Revision as of 14:41, 29 June 2023
Note | |
---|---|
This page is actively maintained by the Grid'5000 team. If you encounter problems, please report them (see the Support page). Additionally, as it is a wiki page, you are free to make minor corrections yourself if needed. If you would like to suggest a more fundamental change, please contact the Grid'5000 team. |
Dask-jobqueue
Dask-jobqueue is a Python library that simplifies the deployment of Dask on commonly used job queuing systems found in high performance supercomputers, academic research institutions, and other clusters. Dask itself is a Python library for parallel computing, allowing scalability of Python code from multi-core local machines to large distributed clusters in the cloud.
Since Dask-jobqueue provides interfaces for both OAR and Slurm-based clusters, it can be used to facilitate the switch between OAR and Slurm-based resource managers.
The source code, issues and pull requests can be found here.
The OARCluster implementation in Dask-jobqueue has been improved to address a limitation. Previously, it did not consider the memory parameter, leading to inconsistencies in memory allocation. To overcome this, an extension was proposed and accepted into the Dask-jobqueue library.
This extension enables OAR to consider the memory parameter, ensuring that the allocated resources fulfill the specified memory requirements of Dask workers. By making OAR aware of the memory request, the extension facilitates a more intuitive and efficient allocation of resources.
Additionally, documentation work has been undertaken to assist users in seamlessly transitioning between OAR and Slurm-based clusters. You can refer to this table to see the mapping of commands and concepts between OAR and Slurm-based resource managers.
Dask-jobqueue installation
Using pip
pip can be used to install dask-jobqueue and its dependencies in your homedir:
Note that, it is recommended to install python dependencies via a virtual environment. To do so, before running the pip
command, you need to set up a virtual environment:
Using conda
conda can be used to install dask-jobqueue from the conda-forge:
Basic usage
Executing a bash script with dask-jobqueue
Here is a Python script example of dask-jobqueue which executes a batch script (hello-world.sh) on a specific resource (2 cores, 24GB, at least 1 GPU, specific cluster - chifflet -, for 1 hour)
from dask_jobqueue import OARCluster as Cluster
from dask.distributed import Client
import os
cluster = Cluster(
queue='default',
# Should be specified if you belongs to more than one GGA
#project='<your grant access group>',
# cores per job, required parameter
cores=2,
# memory per job, required parameter
memory='24GB',
# The memory per core property name of your OAR cluster (usually memcore or mem_core).
memory_per_core_property_name='memcore',
# walltime for each worker job
walltime='1:0:0',
job_extra_directives=[
# Besteffort job
#'-t besteffort',
# reserve node from specific cluster
'-p chifflet',
],
# reserve node with GPU
#resource_spec='gpu=1'
)
cluster.scale(1)
client = Client(cluster)
# call your favorite batch script
client.submit(os.system, './hello-world.sh').result()
client.close()
cluster.close()
The Python script has to be launched on frontend:
Note that the oar_mem_core_property_name
parameter has been introduced in the extension of OARCluster. Since the OAR property names are not standardized by OAR, their names might differ from one cluster to another (usually memcore or mem_core). They can be listed by executing the oarnodes
command. If the property does not exist on your cluster, you can set it to not_applicable
to avoid getting a warning. If you leave memory_per_core_property_name set to its default value, you will be warned that allocated nodes may not have enough memory to match the memory parameter and Dask worker memory management may not work correctly.
Advanced usage
Start multiple computations in parallel using 'scale' parameter
Dask provides the ability (through Dask-distributed) to scale jobs for parallel computing, which is very convenient for interacting with scientific libraries like NumPy, Pandas, Scikit-Learn. The Client
class enables users to submit computations (through submit
or map
calls) to a defined Dask Cluster. More details about the Client
class can be found here.
To specify the number of Dask workers or OAR jobs, you can use the scale command. The number of workers or jobs can be specified directly, or indirectly by specifying the cores or memory request:
# specify total workers
cluster.scale(2)
# specify total OAR jobs
cluster.scale(jobs=2)
# specify total cores
cluster.scale(cores=4)
# specify total memory
cluster.scale(memory="48GB")
Dask Cluster also has the ability to "autoscale", with the adapt interface:
cluster.adapt(minimum=2, maximum=20)
Note that a single Job may include one or more Workers.
Example
A more complicated example below can help understanding the Worker and Job notions of Dask-jobqueue:
from dask_jobqueue import OARCluster as Cluster
from dask.distributed import Client
from pprint import pprint
import logging
import os
import socket
import time
cluster = Cluster(
queue='default',
# Should be specified if you belongs to more than one GGA
#project='<your grant access group>',
cores=4,
memory='64GiB',
# The memory per core property name of your OAR cluster (usually memcore or mem_core).
memory_per_core_property_name='memcore',
# walltime for each worker job
walltime='1:0:0',
processes=2,
)
cluster.scale(jobs=2)
print('OAR submission file: \n', cluster.job_script())
client = Client(cluster)
def slow_increment(x):
time.sleep(5)
return {'result': x + 1,
'host': socket.gethostname(),
'pid': os.getpid(),
'time': time.asctime(time.localtime(time.time())),
'jobId': os.environ.get('OAR_JOB_ID')}
# cluster.scale() doesn't wait for the workers to spin up.
# sleep here and wait for workers to be up.
# more details here: https://github.com/dask/dask-jobqueue/issues/206
while len(client.scheduler_info()["workers"]) < 4:
time.sleep(1)
futures = client.map(slow_increment, range(6))
results = client.gather(futures)
print('results: ')
pprint(results)
client.close()
cluster.close()
The previous code generates the following output:
results:
[{'host': 'chifflet-7.lille.grid5000.fr',
'jobId': '1952956',
'pid': 16502,
'result': 1,
'time': 'Thu Jun 29 09:56:06 2023'},
{'host': 'chifflet-7.lille.grid5000.fr',
'jobId': '1952955',
'pid': 16501,
'result': 2,
'time': 'Thu Jun 29 09:56:06 2023'},
{'host': 'chifflet-7.lille.grid5000.fr',
'jobId': '1952956',
'pid': 16508,
'result': 3,
'time': 'Thu Jun 29 09:56:06 2023'},
{'host': 'chifflet-7.lille.grid5000.fr',
'jobId': '1952955',
'pid': 16507,
'result': 4,
'time': 'Thu Jun 29 09:56:06 2023'},
{'host': 'chifflet-7.lille.grid5000.fr',
'jobId': '1952956',
'pid': 16502,
'result': 5,
'time': 'Thu Jun 29 09:56:06 2023'},
{'host': 'chifflet-7.lille.grid5000.fr',
'jobId': '1952955',
'pid': 16501,
'result': 6,
'time': 'Thu Jun 29 09:56:06 2023'}]
As shown in the output, we got 4 workers (corresponding to the 4 different PIDs: 16501, 16502, 16507, and 16508). Besides, we can see that as requested, the workers were executed in 2 different OAR jobs (1952955 and 1952956) ; 2 workers on each job.
Understand the interaction between Dask-jobqueue and OAR
Dask-jobqueue creates at first a Dask Scheduler when the Cluster object is instantiated (see line 5 of the previous example). The Dask Scheduler interacts with the scheduler (i.e., OAR or Slurm) to submit jobs according to the wanted specification. The interaction between Dask-jobqueue and OAR (hidden for the end-user) is made through a submission file. In the previous example, Dask Scheduler creates the following configuration file and submits it to OAR:
#!/usr/bin/env bash
#OAR -n dask-worker
#OAR -q default
#OAR -l /nodes=1/core=2,walltime=1:0:0
#OAR -p 'chifflet AND memcore>=11445'
/home/ychi/source-dask-jobq/env/bin/python3 -m distributed.cli.dask_worker tcp://172.16.47.106:36611 --nthreads 1 --nworkers 2 --memory-limit 11.18GiB --name dummy-name --nanny --death-timeout 60
The job request is sent to the OAR and executing on the wanted resources. OAR is starting a Dask Worker on the distant nodes. The Dask Scheduler is interacting with these Workers to indicate the computations they have to perform. In the previous example, this is done by executing the bash script specified by the submit
method (see line 32).
The schema below describes the workflow of the different entities (Dask, OAR and resources):
Note that :
- when the Cluster object goes away, either because the Python program is killed or the dask object has been deleted, a signal is sent to the Workers to shut them down. If for some reason, the signal does not get through, the Workers kill themselves after waiting for 60 seconds reconnecting with the Dask Scheduler (timeout can be adjusted with the
death-timeout
parameter presented in the next section). - to schedule job(s) on the previously reserved nodes for the computation, you need to tell the Dask Scheduler the number of job(s) using the scale command. At this step, the Dask Scheduler starts to interact with OAR. In the example above, only one job will be launched, with only one worker inside. For advanced usage, please refer to the Advanced usage section.
Use a configuration file to specify resources
Instead of specifying resources in the Python script, it is possible to use a configuration file. This can be useful when the same script is executed on different G5K sites.
Bellow an example of Dask config file stored in ~/.config/dask/jobqueue.yaml:
jobqueue:
oar:
name: dask-worker
# Dask worker options
cores: 2 # Total number of cores per job
memory: '24GB' # Total amount of memory per job
#processes: 1 # Number of Python processes per job, ~= sqrt(cores)
#interface: null # Network interface to use: eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
#local-directory: null # Location of fast local storage like /scratch or $TMPDIR
#extra: [] # Extra arguments to pass to Dask worker
worker-extra-args: [] # Additional arguments to pass to `dask-worker`
# OAR resource manager options
#shebang: "#!/usr/bin/env bash"
queue: 'default'
#project: null
walltime: '1:00:00'
#env-extra: []
job-script-prologue: []
#resource-spec: null
#job-extra: null
#job-extra-directives: []
#job-directives-skip: []
log-directory: null
# Scheduler options
scheduler-options: {}
When a resource configuration file exists, the Dask cluster object can be then instantiated without parameters:
cluster = OARCluster()
Cluster parameters
The following table is giving the mapping between Dask parameters and OAR/Slurm parameters.
dask-jobqueue parameter | OAR command example | Slurm command example | Description |
---|---|---|---|
queue | #OAR -q | #SBATCH -p | Destination queue for each worker job |
project | #OAR --project | #SBATCH -A | Accounting group associated with each worker job |
cores | #OAR -l core=2 | #SBATCH --cpu-per-task=2 | Total cores per job |
memory | #OAR -p memcore>=16000 | #SBATCH --mem=24GB if job_mem is None | Workers' --memory-limit parameter |
walltime | #OAR -l walltime=hh:mm:ss | #SBATCH -t hh:mm:ss | Walltime for each worker job |
name | #OAR -n | #SBATCH -J | Name of worker, always set to the default value dask-worker |
resource_spec | #OAR -l host=1/core=2, gpu=1 | Not supported | Request resources and specify job placement |
job_mem | Not supported | #SBATCH --mem=24GB | Amount of memory to request (If None, defaults to memory * Worker processes) |
job_cpu | Not supported | #SBATCH --cpus-per-task=2 | Number of CPU to book (If None, defaults to Worker threads * processes) |
job_extra_directives | #OAR -O, -E | #SBATCH -o, -e | Log directory |
job_extra_directives | #OAR -p parasilo | #SBATCH -C sirocoo | Property request |
job_extra_directives | #OAR -t besteffort | #SBATCH -t besteffort | Besteffort job |
job_extra_directives | #OAR -r now | #SBATCH --begin=now | Advance reservation |
job_extra_directives | #OAR --checkpoint 150 | #SBATCH --checkpoint 150 | Checkpoint |
job_extra_directives | #OAR -a jobid | #SBATCH --dependency state:jobid | Jobs dependency |