Execo Practical Session
Overview
The goal of this practical session is to show how to use execo ([1]) to quickly and easily prototype / develop reproducible experiments. The aim in this session is to show the issues of experiment development as faced by typical experimenters, when using grid5000, and how execo can help them being more productive and getting more reproducible results.
This practical session will start by showing how to use execo to interactively develop the different steps of an experiment, then it will show how to use execo to transform this prototype in a fully automatic, configurable, robust experiment engine, producing reproducible results, and able to run on a much larger parameters space.
Tool: execo
execo offers a Python API for local or remote, standalone or parallel, unix processes execution . It is especially well suited for quickly and easily scripting workflows of parallel/distributed operations on local or remote hosts: automate a scientific workflow, conduct computer science experiments, perform automated tests, etc. The core python package is execo
. The execo_g5k
package provides a set of tools and extensions for the grid5000 testbed. The execo_engine
package provides tools to ease the development of computer sciences experiments.
Tutorial requirements
This tutorial requires users to know basic python and be reasonably familiar with grid5000 usage (this is not an absolute beginner session). During the tutorial, users will need to reserve a few nodes on at least two different clusters.
Detailed session program
The use case (the experiment which we will use as a support to illustrate execo functionality) is to use an MPI benchmark to analyze performance scalability on various grid5000 clusters (depending on number of cores and problem size).
execo introduction
execo installation
On a grid5000 frontend, run:
$ easy_install --user execo
To check that everything is setup correctly, run a simple hello world:
$ ipython In [1]: import execo In [2]: execo.Process("echo 'hello, world'").run().stdout Out[2]: 'hello, world\n'
Prototype the experiment interactively
Let's start by creating a directory for the tutorial, on the frontend:
$ mkdir ~/execo_tutorial && cd ~/execo_tutorial
From now on, all commands prefixed by >>> are to be run in a python shell, preferably ipython, which is more user-friendly. All python sessions should import the execo modules:
$ ipython >>> from execo import * >>> from execo_g5k import * >>> from execo_engine import *
The experiment
We will run the LU benchmark (Lower-Upper symmetric Gauss-Seidel) from the NAS Parallel Benchmark, which solves a system of nonlinear PDE. We will run this benchmark using MPI for parallelization over several cores. This benchmark can run on several problem sizes (named A, B, C, etc.). This benchmark must be configured, at compilation time, knowing the number of cores used in parallel and the problem sizes.
The NAS Parallel Benchmark suite can be downloaded from [here] but downloading involves registering, so for the convenience of the tutorial, we have put the archive [here].
We'll start by prototyping the experiment interactively. Some of the following steps can easily be performed without execo, but we use execo because in the end we will automate everything. For each execo function or class used, you can always look at the documentation in the [Execo API documentation]
Reserve some grid5000 compute nodes
Let's reserve some nodes on a site, for example lyon:
>>> jobs = oarsub([(OarSubmission("cluster=1/nodes=2", walltime=3600, job_type="allow_classic_ssh"), "lyon")]) >>> jobs [(<jobid>, 'lyon')]
We can get informations on the job:
>>> get_oar_job_info(*jobs[0])
And get the list of nodes:
>>> nodes = get_oar_job_nodes(*jobs[0]) >>> nodes [Host('<node1>.lyon.grid5000.fr'), Host('<node2>.lyon.grid5000.fr')]
Configure, compile and install the benchmark program on one node
We will use one of the NPB bench, namely a LU decomposition, that performs a linear system solver. Downloading the benchmark, extract it:
$ wget --no-check-certificate https://api.grid5000.fr/sid/sites/lyon/public/mimbert/execo_tutorial/NPB3.3-MPI.tar.bz2 $ tar -xjf NPB3.3-MPI.tar.bz2
Prior to compiling it, we must configure the build according to the total number of cores and the problem sizes which will be used. So we first need to retrieve the numbe of core of the cluster thanks to the grid5000 API:
>>> n_core = get_host_attributes(nodes[0])['architecture']['smt_size'] >>> total_core = n_core * len(nodes) >>> total_core <num total cores>
For now we only compile problem size A
>>> conf = Process('echo "lu\tA\t%i" > NPB3.3-MPI/config/suite.def' % total_core, shell = True) >>> conf.run()
Now compile it on a node (not on a frontend, because it's forbidden ;-)... and because we need mpif77):
>>> compilation = SshProcess('cd execo_tutorial/NPB3.3-MPI && make clean && make suite', nodes[0]) >>> compilation.run()
We can see a summary of the compilation process:
>>> print compilation
It should be ok:
>>> compilation.ok
We can also have a detailed look at compilation outputs if needed:
>>> print compilation.stdout >>> print compilation.stderr
The program is ready to be run.
Run the benchmark program
We run the benchmark with the default openmpi of grid5000 compute nodes, using mpirun. For prototyping, let's use a default fallback openmpi connector configuration which will run on most clusters (see FAQ#MPI_options_to_use)
>>> bench = SshProcess('mpirun -H %s -n %i --mca pml ob1 --mca btl tcp,self ~/execo_tutorial/NPB3.3-MPI/bin/lu.A.%i' % (",".join([node.address for node in nodes]), total_core, total_core), nodes[0]) >>> bench.run() >>> bench.ok >>> print bench.stdout
Release your resources
>>> oardel(jobs)
Transform this prototype in an automated experiment engine
In this section, we show how each step previously interactively prototyped can be integrated in a fully automated experiment engine.
The skeleton: an empty (yet) experiment engine
Execo ships with a [generic Engine class] which is well-suited for automatizing an experimental workflow with varying parameters. We define and inherit from it our experiment engine with the following experimental workflow:
- define the parameters to explore: number of parallel nodes, which problem sizes, which clusters
- prepare the bench according to the parameters (bench compilation)
- run the bench on the various parameter combinations (and of course, save the results)
Create a new file ~/execo_tutorial/mpi_bench.py
with the following code in it, which is the skeleton of our experimental workflow:
from execo_engine import Engine class mpi_bench(Engine): def run(self): """Inherited method, put here the code for running the engine""" self.define_parameters() if self.prepare_bench(): self.run_xp() def define_parameters(self): """Create the iterator that contains the parameters to be explored """ pass def prepare_bench(self): """Copy required files on frontend(s) and compile bench suite, adapted for the clusters to be benchmarked """ pass def run_xp(self): """Iterate over the parameters and execute the bench """ pass
Now let's fill the blanks and implement these 3 steps:
Define the parameters space
For this tutorial, we fix the number of nodes to be used at 4 (to avoid using too much resources). We need to compute the number of cores available on these 4 nodes which depends on the cluster, using [function get-host-attributes]. We need to do that for each involved cluster, which gives us the max number of cores that can be used in our parameters space. For example:
>>> clusters = ['petitprince', 'edel', 'paravance', 'stremi'] >>> n_nodes = 4 >>> max_core = n_nodes * max([ ... get_host_attributes(cluster + '-1')['architecture']['smt_size'] ... for cluster in clusters]) 96
On all clusters, we will run the bench with a varying number of parallel cores, among powers of two and up to the number of cores of 4 nodes. For example:
>>> from itertools import takewhile, count >>> filter(lambda i: i >= n_nodes, ... list(takewhile(lambda i: i<max_core, ... (2**i for i in count(0, 1))))) [4, 8, 16, 32, 64]
We use the [function sweep] to generate all the parameters combinations, and the [ParamSweeper class] to store and iterate the parameters space, and checkpoint the progress. So we propose this implementation of method define_parameters:
import os from execo_g5k import get_host_attributes from itertools import takewhile, count from execo_engine import logger, ParamSweeper, sweep def define_parameters(self): """Create the iterator that contains the parameters to be explored """ self.n_nodes = 4 # Choose a list of clusters clusters = ['graphene', 'petitprince', 'edel', 'paravance', 'stremi'] # compute the maximum number of cores max_core = self.n_nodes * max([ get_host_attributes(cluster + '-1')['architecture']['smt_size'] for cluster in clusters]) self.parameters = { 'cluster' : clusters, 'n_core': filter(lambda i: i >= self.n_nodes, list(takewhile(lambda i: i<max_core, (2**i for i in count(0, 1))))), 'size' : ['A', 'B', 'C'] } logger.info(self.parameters) self.sweeper = ParamSweeper(os.path.join(self.result_dir, "sweeps"), sweep(self.parameters)) logger.info('Number of parameters combinations %s', len(self.sweeper.get_remaining()))
Prepare the bench
The bench preparation steps are:
- reserve a node
- extract the bench tgz (no need to copy it to the node, we use the NFS)
- configure bench compilation: number of cores and problem sizes
- compile the bench
- copy the compiled bench to all involved frontends
from execo_g5k import get_cluster_site, oarsub, OarSubmission, \ wait_oar_job_start, get_oar_job_nodes, oardel, get_host_site from execo import Put, Remote from execo_engine import logger def prepare_bench(self): """bench configuration and compilation, copy binaries to frontends return True if preparation is ok """ logger.info("preparation: configure and compile benchmark") # the involved sites. We will do the compilation on the first of these. sites = list(set(map(get_cluster_site, self.parameters['cluster']))) # generate the bench compilation configuration bench_list = '\n'.join([ 'lu\t%s\t%s' % (size, n_core) for n_core in self.parameters['n_core'] for size in self.parameters['size'] ]) # Reserving a node because compiling on the frontend is forbidden # and because we need mpif77 jobs = oarsub([(OarSubmission(resources = "nodes=1", job_type = 'allow_classic_ssh', walltime ='0:10:00'), sites[0])]) if jobs[0][0]: try: copy_bench = Put([sites[0]], ['NPB3.3-MPI.tar.bz2']).run() extract_bench = Remote('tar -xjf NPB3.3-MPI.tar.bz2', [sites[0]]).run() wait_oar_job_start(*jobs[0]) nodes = get_oar_job_nodes(*jobs[0]) conf_bench = Remote('echo "%s" > ~/NPB3.3-MPI/config/suite.def' % bench_list, nodes).run() compilation = Remote('cd NPB3.3-MPI && make clean && make suite', nodes).run() except: logger.error("unable to compile bench") return False finally: oardel(jobs) # Copying binaries to all other frontends frontends = sites[1:] rsync = Remote('rsync -avuP ~/NPB3.3-MPI/ {{frontends}}:NPB3.3-MPI', [get_host_site(nodes[0])] * len(frontends)) rsync.run() return compilation.ok and rsync.ok
The experimental workflow
The experimental workflow (the run_xp
of our engine) consists in iterating over the parameter combinations, and for each, running the MPI bench and collecting the results.
First of all, as we run the bench on several clusters, we need a function which gives us the appropriate mpirun
options depending on the cluster:
def get_mpi_opts(cluster): # MPI configuration depends on the cluster # see https://www.grid5000.fr/mediawiki/index.php/FAQ#MPI_options_to_use if cluster in ['parapluie', 'parapide', 'griffon', 'graphene', 'edel', 'adonis', 'genepi' ]: mpi_opts = '--mca btl openib,sm,self --mca pml ^cm' elif cluster in ['suno', 'chinqchint']: mpi_opts = '--mca pml ob1 --mca btl tcp,self' elif cluster in ['sol']: mpi_opts = '--mca pml cm' else: mpi_opts = '--mca pml ob1 --mca btl tcp,self' return mpi_opts
To understand the iteration over the parameters space, you can look at the [ParamSweeper class documentation]. We use methods:
get_remaining
: get the list of unexplored parameter combinations.get_next
: get the next available unexplored parameter combination (and mark it as in progress).skip
skip and mark a parameter combination as ignored.cancel
put an in progress parameter combination back in the queue of unexplored.done
mark a parameter combination as done (and remove it from the queue of in progress).
from execo_g5k import get_host_attributes, get_cluster_site, \ oarsub, OarSubmission, wait_oar_job_start, get_oar_job_nodes, oardel from execo import SshProcess from execo_engine import slugify, logger def run_xp(self): """Iterate over the parameters and execute the bench""" while len(self.sweeper.get_remaining()) > 0: comb = self.sweeper.get_next() if comb['n_core'] > get_host_attributes(comb['cluster']+'-1')['architecture']['smt_size'] * self.n_nodes: self.sweeper.skip(comb) continue logger.info('Processing new combination %s' % (comb,)) site = get_cluster_site(comb['cluster']) jobs = oarsub([(OarSubmission(resources = "{cluster='" + comb['cluster']+"'}/nodes=" + str(self.n_nodes), job_type = 'allow_classic_ssh', walltime ='0:10:00'), site)]) if jobs[0][0]: try: wait_oar_job_start(*jobs[0]) nodes = get_oar_job_nodes(*jobs[0]) bench_cmd = 'mpirun -H %s -n %i %s ~/NPB3.3-MPI/bin/lu.%s.%i' % ( ",".join([node.address for node in nodes]), comb['n_core'], get_mpi_opts(comb['cluster']), comb['size'], comb['n_core']) lu_bench = SshProcess(bench_cmd, nodes[0]) lu_bench.stdout_handlers.append(self.result_dir + '/' + slugify(comb) + '.out') lu_bench.run() if lu_bench.ok: logger.info("comb ok: %s" % (comb,)) self.sweeper.done(comb) continue finally: oardel(jobs) logger.info("comb NOT ok: %s" % (comb,)) self.sweeper.cancel(comb)
An interesting detail is the use of field [stdout_handlers
] of the lu_bench SshProcess
: appending a file name to the the list stdout_handlers
of an execo Process
redirects the stdout of the process to the file. Note that the SshProcess
is a remote process, but the file is a local file, thus, there is no need to copy the results, they are directly written in the result directory. Also, function [slugify] is a convenient function for generating from a parameter combination a string which can be used as a filename.
Launch the engine
Just add the following lines to make your engine runable:
if __name__ == "__main__": engine = mpi_bench() engine.start()
Your code should be complete now. For your convenience, the code code is available here and you can get it with:
$ wget --no-check-certificate https://api.grid5000.fr/sid/sites/lyon/public/mimbert/execo_tutorial/mpi_bench.py
You can then run it with a simple command such as
$ python ./mpi_bench.py
You can run the following to get the list of command line options:
$ python ./mpi_bench.py --help
For better debugging, if needed, you can also run it in ipython with
$ ipython --pdb -i ./mpi_bench.py
Thanks to the execo_engine.Paramsweeper, the progress of the benches is saved to disk and can later be resumed by running mpi_bench.py with option -c.
Using the results
Using [this python/matplotlib based script], you can generate graphs showing the results of the experiment. You can run it in grid5000 this way:
$ wget --no-check-certificate https://api.grid5000.fr/sid/sites/lyon/public/mimbert/execo_tutorial/draw_mpi_bench.py $ ./draw_mpi_bench.py -f result.png <experiment_dir>
It then generates for each cluster a file result_<N>.png. You can also use interactive visualisation by connecting to grid5000 with ssh -X
and not using option -f.
Here are some examples of graphs generated with this experiment scripts:
Conclusion
During this tutorial, we have learned how to use execo API to quickly create a automated and reproducible experiment on the Grid'5000 platform. Many other features, such as advanced deployment management, resources planning, scalable number of concurrent SSH connections (thanks to TakTuk), network topology, can be found in execo. If interested, you can join the execo mailing list (see [execo README]).