Dask-jobqueue: Difference between revisions
No edit summary |
|||
Line 85: | Line 85: | ||
The Python script has to be launched on frontend: | The Python script has to be launched on frontend: | ||
{{Term|location=flille|cmd=<code class="command">python3 this-script.py</code>}} | {{Term|location=flille|cmd=<code class="command">python3 this-script.py</code>}} | ||
== Advanced usage == | |||
=== How is Dask-jobqueue interacting with OAR in practice? === | === How is Dask-jobqueue interacting with OAR in practice? === | ||
Line 119: | Line 123: | ||
[[File:Workflow dask jobq.png|900px]] | [[File:Workflow dask jobq.png|900px]] | ||
=== Use a configuration file to specify resources === | === Use a configuration file to specify resources === |
Revision as of 09:49, 23 June 2023
![]() |
Note |
---|---|
This page is actively maintained by the Grid'5000 team. If you encounter problems, please report them (see the Support page). Additionally, as it is a wiki page, you are free to make minor corrections yourself if needed. If you would like to suggest a more fundamental change, please contact the Grid'5000 team. |
Dask-jobqueue
Dask-jobqueue is a Python library that simplifies the deployment of Dask on commonly used job queuing systems found in high performance supercomputers, academic research institutions, and other clusters. Dask itself is a Python library for parallel computing, allowing scalability of Python code from multi-core local machines to large distributed clusters in the cloud.
Since Dask-jobqueue provides interfaces for both OAR and Slurm-based clusters, it can be used to facilitate the switch between OAR and Slurm-based resource managers.
The source code, issues and pull requests can be found here.
The OARCluster implementation in Dask-jobqueue has been improved to address a limitation. Previously, it did not consider the memory parameter, leading to inconsistencies in memory allocation. To overcome this, an extension was proposed and accepted into the Dask-jobqueue library.
This extension enables OAR to consider the memory parameter, ensuring that the allocated resources fulfill the specified memory requirements of Dask workers. By making OAR aware of the memory request, the extension facilitates a more intuitive and efficient allocation of resources.
Additionally, documentation work has been undertaken to assist users in seamlessly transitioning between OAR and Slurm-based clusters. You can refer to this table to see the mapping of commands and concepts between OAR and Slurm-based resource managers.
Dask-jobqueue installation
Using pip
pip can be used to install dask-jobqueue and its dependencies in your homedir:
Note that, it is recommended to install python dependencies via a virtual environment. To do so, before running the pip
command, you need to set up a virtual environment:
Using conda
conda can be used to install dask-jobqueue from the conda-forge:
Basic usage
Executing a bash script with dask-jobqueue
Here is a Python script example of dask-jobqueue which executes a batch script (hello-world.sh) on a specific resource (2 cores, 24GB, at least 1 GPU, specific cluster - chifflet -, for 1 hour)
from dask_jobqueue import OARCluster as Cluster
from dask.distributed import Client
import os
cluster = Cluster(
queue='default',
# Should be specified if you belongs to more than one GGA
#project='<your grant access group>',
# cores per job, required parameter
cores=2,
# memory per job, required parameter
memory='24GB',
memory_per_core_property_name='memcore',
# walltime for each worker job
walltime='1:0:0',
job_extra_directives=[
# Besteffort job
#'-t besteffort',
# reserve node from specific cluster
'-p chifflet',
],
# reserve node with GPU
#resource_spec='gpu=1'
)
cluster.scale(1)
client = Client(cluster)
# call your favorite batch script
client.submit(os.system, './hello-world.sh').result()
client.close()
cluster.close()
The Python script has to be launched on frontend:
Advanced usage
How is Dask-jobqueue interacting with OAR in practice?
Dask-jobqueue creates at first a Dask Scheduler when the Cluster object is instantiated (see line 5 of the previous example). The Dask Scheduler interacts with OAR/Slurm to submit jobs and reserve nodes according to the specification you made. For instance, Dask Scheduler creates for the previous example the following configuration file and submits it to OAR :
#!/usr/bin/env bash
#OAR -n dask-worker
#OAR -q default
#OAR -l /nodes=1/core=2,walltime=1:0:0
#OAR -p 'chifflet AND memcore>=11445'
/home/ychi/source-dask-jobq/env/bin/python3 -m distributed.cli.dask_worker tcp://172.16.47.106:36611 --nthreads 1 --nworkers 2 --memory-limit 11.18GiB --name dummy-name --nanny --death-timeout 60
This generated OAR file can be access by adding the following line to the python script:
print(cluster.job_script())
Note, that this command can help you to debug Dask-jobqueue resource reservation if you do not understand why Dask is not correctly reserving the nodes you specified.
The job request is sent to the OAR and executing on the wanted resources. OAR is starting a Dask Worker on the distant nodes. The Dask Scheduler is interacting with these Workers to indicate the computations they have to perform. In the previous example, this is done by executing the bash script specified by the submit
method (see line 34).
Note that :
- a Worker in Dask is a Python object that serves data and performs computations while a Job here means requests submitted to, and managed by a resource manager (OAR for instance). To summarize, dask-jobqueue starts a single OAR job that may includes one or more Dask Workers that may execute one or more scripts/tasks.
- when the Cluster object goes away, either because the Python program is killed or the dask object has been deleted, a signal is sent to the Workers to shut them down. If for some reason, the signal does not get through, the Workers kill themselves after waiting for 60 seconds reconnecting with the Dask Scheduler (timeout can be adjusted with the
death-timeout
parameter presented in the next section). - to schedule job(s) on the previously reserved nodes for the computation, you need to tell the Dask Scheduler the number of job(s) using the scale command. At this step, the Dask Scheduler starts to interact with OAR. In the example above, only one job will be launched, with only one worker inside. For advanced usage, please refer to the Advanced usage section.
Below is a schema that illustrates the different layers about Dask, OAR and resources.
The schema below describes the workflow of the different entities (Dask, OAR and resources):
Use a configuration file to specify resources
Instead of specifying resources in the Python script, it is possible to use a configuration file. This can be useful when the same script is executed on different G5K sites.
Bellow an example of Dask config file stored in ~/.config/dask/jobqueue.yaml:
jobqueue:
oar:
name: dask-worker
# Dask worker options
cores: 2 # Total number of cores per job
memory: '24GB' # Total amount of memory per job
#processes: 1 # Number of Python processes per job, ~= sqrt(cores)
#interface: null # Network interface to use: eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
#local-directory: null # Location of fast local storage like /scratch or $TMPDIR
#extra: [] # Extra arguments to pass to Dask worker
worker-extra-args: [] # Additional arguments to pass to `dask-worker`
# OAR resource manager options
#shebang: "#!/usr/bin/env bash"
queue: 'default'
#project: null
walltime: '1:00:00'
#env-extra: []
job-script-prologue: []
#resource-spec: null
#job-extra: null
#job-extra-directives: []
#job-directives-skip: []
log-directory: null
# Scheduler options
scheduler-options: {}
When a resource configuration file exists, the cluster can be then instantiated with one single line as follow:
cluster = OARCluster()
Cluster parameters
The following table is giving the mapping between Dask parameters and OAR/Slurm parameters.
dask-jobqueue parameter | OAR command example | Slurm command example | Description |
---|---|---|---|
queue | #OAR -q | #SBATCH -p | Destination queue for each worker job |
project | #OAR --project | #SBATCH -A | Accounting group associated with each worker job |
cores | #OAR -l core=2 | #SBATCH --cpu-per-task=2 | Total cores per job |
memory | #OAR -p memcore>=16000 | #SBATCH --mem=24GB if job_mem is None | Workers' --memory-limit parameter |
walltime | #OAR -l walltime=hh:mm:ss | #SBATCH -t hh:mm:ss | Walltime for each worker job |
name | #OAR -n | #SBATCH -J | Name of worker, always set to the default value dask-worker |
resource_spec | #OAR -l host=1/core=2, gpu=1 | Not supported | Request resources and specify job placement |
job_mem | Not supported | #SBATCH --mem=24GB | Amount of memory to request (If None, defaults to memory * Worker processes) |
job_cpu | Not supported | #SBATCH --cpus-per-task=2 | Number of CPU to book (If None, defaults to Worker threads * processes) |
job_extra_directives | #OAR -O, -E | #SBATCH -o, -e | Log directory |
job_extra_directives | #OAR -p parasilo | #SBATCH -C sirocoo | Property request |
job_extra_directives | #OAR -t besteffort | #SBATCH -t besteffort | Besteffort job |
job_extra_directives | #OAR -r now | #SBATCH --begin=now | Advance reservation |
job_extra_directives | #OAR --checkpoint 150 | #SBATCH --checkpoint 150 | Checkpoint |
job_extra_directives | #OAR -a jobid | #SBATCH --dependency state:jobid | Jobs dependency |
Start multiple computations at once using 'scale' parameter
How to start multiple computations at once using 'scale' parameter
Dask-jobqueue allows for the seamless deployment of Dask on clusters that use various job queuing systems such OAR and Slurm. With Dask-jobqueue's Pythonic interface, users can easily manage submissions, executions, and deletions through different resource and job management systems. Dask also provides the ability to scale jobs for parallel computing, which coordinates with Python's existing scientific libraries like NumPy, Pandas, and Scikit-Learn.
Dask-distributed, which we imported in the example above is an extension of Dask which facilitates parallel computings. The Client
class enables users to connect to and submit computations to a defined Dask Cluster, through submit
or map
calls. More details about the Client
class can be found here.
As demonstrated in the example above, a single Job may include one or more Workers. The number of Workers can be set by the processes parameter (see configuration section), if your job can be cut into many processes.
To specify the number of Jobs, you can use the scale command. The number of jobs can be specified directly, as shown in the example above (2 cores, 24GB), or indirectly by specifying the cores or memory request:
# 2 jobs with 1 worker for each will be launched
cluster.scale(2)
# specify total cores
cluster.scale(cores=4)
# specify total memory
cluster.scale(memory="48GB")
Dask Cluster also has the ability to "autoscale", with the adapt interface:
cluster.adapt(minimum=2, maximum=20)
Example
A more complicated example below can help understanding the Worker and Job notions of Dask-jobqueue:
from dask_jobqueue import OARCluster as Cluster
from dask.distributed import Client
from pprint import pprint
import logging
import os
import socket
import time
cluster = Cluster(
queue='default',
# Should be specified if you belongs to more than one GGA
#project='<your grant access group>',
cores=4,
memory='64GiB',
memory_per_core_property_name='memcore',
# walltime for each worker job
walltime='1:0:0',
processes=2,
)
cluster.scale(6)
print(cluster.job_script())
client = Client(cluster)
def slow_increment(x):
time.sleep(5)
return {'result': x + 1,
'host': socket.gethostname(),
'pid': os.getpid(),
'time': time.asctime(time.localtime(time.time()))}
nb_workers = 0
while True:
nb_workers = len(client.scheduler_info()["workers"])
if nb_workers >= 2:
print('Finally got {} workers '.format(nb_workers), 'with client: ', client)
break
time.sleep(1)
futures = client.map(slow_increment, range(6))
results = client.gather(futures)
print('results: ')
pprint(results)
client.close()
cluster.close()
The regular job script is generated as follow:
#!/usr/bin/env bash
#OAR -n dask-worker
#OAR -q default
#OAR -l /nodes=1/core=4,walltime=1:0:0
#OAR -p memcore>=16384
/home/ychi/source-dask-jobq/env/bin/python3 -m distributed.cli.dask_worker tcp://172.16.47.106:40605 --nthreads 2 --nprocs 2 --memory-limit 32.00GiB --name dummy-name --nanny --death-timeout 60 --protocol tcp://
Finally got 6 workers with client: <Client: 'tcp://172.16.47.106:40605' processes=6 threads=12, memory=192.00 GiB>
results:
[{'host': 'chetemi-13.lille.grid5000.fr',
'pid': 6780,
'result': 1,
'time': 'Fri Aug 5 17:08:03 2022'},
{'host': 'chetemi-13.lille.grid5000.fr',
'pid': 6767,
'result': 2,
'time': 'Fri Aug 5 17:08:03 2022'},
{'host': 'chetemi-13.lille.grid5000.fr',
'pid': 6768,
'result': 3,
'time': 'Fri Aug 5 17:08:03 2022'},
{'host': 'chetemi-13.lille.grid5000.fr',
'pid': 6777,
'result': 4,
'time': 'Fri Aug 5 17:08:03 2022'},
{'host': 'chetemi-13.lille.grid5000.fr',
'pid': 6769,
'result': 5,
'time': 'Fri Aug 5 17:08:03 2022'},
{'host': 'chetemi-13.lille.grid5000.fr',
'pid': 6776,
'result': 6,
'time': 'Fri Aug 5 17:08:03 2022'}]
Here 2 Workers (--nprocs 2) with 2 cores for each are asked for Dask. Since there are 2 Workers per job, Dask-jobqueue will ask 3 OAR Jobs, equivalent of 6 Dask Jobs/Workers (processes=6).
Note that by default Dask-jobqueue will allocate all required cores on the same node if possible. If you would like to have 4 cores on 4 different nodes, you should specify it by adding resource_spec="/nodes=4/core=1"
before the line 13 cores=4
.