Submitit: Difference between revisions
Pvirouleau (talk | contribs) |
|||
(45 intermediate revisions by 4 users not shown) | |||
Line 6: | Line 6: | ||
The source code, issues and pull requests can be found [https://github.com/facebookincubator/submitit/ here]. | The source code, issues and pull requests can be found [https://github.com/facebookincubator/submitit/ here]. | ||
To facilitate the switch between OAR and Slurm-based resource managers for users of Inria's national computing infrastructure, ongoing development efforts are focused on adding support for the OAR cluster in Submitit, through a plugin approach. The source code for the OAR plugin can be accessed [https://gitlab.inria.fr/ | To facilitate the switch between OAR and Slurm-based resource managers for users of Inria's national computing infrastructure, ongoing development efforts are focused on adding support for the OAR cluster in Submitit, through a plugin approach. The source code for the OAR plugin can be accessed [https://gitlab.inria.fr/grid5000/submitit_oar here]. A summary of the progress made, including implemented features and pending tasks can be found in [https://www.grid5000.fr/w/User:Ychi/submitit#Current_status_of_the_OAR_executor this section]. | ||
== Submitit installation == | == Submitit and submitit_oar plugin installation == | ||
{{Note|text=Submitit should be installed in a folder accessible by both the frontend and the nodes. For example, by installing it in your homedir, we are guaranteed that Submitit is available by both the frontend and the nodes. | {{Note|text=Submitit and submitit_oar should be installed in a folder accessible by both the frontend and the nodes. For example, by installing it in your homedir, we are guaranteed that Submitit is available by both the frontend and the nodes. | ||
}} | }} | ||
=== Using pip === | === Using pip === | ||
pip can be used to install the stable release of submitit: | pip can be used to install the stable release of submitit and submitit_oar: | ||
{{Term|location=fontend.site|cmd=<code class="command">pip install --user | {{Term|location=fontend.site|cmd=<code class="command">pip install --user submitit_oar==1.1.1</code>}} | ||
It is recommended to install python dependencies via a virtual environment. To do so, before running the <code class="command">pip</code> command: | It is recommended to install python dependencies via a virtual environment. To do so, before running the <code class="command">pip</code> command: | ||
{{Term|location=fontend.site|cmd=<code class="command">python3 -m venv env</code>}} | {{Term|location=fontend.site|cmd=<code class="command">python3 -m venv env</code>}} | ||
{{Term|location=fontend.site|cmd=<code class="command">source env/bin/activate</code>}} | {{Term|location=fontend.site|cmd=<code class="command">source env/bin/activate</code>}} | ||
{{Term|location=fontend.site|cmd=<code class="command">pip install | {{Term|location=fontend.site|cmd=<code class="command">pip install submitit_oar==1.1.1</code>}} | ||
== Basic usage == | == Basic usage == | ||
The example | The example below shows the execution of an integer addition performed with the Submitit library. The '''AutoExecutor''' serves as the interface for submitting functions that are either executed on a cluster or on a local execution. It automatically detects the presence of a scheduler (such as OAR or Slurm). If no scheduler is detected, the function is executed locally on the machine. | ||
<syntaxhighlight lang="python" line> | <syntaxhighlight lang="python" line> | ||
Line 48: | Line 34: | ||
# logs are dumped in the folder | # logs are dumped in the folder | ||
executor = submitit.AutoExecutor(folder="log_test") | executor = submitit.AutoExecutor(folder="log_test", cluster="oar") | ||
job_addition = executor.submit(add, 5, 7) # will compute add(5, 7) | job_addition = executor.submit(add, 5, 7) # will compute add(5, 7) | ||
Line 75: | Line 61: | ||
<syntaxhighlight lang="python"> | <syntaxhighlight lang="python"> | ||
executor = submitit.AutoExecutor(folder="log_test") | executor = submitit.AutoExecutor(folder="log_test", cluster="oar") | ||
executor.update_parameters(slurm_partition="cpu_devel", oar_queue="default") | executor.update_parameters(slurm_partition="cpu_devel", oar_queue="default") | ||
</syntaxhighlight> | </syntaxhighlight> | ||
Line 83: | Line 69: | ||
Besides, parameters for specific scheduler are overwriting common parameters. For instance, if both ''oar_walltime'' and ''timeout_min'' are provided, then ''oar_walltime'' is used on OAR clusters (as ''timeout_min'' is a common parameter) while ''timeout_min'' is used on others (as ''oar_walltime'' is ignored)}} | Besides, parameters for specific scheduler are overwriting common parameters. For instance, if both ''oar_walltime'' and ''timeout_min'' are provided, then ''oar_walltime'' is used on OAR clusters (as ''timeout_min'' is a common parameter) while ''timeout_min'' is used on others (as ''oar_walltime'' is ignored)}} | ||
The following table recaps the parameters supported (and unsupported) by AutoExecutor, | The following table recaps the parameters supported (and unsupported) by AutoExecutor, OarExecutor and SlurmExecutor: | ||
{| class="wikitable" | {| class="wikitable" | ||
|- | |- | ||
! AutoExecutor !! | ! AutoExecutor !! OarExecutor !! SlurmExecutor !! Description | ||
|- | |- | ||
| timeout_min || walltime in hh:mm:ss || time || The duration of the job (minutes) | | timeout_min || walltime in hh:mm:ss || time || The duration of the job (minutes) | ||
Line 116: | Line 102: | ||
executor.update_parameters(walltime="0:0:5", queue="default") | executor.update_parameters(walltime="0:0:5", queue="default") | ||
</syntaxhighlight> | </syntaxhighlight> | ||
==== Example with OarExecutor ==== | ==== Example with OarExecutor ==== | ||
Line 128: | Line 113: | ||
return a + b | return a + b | ||
executor = submitit.AutoExecutor(folder="log_test") | executor = submitit.AutoExecutor(folder="log_test", cluster="oar") | ||
executor.update_parameters(oar_queue="production", oar_walltime="0:2:0", nodes=1, gpus_per_node=1, oar_additional_parameters={"t": "besteffort"}) | executor.update_parameters(oar_queue="production", oar_walltime="0:2:0", nodes=1, gpus_per_node=1, oar_additional_parameters={"t": "besteffort"}) | ||
job_addition = executor.submit(add, 5, 7) | job_addition = executor.submit(add, 5, 7) | ||
Line 136: | Line 121: | ||
</syntaxhighlight> | </syntaxhighlight> | ||
If we check the submission file | If we check the submission file submitted to OAR, we can see all our requirements: | ||
<syntaxhighlight lang="bash"> | <syntaxhighlight lang="bash"> | ||
#!/bin/bash | #!/bin/bash | ||
Line 156: | Line 141: | ||
=== Checkpointing with Submitit === | === Checkpointing with Submitit === | ||
The purpose of the checkpointing functionality is to allow the automatic resumption of a computation when the job is preempted by another. To so do, the current state of the job needs to be saved before the scheduler (e.g., OAR, Slurm) stops the job. | |||
As explained more in detailed in the [https://github.com/facebookincubator/submitit/blob/main/docs/checkpointing.md official submitit documentation], you need to: | |||
* define a class that implements the <code>submitit.helpers.Checkpointable</code> interface | |||
* implement a '''__call__''' method that contains the computations code | |||
* implement a '''checkpoint''' method to save the current state of the computation | |||
<syntaxhighlight lang="python"> | <syntaxhighlight lang="python"> | ||
import submitit | import submitit | ||
class MyCheckpointableAlgorithm(submitit.helpers.Checkpointable): | |||
def __call__(self, ....): | |||
.... | |||
def __call__(self, | |||
def checkpoint(self, *args, **kwargs): | def checkpoint(self, *args, **kwargs): | ||
.... | |||
if __name__ == "__main__": | |||
executor = submitit.AutoExecutor(folder) | |||
myAlgo = MyCheckpointableAlgorithm() | |||
job = executor.submit(myAlgo, ...) | |||
print('output: ', job.result()) | |||
</syntaxhighlight> | |||
The submitit documentation provides a [https://github.com/facebookincubator/submitit/blob/main/docs/mnist.py complete example] of implementing a checkpointing mechanism for a multinomial logistic regression based on the Scikit-learn library. | |||
==== Slurm and OAR mechanisms for resuming preempted jobs ==== | |||
OAR and Slurm do not provides the same strategy for implementing the resumption of a preempted job: | |||
* For the Slurm scheduler, there is the possibility to requeue the current job directly from the node by executing the command <code>scontrol requeue jobId</code> (see requeue description in the [https://slurm.schedmd.com/scontrol.html scontrol man page] for more information). | |||
* For the OAR scheduler, there are two possibilities: either requeuing the job by executing <code>oarsub --resubmit=origin_job_id</code> (note that the <code>oarsub</code> command is not available on the nodes), or activating an automatic requeuing mechanism by submitting the original job with the ''idempotent'' type. | |||
{{Note|text=For the OAR scheduler, the automatic requeuing mechanism using ''idempotent'' jobs only works if the job is preempted after running at least 60 seconds. If the preemption occurs before 60 seconds, the OAR scheduler considers the job not viable and stops it without requeuing.}} | |||
==== How to implement the checkpointing mechanism ==== | |||
As explained in the previous section, to implement a checkpointing mechanism with Submitit, you need to : | |||
* In the '''__call__''' method: | |||
** If the OAR scheduler is used, you should wait for 60 seconds to ensure that when the computation starts, your job will be rescheduled if it is preempted. | |||
** Check if an existing state is stored on the persistent storage and retrieve it. | |||
* In the '''checkpointing''' method: | |||
** Save the state of your computation in the '''checkpointing''' method (e.g., files on a persistent storage). | |||
** Conclude the method by returning <code>super().checkpoint(*args, **kwargs)</code>, which will create a '''DelayedSubmission''' object corresponding the requeued job. | |||
===== Template for Slurm ===== | |||
<syntaxhighlight lang="python" line> | |||
def __call__(self, ....): | |||
... # Retrieve a potential existing state | |||
... # Start computation | |||
def checkpoint(self, *args: Any, **kwargs: Any) -> submitit.helpers.DelayedSubmission: | |||
... # Save current computation state | |||
return super().checkpoint(*args, **kwargs) | |||
</syntaxhighlight> | |||
===== Template for OAR ===== | |||
<syntaxhighlight lang="python" line> | |||
def __call__(self, ....): | |||
# wait 60s at first to ensure that the checkpointing mechanism is working | |||
time.sleep(60) | |||
... # Retrieve a potential existing state | |||
... # Start computation | |||
def checkpoint(self, *args: Any, **kwargs: Any) -> submitit.helpers.DelayedSubmission: | |||
... # Save current computation state | |||
return super().checkpoint(*args, **kwargs) | |||
</syntaxhighlight> | </syntaxhighlight> | ||
==== How to simulate a preemption ==== | |||
If you want to test that the checkpointing mechanism is working properly with your algorithm, you can simulate a preemption by calling the ''_interrupt()'' method. The call to ''_interrupt()'' should be made after the job starts (i.e., in the running state) and after waiting for a couple of seconds to ensure the computation has started. Here is an example: | |||
<syntaxhighlight lang="python"> | |||
... | |||
job = ex.submit(myAlgo, ...) | |||
# Wait for the job to be running | |||
while job.state() != "RUNNING": | |||
time.sleep(1) | |||
# Wait for 60s to let the algorithm run for a while | |||
time.sleep(60) | |||
# Simulate preemption | |||
job._interrupt() | |||
# Get the result | |||
result = job.result() | |||
print(f"Result: {result}") | |||
</syntaxhighlight> | </syntaxhighlight> | ||
To validate the good execution of the checkpointing, you should: | |||
* Check the scheduler to see that the job was stopped and then restarted. | |||
* Check the output of your algorithm to see that the computation was saved and restarted not from the beginning. | |||
=== Job array with Submitit === | === Job array with Submitit === | ||
Line 409: | Line 254: | ||
a = [1, 2, 3, 4] | a = [1, 2, 3, 4] | ||
b = [10, 20, 30, 40] | b = [10, 20, 30, 40] | ||
executor = submitit.AutoExecutor(folder="log_test") | executor = submitit.AutoExecutor(folder="log_test", cluster="oar") | ||
jobs = executor.map_array(add, a, b) | jobs = executor.map_array(add, a, b) | ||
# Iterate over the 4 jobs and print their results | # Iterate over the 4 jobs and print their results | ||
Line 424: | Line 269: | ||
</syntaxhighlight> | </syntaxhighlight> | ||
Note | {{Note|text=Unlike the SlurmExecutor with <code>slurm_array_parallelism</code> parameter, the OarExecutor does not provide a parameter to limit the number of jobs executed simultaneously.}} | ||
{{Note|text=As explained in the [https://github.com/facebookincubator/submitit/blob/main/docs/examples.md#job-array submitit documentation], one pickle file is created for each job of an array. If you have big object in your functions (like a full pytorch model) you should serialize it once and only pass its path to the submitted function.}} | |||
You can refer to [https://github.com/facebookincubator/submitit/blob/main/docs/examples.md#job-arrays-through-a-context-manager this page] to see how Submitit allows the job arrays submission through a context manager. | |||
=== Other advanced submitit features === | |||
Submitit provides advanced features like: | |||
* Concurrent job executions (example available [https://github.com/facebookincubator/submitit/blob/main/docs/examples.md#concurrent-jobs here]) | |||
* Multi-threading executions with <code>ThreadPoolExecutor</code> (example available [https://github.com/facebookincubator/submitit/blob/main/docs/examples.md#concurrent-jobs here]) | |||
* Integration with Asyncio for asynchronous job execution (example available [https://github.com/facebookincubator/submitit/blob/main/docs/examples.md#asyncio here]) | |||
== Current status of the OAR executor == | |||
== Current status of the OAR | |||
The current OarExecutor implementation does not cover all Submitit features. For now, it includes: | |||
* Submission of functions | * Submission of functions | ||
* Error handling and recording of stack traces with <code>job.result()</code> | * Error handling and recording of stack traces with <code>job.result()</code> | ||
* Checkpointing of stateful callables and automatic resubmission from current state when preempted | * Checkpointing of stateful callables and automatic resubmission from current state when preempted | ||
* Job arrays | * Job arrays | ||
* Concurrent jobs | * Concurrent jobs | ||
* Asyncio coroutines | * Asyncio coroutines | ||
The following features are not currently supported: | |||
* Multi-tasks jobs | |||
* Some parameters (as listed in the table of [[Submitit#Parameters]]) |
Latest revision as of 09:31, 22 March 2024
Note | |
---|---|
This page is actively maintained by the Grid'5000 team. If you encounter problems, please report them (see the Support page). Additionally, as it is a wiki page, you are free to make minor corrections yourself if needed. If you would like to suggest a more fundamental change, please contact the Grid'5000 team. |
Submitit
Submitit is a lightweight tool designed for submitting Python functions for computation within a Slurm cluster. It acts as a wrapper for submission and provides access to results, logs and more. Slurm, on the other hand, is an open-source, fault-tolerant, and highly scalable cluster management and job scheduling system suitable for both large and small Linux clusters. Submitit allows for seamless execution switching between Slurm or local environments. The source code, issues and pull requests can be found here.
To facilitate the switch between OAR and Slurm-based resource managers for users of Inria's national computing infrastructure, ongoing development efforts are focused on adding support for the OAR cluster in Submitit, through a plugin approach. The source code for the OAR plugin can be accessed here. A summary of the progress made, including implemented features and pending tasks can be found in this section.
Submitit and submitit_oar plugin installation
Using pip
pip can be used to install the stable release of submitit and submitit_oar:
It is recommended to install python dependencies via a virtual environment. To do so, before running the pip
command:
Basic usage
The example below shows the execution of an integer addition performed with the Submitit library. The AutoExecutor serves as the interface for submitting functions that are either executed on a cluster or on a local execution. It automatically detects the presence of a scheduler (such as OAR or Slurm). If no scheduler is detected, the function is executed locally on the machine.
import submitit
def add(a, b):
return a + b
# logs are dumped in the folder
executor = submitit.AutoExecutor(folder="log_test", cluster="oar")
job_addition = executor.submit(add, 5, 7) # will compute add(5, 7)
output = job_addition.result() # waits for completion and returns output
# if ever the job failed, result() will raise an error with the corresponding trace
print('job_addition output: ', output)
assert output == 12
The example script needs to be launched on frontend as follow:
The execution of the submitit script will create the working folder (i.e., folder="log_test"), in which you will find the scheduler output files (i.e., jobId_log.out and jobId_log.err), the scheduler submission file jobId_submission.sh, the submitit pickles (i.e., a task file jobId_submitted.pkl and the result jobId_result.pkl).
Note | |
---|---|
The However, it is generally recommended to specify an explicit folder to avoid any conflicts with other files present in the current directory. Note that this folder may grow rapidly, especially if you have large checkpoints (see checkpoint section for details) or if you submit lot of jobs. You should think about cleaning up the folder, and even better an automated way of cleaning it. |
Advanced usage
Using parameters for specifying resources
For specifying resources (e.g., cores, memory, partition,...), you need to call the method update_parameters :
executor = submitit.AutoExecutor(folder="log_test", cluster="oar")
executor.update_parameters(slurm_partition="cpu_devel", oar_queue="default")
The following table recaps the parameters supported (and unsupported) by AutoExecutor, OarExecutor and SlurmExecutor:
The scheduler specific parameters can also be used with scheduler specific Executors, without the scheduler name prefixes. For instance, let's consider OarExecutor, you do not need to add oar_ before the parameters walltime or queue:
executor = submitit.OarExecutor(folder="log_test")
executor.update_parameters(walltime="0:0:5", queue="default")
Example with OarExecutor
In the example bellow, we want a job with a specific walltime that requires 1 GPU on the production queue, and is executed in besteffort mode.
import submitit
def add(a, b):
return a + b
executor = submitit.AutoExecutor(folder="log_test", cluster="oar")
executor.update_parameters(oar_queue="production", oar_walltime="0:2:0", nodes=1, gpus_per_node=1, oar_additional_parameters={"t": "besteffort"})
job_addition = executor.submit(add, 5, 7)
output = job_addition.result()
print('job_addition output: ', output)
assert output == 12
If we check the submission file submitted to OAR, we can see all our requirements:
#!/bin/bash
# Parameters
#OAR -E /home/ychi/submitit_env/log_test/%jobid%_0_log.err
#OAR -O /home/ychi/submitit_env/log_test/%jobid%_0_log.out
#OAR -l /nodes=1/gpu=1,walltime=0:2:0
#OAR -n submitit
#OAR -q production
#OAR -t besteffort
# command
export SUBMITIT_EXECUTOR=oar
/home/ychi/submitit_env/env/bin/python -u -m submitit.core._submit /home/ychi/submitit_env/log_test
Checkpointing with Submitit
The purpose of the checkpointing functionality is to allow the automatic resumption of a computation when the job is preempted by another. To so do, the current state of the job needs to be saved before the scheduler (e.g., OAR, Slurm) stops the job.
As explained more in detailed in the official submitit documentation, you need to:
- define a class that implements the
submitit.helpers.Checkpointable
interface - implement a __call__ method that contains the computations code
- implement a checkpoint method to save the current state of the computation
import submitit
class MyCheckpointableAlgorithm(submitit.helpers.Checkpointable):
def __call__(self, ....):
....
def checkpoint(self, *args, **kwargs):
....
if __name__ == "__main__":
executor = submitit.AutoExecutor(folder)
myAlgo = MyCheckpointableAlgorithm()
job = executor.submit(myAlgo, ...)
print('output: ', job.result())
The submitit documentation provides a complete example of implementing a checkpointing mechanism for a multinomial logistic regression based on the Scikit-learn library.
Slurm and OAR mechanisms for resuming preempted jobs
OAR and Slurm do not provides the same strategy for implementing the resumption of a preempted job:
- For the Slurm scheduler, there is the possibility to requeue the current job directly from the node by executing the command
scontrol requeue jobId
(see requeue description in the scontrol man page for more information). - For the OAR scheduler, there are two possibilities: either requeuing the job by executing
oarsub --resubmit=origin_job_id
(note that theoarsub
command is not available on the nodes), or activating an automatic requeuing mechanism by submitting the original job with the idempotent type.
How to implement the checkpointing mechanism
As explained in the previous section, to implement a checkpointing mechanism with Submitit, you need to :
- In the __call__ method:
- If the OAR scheduler is used, you should wait for 60 seconds to ensure that when the computation starts, your job will be rescheduled if it is preempted.
- Check if an existing state is stored on the persistent storage and retrieve it.
- In the checkpointing method:
- Save the state of your computation in the checkpointing method (e.g., files on a persistent storage).
- Conclude the method by returning
super().checkpoint(*args, **kwargs)
, which will create a DelayedSubmission object corresponding the requeued job.
Template for Slurm
def __call__(self, ....):
... # Retrieve a potential existing state
... # Start computation
def checkpoint(self, *args: Any, **kwargs: Any) -> submitit.helpers.DelayedSubmission:
... # Save current computation state
return super().checkpoint(*args, **kwargs)
Template for OAR
def __call__(self, ....):
# wait 60s at first to ensure that the checkpointing mechanism is working
time.sleep(60)
... # Retrieve a potential existing state
... # Start computation
def checkpoint(self, *args: Any, **kwargs: Any) -> submitit.helpers.DelayedSubmission:
... # Save current computation state
return super().checkpoint(*args, **kwargs)
How to simulate a preemption
If you want to test that the checkpointing mechanism is working properly with your algorithm, you can simulate a preemption by calling the _interrupt() method. The call to _interrupt() should be made after the job starts (i.e., in the running state) and after waiting for a couple of seconds to ensure the computation has started. Here is an example:
...
job = ex.submit(myAlgo, ...)
# Wait for the job to be running
while job.state() != "RUNNING":
time.sleep(1)
# Wait for 60s to let the algorithm run for a while
time.sleep(60)
# Simulate preemption
job._interrupt()
# Get the result
result = job.result()
print(f"Result: {result}")
To validate the good execution of the checkpointing, you should:
- Check the scheduler to see that the job was stopped and then restarted.
- Check the output of your algorithm to see that the computation was saved and restarted not from the beginning.
Job array with Submitit
If you need to start n times the same computation or need to execute n times the same algorithm with different parameter, submitit implement a dedicated functionality for such a usage case: Job Array. This functionnality leverages the Slurm or OAR job array feature. You should use job arrays as it is faster than submitted all jobs independently (as all jobs are submitted in a unique call to the scheduler).
To use job arrays with submitit, you need to call the executor.map_array
method. The example bellow shows how to execute 4 different additions (i.e., same algorithm applied with four distinct sets of parameter pairs: 1+10; 2+20; 3+30; 4+40):
import submitit
def add(a, b):
return a + b
a = [1, 2, 3, 4]
b = [10, 20, 30, 40]
executor = submitit.AutoExecutor(folder="log_test", cluster="oar")
jobs = executor.map_array(add, a, b)
# Iterate over the 4 jobs and print their results
for job in jobs:
print("Job: ", job.job_id, " result: ", job.result())
The output is as follow:
Job: 1948179 result: 11
Job: 1948180 result: 22
Job: 1948178 result: 33
Job: 1948181 result: 44
Note | |
---|---|
Unlike the SlurmExecutor with |
Note | |
---|---|
As explained in the submitit documentation, one pickle file is created for each job of an array. If you have big object in your functions (like a full pytorch model) you should serialize it once and only pass its path to the submitted function. |
You can refer to this page to see how Submitit allows the job arrays submission through a context manager.
Other advanced submitit features
Submitit provides advanced features like:
- Concurrent job executions (example available here)
- Multi-threading executions with
ThreadPoolExecutor
(example available here) - Integration with Asyncio for asynchronous job execution (example available here)
Current status of the OAR executor
The current OarExecutor implementation does not cover all Submitit features. For now, it includes:
- Submission of functions
- Error handling and recording of stack traces with
job.result()
- Checkpointing of stateful callables and automatic resubmission from current state when preempted
- Job arrays
- Concurrent jobs
- Asyncio coroutines
The following features are not currently supported:
- Multi-tasks jobs
- Some parameters (as listed in the table of Submitit#Parameters)