Submitit
Note | |
---|---|
This page is actively maintained by the Grid'5000 team. If you encounter problems, please report them (see the Support page). Additionally, as it is a wiki page, you are free to make minor corrections yourself if needed. If you would like to suggest a more fundamental change, please contact the Grid'5000 team. |
Submitit
Submitit is a lightweight tool designed for submitting Python functions for computation within a Slurm cluster. It acts as a wrapper for submission and provides access to results, logs and more. Slurm, on the other hand, is an open-source, fault-tolerant, and highly scalable cluster management and job scheduling system suitable for both large and small Linux clusters. Submitit allows for seamless execution switching between Slurm or local environments. The source code, issues and pull requests can be found here.
To facilitate the switch between OAR and Slurm-based resource managers for users of Inria's national computing infrastructure, ongoing development efforts are focused on adding support for the OAR cluster in Submitit, through a plugin approach. The source code for the OAR plugin can be accessed here. A summary of the progress made, including implemented features and pending tasks can be found in this section.
Submitit installation
Using pip
pip can be used to install the stable release of submitit:
To use the last version including the OAR plugin, an installation from Source can be done:
fontend.site :
|
pip install --user git+https://gitlab.inria.fr/moyens-de-calcul/submitit.git@master#egg=submitit |
It is recommended to install python dependencies via a virtual environment. To do so, before running the pip
command:
Using conda
conda can be used to install submitit from the conda-forge:
To use the last version including the OAR plugin, you can create a conda environment file(e.g. "conda-env-submitit.yml") as:
name: submitit
dependencies:
- pip:
- git+https://gitlab.inria.fr/moyens-de-calcul/submitit.git@master#egg=submitit
and then install the last version of Submitit using this environment file
Basic usage
The example bellow shows the execution of an integer addition performed with the Submitit library. The AutoExecutor serves as the interface for submitting functions that are either executed on a cluster or on a local execution. It automatically detects the presence of a scheduler (such as OAR or Slurm). If no scheduler is detected, the function is executed locally on the machine.
import submitit
def add(a, b):
return a + b
# logs are dumped in the folder
executor = submitit.AutoExecutor(folder="log_test")
job_addition = executor.submit(add, 5, 7) # will compute add(5, 7)
output = job_addition.result() # waits for completion and returns output
# if ever the job failed, result() will raise an error with the corresponding trace
print('job_addition output: ', output)
assert output == 12
The example script needs to be launched on frontend as follow:
The execution of the submitit script will create the working folder (i.e., folder="log_test"), in which you will find the scheduler output files (i.e., jobId_log.out and jobId_log.err), the scheduler submission file jobId_submission.sh, the submitit pickles (i.e., a task file jobId_submitted.pkl and the result jobId_result.pkl).
Note | |
---|---|
The However, it is generally recommended to specify an explicit folder to avoid any conflicts with other files present in the current directory. Note that this folder may grow rapidly, especially if you have large checkpoints (see checkpoint section for details) or if you submit lot of jobs. You should think about cleaning up the folder, and even better an automated way of cleaning it. |
Advanced usage
Using parameters for specifying resources
For specifying resources (e.g., cores, memory, partition,...), you need to call the method update_parameters :
executor = submitit.AutoExecutor(folder="log_test")
executor.update_parameters(slurm_partition="cpu_devel", oar_queue="default")
The following table recaps the parameters supported (and unsupported) by AutoExecutor, OARExecutor and SlurmExecutor:
The scheduler specific parameters can also be used with scheduler specific Executors, without the scheduler name prefixes. For instance, let's consider OarExecutor, you do not need to add oar_ before the parameters walltime or queue:
executor = submitit.OarExecutor(folder="log_test")
executor.update_parameters(walltime="0:0:5", queue="default")
Example with OarExecutor
In the example bellow, we want a job with a specific walltime that requires 1 GPU on the production queue, and is executed in besteffort mode.
import submitit
def add(a, b):
return a + b
executor = submitit.AutoExecutor(folder="log_test")
executor.update_parameters(oar_queue="production", oar_walltime="0:2:0", nodes=1, gpus_per_node=1, oar_additional_parameters={"t": "besteffort"})
job_addition = executor.submit(add, 5, 7)
output = job_addition.result()
print('job_addition output: ', output)
assert output == 12
If we check the submission file submited to OAR, we can see all our requirements:
#!/bin/bash
# Parameters
#OAR -E /home/ychi/submitit_env/log_test/%jobid%_0_log.err
#OAR -O /home/ychi/submitit_env/log_test/%jobid%_0_log.out
#OAR -l /nodes=1/gpu=1,walltime=0:2:0
#OAR -n submitit
#OAR -q production
#OAR -t besteffort
# command
export SUBMITIT_EXECUTOR=oar
/home/ychi/submitit_env/env/bin/python -u -m submitit.core._submit /home/ychi/submitit_env/log_test
Checkpointing with Submitit
The purpose of the checkpointing functionality is to allow the automatic resumption of a computation when the job is preempted by another. To so do, the current state of the job needs to be saved before the scheduler (e.g., OAR, Slurm) stops the job.
As explained more in detailed in the official submitit documentation, you need to:
- define a class that implements the
submitit.helpers.Checkpointable
interface - implement a __call__ method that contains the computations code
- implement a checkpoint method to save the current state of the computation
import submitit
class MyCheckpointableAlgorithm(submitit.helpers.Checkpointable):
def __call__(self, ....):
....
def checkpoint(self, *args, **kwargs):
....
if __name__ == "__main__":
executor = submitit.AutoExecutor(folder)
myAlgo = MyCheckpointableAlgorithm()
job = executor.submit(myAlgo, ...)
print('output: ', job.result())
The submitit documentation provides a full example of implementing a checkpointing mechanism for a multinomial logistic regression based on the Scikit-learn library.
Slurm and OAR mechanisms for resuming preempted jobs
OAR and Slurm does not provides the same strategy for implementing the resumption of a preempted job:
- for Slurm scheduler, there is the possibility to requeue the current job directly from the node by executing the command
scontrol requeue jobId
(see requeue description in the scontrol man page for more information) - for OAR scheduler, there are two possibilities: either requeuing the job by executing
oarsub --resubmit=origin_job_id
(butoarsub
command is not available on the nodes) or activate an automatic requeuing mechanism by submitted the original job with the idempotent type.
How to implement the checkpointing mechanism
As explained in the previous section, to implement a checkpointing mechanism with submitit, you need to :
- in the __call__ method:
- if the OAR scheduler is used, you should wait for 60 seconds to ensure that when the computation starts, your job will be rescheduled if it is pre-empted.
- check if an existing state is stored on the persistent storage and retrieve it
- in the checkpointing method:
- save the state of your computation in the checkpointing method (e.g., files on a persistent storage)
- if the Slurm scheduler is used, ask Slurm to reschedule the computation (with a call to the helper DelayedSubmission)
Template for Slurm
def __call__(self, ....):
... retrieve a potential existing state ...
... start computation ...
def checkpoint(self, *args: Any, **kwargs: Any) -> submitit.helpers.DelayedSubmission:
... save current computation state ...
return submitit.helpers.DelayedSubmission(self, *args, **kwargs) # submits to requeuing
Template for OAR
def __call__(self, ....):
# wait 60s at first to ensure that the checkpointing mechanism is working
time.sleep(60)
... retrieve a potential existing state ...
... start computation ...
def checkpoint(self, *args: Any, **kwargs: Any) -> submitit.helpers.DelayedSubmission:
... save current computation state ...
return None
How to implement the checkpointing mechanism using the OAR executor
TODO
TO BE REMOVED
Checkpointing with Submitit on Slurm cluster is provided with the job requeue mechanism and the self defined checkpoint method:
- The or timeout. The states of the same job are changed from running to pending and running again to finish the job.
- The self defined checkpoint method will prepare the new submission with the current state of the computation. It should include a signature able to receive parameters from the callable function (an instance of a class with a __call__ method). When the preemption signal is sent, the checkpoint method will be called asynchronously, with the same arguments as the callable function.
Doing checkpointing with Submitit on OAR cluster, with the OAR plugin approach, requires a precise understanding of the inner working of the checkpointing and the job pickling of Submitit.
The self defined checkpoint method is practically the same for OAR and Slurm. However, when an OAR job is checkpointed after preemption or timeout signal, it is systematically terminated. Another OAR job can be submitted with the "resubmit_job_id" property to finish the previously checkpointed and terminated job. However, the Submitit's job requeue mechanism provided in JobEnvironment is not relevant here. We have to resubmit another job with the current state of the computation in the original job. The resubmission can be done either manually with oarsub --resubmit=origin_job_id, or automatically by OAR if our job is idempotent.
Note | |
---|---|
Only jobs that meet the following criteria will be automatically resubmitted by OAR:
In order to achieve similar behavior on an OAR cluster as on a Slurm cluster, some minor modifications have been made to the MNIST example mentioned in the link above. For instance, a 60-second wait is included in the callable function to ensure that a checkpointed job will be automatically resubmitted by OAR. Additionally, thejob.state is set to uppercase since the "Running" state is not entirely capitalized on OAR.
|
Here is the "adapted" MNIST example:
# Copyright (c) Arthur Mensch <arthur.mensch@m4x.org>
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the BSD 3-clauses license.
# Original at https://scikit-learn.org/stable/auto_examples/linear_model/plot_sparse_logistic_regression_mnist.html
#
import functools
import pickle
import time
from pathlib import Path
import numpy as np
from sklearn.datasets import fetch_openml
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.utils import check_random_state
import submitit
class MnistTrainer(submitit.helpers.Checkpointable):
"""
This shows how to rewrite a monolith function so that it can handle preemption nicely,
and not restart from scratch everytime it's preempted.
"""
def __init__(self, clf):
# This is the state that will be saved by `checkpoint`
self.train_test = None
self.scaler = None
self.clf = clf
self.trained_clf = False
self.stage = "0"
def __call__(self, train_samples: int, model_path: Path = None):
# wait here 60s at first
# since only >60s + exit code 99 + idempotent jobs can be resubmitted automatically by OAR
time.sleep(60)
# `train_samples` and `model_path` will also be saved
log = functools.partial(print, flush=True)
log(f"*** Starting from stage '{self.stage}' ***")
if self.train_test is None:
self.stage = "Data Loading"
t0 = time.time()
log(f"*** Entering stage '{self.stage}' ***")
# Load data from https://www.openml.org/d/554
X, y = fetch_openml("mnist_784", version=1, return_X_y=True)
X, y = X.to_numpy(), y.to_numpy()
random_state = check_random_state(0)
permutation = random_state.permutation(X.shape[0])
X = X[permutation]
y = y[permutation]
X = X.reshape((X.shape[0], -1))
# Checkpoint 1: save the train/test splits
X_train, X_test, y_train, y_test = train_test_split(
X, y, train_size=train_samples, test_size=10000
)
self.train_test = X_train, X_test, y_train, y_test
log(f"Loaded data, shuffle and split in {time.time() - t0:.1f}s")
X_train, X_test, y_train, y_test = self.train_test
if self.scaler is None:
self.stage = "Data Cleaning"
t0 = time.time()
log(f"*** Entering stage '{self.stage}' ***")
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
# Scaling is actual pretty fast, make it a bit slower to allow preemption to happen here
time.sleep(10)
# Checkpoint 2: save the scaler and the preprocessed data
self.scaler = scaler
self.train_test = X_train, X_test, y_train, y_test
log(f"Scaled the data took {time.time() - t0:.0f}s")
if not self.trained_clf:
self.stage = "Model Training"
t0 = time.time()
log(f"*** Entering stage '{self.stage}' ***")
self.clf.C = 50 / train_samples
self.clf.fit(X_train, y_train)
# Checkpoint 3: mark the classifier as trained
self.trained_clf = True
log(f"Training took {time.time() - t0:.0f}s")
sparsity = np.mean(self.clf.coef_ == 0) * 100
score = self.clf.score(X_test, y_test)
log(f"Sparsity with L1 penalty: {sparsity / 100:.2%}")
log(f"Test score with L1 penalty: {score:.4f}")
if model_path:
self.save(model_path)
return score
def checkpoint(self, *args, **kwargs):
print(f"Checkpointing at stage '{self.stage}'")
return super().checkpoint(*args, **kwargs)
def save(self, model_path: Path):
with open(model_path, "wb") as o:
pickle.dump((self.scaler, self.clf), o, pickle.HIGHEST_PROTOCOL)
def main():
t0 = time.time()
# Cleanup log folder.
# This folder may grow rapidly especially if you have large checkpoints,
# or submit lot of jobs. You should think about an automated way of cleaning it.
folder = Path(__file__).parent / "mnist_logs"
if folder.exists():
for file in folder.iterdir():
file.unlink()
ex = submitit.AutoExecutor(folder)
if ex.cluster == "oar":
print("Executor will schedule jobs on Oar.")
else:
print(f"!!! Oar executable `oarsub` not found. Will execute jobs on '{ex.cluster}'")
model_path = folder / "model.pkl"
trainer = MnistTrainer(LogisticRegression(penalty="l1", solver="saga", tol=0.1, multi_class="auto"))
# Specify the job requirements.
# Reserving only as much resource as you need ensure the cluster resource are
# efficiently allocated.
ex.update_parameters(oar_core=4, timeout_min=5)
job = ex.submit(trainer, 5000, model_path=model_path)
print(f"Scheduled {job}.")
# Wait for the job to be running.
while job.state.upper() != "RUNNING":
time.sleep(1)
print("Run the following command to see what's happening")
print(f" less +F {job.paths.stdout}")
# Simulate preemption.
# Tries to stop the job after the first stage.
# If the job is preempted before the end of the first stage, try to increase it.
# If the job is not preempted, try to decrease it.
time.sleep(85)
print(f"preempting {job} after {time.time() - t0:.0f}s")
job._interrupt()
score = job.result()
print(f"Finished training. Final score: {score}.")
print(f"---------------- Job output ---------------------")
print(job.stdout())
print(f"-------------------------------------------------")
assert model_path.exists()
with open(model_path, "rb") as f:
(scaler, clf) = pickle.load(f)
sparsity = np.mean(clf.coef_ == 0) * 100
print(f"Sparsity with L1 penalty: {sparsity / 100:.2%}")
if __name__ == "__main__":
main()
This example script needs to be launched on frontend, in a virtual environment, as follow:
The output (of the original job and the submitted job) is as follow:
Executor will schedule jobs on Oar.
Scheduled OarJob<job_id=1938801, task_id=0, state="Waiting">.
Run the following command to see what's happening
less +F /home/ychi/sklearn_env/mnist_logs/1938801_0_log.out
preempting OarJob<job_id=1938801, task_id=0, state="Running"> after 92s
Checkpointing the job 1938801 ...DONE.
The job 1938801 was notified to checkpoint itself on chetemi-13.lille.grid5000.fr.
Finished training. Final score: 0.8222.
---------------- Job output ---------------------
submitit INFO (2023-03-23 12:33:42,410) - Starting with JobEnvironment(job_id=1938801_0, hostname=chetemi-13.lille.grid5000.fr, local_rank=0(1), node=0(1), global_rank=0(1))
submitit INFO (2023-03-23 12:33:42,410) - Loading pickle: /home/ychi/sklearn_env/mnist_logs/1938801_submitted.pkl
*** Starting from stage '0' ***
*** Entering stage 'Data Loading' ***
submitit INFO (2023-03-23 12:35:10,917) - Job has timed out. Ran 1 minutes out of requested 5 minutes.
submitit WARNING (2023-03-23 12:35:10,917) - Caught signal SIGUSR2 on chetemi-13.lille.grid5000.fr: this job is timed-out.
submitit INFO (2023-03-23 12:35:10,917) - Calling checkpoint method.
Checkpointing at stage 'Data Loading'
submitit INFO (2023-03-23 12:35:10,922) - Exiting job 1938801_1 with 99 code, (2 remaining timeouts)
submitit INFO (2023-03-23 12:35:27,490) - Starting with JobEnvironment(job_id=1938801_0, hostname=chetemi-13.lille.grid5000.fr, local_rank=0(1), node=0(1), global_rank=0(1))
submitit INFO (2023-03-23 12:35:27,491) - Loading pickle: /home/ychi/sklearn_env/mnist_logs/1938801_submitted.pkl
*** Starting from stage 'Data Loading' ***
*** Entering stage 'Data Loading' ***
Loaded data, shuffle and split in 35.5s
*** Entering stage 'Data Cleaning' ***
Scaled the data took 10s
*** Entering stage 'Model Training' ***
Training took 2s
Sparsity with L1 penalty: 78.88%
Test score with L1 penalty: 0.8222
submitit INFO (2023-03-23 12:37:16,022) - Job completed successfully
submitit INFO (2023-03-23 12:37:16,024) - Exitting after successful completion
-------------------------------------------------
Sparsity with L1 penalty: 78.88%
As you can see in the example above, the original job (job_id=1938801) was checkpointed at the stage 'Data Loading'. In the automatically resubmitted job (job_id=1938802), the stage '0' is skipped since it has been done in the original job. The resubmitted job restarted directly from the stage 'Date Loading', finished the training and give us back a final score.
Under the hood, a DelayedSubmission class is used to contain the callable function for checkpointing. In the OAR plugin, this class calls trickily the original job's pickled callable function, and pickle the output of the callable function into the original job's result. It makes it as similar as possible to the Slurm's same job requeue mecanisme.
However, don't forget that OAR resubmits always another job after the checkpointing. Your original submission bash file will automatically be called for the resubmission by OAR. Submitit will then write the stdout and stderr files in resubmitted_job_id_log.out and resubmitted_job_id_log.err files.
Job array with Submitit
If you need to start n times the same computation or need to execute n times the same algorithm with different parameter, submitit implement a dedicated functionality for such a usage case: Job Array. This functionnality leverages the Slurm or OAR job array feature. You should use job arrays as it is faster than submitted all jobs independently (as all jobs are submitted in a unique call to the scheduler).
To use job arrays with submitit, you need to call the executor.map_array
method. The example bellow shows how to execute 4 different additions (i.e., same algorithm applied with four distinct sets of parameter pairs: 1+10; 2+20; 3+30; 4+40):
import submitit
def add(a, b):
return a + b
a = [1, 2, 3, 4]
b = [10, 20, 30, 40]
executor = submitit.AutoExecutor(folder="log_test")
jobs = executor.map_array(add, a, b)
# Iterate over the 4 jobs and print their results
for job in jobs:
print("Job: ", job.job_id, " result: ", job.result())
The output is as follow:
Job: 1948179 result: 11
Job: 1948180 result: 22
Job: 1948178 result: 33
Job: 1948181 result: 44
Note | |
---|---|
Unlike the SlurmExecutor with |
Note | |
---|---|
As explained in the submitit documentation, one pickle file is created for each job of an array. If you have big object in your functions (like a full pytorch model) you should serialize it once and only pass its path to the submitted function. |
TODO : Job arrays through a context manager
Other advanced submitit features
Submitit provides advanced features like:
- Concurrent job executions (example available here)
- Multi-threading executions with
ThreadPoolExecutor
(example available here) - Integration with Asyncio for asynchronous job execution (example available here)
Current status of the OAR executor
The current OARExecutor implementation does cover all Submitit features. For now, it includes:
- Submission of functions
- Error handling and recording of stack traces with
job.result()
- Checkpointing of stateful callables and automatic resubmission from current state when preempted
- Job arrays
- Concurrent jobs
- Asyncio coroutines
The following features are not currently supported:
- Multi-tasks jobs
- Some parameters (as listed in the table of this section)