Submitit: Difference between revisions
Line 123: | Line 123: | ||
Checkpointing with Submitit on Slurm cluster is provided with the '''job requeue''' mechanism and the self defined '''checkpoint''' method: | Checkpointing with Submitit on Slurm cluster is provided with the '''job requeue''' mechanism and the self defined '''checkpoint''' method: | ||
* The '''job requeue''' mechanism will reschedule the checkpointed job after preemption or timeout. The states of the same job are changed from running to pending and running again to finish the job. | * The '''job requeue''' mechanism will reschedule the checkpointed job after preemption or timeout. The states of the same job are changed from ''running'' to ''pending'' and ''running'' again to finish the job. | ||
* The self defined '''checkpoint''' method will prepare the new submission with the current state of the computation. It should include a signature able to receive parameters from the callable function(an instance of a class with a '''__call__''' method). When the preemption signal is sent, the '''checkpoint''' method will be called asynchronously, with the same arguments as the callable function. | * The self defined '''checkpoint''' method will prepare the new submission with the current state of the computation. It should include a signature able to receive parameters from the callable function(an instance of a class with a '''__call__''' method). When the preemption signal is sent, the '''checkpoint''' method will be called asynchronously, with the same arguments as the callable function. | ||
Doing checkpointing with Submitit on OAR cluster, with the OAR plugin approach, requires a | Doing checkpointing with Submitit on OAR cluster, with the OAR plugin approach, requires a precise understanding of the inner working of the checkpointing and the job pickling of Submitit. | ||
The self defined | |||
The self defined ''checkpoint'' method is practically the same for OAR and Slurm. However, when an OAR job is checkpointed after preemption or timeout signal, it is systematically terminated. Another OAR job can be submitted with the "resubmit_job_id" property to finish the previously checkpointed and terminated job. However, the Submitit's ''job requeue'' mechanism provided in JobEnvironment is not relevant here. We will do the resubmission in the OarJob level instead. | |||
Revision as of 14:58, 13 March 2023
Submitit
Submitit is a lightweight tool for submitting Python functions for computation within a Slurm cluster. It basically wraps submission and provide access to results, logs and more. Slurm is an open source, fault-tolerant, and highly scalable cluster management and job scheduling system for large and small Linux clusters. Submitit allows to switch seamlessly between executing on Slurm or locally. Source code, issues and pull requests can be found here.
Currently, development is in progress for an OAR plugin, to facilitate the passage between OAR and Slurm based resource managers. Source code for an OAR plugin can be found here. Fort the first version, the supported and not supported parameters are listed in the table below. The not supported functionalities are the tasks notion of Slurm, the memory management of the job, the checkpointing, the job array and the asynchronous job supports.
Comparison with Dask-jobqueue
The key difference with Submitit is that Dask-jobqueue distributes the jobs to a pool of Dask workers, while Submitit jobs are directly jobs on the cluster. In that sense Submitit is a lower level interface than Dask-jobqueue and you get more direct control over your jobs, including individual stdout and stderr, and possibly checkpointing in case of preemption and timeout. On the other hand, you should avoid submitting multiple small tasks with Submitit, which would create many independent jobs and possibly overload the cluster, while you can do it without any problem through Dask-jobqueue.
Submitit installation
Using pip
pip can be used to install the stable release of submitit:
To use the last version including the OAR plugin, an installation from Source can be done:
fontend.site :
|
pip install --user git+https://gitlab.inria.fr/moyens-de-calcul/submitit.git@master#egg=submitit |
It is recommended to install python dependencies via a virtual environment. To do so, before running the pip
command:
Using conda
conda can be used to install submitit from the conda-forge:
To use the last version including the OAR plugin, you can create a conda environment file(e.g. "conda-env-submitit.yml") as:
name: submitit
dependencies:
- pip:
- git+https://gitlab.inria.fr/moyens-de-calcul/submitit.git@master#egg=submitit
and then install the last version of Submitit using this environment file
Basic usage
Performing an addition with Submitit
Here is a Python script example which allows to execute an addition job on Slurm, OAR or locally.
import submitit
def add(a, b):
return a + b
# logs are dumped in the folder
executor = submitit.AutoExecutor(folder="log_test")
job_addition = executor.submit(add, 5, 7) # will compute add(5, 7)
output = job_addition.result() # waits for completion and returns output
print('job_addition output: ', output)
assert output == 12
The example script can be launched on frontend as follow:
The addition job will be computed on the cluster. For each job, in the working folder that you defined (e.g., folder="log_test"), you will find a stdout log file jobId_log.out, a stderr log file jobId_log.err, a submission batch file jobId_submission.sh, a task file jobId_submitted.pkl and an output file jobId_result.pkl.
Advanced usage
Parameters
Parameters for cluster can be setted by update_parameters(**kwargs).
The AutoExecutor shown in the basic usage example above is the common submission interface, for OAR/Slurm clusters and local jobs.
To use the cluster specific parameters with the AutoExecutor, they must be appended by the cluster name, e.g., slurm_partition="cpu_devel", oar_queue="default". These cluster specific options will be ignored on other clusters.
executor = submitit.AutoExecutor(folder="log_test")
executor.update_parameters(slurm_partition="cpu_devel", oar_queue="default")
E.g. if both oar_walltime and timeout_min are provided, then:
- oar_walltime is used on the OAR cluster
- timeout_min is used on other clusters
The cluster specific parameters can also be used with cluster specific Executors, without the cluster name prefixes, e.g., SlurmExecutor, OarExecutor.
executor = submitit.OarExecutor(folder="log_test")
executor.update_parameters(walltime="0:0:5", queue="default")
The following table recaps the parameters supported by AutoExecutor, OARExecutor and SlurmExecutor:
AutoExecutor | OARExecutor | SlurmExecutor | Description |
---|---|---|---|
timeout_min | walltime in hh:mm:ss | time | timeout in minutes |
name | n | job_name | 'submitit' by default |
nodes | nodes | nodes | number of nodes in int |
oar_queue | slurm_partition | string | |
gpus_per_node | gpu | gpus_per_node or --gres=gpu:xx | number of gpu in int |
stderr_to_stdout | not supported | stderr_to_stdout | boolean |
tasks_per_node | not supported | ntasks_per_node | int |
cpus_per_task | not supported | cpus_per_task | int |
mem_gb | not supported | mem | string |
Checkpointing with Submitit
Checkpointing with Submitit on Slurm cluster is provided with the job requeue mechanism and the self defined checkpoint method:
- The job requeue mechanism will reschedule the checkpointed job after preemption or timeout. The states of the same job are changed from running to pending and running again to finish the job.
- The self defined checkpoint method will prepare the new submission with the current state of the computation. It should include a signature able to receive parameters from the callable function(an instance of a class with a __call__ method). When the preemption signal is sent, the checkpoint method will be called asynchronously, with the same arguments as the callable function.
Doing checkpointing with Submitit on OAR cluster, with the OAR plugin approach, requires a precise understanding of the inner working of the checkpointing and the job pickling of Submitit.
The self defined checkpoint method is practically the same for OAR and Slurm. However, when an OAR job is checkpointed after preemption or timeout signal, it is systematically terminated. Another OAR job can be submitted with the "resubmit_job_id" property to finish the previously checkpointed and terminated job. However, the Submitit's job requeue mechanism provided in JobEnvironment is not relevant here. We will do the resubmission in the OarJob level instead.
An MNIST example showing checkpointing with Submitit on Slurm cluster is available here.
Scikit-learn (with numpy, scipy) is required to run the MNIST example. You can refer to this page to install scikit-learn.
To have the same behavior on OAR cluster with the Submitit OAR plugin, you can run the following adapted MNIST example:
# Copyright (c) Arthur Mensch <arthur.mensch@m4x.org>
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the BSD 3-clauses license.
# Original at https://scikit-learn.org/stable/auto_examples/linear_model/plot_sparse_logistic_regression_mnist.html
#
import functools
import pickle
import time
from pathlib import Path
import numpy as np
from sklearn.datasets import fetch_openml
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.utils import check_random_state
import submitit
class MnistTrainer(submitit.helpers.Checkpointable):
"""
This shows how to rewrite a monolith function so that it can handle preemption nicely,
and not restart from scratch everytime it's preempted.
"""
def __init__(self, clf):
# This is the state that will be saved by `checkpoint`
self.train_test = None
self.scaler = None
self.clf = clf
self.trained_clf = False
self.stage = "0"
def __call__(self, train_samples: int, model_path: Path = None):
# `train_samples` and `model_path` will also be saved
log = functools.partial(print, flush=True)
log(f"*** Starting from stage '{self.stage}' ***")
if self.train_test is None:
self.stage = "Data Loading"
t0 = time.time()
log(f"*** Entering stage '{self.stage}' ***")
# Load data from https://www.openml.org/d/554
X, y = fetch_openml("mnist_784", version=1, return_X_y=True)
X, y = X.to_numpy(), y.to_numpy()
random_state = check_random_state(0)
permutation = random_state.permutation(X.shape[0])
X = X[permutation]
y = y[permutation]
X = X.reshape((X.shape[0], -1))
# Checkpoint 1: save the train/test splits
X_train, X_test, y_train, y_test = train_test_split(
X, y, train_size=train_samples, test_size=10000
)
self.train_test = X_train, X_test, y_train, y_test
log(f"Loaded data, shuffle and split in {time.time() - t0:.1f}s")
X_train, X_test, y_train, y_test = self.train_test
if self.scaler is None:
self.stage = "Data Cleaning"
t0 = time.time()
log(f"*** Entering stage '{self.stage}' ***")
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
# Scaling is actual pretty fast, make it a bit slower to allow preemption to happen here
time.sleep(10)
# Checkpoint 2: save the scaler and the preprocessed data
self.scaler = scaler
self.train_test = X_train, X_test, y_train, y_test
log(f"Scaled the data took {time.time() - t0:.0f}s")
if not self.trained_clf:
self.stage = "Model Training"
t0 = time.time()
log(f"*** Entering stage '{self.stage}' ***")
self.clf.C = 50 / train_samples
self.clf.fit(X_train, y_train)
# Checkpoint 3: mark the classifier as trained
self.trained_clf = True
log(f"Training took {time.time() - t0:.0f}s")
sparsity = np.mean(self.clf.coef_ == 0) * 100
score = self.clf.score(X_test, y_test)
log(f"Sparsity with L1 penalty: {sparsity / 100:.2%}")
log(f"Test score with L1 penalty: {score:.4f}")
if model_path:
self.save(model_path)
return score
def checkpoint(self, *args, **kwargs):
print(f"Checkpointing at stage '{self.stage}'")
return super().checkpoint(*args, **kwargs)
def save(self, model_path: Path):
with open(model_path, "wb") as o:
pickle.dump((self.scaler, self.clf), o, pickle.HIGHEST_PROTOCOL)
def main():
t0 = time.time()
# Cleanup log folder.
# This folder may grow rapidly especially if you have large checkpoints,
# or submit lot of jobs. You should think about an automated way of cleaning it.
folder = Path(__file__).parent / "mnist_logs"
if folder.exists():
for file in folder.iterdir():
file.unlink()
ex = submitit.AutoExecutor(folder)
if ex.cluster == "oar":
print("Executor will schedule jobs on Oar.")
else:
print(f"!!! Oar executable `oarsub` not found. Will execute jobs on '{ex.cluster}'")
model_path = folder / "model.pkl"
trainer = MnistTrainer(LogisticRegression(penalty="l1", solver="saga", tol=0.1, multi_class="auto"))
# Specify the job requirements.
# Reserving only as much resource as you need ensure the cluster resource are
# efficiently allocated.
ex.update_parameters(timeout_min=5, oar_core=4)
job = ex.submit(trainer, 5000, model_path=model_path)
print(f"Scheduled {job}.")
# Wait for the job to be running.
while job.state.upper() != "RUNNING":
time.sleep(1)
print("Run the following command to see what's happening")
print(f" less +F {job.paths.stdout}")
# Simulate preemption.
# Tries to stop the job after the first stage.
# If the job is preempted before the end of the first stage, try to increase it.
# If the job is not preempted, try to decrease it.
time.sleep(5)
print(f"preempting {job} after {time.time() - t0:.0f}s")
job._interrupt()
# Wait for the job to be terminated, before the resubmission.
while job.state.upper() != "TERMINATED":
time.sleep(1)
resubmitted_job = job._resubmit()
score = resubmitted_job.result()
print(f"Finished training. Final score: {score}.")
print(f"---------- Resubmitted Job output ---------------")
print(resubmitted_job.stdout())
print(f"-------------------------------------------------")
assert model_path.exists()
with open(model_path, "rb") as f:
(scaler, clf) = pickle.load(f)
sparsity = np.mean(clf.coef_ == 0) * 100
print(f"Sparsity with L1 penalty: {sparsity / 100:.2%}")
if __name__ == "__main__":
main()
This example script can be launched on frontend, in a virtual environment, as follow:
The output (of the original job and the submitted job) is as follow:
Executor will schedule jobs on Oar.
Scheduled OarJob<job_id=1937389, task_id=0, state="Waiting">.
Run the following command to see what's happening
less +F /home/ychi/sklearn_env/mnist_logs/1937389_0_log.out
submitit INFO (2023-03-13 10:55:43,370) - Starting with JobEnvironment(job_id=1937389, hostname=chetemi-8.lille.grid5000.fr, local_rank=0(1), node=0(1), global_rank=0(1))
submitit INFO (2023-03-13 10:55:43,370) - Loading pickle: /home/ychi/sklearn_env/mnist_logs/1937389_submitted.pkl
*** Starting from stage '0' ***
*** Entering stage 'Data Loading' ***
submitit WARNING (2023-03-13 10:55:51,300) - Bypassing signal SIGTERM
submitit INFO (2023-03-13 10:55:51,307) - Job has timed out. Ran 0 minutes out of requested 5 minutes.
submitit WARNING (2023-03-13 10:55:51,308) - Caught signal SIGUSR2 on chetemi-8.lille.grid5000.fr: this job is timed-out.
submitit INFO (2023-03-13 10:55:51,308) - Calling checkpoint method.
Checkpointing at stage 'Data Loading'
submitit INFO (2023-03-13 10:55:51,313) - Exiting gracefully after preemption/timeout.
preempting OarJob<job_id=1937389, task_id=0, state="Running"> after 12s
Signaling the job 1937389 with SIGTERM signal.
DONE.
The job 1937389 was notified to signal itself with SIGTERM on chetemi-8.lille.grid5000.fr.
Checkpointing the job 1937389 ...DONE.
The job 1937389 was notified to checkpoint itself on chetemi-8.lille.grid5000.fr.
Finished training. Final score: 0.8322.
---------- Resubmitted Job output ---------------
submitit INFO (2023-03-13 10:56:21,710) - Starting with JobEnvironment(job_id=1937390, hostname=chetemi-8.lille.grid5000.fr, local_rank=0(1), node=0(1), global_rank=0(1))
submitit INFO (2023-03-13 10:56:21,710) - Loading pickle: /home/ychi/sklearn_env/mnist_logs/1937390_submitted.pkl
*** Starting from stage 'Data Loading' ***
*** Entering stage 'Data Loading' ***
Loaded data, shuffle and split in 35.5s
*** Entering stage 'Data Cleaning' ***
Scaled the data took 10s
*** Entering stage 'Model Training' ***
Training took 2s
Sparsity with L1 penalty: 82.26%
Test score with L1 penalty: 0.8322
submitit INFO (2023-03-13 10:57:10,143) - Job completed successfully
submitit INFO (2023-03-13 10:57:10,145) - Exitting after successful completion
-------------------------------------------------
Sparsity with L1 penalty: 82.26%
As you can see in the job output, the original job (1937389) was checkpointed at the stage 'Data Loading'. In the resubmitted job (1937390), the stage '0' is skipped since it has been done before being rescheduled. The job is restarted directly from the stage 'Date Loading', to finish the training.