Submitit: Difference between revisions

From Grid5000
Jump to navigation Jump to search
 
(73 intermediate revisions by 4 users not shown)
Line 1: Line 1:
{{Portal|User}}
{{TutorialHeader}}
= Submitit =
= Submitit =
[https://github.com/facebookincubator/submitit Submitit] is a lightweight tool designed for submitting Python functions for computation within a Slurm cluster. It acts as a wrapper for submission and provides access to results, logs and more. Slurm, on the other hand, is an open-source, fault-tolerant, and highly scalable cluster management and job scheduling system suitable for both large and small Linux clusters. Submitit allows for seamless execution switching between Slurm or local environments.
[https://github.com/facebookincubator/submitit Submitit] is a lightweight tool designed for submitting Python functions for computation within a Slurm cluster. It acts as a wrapper for submission and provides access to results, logs and more. Slurm, on the other hand, is an open-source, fault-tolerant, and highly scalable cluster management and job scheduling system suitable for both large and small Linux clusters. Submitit allows for seamless execution switching between Slurm or local environments.
The source code, issues and pull requests can be found [https://github.com/facebookincubator/submitit/ here].  
The source code, issues and pull requests can be found [https://github.com/facebookincubator/submitit/ here].  


To facilitate the switch between OAR and Slurm-based resource managers for users of Inria's national computing infrastructure, ongoing development efforts are focused on adding support for the OAR cluster in Submitit, through a plugin approach. The source code for the OAR plugin can be accessed [https://gitlab.inria.fr/moyens-de-calcul/submitit/-/tree/dev here]. A summary of the progress made, including implemented features and pending tasks can be found in [https://www.grid5000.fr/w/User:Ychi/submitit#Current_status_of_the_OAR_plugin this section].
To facilitate the switch between OAR and Slurm-based resource managers for users of Inria's national computing infrastructure, ongoing development efforts are focused on adding support for the OAR cluster in Submitit, through a plugin approach. The source code for the OAR plugin can be accessed [https://gitlab.inria.fr/grid5000/submitit_oar here]. A summary of the progress made, including implemented features and pending tasks can be found in [https://www.grid5000.fr/w/User:Ychi/submitit#Current_status_of_the_OAR_executor this section].
== Submitit installation ==
 
== Submitit and submitit_oar plugin installation ==


{{Note|text=Submitit should be installed in a folder accessible by both the frontend and the nodes. For example, by installing it in your homedir, we are guaranteed that Submitit is available by both the frontend and the nodes.
{{Note|text=Submitit and submitit_oar should be installed in a folder accessible by both the frontend and the nodes. For example, by installing it in your homedir, we are guaranteed that Submitit is available by both the frontend and the nodes.
}}
}}


{{Note|text=Python 3.6+ is required to install Submitit}}
=== Using pip ===
pip can be used to install the stable release of submitit and submitit_oar:
{{Term|location=fontend.site|cmd=<code class="command">pip install --user submitit_oar==1.1.1</code>}}


=== Using pip ===
pip can be used to install the stable release of submitit:
{{Term|location=fontend.site|cmd=<code class="command">pip install --user submitit</code>}}
To use the last version including the OAR plugin, an installation from Source can be done:
{{Term|location=fontend.site|cmd=<code class="command">pip install --user git+https://gitlab.inria.fr/moyens-de-calcul/submitit.git@master#egg=submitit</code>}}
It is recommended to install python dependencies via a virtual environment. To do so, before running the <code class="command">pip</code> command:
It is recommended to install python dependencies via a virtual environment. To do so, before running the <code class="command">pip</code> command:
{{Term|location=fontend.site|cmd=<code class="command">cd my_submitit_jobqueue_project</code>}}
{{Term|location=fontend.site|cmd=<code class="command">python3 -m venv env</code>}}
{{Term|location=fontend.site|cmd=<code class="command">python3 -m venv env</code>}}
{{Term|location=fontend.site|cmd=<code class="command">source env/bin/activate</code>}}
{{Term|location=fontend.site|cmd=<code class="command">source env/bin/activate</code>}}
{{Term|location=fontend.site|cmd=<code class="command">pip install git+https://gitlab.inria.fr/moyens-de-calcul/submitit.git@master#egg=submitit</code>}}
{{Term|location=fontend.site|cmd=<code class="command">pip install submitit_oar==1.1.1</code>}}


=== Using conda ===
conda can be used to install submitit from the conda-forge:
{{Term|location=fontend.site|cmd=<code class="command">conda install -c conda-forge submitit</code>}}
To use the last version including the OAR plugin, you can create a conda environment file(e.g. "conda-env-submitit.yml") as:
<syntaxhighlight lang="XML">
name: submitit
dependencies:
  - pip:
    - git+https://gitlab.inria.fr/moyens-de-calcul/submitit.git@master#egg=submitit
</syntaxhighlight>
and then install the last version of Submitit using this environment file
{{Term|location=fontend.site|cmd=<code class="command">conda env create --file conda-env-submitit.yml</code>}}
{{Term|location=fontend.site|cmd=<code class="command">source activate submitit</code>}}


== Basic usage ==
== Basic usage ==


=== Performing an addition with Submitit ===
The example below shows the execution of an integer addition performed with the Submitit library. The '''AutoExecutor''' serves as the interface for submitting functions that are either executed on a cluster or on a local execution. It automatically detects the presence of a scheduler (such as OAR or Slurm). If no scheduler is detected, the function is executed locally on the machine.
 
Here is a Python script example which allows to execute an addition job on Slurm, OAR or locally.


<syntaxhighlight lang="python" line>
<syntaxhighlight lang="python" line>
Line 49: Line 34:


# logs are dumped in the folder
# logs are dumped in the folder
executor = submitit.AutoExecutor(folder="log_test")
executor = submitit.AutoExecutor(folder="log_test", cluster="oar")


job_addition = executor.submit(add, 5, 7)  # will compute add(5, 7)
job_addition = executor.submit(add, 5, 7)  # will compute add(5, 7)
output = job_addition.result()  # waits for completion and returns output
output = job_addition.result()  # waits for completion and returns output
# if ever the job failed, result() will raise an error with the corresponding trace
print('job_addition output: ', output)
print('job_addition output: ', output)
assert output == 12
assert output == 12
Line 58: Line 44:


The example script needs to be launched on frontend as follow:
The example script needs to be launched on frontend as follow:
{{Term|location=flille|cmd=<code class="command">python3 this-script.py</code>}}
{{Term|location=fontend.site|cmd=<code class="command">python3 this-script.py</code>}}
 
The execution of the submitit script will create the working folder (i.e., ''folder="log_test"''), in which you will find the scheduler output files (i.e., '''''jobId_log.out''''' and '''''jobId_log.err'''''), the scheduler submission file '''''jobId_submission.sh''''', the submitit pickles (i.e., a task file '''''jobId_submitted.pkl''''' and the result '''''jobId_result.pkl''''').
 
{{Note|text=The <code>folder</code> parameter is not mandatory. The value of this parameter represents the path of the working folder where the above files will be stored. If you don't specify it, the files will be saved in the current directory where you execute your script.
 
However, it is generally recommended to specify an explicit folder to avoid any conflicts with other files present in the current directory.


The addition job will be computed on the cluster. For each job, in the working folder that you defined (e.g., folder="log_test"), you will find a stdout log file '''''jobId_log.out''''', a stderr log file '''''jobId_log.err''''', a submission batch file '''''jobId_submission.sh''''', a task file '''''jobId_submitted.pkl''''' and an output file '''''jobId_result.pkl'''''.
Note that this folder may grow rapidly, especially if you have large checkpoints (see [[User:Ychi/submitit#Checkpointing_with_Submitit|checkpoint section]] for details) or if you submit lot of jobs. You should think about cleaning up the folder, and even better an automated way of cleaning it.}}


== Advanced usage ==
== Advanced usage ==


=== Parameters ===
=== Using parameters for specifying resources ===
 
Parameters for cluster can be setted by '''update_parameters'''(**kwargs).


The AutoExecutor shown in the [[#Basic usage|basic usage example above]] is the common submission interface, for OAR/Slurm clusters and local jobs.  
For specifying resources (e.g., cores, memory, partition,...), you need to call the method '''update_parameters''' :


To use the cluster specific parameters with the AutoExecutor, they must be appended by the cluster name, e.g., slurm_partition="cpu_devel", oar_queue="default". These cluster specific options will be ignored on other clusters.
<syntaxhighlight lang="python">
<syntaxhighlight lang="python">
executor = submitit.AutoExecutor(folder="log_test")
executor = submitit.AutoExecutor(folder="log_test", cluster="oar")
executor.update_parameters(slurm_partition="cpu_devel", oar_queue="default")
executor.update_parameters(slurm_partition="cpu_devel", oar_queue="default")
</syntaxhighlight>
</syntaxhighlight>


{{Note|text=Cluster specific parameters win over common parameters.  
{{Note|text=Options for a given scheduler (i.e., prefixed by ''oar_'' or ''slurm_'') are ignored if the scheduler is not the good one. For instance, if ''slurm_partition'' is provided but the scheduler used is OAR, ''slurm_partition' is just ignored.
}}
E.g. if both ''oar_walltime'' and ''timeout_min'' are provided, then:
* ''oar_walltime'' is used on the OAR cluster
* ''timeout_min'' is used on other clusters
 


The cluster specific parameters can also be used with cluster specific Executors, without the cluster name prefixes, e.g., SlurmExecutor, OarExecutor.
Besides, parameters for specific scheduler are overwriting common parameters. For instance, if both ''oar_walltime'' and ''timeout_min'' are provided, then ''oar_walltime'' is used on OAR clusters (as ''timeout_min'' is a common parameter) while ''timeout_min'' is used on others (as ''oar_walltime'' is ignored)}}
<syntaxhighlight lang="python">
executor = submitit.OarExecutor(folder="log_test")
executor.update_parameters(walltime="0:0:5", queue="default")
</syntaxhighlight>


The following table recaps the parameters supported by AutoExecutor, OARExecutor and SlurmExecutor:
The following table recaps the parameters supported (and unsupported) by AutoExecutor, OarExecutor and SlurmExecutor:
{| class="wikitable"
{| class="wikitable"
|-
|-
! AutoExecutor !! OARExecutor !! SlurmExecutor !! Description
! AutoExecutor !! OarExecutor !! SlurmExecutor !! Description
|-
|-
| timeout_min || walltime in hh:mm:ss || time || timeout in minutes
| timeout_min || walltime in hh:mm:ss || time || The duration of the job (minutes)
|-
|-
| name || n || job_name || 'submitit' by default
| name || n || job_name || The name of the job ('submitit' by default)
|-
|-
| nodes || nodes || nodes || number of nodes in int
| nodes || nodes || nodes || The number of nodes (int)
|-
|-
|  || oar_queue || slurm_partition || string
|  || oar_queue || slurm_partition || string
|-
|-
| gpus_per_node || gpu || gpus_per_node or --gres=gpu:xx || number of gpu in int
| gpus_per_node || gpu || gpus_per_node or --gres=gpu:xx || The number of GPUs available on each node (int)
|-
| stderr_to_stdout ||  {{No}} || stderr_to_stdout || Redirect the error output on the standard one (boolean)
|-
| tasks_per_node ||  {{No}} || ntasks_per_node || The maximun number of task run on each node (int)
|-
|-
| stderr_to_stdout || not supported || stderr_to_stdout || boolean
| cpus_per_task || {{No}} || cpus_per_task || The number of CPUs dedicated for a task (int)
|-
|-
| tasks_per_node || not supported || ntasks_per_node || int
| mem_gb  || {{No}} || mem || The amount of memory (string)
|-
|-
| cpus_per_task || not supported || cpus_per_task || int
| || {{No}} || slurm_array_parallelism || Number of jobs executed in parallel simultaneously
|-
|-
| mem_gb || not supported || mem || string
| || additional_parameters || {{No}} || dict: other OAR parameters not available in submitit
|}
|}


=== Checkpointing with Submitit ===
The scheduler specific parameters can also be used with scheduler specific Executors, without the scheduler name prefixes. For instance, let's consider OarExecutor, you do not need to add ''oar_'' before the parameters ''walltime'' or ''queue'':
 
Checkpointing with Submitit on Slurm cluster is provided with the '''job requeue''' mechanism and the self defined '''checkpoint''' method:
* The '''job requeue''' mechanism will reschedule the checkpointed job after preemption or timeout. The states of the same job are changed from ''running'' to ''pending'' and ''running'' again to finish the job.
* The self defined '''checkpoint''' method will prepare the new submission with the current state of the computation. It should include a signature able to receive parameters from the callable function (an instance of a class with a '''__call__''' method). When the preemption signal is sent, the '''checkpoint''' method will be called asynchronously, with the same arguments as the callable function.
 
Doing checkpointing with Submitit on OAR cluster, with the OAR plugin approach, requires a precise understanding of the inner working of the checkpointing and the job pickling of Submitit.
 
The self defined ''checkpoint'' method is practically the same for OAR and Slurm. However, when an OAR job is checkpointed after preemption or timeout signal, it is systematically terminated. Another OAR job can be submitted with the "resubmit_job_id" property to finish the previously checkpointed and terminated job. However, the Submitit's ''job requeue'' mechanism provided in JobEnvironment is not relevant here. We have to resubmit another job with the current state of the computation in the original job. The resubmission can be done either manually with ''oarsub --resubmit=origin_job_id'', or automatically by OAR if our job is idempotent.
 
 
An MNIST example showing checkpointing with Submitit on Slurm cluster is available [https://github.com/facebookincubator/submitit/blob/main/docs/mnist.py here].
 
Scikit-learn (with numpy, scipy) is required to run the MNIST example. You can refer to [https://scikit-learn.org/stable/install.html this page] to install scikit-learn.
 
To have the same behavior on OAR cluster with the Submitit OAR plugin, you can run the following adapted MNIST example:
 
<syntaxhighlight lang="python">
<syntaxhighlight lang="python">
# Copyright (c) Arthur Mensch <arthur.mensch@m4x.org>
executor = submitit.OarExecutor(folder="log_test")
# Copyright (c) Facebook, Inc. and its affiliates.
executor.update_parameters(walltime="0:0:5", queue="default")
#
</syntaxhighlight>
# This source code is licensed under the BSD 3-clauses license.
# Original at https://scikit-learn.org/stable/auto_examples/linear_model/plot_sparse_logistic_regression_mnist.html
#


import functools
==== Example with  OarExecutor ====
import pickle
import time
from pathlib import Path


import numpy as np
In the example bellow, we want a job with a specific walltime that requires 1 GPU on the production queue, and is executed in besteffort mode.
from sklearn.datasets import fetch_openml
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.utils import check_random_state


<syntaxhighlight lang="python" line>
import submitit
import submitit


def add(a, b):
    return a + b


class MnistTrainer(submitit.helpers.Checkpointable):
executor = submitit.AutoExecutor(folder="log_test", cluster="oar")
    """
executor.update_parameters(oar_queue="production", oar_walltime="0:2:0", nodes=1, gpus_per_node=1, oar_additional_parameters={"t": "besteffort"})
    This shows how to rewrite a monolith function so that it can handle preemption nicely,
job_addition = executor.submit(add, 5, 7)
    and not restart from scratch everytime it's preempted.
output = job_addition.result()
    """
print('job_addition output: ', output)
assert output == 12
</syntaxhighlight>


    def __init__(self, clf):
If we check the submission file submitted to OAR, we can see all our requirements:
        # This is the state that will be saved by `checkpoint`
<syntaxhighlight lang="bash">
        self.train_test = None
#!/bin/bash
        self.scaler = None
        self.clf = clf
        self.trained_clf = False
        self.stage = "0"


    def __call__(self, train_samples: int, model_path: Path = None):
# Parameters
        # wait here 60s at first
#OAR -E /home/ychi/submitit_env/log_test/%jobid%_0_log.err
        # since only >60s + exit code 99 + idempotent jobs can be resubmitted automatically by OAR
#OAR -O /home/ychi/submitit_env/log_test/%jobid%_0_log.out
        time.sleep(60)
#OAR -l /nodes=1/gpu=1,walltime=0:2:0
#OAR -n submitit
#OAR -q production
#OAR -t besteffort


        # `train_samples` and `model_path` will also be saved
# command
        log = functools.partial(print, flush=True)
export SUBMITIT_EXECUTOR=oar
        log(f"*** Starting from stage '{self.stage}' ***")
/home/ychi/submitit_env/env/bin/python -u -m submitit.core._submit /home/ychi/submitit_env/log_test


        if self.train_test is None:
</syntaxhighlight>
            self.stage = "Data Loading"
            t0 = time.time()
            log(f"*** Entering stage '{self.stage}' ***")
            # Load data from https://www.openml.org/d/554
            X, y = fetch_openml("mnist_784", version=1, return_X_y=True)
            X, y = X.to_numpy(), y.to_numpy()


            random_state = check_random_state(0)
=== Checkpointing with Submitit ===
            permutation = random_state.permutation(X.shape[0])
            X = X[permutation]
            y = y[permutation]
            X = X.reshape((X.shape[0], -1))


            # Checkpoint 1: save the train/test splits
The purpose of the checkpointing functionality is to allow the automatic resumption of a computation when the job is preempted by another. To so do, the current state of the job needs to be saved before the scheduler (e.g., OAR, Slurm) stops the job.
            X_train, X_test, y_train, y_test = train_test_split(
                X, y, train_size=train_samples, test_size=10000
            )
            self.train_test = X_train, X_test, y_train, y_test
            log(f"Loaded data, shuffle and split in {time.time() - t0:.1f}s")


        X_train, X_test, y_train, y_test = self.train_test
As explained more in detailed in the [https://github.com/facebookincubator/submitit/blob/main/docs/checkpointing.md official submitit documentation], you need to:
        if self.scaler is None:
* define a class that implements the <code>submitit.helpers.Checkpointable</code> interface
            self.stage = "Data Cleaning"
* implement a '''__call__''' method that contains the computations code
            t0 = time.time()
* implement a '''checkpoint''' method to save the current state of the computation
            log(f"*** Entering stage '{self.stage}' ***")
            scaler = StandardScaler()
            X_train = scaler.fit_transform(X_train)
            X_test = scaler.transform(X_test)
            # Scaling is actual pretty fast, make it a bit slower to allow preemption to happen here
            time.sleep(10)
            # Checkpoint 2: save the scaler and the preprocessed data
            self.scaler = scaler
            self.train_test = X_train, X_test, y_train, y_test
            log(f"Scaled the data took {time.time() - t0:.0f}s")


        if not self.trained_clf:
<syntaxhighlight lang="python">
            self.stage = "Model Training"
import submitit
            t0 = time.time()
            log(f"*** Entering stage '{self.stage}' ***")
            self.clf.C = 50 / train_samples
            self.clf.fit(X_train, y_train)
            # Checkpoint 3: mark the classifier as trained
            self.trained_clf = True
            log(f"Training took {time.time() - t0:.0f}s")


        sparsity = np.mean(self.clf.coef_ == 0) * 100
class MyCheckpointableAlgorithm(submitit.helpers.Checkpointable):
        score = self.clf.score(X_test, y_test)
        log(f"Sparsity with L1 penalty: {sparsity / 100:.2%}")
        log(f"Test score with L1 penalty: {score:.4f}")


        if model_path:
    def __call__(self, ....):
            self.save(model_path)
         ....
         return score


     def checkpoint(self, *args, **kwargs):
     def checkpoint(self, *args, **kwargs):
         print(f"Checkpointing at stage '{self.stage}'")
         ....
        return super().checkpoint(*args, **kwargs)


    def save(self, model_path: Path):
if __name__ == "__main__":
        with open(model_path, "wb") as o:
    executor = submitit.AutoExecutor(folder)
            pickle.dump((self.scaler, self.clf), o, pickle.HIGHEST_PROTOCOL)
    myAlgo = MyCheckpointableAlgorithm()


    job = executor.submit(myAlgo, ...)
    print('output: ', job.result())
</syntaxhighlight>
   
The submitit documentation provides a [https://github.com/facebookincubator/submitit/blob/main/docs/mnist.py complete example] of implementing a checkpointing mechanism for a multinomial logistic regression based on the Scikit-learn library.


def main():
==== Slurm and OAR mechanisms for resuming preempted jobs ====
    t0 = time.time()
    # Cleanup log folder.
    # This folder may grow rapidly especially if you have large checkpoints,
    # or submit lot of jobs. You should think about an automated way of cleaning it.
    folder = Path(__file__).parent / "mnist_logs"
    if folder.exists():
        for file in folder.iterdir():
            file.unlink()


    ex = submitit.AutoExecutor(folder)
OAR and Slurm do not provides the same strategy for implementing the resumption of a preempted job:
    if ex.cluster == "oar":
* For the Slurm scheduler, there is the possibility to requeue the current job directly from the node by executing the command <code>scontrol requeue jobId</code> (see requeue description in the [https://slurm.schedmd.com/scontrol.html scontrol man page] for more information).
        print("Executor will schedule jobs on Oar.")
* For the OAR scheduler, there are two possibilities: either requeuing the job by executing <code>oarsub --resubmit=origin_job_id</code> (note that the <code>oarsub</code> command is not available on the nodes), or activating an automatic requeuing mechanism by submitting the original job with the ''idempotent'' type.
    else:
        print(f"!!! Oar executable `oarsub` not found. Will execute jobs on '{ex.cluster}'")
    model_path = folder / "model.pkl"
    trainer = MnistTrainer(LogisticRegression(penalty="l1", solver="saga", tol=0.1, multi_class="auto"))


    # Specify the job requirements.
{{Note|text=For the OAR scheduler, the automatic requeuing mechanism using ''idempotent'' jobs only works if the job is preempted after running at least 60 seconds. If the preemption occurs before 60 seconds, the OAR scheduler considers the job not viable and stops it without requeuing.}}
    # Reserving only as much resource as you need ensure the cluster resource are
    # efficiently allocated.
    ex.update_parameters(oar_core=4, timeout_min=5)
    job = ex.submit(trainer, 5000, model_path=model_path)


    print(f"Scheduled {job}.")
==== How to implement the checkpointing mechanism ====


    # Wait for the job to be running.
As explained in the previous section, to implement a checkpointing mechanism with Submitit, you need to :
    while job.state.upper() != "RUNNING":
* In the '''__call__''' method:
        time.sleep(1)
** If the OAR scheduler is used, you should wait for 60 seconds to ensure that when the computation starts, your job will be rescheduled if it is preempted.  
** Check if an existing state is stored on the persistent storage and retrieve it.
* In the '''checkpointing''' method:
** Save the state of your computation in the '''checkpointing''' method (e.g., files on a persistent storage).
** Conclude the method by returning <code>super().checkpoint(*args, **kwargs)</code>, which will create a '''DelayedSubmission''' object corresponding the requeued job.


    print("Run the following command to see what's happening")
===== Template for Slurm =====
    print(f"  less +F {job.paths.stdout}")


    # Simulate preemption.
<syntaxhighlight lang="python" line>
    # Tries to stop the job after the first stage.
def __call__(self, ....):
    # If the job is preempted before the end of the first stage, try to increase it.
     ... # Retrieve a potential existing state
    # If the job is not preempted, try to decrease it.
     ... # Start computation
    time.sleep(85)
     print(f"preempting {job} after {time.time() - t0:.0f}s")
     job._interrupt()


    score = job.result()
def checkpoint(self, *args: Any, **kwargs: Any) -> submitit.helpers.DelayedSubmission:
     print(f"Finished training. Final score: {score}.")
     ... # Save current computation state
     print(f"---------------- Job output ---------------------")
     return super().checkpoint(*args, **kwargs)
    print(job.stdout())
</syntaxhighlight>
    print(f"-------------------------------------------------")


    assert model_path.exists()
===== Template for OAR =====
    with open(model_path, "rb") as f:
        (scaler, clf) = pickle.load(f)
    sparsity = np.mean(clf.coef_ == 0) * 100
    print(f"Sparsity with L1 penalty: {sparsity / 100:.2%}")
 
 
if __name__ == "__main__":
    main()


<syntaxhighlight lang="python" line>
def __call__(self, ....):
    # wait 60s at first to ensure that the checkpointing mechanism is working
    time.sleep(60)
    ... # Retrieve a potential existing state
    ... # Start computation


def checkpoint(self, *args: Any, **kwargs: Any) -> submitit.helpers.DelayedSubmission:
    ... # Save current computation state
    return super().checkpoint(*args, **kwargs)
</syntaxhighlight>
</syntaxhighlight>


==== How to simulate a preemption ====


This example script needs to be launched on frontend, in a virtual environment, as follow:
If you want to test that the checkpointing mechanism is working properly with your algorithm, you can simulate a preemption by calling the ''_interrupt()'' method. The call to ''_interrupt()'' should be made after the job starts (i.e., in the running state) and after waiting for a couple of seconds to ensure the computation has started. Here is an example:
{{Term|location=fontend.site|cmd=<code class="command">cd sklearn_env</code>}}
{{Term|location=fontend.site|cmd=<code class="command">source sklearn_env/bin/activate</code>}}
{{Term|location=fontend.site|cmd=<code class="command">python mnist.py</code>}}


<syntaxhighlight lang="python">
...
job = ex.submit(myAlgo, ...)


The output (of the original job and the submitted job) is as follow:
# Wait for the job to be running
<syntaxhighlight lang="bash">
while job.state() != "RUNNING":
Executor will schedule jobs on Oar.
    time.sleep(1)
Scheduled OarJob<job_id=1938801, task_id=0, state="Waiting">.
Run the following command to see what's happening
  less +F /home/ychi/sklearn_env/mnist_logs/1938801_0_log.out
preempting OarJob<job_id=1938801, task_id=0, state="Running"> after 92s
Checkpointing the job 1938801 ...DONE.
The job 1938801 was notified to checkpoint itself on chetemi-13.lille.grid5000.fr.
Finished training. Final score: 0.8222.
---------------- Job output ---------------------
submitit INFO (2023-03-23 12:33:42,410) - Starting with JobEnvironment(job_id=1938801_0, hostname=chetemi-13.lille.grid5000.fr, local_rank=0(1), node=0(1), global_rank=0(1))
submitit INFO (2023-03-23 12:33:42,410) - Loading pickle: /home/ychi/sklearn_env/mnist_logs/1938801_submitted.pkl
*** Starting from stage '0' ***
*** Entering stage 'Data Loading' ***
submitit INFO (2023-03-23 12:35:10,917) - Job has timed out. Ran 1 minutes out of requested 5 minutes.
submitit WARNING (2023-03-23 12:35:10,917) - Caught signal SIGUSR2 on chetemi-13.lille.grid5000.fr: this job is timed-out.
submitit INFO (2023-03-23 12:35:10,917) - Calling checkpoint method.
Checkpointing at stage 'Data Loading'
submitit INFO (2023-03-23 12:35:10,922) - Exiting job 1938801_1 with 99 code, (2 remaining timeouts)


submitit INFO (2023-03-23 12:35:27,490) - Starting with JobEnvironment(job_id=1938801_0, hostname=chetemi-13.lille.grid5000.fr, local_rank=0(1), node=0(1), global_rank=0(1))
# Wait for 60s to let the algorithm run for a while
submitit INFO (2023-03-23 12:35:27,491) - Loading pickle: /home/ychi/sklearn_env/mnist_logs/1938801_submitted.pkl
time.sleep(60)
*** Starting from stage 'Data Loading' ***
*** Entering stage 'Data Loading' ***
Loaded data, shuffle and split in 35.5s
*** Entering stage 'Data Cleaning' ***
Scaled the data took 10s
*** Entering stage 'Model Training' ***
Training took 2s
Sparsity with L1 penalty: 78.88%
Test score with L1 penalty: 0.8222
submitit INFO (2023-03-23 12:37:16,022) - Job completed successfully
submitit INFO (2023-03-23 12:37:16,024) - Exitting after successful completion


-------------------------------------------------
# Simulate preemption
Sparsity with L1 penalty: 78.88%
job._interrupt()


# Get the result
result = job.result()
print(f"Result: {result}")
</syntaxhighlight>
</syntaxhighlight>


As you can see in the example above, the original job (job_id=1938801) was checkpointed at the stage 'Data Loading'. In the automatically resubmitted job (job_id=1938802), the stage '0' is skipped since it has been done in the original job. The resubmitted job restarted directly from the stage 'Date Loading', finished the training and give us back a final score.
To validate the good execution of the checkpointing, you should:
 
* Check the scheduler to see that the job was stopped and then restarted.
Under the hood, a DelayedSubmission class is used to contain the callable function for checkpointing. In the OAR plugin, this class calls trickily the original job's pickled callable function, and pickle the output of the callable function into the original job's result. It makes it as similar as possible to the Slurm's same job requeue mecanisme.
* Check the output of your algorithm to see that the computation was saved and restarted not from the beginning.
 
However, don't forget that OAR resubmits always another job after the checkpointing. Your original submission bash file will automatically be called for the resubmission by OAR. Submitit will then write the stdout and stderr files in resubmitted_job_id_log.out and resubmitted_job_id_log.err files.


=== Job array with Submitit ===
=== Job array with Submitit ===
The job array submission is supported through the <code>executor.map_array</code> method.


It is preferred in Submitit to use the ''Job Array'' functionality for launching ''n times'' the same or similar jobs at once, because:
If you need to start ''n times'' the same computation or need to execute ''n times'' the same algorithm with different parameter, submitit implement a dedicated functionality for such a usage case: ''Job Array''. This functionnality leverages the Slurm or OAR job array feature. You should use job arrays as it is faster than submitted all jobs independently (as all jobs are submitted in a unique call to the scheduler).
* it can submit all jobs in only 1 call to OAR (avoids flooding it).
* it is faster than submitting all jobs independently.


Here is a Python script example which allows to submit 4 additions at once:
To use job arrays with submitit, you need to call the <code>executor.map_array</code> method. The example bellow shows how to execute 4 different additions (i.e., same algorithm applied with four distinct sets of parameter pairs: 1+10; 2+20; 3+30; 4+40):


<syntaxhighlight lang="python" line>
<syntaxhighlight lang="python" line>
Line 371: Line 254:
a = [1, 2, 3, 4]
a = [1, 2, 3, 4]
b = [10, 20, 30, 40]
b = [10, 20, 30, 40]
executor = submitit.AutoExecutor(folder="log_test")
executor = submitit.AutoExecutor(folder="log_test", cluster="oar")
jobs = executor.map_array(add, a, b)
jobs = executor.map_array(add, a, b)
# Iterate over the list of jobs and check their results, stdout and stderr
# Iterate over the 4 jobs and print their results
for job in jobs:
for job in jobs:
     print("Job: ", job.job_id, " result: ", job.result())
     print("Job: ", job.job_id, " result: ", job.result())
    print("Job: ", job.job_id, " stdout:\n", job.stdout())
    if job.stderr():
        print("Job: ", job.job_id, " stderr: ", job.stderr())
</syntaxhighlight>
</syntaxhighlight>
The example script needs to be launched on frontend as follow:
{{Term|location=fontend.site|cmd=<code class="command">python3 this-script.py</code>}}


The output is as follow:
The output is as follow:
<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
Job:  1948179  result:  11
Job:  1948179  result:  11
Job:  1948179  stdout:
submitit INFO (2023-05-26 11:10:07,374) - Starting with JobEnvironment(job_id=1948178_1, hostname=chetemi-13.lille.grid5000.fr, local_rank=0(1), node=0(1), global_rank=0(1))
submitit INFO (2023-05-26 11:10:07,374) - Loading pickle: /home/ychi/submitit/log_test/1948179_submitted.pkl
submitit INFO (2023-05-26 11:10:07,375) - Job completed successfully
submitit INFO (2023-05-26 11:10:07,375) - Exiting after successful completion
Job:  1948180  result:  22
Job:  1948180  result:  22
Job:  1948180  stdout:
submitit INFO (2023-05-26 11:10:06,354) - Starting with JobEnvironment(job_id=1948178_2, hostname=chifflet-6.lille.grid5000.fr, local_rank=0(1), node=0(1), global_rank=0(1))
submitit INFO (2023-05-26 11:10:06,354) - Loading pickle: /home/ychi/submitit/log_test/1948180_submitted.pkl
submitit INFO (2023-05-26 11:10:06,355) - Job completed successfully
submitit INFO (2023-05-26 11:10:06,356) - Exiting after successful completion
Job:  1948178  result:  33
Job:  1948178  result:  33
Job:  1948178 stdout:
Job:  1948181 result44
  submitit INFO (2023-05-26 11:10:07,136) - Starting with JobEnvironment(job_id=1948178_0, hostname=chetemi-8.lille.grid5000.fr, local_rank=0(1), node=0(1), global_rank=0(1))
</syntaxhighlight>
submitit INFO (2023-05-26 11:10:07,136) - Loading pickle: /home/ychi/submitit/log_test/1948178_submitted.pkl
submitit INFO (2023-05-26 11:10:07,137) - Job completed successfully
submitit INFO (2023-05-26 11:10:07,138) - Exiting after successful completion


Job:  1948181  result:  44
{{Note|text=Unlike the SlurmExecutor with <code>slurm_array_parallelism</code> parameter, the OarExecutor does not provide a parameter to limit the number of jobs executed simultaneously.}}
Job:  1948181  stdout:
submitit INFO (2023-05-26 11:10:06,472) - Starting with JobEnvironment(job_id=1948178_3, hostname=chifflet-2.lille.grid5000.fr, local_rank=0(1), node=0(1), global_rank=0(1))
submitit INFO (2023-05-26 11:10:06,472) - Loading pickle: /home/ychi/submitit/log_test/1948181_submitted.pkl
submitit INFO (2023-05-26 11:10:06,473) - Job completed successfully
submitit INFO (2023-05-26 11:10:06,474) - Exiting after successful completion


</syntaxhighlight>
{{Note|text=As explained in the [https://github.com/facebookincubator/submitit/blob/main/docs/examples.md#job-array submitit documentation], one pickle file is created for each job of an array. If you have big object in your functions (like a full pytorch model) you should serialize it once and only pass its path to the submitted function.}}


Under the hood, each job of the array is an independent OAR job with its own stdout log file '''''jobId_log.out''''', stderr log file '''''jobId_log.err''''', task file '''''jobId_submitted.pkl''''' and output file '''''jobId_result.pkl'''''. In comparison to standard jobs, job arrays share one submission file '''''firstJobId_submission.sh'''''.
You can refer to [https://github.com/facebookincubator/submitit/blob/main/docs/examples.md#job-arrays-through-a-context-manager this page] to see how Submitit allows the job arrays submission through a context manager.


Note that unlike the SlurmExecutor (and the <code>slurm_array_parallelism</code> option), the OARExecutor does not allow limiting the number of jobs executed in parallel with this option.
=== Other advanced submitit features ===


Note also that one pickle file is created for each job of an array. If you have big object in your functions (like a full pytorch model) you should serialize it once and only pass its path to the submitted function.¹
Submitit provides advanced features like:


¹: https://github.com/facebookincubator/submitit/blob/main/docs/examples.md#job-arrays
* Concurrent job executions (example available [https://github.com/facebookincubator/submitit/blob/main/docs/examples.md#concurrent-jobs here])
* Multi-threading executions with <code>ThreadPoolExecutor</code> (example available [https://github.com/facebookincubator/submitit/blob/main/docs/examples.md#concurrent-jobs here])
* Integration with Asyncio for asynchronous job execution (example available [https://github.com/facebookincubator/submitit/blob/main/docs/examples.md#asyncio here])


== Current status of the OAR executor ==


=== Current status of the OAR plugin ===
The current OarExecutor implementation does not cover all Submitit features. For now, it includes:
* Submission of functions
* Error handling and recording of stack traces with <code>job.result()</code>
* Checkpointing of stateful callables and automatic resubmission from current state when preempted
* Job arrays
* Concurrent jobs
* Asyncio coroutines


Despite the differences between OAR and Slurm-based clusters,
The following features are not currently supported:
the first version, the supported and not supported parameters are listed in [[#Parameters|the table below]]. The not supported functionalities are the tasks notion of Slurm, the memory management of the job, the job array and the asynchronous job supports.
* Multi-tasks jobs
* Some parameters (as listed in the table of [[Submitit#Parameters]])

Latest revision as of 09:31, 22 March 2024

Note.png Note

This page is actively maintained by the Grid'5000 team. If you encounter problems, please report them (see the Support page). Additionally, as it is a wiki page, you are free to make minor corrections yourself if needed. If you would like to suggest a more fundamental change, please contact the Grid'5000 team.

Submitit

Submitit is a lightweight tool designed for submitting Python functions for computation within a Slurm cluster. It acts as a wrapper for submission and provides access to results, logs and more. Slurm, on the other hand, is an open-source, fault-tolerant, and highly scalable cluster management and job scheduling system suitable for both large and small Linux clusters. Submitit allows for seamless execution switching between Slurm or local environments. The source code, issues and pull requests can be found here.

To facilitate the switch between OAR and Slurm-based resource managers for users of Inria's national computing infrastructure, ongoing development efforts are focused on adding support for the OAR cluster in Submitit, through a plugin approach. The source code for the OAR plugin can be accessed here. A summary of the progress made, including implemented features and pending tasks can be found in this section.

Submitit and submitit_oar plugin installation

Note.png Note

Submitit and submitit_oar should be installed in a folder accessible by both the frontend and the nodes. For example, by installing it in your homedir, we are guaranteed that Submitit is available by both the frontend and the nodes.

Using pip

pip can be used to install the stable release of submitit and submitit_oar:

Terminal.png fontend.site:
pip install --user submitit_oar==1.1.1

It is recommended to install python dependencies via a virtual environment. To do so, before running the pip command:

Terminal.png fontend.site:
python3 -m venv env
Terminal.png fontend.site:
source env/bin/activate
Terminal.png fontend.site:
pip install submitit_oar==1.1.1


Basic usage

The example below shows the execution of an integer addition performed with the Submitit library. The AutoExecutor serves as the interface for submitting functions that are either executed on a cluster or on a local execution. It automatically detects the presence of a scheduler (such as OAR or Slurm). If no scheduler is detected, the function is executed locally on the machine.

import submitit

def add(a, b):
    return a + b

# logs are dumped in the folder
executor = submitit.AutoExecutor(folder="log_test", cluster="oar")

job_addition = executor.submit(add, 5, 7)  # will compute add(5, 7)
output = job_addition.result()  # waits for completion and returns output
# if ever the job failed, result() will raise an error with the corresponding trace
print('job_addition output: ', output)
assert output == 12

The example script needs to be launched on frontend as follow:

Terminal.png fontend.site:
python3 this-script.py

The execution of the submitit script will create the working folder (i.e., folder="log_test"), in which you will find the scheduler output files (i.e., jobId_log.out and jobId_log.err), the scheduler submission file jobId_submission.sh, the submitit pickles (i.e., a task file jobId_submitted.pkl and the result jobId_result.pkl).

Note.png Note

The folder parameter is not mandatory. The value of this parameter represents the path of the working folder where the above files will be stored. If you don't specify it, the files will be saved in the current directory where you execute your script.

However, it is generally recommended to specify an explicit folder to avoid any conflicts with other files present in the current directory.

Note that this folder may grow rapidly, especially if you have large checkpoints (see checkpoint section for details) or if you submit lot of jobs. You should think about cleaning up the folder, and even better an automated way of cleaning it.

Advanced usage

Using parameters for specifying resources

For specifying resources (e.g., cores, memory, partition,...), you need to call the method update_parameters :

executor = submitit.AutoExecutor(folder="log_test", cluster="oar")
executor.update_parameters(slurm_partition="cpu_devel", oar_queue="default")
Note.png Note

Options for a given scheduler (i.e., prefixed by oar_ or slurm_) are ignored if the scheduler is not the good one. For instance, if slurm_partition is provided but the scheduler used is OAR, slurm_partition' is just ignored. Besides, parameters for specific scheduler are overwriting common parameters. For instance, if both oar_walltime and timeout_min are provided, then oar_walltime is used on OAR clusters (as timeout_min is a common parameter) while timeout_min is used on others (as oar_walltime is ignored)

The following table recaps the parameters supported (and unsupported) by AutoExecutor, OarExecutor and SlurmExecutor:

AutoExecutor OarExecutor SlurmExecutor Description
timeout_min walltime in hh:mm:ss time The duration of the job (minutes)
name n job_name The name of the job ('submitit' by default)
nodes nodes nodes The number of nodes (int)
oar_queue slurm_partition string
gpus_per_node gpu gpus_per_node or --gres=gpu:xx The number of GPUs available on each node (int)
stderr_to_stdout Fail.png stderr_to_stdout Redirect the error output on the standard one (boolean)
tasks_per_node Fail.png ntasks_per_node The maximun number of task run on each node (int)
cpus_per_task Fail.png cpus_per_task The number of CPUs dedicated for a task (int)
mem_gb Fail.png mem The amount of memory (string)
Fail.png slurm_array_parallelism Number of jobs executed in parallel simultaneously
additional_parameters Fail.png dict: other OAR parameters not available in submitit

The scheduler specific parameters can also be used with scheduler specific Executors, without the scheduler name prefixes. For instance, let's consider OarExecutor, you do not need to add oar_ before the parameters walltime or queue:

executor = submitit.OarExecutor(folder="log_test")
executor.update_parameters(walltime="0:0:5", queue="default")

Example with OarExecutor

In the example bellow, we want a job with a specific walltime that requires 1 GPU on the production queue, and is executed in besteffort mode.

import submitit

def add(a, b):
    return a + b

executor = submitit.AutoExecutor(folder="log_test", cluster="oar")
executor.update_parameters(oar_queue="production", oar_walltime="0:2:0", nodes=1, gpus_per_node=1, oar_additional_parameters={"t": "besteffort"})
job_addition = executor.submit(add, 5, 7)
output = job_addition.result()
print('job_addition output: ', output)
assert output == 12

If we check the submission file submitted to OAR, we can see all our requirements:

#!/bin/bash

# Parameters
#OAR -E /home/ychi/submitit_env/log_test/%jobid%_0_log.err
#OAR -O /home/ychi/submitit_env/log_test/%jobid%_0_log.out
#OAR -l /nodes=1/gpu=1,walltime=0:2:0
#OAR -n submitit
#OAR -q production
#OAR -t besteffort

# command
export SUBMITIT_EXECUTOR=oar
/home/ychi/submitit_env/env/bin/python -u -m submitit.core._submit /home/ychi/submitit_env/log_test

Checkpointing with Submitit

The purpose of the checkpointing functionality is to allow the automatic resumption of a computation when the job is preempted by another. To so do, the current state of the job needs to be saved before the scheduler (e.g., OAR, Slurm) stops the job.

As explained more in detailed in the official submitit documentation, you need to:

  • define a class that implements the submitit.helpers.Checkpointable interface
  • implement a __call__ method that contains the computations code
  • implement a checkpoint method to save the current state of the computation
import submitit

class MyCheckpointableAlgorithm(submitit.helpers.Checkpointable):

    def __call__(self, ....):
        ....

    def checkpoint(self, *args, **kwargs):
        ....

if __name__ == "__main__":
    executor = submitit.AutoExecutor(folder)
    myAlgo = MyCheckpointableAlgorithm()

    job = executor.submit(myAlgo, ...)
    print('output: ', job.result())

The submitit documentation provides a complete example of implementing a checkpointing mechanism for a multinomial logistic regression based on the Scikit-learn library.

Slurm and OAR mechanisms for resuming preempted jobs

OAR and Slurm do not provides the same strategy for implementing the resumption of a preempted job:

  • For the Slurm scheduler, there is the possibility to requeue the current job directly from the node by executing the command scontrol requeue jobId (see requeue description in the scontrol man page for more information).
  • For the OAR scheduler, there are two possibilities: either requeuing the job by executing oarsub --resubmit=origin_job_id (note that the oarsub command is not available on the nodes), or activating an automatic requeuing mechanism by submitting the original job with the idempotent type.
Note.png Note

For the OAR scheduler, the automatic requeuing mechanism using idempotent jobs only works if the job is preempted after running at least 60 seconds. If the preemption occurs before 60 seconds, the OAR scheduler considers the job not viable and stops it without requeuing.

How to implement the checkpointing mechanism

As explained in the previous section, to implement a checkpointing mechanism with Submitit, you need to :

  • In the __call__ method:
    • If the OAR scheduler is used, you should wait for 60 seconds to ensure that when the computation starts, your job will be rescheduled if it is preempted.
    • Check if an existing state is stored on the persistent storage and retrieve it.
  • In the checkpointing method:
    • Save the state of your computation in the checkpointing method (e.g., files on a persistent storage).
    • Conclude the method by returning super().checkpoint(*args, **kwargs), which will create a DelayedSubmission object corresponding the requeued job.
Template for Slurm
def __call__(self, ....):
    ... # Retrieve a potential existing state
    ... # Start computation

def checkpoint(self, *args: Any, **kwargs: Any) -> submitit.helpers.DelayedSubmission:
    ... # Save current computation state
    return super().checkpoint(*args, **kwargs)
Template for OAR
def __call__(self, ....):
    # wait 60s at first to ensure that the checkpointing mechanism is working
    time.sleep(60)
    ... # Retrieve a potential existing state
    ... # Start computation

def checkpoint(self, *args: Any, **kwargs: Any) -> submitit.helpers.DelayedSubmission:
    ... # Save current computation state
    return super().checkpoint(*args, **kwargs)

How to simulate a preemption

If you want to test that the checkpointing mechanism is working properly with your algorithm, you can simulate a preemption by calling the _interrupt() method. The call to _interrupt() should be made after the job starts (i.e., in the running state) and after waiting for a couple of seconds to ensure the computation has started. Here is an example:

...
job = ex.submit(myAlgo, ...)

# Wait for the job to be running
while job.state() != "RUNNING":
    time.sleep(1)

# Wait for 60s to let the algorithm run for a while
time.sleep(60)

# Simulate preemption
job._interrupt()

# Get the result
result = job.result()
print(f"Result: {result}")

To validate the good execution of the checkpointing, you should:

  • Check the scheduler to see that the job was stopped and then restarted.
  • Check the output of your algorithm to see that the computation was saved and restarted not from the beginning.

Job array with Submitit

If you need to start n times the same computation or need to execute n times the same algorithm with different parameter, submitit implement a dedicated functionality for such a usage case: Job Array. This functionnality leverages the Slurm or OAR job array feature. You should use job arrays as it is faster than submitted all jobs independently (as all jobs are submitted in a unique call to the scheduler).

To use job arrays with submitit, you need to call the executor.map_array method. The example bellow shows how to execute 4 different additions (i.e., same algorithm applied with four distinct sets of parameter pairs: 1+10; 2+20; 3+30; 4+40):

import submitit

def add(a, b):
    return a + b

a = [1, 2, 3, 4]
b = [10, 20, 30, 40]
executor = submitit.AutoExecutor(folder="log_test", cluster="oar")
jobs = executor.map_array(add, a, b)
# Iterate over the 4 jobs and print their results
for job in jobs:
    print("Job: ", job.job_id, " result: ", job.result())

The output is as follow:

Job:  1948179  result:  11
Job:  1948180  result:  22
Job:  1948178  result:  33
Job:  1948181  result:  44
Note.png Note

Unlike the SlurmExecutor with slurm_array_parallelism parameter, the OarExecutor does not provide a parameter to limit the number of jobs executed simultaneously.

Note.png Note

As explained in the submitit documentation, one pickle file is created for each job of an array. If you have big object in your functions (like a full pytorch model) you should serialize it once and only pass its path to the submitted function.

You can refer to this page to see how Submitit allows the job arrays submission through a context manager.

Other advanced submitit features

Submitit provides advanced features like:

  • Concurrent job executions (example available here)
  • Multi-threading executions with ThreadPoolExecutor (example available here)
  • Integration with Asyncio for asynchronous job execution (example available here)

Current status of the OAR executor

The current OarExecutor implementation does not cover all Submitit features. For now, it includes:

  • Submission of functions
  • Error handling and recording of stack traces with job.result()
  • Checkpointing of stateful callables and automatic resubmission from current state when preempted
  • Job arrays
  • Concurrent jobs
  • Asyncio coroutines

The following features are not currently supported: