Submitit

From Grid5000
Jump to navigation Jump to search
Note.png Note

This page is actively maintained by the Grid'5000 team. If you encounter problems, please report them (see the Support page). Additionally, as it is a wiki page, you are free to make minor corrections yourself if needed. If you would like to suggest a more fundamental change, please contact the Grid'5000 team.

Submitit

Submitit is a lightweight tool designed for submitting Python functions for computation within a Slurm cluster. It acts as a wrapper for submission and provides access to results, logs and more. Slurm, on the other hand, is an open-source, fault-tolerant, and highly scalable cluster management and job scheduling system suitable for both large and small Linux clusters. Submitit allows for seamless execution switching between Slurm or local environments. The source code, issues and pull requests can be found here.

To facilitate the switch between OAR and Slurm-based resource managers for users of Inria's national computing infrastructure, ongoing development efforts are focused on adding support for the OAR cluster in Submitit, through a plugin approach. The source code for the OAR plugin can be accessed here. A summary of the progress made, including implemented features and pending tasks can be found in this section.

Submitit installation

Note.png Note

Submitit should be installed in a folder accessible by both the frontend and the nodes. For example, by installing it in your homedir, we are guaranteed that Submitit is available by both the frontend and the nodes.

Using pip

pip can be used to install the stable release of submitit:

Terminal.png fontend.site:
pip install --user submitit

To use the last version including the OAR plugin, an installation from Source can be done:

It is recommended to install python dependencies via a virtual environment. To do so, before running the pip command:

Terminal.png fontend.site:
python3 -m venv env
Terminal.png fontend.site:
source env/bin/activate

Using conda

conda can be used to install submitit from the conda-forge:

Terminal.png fontend.site:
conda install -c conda-forge submitit

To use the last version including the OAR plugin, you can create a conda environment file(e.g. "conda-env-submitit.yml") as:

name: submitit
dependencies:
  - pip:
    - git+https://gitlab.inria.fr/moyens-de-calcul/submitit.git@master#egg=submitit

and then install the last version of Submitit using this environment file

Terminal.png fontend.site:
conda env create --file conda-env-submitit.yml
Terminal.png fontend.site:
source activate submitit

Basic usage

The example bellow shows the execution of an integer addition performed with the Submitit library. The AutoExecutor serves as the interface for submitting functions that are either executed on a cluster or on a local execution. It automatically detects the presence of a scheduler (such as OAR or Slurm). If no scheduler is detected, the function is executed locally on the machine.

import submitit

def add(a, b):
    return a + b

# logs are dumped in the folder
executor = submitit.AutoExecutor(folder="log_test")

job_addition = executor.submit(add, 5, 7)  # will compute add(5, 7)
output = job_addition.result()  # waits for completion and returns output
# if ever the job failed, result() will raise an error with the corresponding trace
print('job_addition output: ', output)
assert output == 12

The example script needs to be launched on frontend as follow:

Terminal.png fontend.site:
python3 this-script.py

The execution of the submitit script will create the working folder (i.e., folder="log_test"), in which you will find the scheduler output files (i.e., jobId_log.out and jobId_log.err), the scheduler submission file jobId_submission.sh, the submitit pickles (i.e., a task file jobId_submitted.pkl and the result jobId_result.pkl).

Note.png Note

The folder parameter is not mandatory. The value of this parameter represents the path of the working folder where the above files will be stored. If you don't specify it, the files will be saved in the current directory where you execute your script.

However, it is generally recommended to specify an explicit folder to avoid any conflicts with other files present in the current directory.

Note that this folder may grow rapidly, especially if you have large checkpoints (see checkpoint section for details) or if you submit lot of jobs. You should think about cleaning up the folder, and even better an automated way of cleaning it.

Advanced usage

Using parameters for specifying resources

For specifying resources (e.g., cores, memory, partition,...), you need to call the method update_parameters :

executor = submitit.AutoExecutor(folder="log_test")
executor.update_parameters(slurm_partition="cpu_devel", oar_queue="default")
Note.png Note

Options for a given scheduler (i.e., prefixed by oar_ or slurm_) are ignored if the scheduler is not the good one. For instance, if slurm_partition is provided but the scheduler used is OAR, slurm_partition' is just ignored. Besides, parameters for specific scheduler are overwriting common parameters. For instance, if both oar_walltime and timeout_min are provided, then oar_walltime is used on OAR clusters (as timeout_min is a common parameter) while timeout_min is used on others (as oar_walltime is ignored)

The following table recaps the parameters supported (and unsupported) by AutoExecutor, OARExecutor and SlurmExecutor:

AutoExecutor OARExecutor SlurmExecutor Description
timeout_min walltime in hh:mm:ss time The duration of the job (minutes)
name n job_name The name of the job ('submitit' by default)
nodes nodes nodes The number of nodes (int)
oar_queue slurm_partition string
gpus_per_node gpu gpus_per_node or --gres=gpu:xx The number of GPUs available on each node (int)
stderr_to_stdout Fail.png stderr_to_stdout Redirect the error output on the standard one (boolean)
tasks_per_node Fail.png ntasks_per_node The maximun number of task run on each node (int)
cpus_per_task Fail.png cpus_per_task The number of CPUs dedicated for a task (int)
mem_gb Fail.png mem The amount of memory (string)
Fail.png slurm_array_parallelism Number of jobs executed in parallel simultaneously
additional_parameters Fail.png dict: other OAR parameters not available in submitit

The scheduler specific parameters can also be used with scheduler specific Executors, without the scheduler name prefixes. For instance, let's consider OarExecutor, you do not need to add oar_ before the parameters walltime or queue:

executor = submitit.OarExecutor(folder="log_test")
executor.update_parameters(walltime="0:0:5", queue="default")


Example with OarExecutor

In the example bellow, we want a job with a specific walltime that requires 1 GPU on the production queue, and is executed in besteffort mode.

import submitit

def add(a, b):
    return a + b

executor = submitit.AutoExecutor(folder="log_test")
executor.update_parameters(oar_queue="production", oar_walltime="0:2:0", nodes=1, gpus_per_node=1, oar_additional_parameters={"t": "besteffort"})
job_addition = executor.submit(add, 5, 7)
output = job_addition.result()
print('job_addition output: ', output)
assert output == 12

If we check the submission file submited to OAR, we can see all our requirements:

#!/bin/bash

# Parameters
#OAR -E /home/ychi/submitit_env/log_test/%jobid%_0_log.err
#OAR -O /home/ychi/submitit_env/log_test/%jobid%_0_log.out
#OAR -l /nodes=1/gpu=1,walltime=0:2:0
#OAR -n submitit
#OAR -q production
#OAR -t besteffort

# command
export SUBMITIT_EXECUTOR=oar
/home/ychi/submitit_env/env/bin/python -u -m submitit.core._submit /home/ychi/submitit_env/log_test

Checkpointing with Submitit

Checkpointing with Submitit on Slurm cluster is provided with the job requeue mechanism and the self defined checkpoint method:

  • The job requeue mechanism will reschedule the checkpointed job after preemption or timeout. The states of the same job are changed from running to pending and running again to finish the job.
  • The self defined checkpoint method will prepare the new submission with the current state of the computation. It should include a signature able to receive parameters from the callable function (an instance of a class with a __call__ method). When the preemption signal is sent, the checkpoint method will be called asynchronously, with the same arguments as the callable function.


Doing checkpointing with Submitit on OAR cluster, with the OAR plugin approach, requires a precise understanding of the inner working of the checkpointing and the job pickling of Submitit.

The self defined checkpoint method is practically the same for OAR and Slurm. However, when an OAR job is checkpointed after preemption or timeout signal, it is systematically terminated. Another OAR job can be submitted with the "resubmit_job_id" property to finish the previously checkpointed and terminated job. However, the Submitit's job requeue mechanism provided in JobEnvironment is not relevant here. We have to resubmit another job with the current state of the computation in the original job. The resubmission can be done either manually with oarsub --resubmit=origin_job_id, or automatically by OAR if our job is idempotent.


An MNIST example showing checkpointing with Submitit on Slurm cluster is available here.

Scikit-learn (with numpy, scipy) is required to run the MNIST example. You can refer to this page to install scikit-learn.

To have the same behavior on OAR cluster with the Submitit OAR plugin, you can run the following adapted MNIST example:

# Copyright (c) Arthur Mensch <arthur.mensch@m4x.org>
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the BSD 3-clauses license.
# Original at https://scikit-learn.org/stable/auto_examples/linear_model/plot_sparse_logistic_regression_mnist.html
#

import functools
import pickle
import time
from pathlib import Path

import numpy as np
from sklearn.datasets import fetch_openml
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.utils import check_random_state

import submitit


class MnistTrainer(submitit.helpers.Checkpointable):
    """
    This shows how to rewrite a monolith function so that it can handle preemption nicely,
    and not restart from scratch everytime it's preempted.
    """

    def __init__(self, clf):
        # This is the state that will be saved by `checkpoint`
        self.train_test = None
        self.scaler = None
        self.clf = clf
        self.trained_clf = False
        self.stage = "0"

    def __call__(self, train_samples: int, model_path: Path = None):
        # wait here 60s at first 
        # since only >60s + exit code 99 + idempotent jobs can be resubmitted automatically by OAR
        time.sleep(60)

        # `train_samples` and `model_path` will also be saved
        log = functools.partial(print, flush=True)
        log(f"*** Starting from stage '{self.stage}' ***")

        if self.train_test is None:
            self.stage = "Data Loading"
            t0 = time.time()
            log(f"*** Entering stage '{self.stage}' ***")
            # Load data from https://www.openml.org/d/554
            X, y = fetch_openml("mnist_784", version=1, return_X_y=True)
            X, y = X.to_numpy(), y.to_numpy()

            random_state = check_random_state(0)
            permutation = random_state.permutation(X.shape[0])
            X = X[permutation]
            y = y[permutation]
            X = X.reshape((X.shape[0], -1))

            # Checkpoint 1: save the train/test splits
            X_train, X_test, y_train, y_test = train_test_split(
                X, y, train_size=train_samples, test_size=10000
            )
            self.train_test = X_train, X_test, y_train, y_test
            log(f"Loaded data, shuffle and split in {time.time() - t0:.1f}s")

        X_train, X_test, y_train, y_test = self.train_test
        if self.scaler is None:
            self.stage = "Data Cleaning"
            t0 = time.time()
            log(f"*** Entering stage '{self.stage}' ***")
            scaler = StandardScaler()
            X_train = scaler.fit_transform(X_train)
            X_test = scaler.transform(X_test)
            # Scaling is actual pretty fast, make it a bit slower to allow preemption to happen here
            time.sleep(10)
            # Checkpoint 2: save the scaler and the preprocessed data
            self.scaler = scaler
            self.train_test = X_train, X_test, y_train, y_test
            log(f"Scaled the data took {time.time() - t0:.0f}s")

        if not self.trained_clf:
            self.stage = "Model Training"
            t0 = time.time()
            log(f"*** Entering stage '{self.stage}' ***")
            self.clf.C = 50 / train_samples
            self.clf.fit(X_train, y_train)
            # Checkpoint 3: mark the classifier as trained
            self.trained_clf = True
            log(f"Training took {time.time() - t0:.0f}s")

        sparsity = np.mean(self.clf.coef_ == 0) * 100
        score = self.clf.score(X_test, y_test)
        log(f"Sparsity with L1 penalty: {sparsity / 100:.2%}")
        log(f"Test score with L1 penalty: {score:.4f}")

        if model_path:
            self.save(model_path)
        return score

    def checkpoint(self, *args, **kwargs):
        print(f"Checkpointing at stage '{self.stage}'")
        return super().checkpoint(*args, **kwargs)

    def save(self, model_path: Path):
        with open(model_path, "wb") as o:
            pickle.dump((self.scaler, self.clf), o, pickle.HIGHEST_PROTOCOL)


def main():
    t0 = time.time()
    # Cleanup log folder.
    # This folder may grow rapidly especially if you have large checkpoints,
    # or submit lot of jobs. You should think about an automated way of cleaning it.
    folder = Path(__file__).parent / "mnist_logs"
    if folder.exists():
        for file in folder.iterdir():
            file.unlink()

    ex = submitit.AutoExecutor(folder)
    if ex.cluster == "oar":
        print("Executor will schedule jobs on Oar.")
    else:
        print(f"!!! Oar executable `oarsub` not found. Will execute jobs on '{ex.cluster}'")
    model_path = folder / "model.pkl"
    trainer = MnistTrainer(LogisticRegression(penalty="l1", solver="saga", tol=0.1, multi_class="auto"))

    # Specify the job requirements.
    # Reserving only as much resource as you need ensure the cluster resource are
    # efficiently allocated.
    ex.update_parameters(oar_core=4, timeout_min=5)
    job = ex.submit(trainer, 5000, model_path=model_path)

    print(f"Scheduled {job}.")

    # Wait for the job to be running.
    while job.state.upper() != "RUNNING":
        time.sleep(1)

    print("Run the following command to see what's happening")
    print(f"  less +F {job.paths.stdout}")

    # Simulate preemption.
    # Tries to stop the job after the first stage.
    # If the job is preempted before the end of the first stage, try to increase it.
    # If the job is not preempted, try to decrease it.
    time.sleep(85)
    print(f"preempting {job} after {time.time() - t0:.0f}s")
    job._interrupt()

    score = job.result()
    print(f"Finished training. Final score: {score}.")
    print(f"---------------- Job output ---------------------")
    print(job.stdout())
    print(f"-------------------------------------------------")

    assert model_path.exists()
    with open(model_path, "rb") as f:
        (scaler, clf) = pickle.load(f)
    sparsity = np.mean(clf.coef_ == 0) * 100
    print(f"Sparsity with L1 penalty: {sparsity / 100:.2%}")


if __name__ == "__main__":
    main()


This example script needs to be launched on frontend, in a virtual environment, as follow:

Terminal.png fontend.site:
cd sklearn_env
Terminal.png fontend.site:
source sklearn_env/bin/activate
Terminal.png fontend.site:
python mnist.py


The output (of the original job and the submitted job) is as follow:

Executor will schedule jobs on Oar.
Scheduled OarJob<job_id=1938801, task_id=0, state="Waiting">.
Run the following command to see what's happening
  less +F /home/ychi/sklearn_env/mnist_logs/1938801_0_log.out
preempting OarJob<job_id=1938801, task_id=0, state="Running"> after 92s
Checkpointing the job 1938801 ...DONE.
The job 1938801 was notified to checkpoint itself on chetemi-13.lille.grid5000.fr.
Finished training. Final score: 0.8222.
---------------- Job output ---------------------
submitit INFO (2023-03-23 12:33:42,410) - Starting with JobEnvironment(job_id=1938801_0, hostname=chetemi-13.lille.grid5000.fr, local_rank=0(1), node=0(1), global_rank=0(1))
submitit INFO (2023-03-23 12:33:42,410) - Loading pickle: /home/ychi/sklearn_env/mnist_logs/1938801_submitted.pkl
*** Starting from stage '0' ***
*** Entering stage 'Data Loading' ***
submitit INFO (2023-03-23 12:35:10,917) - Job has timed out. Ran 1 minutes out of requested 5 minutes.
submitit WARNING (2023-03-23 12:35:10,917) - Caught signal SIGUSR2 on chetemi-13.lille.grid5000.fr: this job is timed-out.
submitit INFO (2023-03-23 12:35:10,917) - Calling checkpoint method.
Checkpointing at stage 'Data Loading'
submitit INFO (2023-03-23 12:35:10,922) - Exiting job 1938801_1 with 99 code, (2 remaining timeouts)

submitit INFO (2023-03-23 12:35:27,490) - Starting with JobEnvironment(job_id=1938801_0, hostname=chetemi-13.lille.grid5000.fr, local_rank=0(1), node=0(1), global_rank=0(1))
submitit INFO (2023-03-23 12:35:27,491) - Loading pickle: /home/ychi/sklearn_env/mnist_logs/1938801_submitted.pkl
*** Starting from stage 'Data Loading' ***
*** Entering stage 'Data Loading' ***
Loaded data, shuffle and split in 35.5s
*** Entering stage 'Data Cleaning' ***
Scaled the data took 10s
*** Entering stage 'Model Training' ***
Training took 2s
Sparsity with L1 penalty: 78.88%
Test score with L1 penalty: 0.8222
submitit INFO (2023-03-23 12:37:16,022) - Job completed successfully
submitit INFO (2023-03-23 12:37:16,024) - Exitting after successful completion

-------------------------------------------------
Sparsity with L1 penalty: 78.88%

As you can see in the example above, the original job (job_id=1938801) was checkpointed at the stage 'Data Loading'. In the automatically resubmitted job (job_id=1938802), the stage '0' is skipped since it has been done in the original job. The resubmitted job restarted directly from the stage 'Date Loading', finished the training and give us back a final score.

Under the hood, a DelayedSubmission class is used to contain the callable function for checkpointing. In the OAR plugin, this class calls trickily the original job's pickled callable function, and pickle the output of the callable function into the original job's result. It makes it as similar as possible to the Slurm's same job requeue mecanisme.

However, don't forget that OAR resubmits always another job after the checkpointing. Your original submission bash file will automatically be called for the resubmission by OAR. Submitit will then write the stdout and stderr files in resubmitted_job_id_log.out and resubmitted_job_id_log.err files.

Job array with Submitit

The job array submission is supported through the executor.map_array method.

It is preferred in Submitit to use the Job Array functionality for launching n times the same or similar jobs at once, because:

  • it can submit all jobs in only 1 call to OAR (avoids flooding it).
  • it is faster than submitting all jobs independently.

Here is a Python script example which allows to submit 4 additions at once:

import submitit

def add(a, b):
    return a + b

a = [1, 2, 3, 4]
b = [10, 20, 30, 40]
executor = submitit.AutoExecutor(folder="log_test")
jobs = executor.map_array(add, a, b)
# Iterate over the list of jobs and check their results, stdout and stderr
for job in jobs:
    print("Job: ", job.job_id, " result: ", job.result())
    print("Job: ", job.job_id, " stdout:\n", job.stdout())
    if job.stderr():
        print("Job: ", job.job_id, " stderr: ", job.stderr())

The example script needs to be launched on frontend as follow:

Terminal.png fontend.site:
python3 this-script.py

The output is as follow:

Job:  1948179  result:  11
Job:  1948179  stdout:
 submitit INFO (2023-05-26 11:10:07,374) - Starting with JobEnvironment(job_id=1948178_1, hostname=chetemi-13.lille.grid5000.fr, local_rank=0(1), node=0(1), global_rank=0(1))
submitit INFO (2023-05-26 11:10:07,374) - Loading pickle: /home/ychi/submitit/log_test/1948179_submitted.pkl
submitit INFO (2023-05-26 11:10:07,375) - Job completed successfully
submitit INFO (2023-05-26 11:10:07,375) - Exiting after successful completion

Job:  1948180  result:  22
Job:  1948180  stdout:
 submitit INFO (2023-05-26 11:10:06,354) - Starting with JobEnvironment(job_id=1948178_2, hostname=chifflet-6.lille.grid5000.fr, local_rank=0(1), node=0(1), global_rank=0(1))
submitit INFO (2023-05-26 11:10:06,354) - Loading pickle: /home/ychi/submitit/log_test/1948180_submitted.pkl
submitit INFO (2023-05-26 11:10:06,355) - Job completed successfully
submitit INFO (2023-05-26 11:10:06,356) - Exiting after successful completion

Job:  1948178  result:  33
Job:  1948178  stdout:
 submitit INFO (2023-05-26 11:10:07,136) - Starting with JobEnvironment(job_id=1948178_0, hostname=chetemi-8.lille.grid5000.fr, local_rank=0(1), node=0(1), global_rank=0(1))
submitit INFO (2023-05-26 11:10:07,136) - Loading pickle: /home/ychi/submitit/log_test/1948178_submitted.pkl
submitit INFO (2023-05-26 11:10:07,137) - Job completed successfully
submitit INFO (2023-05-26 11:10:07,138) - Exiting after successful completion

Job:  1948181  result:  44
Job:  1948181  stdout:
 submitit INFO (2023-05-26 11:10:06,472) - Starting with JobEnvironment(job_id=1948178_3, hostname=chifflet-2.lille.grid5000.fr, local_rank=0(1), node=0(1), global_rank=0(1))
submitit INFO (2023-05-26 11:10:06,472) - Loading pickle: /home/ychi/submitit/log_test/1948181_submitted.pkl
submitit INFO (2023-05-26 11:10:06,473) - Job completed successfully
submitit INFO (2023-05-26 11:10:06,474) - Exiting after successful completion

Under the hood, each job of the array is an independent OAR job with its own stdout log file jobId_log.out, stderr log file jobId_log.err, task file jobId_submitted.pkl and output file jobId_result.pkl. In comparison to standard jobs, job arrays share one submission file firstJobId_submission.sh.

Note that unlike the SlurmExecutor (and the slurm_array_parallelism option), the OARExecutor does not allow limiting the number of jobs executed in parallel with this option.

Note also that one pickle file is created for each job of an array. If you have big object in your functions (like a full pytorch model) you should serialize it once and only pass its path to the submitted function.¹

¹: https://github.com/facebookincubator/submitit/blob/main/docs/examples.md#job-arrays

Concurrent jobs

Submitit uses the same submission and job patterns than the standard library package cocurrent.futures. It is easy then to run your code either on OAR or Slurm-based cluster, or locally with multithreading.


  • Concurrent job using submitit.AutoExecutor

Here is a Python script example which used submitit.AutoExecutor to submit 10 jobs in parallel, and check their completion with the done method:

import submitit
import time

executor = submitit.AutoExecutor(folder="log_test")
executor.update_parameters(timeout_min=1)
jobs = [executor.submit(time.sleep, k) for k in range(1, 11)]

# wait and check how many have finished
time.sleep(5)
num_finished = sum(job.done() for job in jobs)
print(num_finished)  # probably around 2 have finished, given the overhead

# then you may want to wait until all jobs are completed:
outputs = [job.result() for job in jobs]

The example script needs to be launched on frontend as follow:

Terminal.png fontend.site:
python3 this-script.py

The expected output value of around 2 is due to fact that the jobs are running concurrently on OAR. The number may vary depending on factors such as the system load and the available resources.


  • Multithreading using concurrent.futures.ThreadPoolExecutor

Similar to the previous example, the ThreadPoolExecutor from the concurrent.futures module can be used here to achieve multi-threading as follow:

import time
from concurrent import futures
with futures.ThreadPoolExecutor(max_workers=10) as executor:  # This is the only real difference
    jobs = [executor.submit(time.sleep, k) for k in range(1, 11)]
    time.sleep(5)
    print(sum(job.done() for job in jobs))  # around 4 or 5 should be over
    [job.result() for job in jobs]
    assert sum(job.done() for job in jobs) == 10  # all done

The main difference is that the jobs are executed concurrently using multiple threads. The max_workers parameter specifies the maximum number of worker threads to use.

Since the jobs are executed concurrently in separate threads, the expected output value of around 4 or 5 is based on the assumption that some jobs have completed during the delay. However, the exact number can vary depending on factors such as the system load and the available CPU resources.

As you can see in the above examples, the submitit.AutoExecutor in Submitit leverages the concurrent.futures modules, to provide local execution with multithreading when no scheduler as OAR or Slurm is detected on the machine.

In summary, submitit.AutoExecutor seamlessly integrates with concurrent.futures to provide parallel execution using multithreading when running jobs locally, ensuring efficient utilization of resources and maximizing the performance of your code.

Asyncio integration with Submitit

Submitit provides integration with asyncio, allowing you to leverage the power of coroutines for asynchronous job execution.

Here is a Python script example which demonstrates how to use asyncio with Submitit:

import asyncio
import random
import submitit
import time

async def main():
    def slow_multiplication(x, y):
        time.sleep(x * y)
        return x * y

    executor = submitit.AutoExecutor(folder="log_test")
    executor.update_parameters(timeout_min=1)

    # await a single result
    job = executor.submit(slow_multiplication, 10, 2)
    print("result of job ", job.job_id, "is: ", await job.awaitable().result())

    # print results as they become available
    jobs = [executor.submit(slow_multiplication, k, random.randint(1, 4)) for k in range(1, 5)]
    awaitables = [j.awaitable().result() for j in jobs]
    results = await asyncio.gather(*awaitables)
    for job, result in zip(jobs, results):
        print("result of job ", job.job_id, "is:", result)

# Run the main function
asyncio.run(main())

The example script needs to be launched on frontend as follow:

Terminal.png fontend.site:
python3 this-script.py

The output is as follow:

result of job  1951575 is:  20
result of job  1951576 is: 1
result of job  1951577 is: 6
result of job  1951578 is: 12
result of job  1951579 is: 4

To await a result, await job.awaitable().result() is used here. This statement suspends the execution until the result of the job becomes available. Note that it should be executed within a async function.

If you prefer not to use coroutines and want a simpler interface, Submitit provides also the submitit.helpers.as_completed function.

Here is an example that demonstrates its usage:

import random
import submitit
import time

def slow_multiplication(x, y):
    time.sleep(x * y)
    return x * y

executor = submitit.AutoExecutor(folder="log_test")
executor.update_parameters(timeout_min=1)

# Submit jobs
jobs = [executor.submit(slow_multiplication, k, random.randint(1, 4)) for k in range(1, 5)]

# Wait for results and print them as they become available
for job in submitit.helpers.as_completed(jobs):
    result = job.result()
    print(f"Result of job {job.job_id}: {result}")

The example script needs to be launched on frontend as follow:

Terminal.png fontend.site:
python3 this-script.py

The output is as follow:

Result of job 1951601: 3
Result of job 1951602: 8
Result of job 1951603: 12
Result of job 1951604: 16

In this example, submitit.helpers.as_completed is used to iterate over the jobs and their results, as the jobs complete.

By using asyncio and submitit.helpers.as_completed function, you can achieve concurrent and asynchronous job execution, making the most efficient use of your resources and improving the overall performance of your code.

Current status of the OAR plugin

The ongoing work to add support for the OAR cluster in Submitit, using a plugin approach, aims to bridge the differences between OAR and Slurm-based clusters.

Implemented features include:

  • Submission of functions
  • Compatibility for both OAR and Slurm-based clusters, as well as local execution
  • Error handling and recording of stack traces with job.result()
  • Support for parameters specified in the table of this section for OAR, Slurm and Submitit
  • Checkpointing of stateful callables and automatic resubmission from current state when preempted
  • Job arrays
  • Concurrent jobs
  • Asyncio coroutines

Pending tasks include implementing support for multi-tasks jobs of Slurm and addressing other unsupported parameters listed in the table of this section.