Execo Practical Session: Difference between revisions

From Grid5000
Jump to navigation Jump to search
No edit summary
 
(75 intermediate revisions by 8 users not shown)
Line 1: Line 1:
{{Portal|User}}
=Overview=
=Overview=


The goal of this practical session is to show how to use execo ([http://execo.gforge.inria.fr/]) to quickly and easily prototype / develop reproducible experiments. The aim in this session is to show the issues of experiment development as faced by typical experimenters, when using grid5000, and how execo can help them being more productive and getting more reproducible results.
The goal of this practical session is to show how to use [https://gitlab.inria.fr/mimbert/execo execo] to quickly and easily prototype / develop reproducible experiments. The aim of this session is to show issues of experiment development as faced by typical experimenters, when using Grid'5000, and how execo can help them being more productive and getting more reproducible results.


This practical session will start by showing how to use execo to interactively develop the different steps of an experiment, then it will show how to use execo to transform this prototype in a fully automatic, configurable, robust experiment engine, producing reproducible results, and able to run on a much larger parameters space.
This practical session will start by showing how to use execo to interactively develop the different steps of an experiment, then it will show how to use execo to transform this prototype in a fully automatic, configurable, robust experiment engine, producing reproducible results, and able to run on a much larger parameters space.
Line 7: Line 8:
=Tool: execo=
=Tool: execo=


execo offers a Python API for local or remote, standalone or parallel, unix processes execution . It is especially well suited for quickly and easily scripting workflows of parallel/distributed operations on local or remote hosts: automate a scientific workflow, conduct computer science experiments, perform automated tests, etc. The core python package is <code>execo</code>. The <code>execo_g5k</code> package provides a set of tools and extensions for the grid5000 testbed. The <code>execo_engine</code> package provides tools to ease the development of computer sciences experiments.
execo offers a Python API for local or remote, standalone or parallel, Unix processes execution . It is especially well suited for quickly and easily workflows scripting of parallel/distributed operations on local or remote hosts: Automate a scientific workflow, conduct computer science experiments, perform automated tests, etc. The core python package is <code>execo</code>. The <code>execo_g5k</code> package provides a set of tools and extensions for the Grid'5000 testbed. The <code>execo_engine</code> package provides tools to ease the development of computer sciences experiments.


=Tutorial requirements=
=Tutorial requirements=


This tutorial requires users to know basic python and be reasonably familiar with grid5000 usage (this is not an absolute beginner session).
This tutorial requires users to know basic python and be reasonably familiar with Grid'5000 usage (this is not an absolute beginner session).
During the tutorial, users will need to reserve a few nodes on at least two different clusters.
During the tutorial, users will need to reserve a few nodes on at least two different clusters.


=Detailed session program=
=Detailed session program=


The '''use case''' (the experiment which we will use as a support to illustrate execo functionality) is to benchmark an MPI application on different grid5000 clusters.
The '''use case''' (the experiment which we will use as a support to illustrate execo functionality) is to use an MPI benchmark to analyze performance scalability on various Grid'5000 clusters (depending on number of cores and problem size).


==execo introduction==
==execo introduction==


[[media:Execo-tutorial-g5k-school-june-2014.pdf|slides]]
See the following '''[[media:Execo-tutorial-g5k-school-january-2016.pdf|slides]]'''.


==execo installation==
==execo installation==


On a grid5000 frontend, run:
On a Grid'5000 frontend, run:
  $ export http_proxy="http://proxy:3128"
  $ pip3 install --user requests
  $ export https_proxy="http://proxy:3128"
  $ pip3 install --user execo
$ easy_install --user execo
 
(note in the commands above that one currently needs to manually install requests to be able to use execo on Grid5000)


To check that everything is setup correctly, run a simple hello world:
To check that everything is setup correctly, run a simple hello world:
  $ ipython
  $ ipython3
  In [1]: import execo
  In [1]: import execo
  In [2]: execo.Process("echo 'hello, world'").run().stdout
  In [2]: execo.Process("echo 'hello, world'").run().stdout
Line 36: Line 38:


==Prototype the experiment interactively==
==Prototype the experiment interactively==
The good setup to work on this tutorial is to have three terminals opened, with three ssh sessions to a Grid'5000 frontend of your choice. One terminal will be used for shell commands (prefixed by <code>$</code>), the other for a python or ipython interpreter (prefixed by <code>>>></code>). The third terminal will be used to run the editor of your choice, to edit files. GNU Screen should be used to allow the working session to survive disconnections.


Let's start by creating a directory for the tutorial, on the frontend:
Let's start by creating a directory for the tutorial, on the frontend:
Line 41: Line 45:


From now on, all commands prefixed by >>> are to be run in a python shell, preferably ipython, which is more user-friendly. All python sessions should import the execo modules:
From now on, all commands prefixed by >>> are to be run in a python shell, preferably ipython, which is more user-friendly. All python sessions should import the execo modules:
  $ ipython
  $ ipython3
  >>> from execo import *
  >>> from execo import *
  >>> from execo_g5k import *
  >>> from execo_g5k import *
Line 50: Line 54:
We will run the LU benchmark (Lower-Upper symmetric Gauss-Seidel) from the NAS Parallel Benchmark, which solves a system of nonlinear PDE. We will run this benchmark using MPI for parallelization over several cores. This benchmark can run on several problem sizes (named A, B, C, etc.). This benchmark must be configured, at compilation time, knowing the number of cores used in parallel and the problem sizes.
We will run the LU benchmark (Lower-Upper symmetric Gauss-Seidel) from the NAS Parallel Benchmark, which solves a system of nonlinear PDE. We will run this benchmark using MPI for parallelization over several cores. This benchmark can run on several problem sizes (named A, B, C, etc.). This benchmark must be configured, at compilation time, knowing the number of cores used in parallel and the problem sizes.


The NAS Parallel Benchmark suite can be downloaded from [[http://www.nas.nasa.gov/publications/sw_instructions.html here]] but downloading involves registering, so for the convenience of the tutorial, we have put the archive [[http://public.lyon.grid5000.fr/~lpouilloux/NPB3.3-MPI.tar.bz2 here]].
The NAS Parallel Benchmark suite can be downloaded from [http://www.nas.nasa.gov/publications/sw_instructions.html here] but downloading involves registering, so for the convenience of the tutorial, we have put the archive [https://api.grid5000.fr/sid/sites/lyon/public/mimbert/execo_tutorial/NPB3.3-MPI.tar.bz2 here].


We'll start by prototyping the experiment interactively. Some of the following steps can easily be performed without execo, but we use execo because in the end we will automate everything. For each execo function or class used, you can always look at the documentation in the [[http://execo.gforge.inria.fr/doc/latest-stable/apidoc.html Execo API documentation]]
We'll start by prototyping the experiment interactively. Some of the following steps can easily be performed without execo, but we use execo because in the end we will automate everything. For each execo function or class used, you can always look at the documentation in the [https://mimbert.gitlabpages.inria.fr/execo/apidoc.html Execo API documentation]


===Reserve some grid5000 compute nodes===
===Reserve some grid5000 compute nodes===
Line 58: Line 62:
Let's reserve some nodes on a site, for example lyon:
Let's reserve some nodes on a site, for example lyon:


  >>> jobs = oarsub([(OarSubmission("cluster=1/nodes=2",
  >>> jobs = oarsub([(OarSubmission("cluster=1/nodes=2", walltime=3600), "lyon")])
...                              walltime=7200,
...                              job_type="allow_classic_ssh"), "lyon")])
  >>> jobs
  >>> jobs
  [(<jobid>, 'lyon')]
  [(<jobid>, 'lyon')]
>>> job_id, site = jobs[0]
We can get information on the job:


We can get informations on the job:
  >>> get_oar_job_info(job_id, site)
 
  >>> get_oar_job_info(*jobs[0])


And get the list of nodes:
And get the list of nodes:


  >>> nodes = get_oar_job_nodes(*jobs[0])
  >>> nodes = get_oar_job_nodes(job_id, site)
  >>> nodes
  >>> nodes
  [Host('<node1>.lyon.grid5000.fr'),
  [Host('<node1>.lyon.grid5000.fr'),
Line 80: Line 82:
Downloading the benchmark, extract it:
Downloading the benchmark, extract it:


  $ wget http://public.lyon.grid5000.fr/~lpouilloux/NPB3.3-MPI.tar.bz2
  $ wget --no-check-certificate https://api.grid5000.fr/sid/sites/lyon/public/mimbert/execo_tutorial/NPB3.3-MPI.tar.bz2
  $ tar -xjf NPB3.3-MPI.tar.bz2
  $ tar -xjf NPB3.3-MPI.tar.bz2


Prior to compiling it, we must configure the build according to the total number of cores which will be used. So we first need to retrieve the numbe of core of the cluster thanks to the grid5000 API:
Prior to compiling it, we must configure the build according to the total number of cores and the problem sizes which will be used. So we first need to retrieve the number of core of the cluster thanks to the Grid'5000 API:
  >>> n_core = get_host_attributes(nodes[0])['architecture']['smt_size']
  >>> n_core = get_host_attributes(nodes[0])['architecture']['nb_cores']
  >>> total_core = n_core * len(nodes)
  >>> total_core = n_core * len(nodes)
  >>> total_core
  >>> total_core
  <num total cores>
  <num total cores>


  >>> conf = Process('echo "lu\tA\t%i" > NPB3.3-MPI/config/suite.def' % total_core)
The problem size can only be a power of two for these benchmark, let's the highest power of two that is inferior to the total number of core
>>> conf.shell = True
 
>>> import math
>>> p_size = 2**(math.ceil(math.log2(total_core))-1)
 
We only compile for this problem size
 
  >>> conf = Process('echo "lu\tA\t%i" > NPB3.3-MPI/config/suite.def' % p_size, shell = True)
  >>> conf.run()
  >>> conf.run()
We also need to slightly modify the makefile to get it to compile
>>> makefilepatch = Process('sed -i "s/^FFLAGS\t= -O3$/FFLAGS\t= -O3 -fallow-argument-mismatch/" NPB3.3-MPI/config/make.def').run()


Now compile it on a node (not on a frontend, because it's forbidden ;-)... and because we need mpif77):
Now compile it on a node (not on a frontend, because it's forbidden ;-)... and because we need mpif77):
Line 99: Line 111:


We can see a summary of the compilation process:
We can see a summary of the compilation process:
  >>> print compilation
  >>> print(compilation)


It should be ok:
It should be ok:
  >>> compilation.ok
  >>> compilation.ok
True


We can also have a detailed look at compilation outputs if needed:
Also check the detailed compilation outputs, since the compilation process may return without error while still being unsuccessful:
  >>> print compilation.stdout
  >>> print(compilation.stdout)
  >>> print compilation.stderr
  >>> print(compilation.stderr)


The program is ready to be run.
If everything is OK, the program is ready to be run.


===Run the benchmark program===
===Run the benchmark program===


  >>> bench = SshProcess('mpirun -H %s -n %i --mca pml ob1 --mca btl tcp,self ~/execo_tutorial/NPB3.3-MPI/bin/lu.A.%i' % (
We run the benchmark with the default OpenMPI of Grid'5000 compute nodes, using mpirun. For prototyping, let's use a default fallback OpenMPI connector configuration which will run on most clusters (see [[FAQ#MPI_options_to_use]])
...                        ",".join([node.address for node in nodes]),
 
...                        total_core, total_core),
  >>> bench = SshProcess('mpirun -H %s -n %i --mca pml ob1 --mca btl tcp,self ~/execo_tutorial/NPB3.3-MPI/bin/lu.A.%i' % (",".join(["%s:%i" % (node.address, n_core) for node in nodes]), p_size, p_size), nodes[0])
...                    nodes[0])  
  >>> bench.run()
  >>> bench.run()
  >>> bench.ok
  >>> bench.ok
  >>> print bench.stdout
  >>> print(bench.stdout)
 
{{Help|text=Be sure to check https://www.grid5000.fr/w/Run_MPI_On_Grid'5000 for some cluster specific settings / options for MPI}}


===Release your resources===
===Release your resources===


  >>> oardel(jobs)
  >>> oardel(jobs)
===Summary===
Up to this point, there is no real added value using execo versus doing the same thing in the unix shell. The real added value is that now we can assemble all these steps in an automatic experiment script, with many benefits compared to a shell script: More control over the experiment flow, easier handling of errors, easier handling of concurrency, and the full power of a generic programming language (python).


==Transform this prototype in an automated experiment engine==
==Transform this prototype in an automated experiment engine==


In this section, we show how each step previously interactively prototyped can be integrated in a fully scripted experiment engine.
In this section, we show how each step previously interactively prototyped can be integrated in a fully automated experiment engine.


=== The skeleton: an empty (yet) experiment engine ===
=== The skeleton: an empty (yet) experiment engine ===


Execo ships with a generic Engine class which is well-suited for automatizing an experimental workflow with varying parameters. We define and inherit from it our experiment engine.
Execo ships with a [https://mimbert.gitlabpages.inria.fr/execo/execo_engine.html#engine generic Engine class] which is well-suited for automatizing an experimental workflow with varying parameters. We define and inherit from it our experiment engine with the following experimental workflow:
* define the parameters to explore: number of parallel nodes, which problem sizes, which clusters
* prepare the bench according to the parameters (bench compilation)
* run the bench on the various parameter combinations (and of course, save the results)


Open a file and define a new engine:
Create a new file <code>~/execo_tutorial/mpi_bench.py</code> with the following code in it, which is the skeleton of our experimental workflow:


  from execo_engine import engine, ParamSweeper
  from execo_engine import Engine
   
   
  class mpi_bench(Engine)
  class mpi_bench(Engine):
   
   
    def run(self):
    def run(self):
        """Inherited method, put here the code for running the engine"""
        """Inherited method, put here the code for running the engine"""
        self.define_parameters()
        self.define_parameters()
        if self.prepare_bench():
        if self.prepare_bench():
            self.run_xp()
            self.run_xp()
    
    
    def define_parameters(self):
    def define_parameters(self):
        """Create the iterator that contains the parameters to be explored """
        """Create the iterator that contains the parameters to be explored """
        pass
        pass
   
   
    def prepare_bench(self):
    def prepare_bench(self):
        """Copy required files on frontend(s) and compile bench suite, adapted for  
        """Copy required files on frontend(s) and compile bench suite, adapted for  
        the clusters to be benchmarked """
        the clusters to be benchmarked """
        pass
        pass
   
   
    def run_xp(self):
    def run_xp(self):
        """Iterate over the parameters and execute the bench """
        """Iterate over the parameters and execute the bench """
        pass
        pass
 
Now let's fill the blanks and implement these 3 steps:


===Define the parameters space===
===Define the parameters space===
Implement the method define_parameters using, for example:


    def define_parameters(self):
For this tutorial, we fix the number of nodes to be used to 4 (to avoid using too much resources). We need to compute the number of cores available on these 4 nodes which depends on the cluster, using [https://mimbert.gitlabpages.inria.fr/execo/execo_g5k.html#execo_g5k.api_utils.get_host_attributes function get-host-attributes]. We need to do that for each involved cluster, which gives us the max number of cores that can be used in our parameters space. For example:
        """Create the iterator that contains the parameters to be explored """
 
        # Choose a list of clusters
>>> clusters = ['petitprince', 'edel', 'paravance', 'stremi']
        self.n_nodes = 4
>>> n_nodes = 4
        #clusters = ['graphene', 'petitprince', 'edel', 'paradent', 'stremi']
>>> max_core = n_nodes * max([
        clusters = ['petitprince', 'paradent']
...        get_host_attributes(cluster + '-1')['architecture']['nb_cores']
        # Determine the maximum number of cores  
...        for cluster in clusters])
        max_core = self.n_nodes * max([get_host_attributes(cluster + '-1')['architecture']['smt_size']
96
              for cluster in clusters])
 
        self.parameters = {
On all clusters, we will run the bench with a varying number of parallel cores, among powers of two and up to the number of cores of 4 nodes. For example:
            'cluster' : clusters,
 
            'n_core': filter(lambda i: i >= self.n_nodes,
>>> from itertools import takewhile, count
                            list(takewhile(lambda i: i<max_core,
>>> list(filter(lambda i: i >= n_nodes,
                                            (2**i for i in count(0, 1))))),
...        list(takewhile(lambda i: i<max_core,
            'size' : ['A']
...                      (2**i for i in count(0, 1))))))
            }
[4, 8, 16, 32, 64]
        logger.info(self.parameters)
 
        self.sweeper = ParamSweeper(os.path.join(self.result_dir, "sweeps"), sweep(self.parameters))
We use the [https://mimbert.gitlabpages.inria.fr/execo/execo_engine.html#sweep function sweep] to generate all the parameters combinations, and the [https://mimbert.gitlabpages.inria.fr/execo/execo_engine.html#paramsweeper ParamSweeper class] to store and iterate the parameters space, and checkpoint the progress. So we propose this implementation of method define_parameters:
        logger.info('Number of parameters combinations %s', len(self.sweeper.get_remaining()))
 
import os
from execo_g5k import get_host_attributes
from itertools import takewhile, count
from execo_engine import logger, ParamSweeper, sweep
    def define_parameters(self):
        """Create the iterator that contains the parameters to be explored """
        self.n_nodes = 4
        # Choose a list of clusters
        clusters = ['petitprince', 'edel', 'paravance', 'stremi']
        # Compute the maximum number of cores  
        max_core = self.n_nodes * max([
                get_host_attributes(cluster + '-1')['architecture']['nb_cores']
                for cluster in clusters])
        # Define the parameter space
        self.parameters = {
            'cluster' : clusters,
            'n_core': list(filter(lambda i: i >= self.n_nodes,
                              list(takewhile(lambda i: i<max_core,
                                            (2**i for i in count(0, 1)))))),
            'size' : ['A', 'B', 'C']
            }
        logger.info(self.parameters)
        self.sweeper = ParamSweeper(os.path.join(self.result_dir, "sweeps"), sweep(self.parameters))
        logger.info('Number of parameters combinations %s', len(self.sweeper.get_remaining()))


===Prepare the bench===
===Prepare the bench===
We need to:
* compile the bench:
** reserve a node
** extract the bench tgz
** configure bench compilation (data size, num nodes)
** compile the bench
** copy the compiled bench to all frontends


The bench preparation steps are:
* reserve a node
* extract the bench tgz (no need to copy it to the node, we use the NFS to access our home directory)
* configure bench compilation: number of cores and problem sizes
* compile the bench
* copy the compiled bench to all involved frontends
from execo_g5k import get_cluster_site, oarsub, OarSubmission, \
  wait_oar_job_start, get_oar_job_nodes, oardel, get_host_site
from execo import Put, Remote
from execo_engine import logger
     def prepare_bench(self):
     def prepare_bench(self):
         """Copy required files on frontend(s) and compile bench suite, adapted for
         """bench configuration and compilation, copy binaries to frontends
         the clusters to be benchmarked """
       
        return True if preparation is ok
         """
         logger.info("preparation: configure and compile benchmark")
         logger.info("preparation: configure and compile benchmark")
        # the involved sites. We will do the compilation on the first of these.
         sites = list(set(map(get_cluster_site, self.parameters['cluster'])))
         sites = list(set(map(get_cluster_site, self.parameters['cluster'])))
         copy_bench = Put(sites, ['NPB3.3-MPI.tar.bz2']).run()
         # generate the bench compilation configuration
        extract_bench = Remote('tar -xjf NPB3.3-MPI.tar.bz2', sites).run()
       
         bench_list = '\n'.join([ 'lu\t%s\t%s' % (size, n_core)
         bench_list = '\n'.join([ 'lu\t%s\t%s' % (size, n_core)
                                 for n_core in self.parameters['n_core']
                                 for n_core in self.parameters['n_core']
                                 for size in self.parameters['size'] ])
                                 for size in self.parameters['size'] ])
         # Reserving a node because compiling on the frontend is forbidden
         # Reserving a node because compiling on the frontend is forbidden
        # and because we need mpif77
         jobs = oarsub([(OarSubmission(resources = "nodes=1",
         jobs = oarsub([(OarSubmission(resources = "nodes=1",
                                    job_type = 'allow_classic_ssh',
                                      walltime ='0:10:00'), sites[0])])
                                    walltime ='0:10:00'), sites[0])])
         job_id, site = jobs[0]
         if jobs[0][0]:
        if job_id:
             try:
             try:
                 wait_oar_job_start(*jobs[0])
                 copy_bench = Put([site], ['NPB3.3-MPI.tar.bz2']).run()
                 nodes = get_oar_job_nodes(*jobs[0])
                extract_bench = Remote('tar -xjf NPB3.3-MPI.tar.bz2', [site]).run()
                wait_oar_job_start(job_id, site)
                 nodes = get_oar_job_nodes(job_id, site)
                 conf_bench = Remote('echo "%s" > ~/NPB3.3-MPI/config/suite.def' % bench_list, nodes).run()
                 conf_bench = Remote('echo "%s" > ~/NPB3.3-MPI/config/suite.def' % bench_list, nodes).run()
                makefilepatch = Remote('sed -i "s/^FFLAGS\t= -O3$/FFLAGS\t= -O3 -fallow-argument-mismatch/" ~/NPB3.3-MPI/config/make.def', nodes).run()
                 compilation = Remote('cd NPB3.3-MPI && make clean && make suite', nodes).run()
                 compilation = Remote('cd NPB3.3-MPI && make clean && make suite', nodes).run()
             except:
             except:
Line 217: Line 274:
         # Copying binaries to all other frontends
         # Copying binaries to all other frontends
         frontends = sites[1:]
         frontends = sites[1:]
         rsync = Remote('rsync -avuP ~/NPB3.3-MPI/ {{frontends}}:NPB3.3-MPI',  
         rsync = Remote('rsync -avuP ~/NPB3.3-MPI/ <nowiki>{{frontends}}</nowiki>:NPB3.3-MPI',  
            [get_host_site(nodes[0])] * len(frontends))  
                      [get_host_site(nodes[0])] * len(frontends))  
         rsync.run()
         rsync.run()
         return compilation.ok
         return compilation.ok and rsync.ok
 
==The experimental workflow==
 
The experimental workflow (the <code>run_xp</code> of our engine) consists in iterating over the parameter combinations, and for each, running the MPI bench and collecting the results.
 
First of all, as we run the bench on several clusters, we need a function which gives us the appropriate <code>mpirun</code> options depending on the cluster:


==Create the experimental workflow==
def get_mpi_opts(cluster):
    # MPI configuration depends on the cluster
    # see https://www.grid5000.fr/mediawiki/index.php/FAQ#MPI_options_to_use
    if cluster in ['parapluie', 'parapide', 'griffon',
                    'graphene', 'edel', 'adonis', 'genepi' ]:
        mpi_opts = '--mca btl openib,sm,self --mca pml ^cm'
    elif cluster in ['suno', 'chinqchint']:
        mpi_opts = '--mca pml ob1 --mca btl tcp,self'
    elif cluster in ['sol']:
        mpi_opts = '--mca pml cm'
    else:
        mpi_opts = '--mca pml ob1 --mca btl tcp,self'
    return mpi_opts


To understand the iteration over the parameters space, you can look at the [https://mimbert.gitlabpages.inria.fr/execo/execo_engine.html#paramsweeper ParamSweeper class documentation]. We use methods:
* <code>get_remaining</code>: get the list of unexplored parameter combinations.
* <code>get_next</code>: get the next available unexplored parameter combination (and mark it as ''in progress'').
* <code>skip</code>: skip and mark a parameter combination as ignored.
* <code>cancel</code>: put an ''in progress'' parameter combination back in the queue of unexplored.
* <code>done</code>: mark a parameter combination as ''done'' (and remove it from the queue of ''in progress'').
from execo_g5k import get_host_attributes, get_cluster_site, \
  oarsub, OarSubmission, wait_oar_job_start, get_oar_job_nodes, oardel
from execo import SshProcess
from execo_engine import slugify, logger
     def run_xp(self):
     def run_xp(self):
         """Iterate over the parameters and execute the bench """
         """Iterate over the parameters and execute the bench"""
         while len(self.sweeper.get_remaining()) > 0:
         while len(self.sweeper.get_remaining()) > 0:
             comb = self.sweeper.get_next()
             comb = self.sweeper.get_next()
             if comb['n_core'] > get_host_attributes(comb['cluster']+'-1')['architecture']['smt_size'] * self.n_nodes:  
             if comb['n_core'] > get_host_attributes(comb['cluster']+'-1')['architecture']['nb_cores'] * self.n_nodes:  
                 self.sweeper.skip(comb)
                 self.sweeper.skip(comb)
                 continue
                 continue
Line 234: Line 321:
             site = get_cluster_site(comb['cluster'])
             site = get_cluster_site(comb['cluster'])
             jobs = oarsub([(OarSubmission(resources = "{cluster='" + comb['cluster']+"'}/nodes=" + str(self.n_nodes),
             jobs = oarsub([(OarSubmission(resources = "{cluster='" + comb['cluster']+"'}/nodes=" + str(self.n_nodes),
                                          job_type = 'allow_classic_ssh',
                                           walltime ='0:10:00'),  
                                           walltime ='0:10:00'),  
                             site)])
                             site)])
Line 241: Line 327:
                     wait_oar_job_start(*jobs[0])
                     wait_oar_job_start(*jobs[0])
                     nodes = get_oar_job_nodes(*jobs[0])
                     nodes = get_oar_job_nodes(*jobs[0])
                     bench_cmd = 'mpirun -H %s -n %i %s ~/NPB3.3-MPI/bin/lu.%s.%i' % (
                     bench_cmd = 'mpirun -H %s -n %i %s ~/NPB3.3-MPI/bin/lu.%s.%i' % (                              
                        ",".join([node.address for node in nodes]),
                      ",".join([                                                                              
                        comb['n_core'],
                          "%s:%i" % (node.address, get_host_attributes(comb['cluster']+'-1')['architecture']['nb_cores'])               
                          for node in nodes]),
                         get_mpi_opts(comb['cluster']),
                         get_mpi_opts(comb['cluster']),
                         comb['size'],
                         comb['size'],
                         comb['n_core'])
                         comb['n_core'])
                     lu_bench = SshProcess(bench_cmd, nodes[0])
                     lu_bench = SshProcess(bench_cmd, nodes[0])
                     lu_bench.stdout_handlers.append(self.result_dir + '/' + slugify(comb) + '.out')
                     lu_bench.stdout_handlers.append(self.result_dir + '/' + slugify(sorted(comb.items())) + '.out')
                     lu_bench.run()
                     lu_bench.run()
                     if lu_bench.ok:
                     if lu_bench.ok:
Line 258: Line 345:
             logger.info("comb NOT ok: %s" % (comb,))
             logger.info("comb NOT ok: %s" % (comb,))
             self.sweeper.cancel(comb)
             self.sweeper.cancel(comb)
An interesting detail is the use of field [https://mimbert.gitlabpages.inria.fr/execo/execo.html#execo.process.ProcessBase.stdout_handlers <code>stdout_handlers</code>] of the <code>lu_bench SshProcess</code>: appending a file name to the the list <code>stdout_handlers</code> of an execo <code>Process</code> redirects the stdout of the process to the file. Note that the <code>SshProcess</code> is a remote process, but the file is a local file, thus, there is no need to copy the results, they are directly written in the result directory. Also, function [https://mimbert.gitlabpages.inria.fr/execo/execo_engine.html#slugify slugify] is a convenient function for generating from a parameter combination a string which can be used as a filename.


==Launch the engine==
==Launch the engine==
Line 266: Line 355:
     engine = mpi_bench()
     engine = mpi_bench()
     engine.start()
     engine.start()
Your code should be complete now. For your convenience, the code code is available [https://api.grid5000.fr/sid/sites/lyon/public/mimbert/execo_tutorial/mpi_bench.py here] and you can get it with:
$ wget --no-check-certificate https://api.grid5000.fr/sid/sites/lyon/public/mimbert/execo_tutorial/mpi_bench.py


You can then run it with a simple command such as
You can then run it with a simple command such as


  $ ipython -i -- ./mpi_bench.py
  $ python3 ./mpi_bench.py
 
You can run the following to get the list of command line options:
 
$ python3 ./mpi_bench.py --help
 
For better debugging, if needed, you can also run it in ipython with
 
$ ipython3 --pdb -i ./mpi_bench.py
 
Thanks to the [https://mimbert.gitlabpages.inria.fr/execo/execo_engine.html#paramsweeper execo_engine.Paramsweeper], the progress of the benches is saved to disk and can later be resumed by running mpi_bench.py with option -c.
 
The full bench duration is near 90 minutes with the list of clusters given in example. If you want to test it more quickly, you can reduce the number of problem sizes (removing sizes B, C) or reduce the number of clusters. Also, in this example, all benches are ran sequentially. It is possible to run benches for different clusters in parallel. It can be done easily by inheriting an engine class from the [https://github.com/lpouillo/execo-g5k-tools/tree/master/engines/g5k_cluster_engine g5k_cluster_engine class].
 
==Using the results==
 
Using [https://api.grid5000.fr/sid/sites/lyon/public/mimbert/execo_tutorial/draw_mpi_bench.py this python/matplotlib based script], you can generate graphs showing the results of the experiment. You can run it in Grid'5000 this way:
 
$ wget --no-check-certificate https://api.grid5000.fr/sid/sites/lyon/public/mimbert/execo_tutorial/draw_mpi_bench.py
$ python3 draw_mpi_bench.py -f result.png <experiment_dir>
 
It then generates for each cluster a file result_<N>.png. You can also use interactive visualization by connecting to Grid'5000 with <code>ssh -X</code> and not using option -f.
 
Here are some examples of graphs generated with this experiment scripts:
 
[[File:execo_mpi_bench_result_0.png|320px]][[File:execo_mpi_bench_result_1.png|320px]][[File:execo_mpi_bench_result_2.png|320px]][[File:execo_mpi_bench_result_3.png|320px]]
 
=Conclusion=
 
During this tutorial, we have learned how to use execo API to quickly create a automated and reproducible experiment on the Grid'5000 platform. Many other features, such as advanced deployment management, resources planning, scalable number of
concurrent SSH connections (thanks to TakTuk), network topology, can be found in execo. If interested, you can join the execo mailing list (see [https://mimbert.gitlabpages.inria.fr/execo/readme.html execo README]).

Latest revision as of 14:46, 6 April 2022

Overview

The goal of this practical session is to show how to use execo to quickly and easily prototype / develop reproducible experiments. The aim of this session is to show issues of experiment development as faced by typical experimenters, when using Grid'5000, and how execo can help them being more productive and getting more reproducible results.

This practical session will start by showing how to use execo to interactively develop the different steps of an experiment, then it will show how to use execo to transform this prototype in a fully automatic, configurable, robust experiment engine, producing reproducible results, and able to run on a much larger parameters space.

Tool: execo

execo offers a Python API for local or remote, standalone or parallel, Unix processes execution . It is especially well suited for quickly and easily workflows scripting of parallel/distributed operations on local or remote hosts: Automate a scientific workflow, conduct computer science experiments, perform automated tests, etc. The core python package is execo. The execo_g5k package provides a set of tools and extensions for the Grid'5000 testbed. The execo_engine package provides tools to ease the development of computer sciences experiments.

Tutorial requirements

This tutorial requires users to know basic python and be reasonably familiar with Grid'5000 usage (this is not an absolute beginner session). During the tutorial, users will need to reserve a few nodes on at least two different clusters.

Detailed session program

The use case (the experiment which we will use as a support to illustrate execo functionality) is to use an MPI benchmark to analyze performance scalability on various Grid'5000 clusters (depending on number of cores and problem size).

execo introduction

See the following slides.

execo installation

On a Grid'5000 frontend, run:

$ pip3 install --user requests
$ pip3 install --user execo

(note in the commands above that one currently needs to manually install requests to be able to use execo on Grid5000)

To check that everything is setup correctly, run a simple hello world:

$ ipython3
In [1]: import execo
In [2]: execo.Process("echo 'hello, world'").run().stdout
Out[2]: 'hello, world\n'

Prototype the experiment interactively

The good setup to work on this tutorial is to have three terminals opened, with three ssh sessions to a Grid'5000 frontend of your choice. One terminal will be used for shell commands (prefixed by $), the other for a python or ipython interpreter (prefixed by >>>). The third terminal will be used to run the editor of your choice, to edit files. GNU Screen should be used to allow the working session to survive disconnections.

Let's start by creating a directory for the tutorial, on the frontend:

$ mkdir ~/execo_tutorial && cd ~/execo_tutorial

From now on, all commands prefixed by >>> are to be run in a python shell, preferably ipython, which is more user-friendly. All python sessions should import the execo modules:

$ ipython3
>>> from execo import *
>>> from execo_g5k import *
>>> from execo_engine import *

The experiment

We will run the LU benchmark (Lower-Upper symmetric Gauss-Seidel) from the NAS Parallel Benchmark, which solves a system of nonlinear PDE. We will run this benchmark using MPI for parallelization over several cores. This benchmark can run on several problem sizes (named A, B, C, etc.). This benchmark must be configured, at compilation time, knowing the number of cores used in parallel and the problem sizes.

The NAS Parallel Benchmark suite can be downloaded from here but downloading involves registering, so for the convenience of the tutorial, we have put the archive here.

We'll start by prototyping the experiment interactively. Some of the following steps can easily be performed without execo, but we use execo because in the end we will automate everything. For each execo function or class used, you can always look at the documentation in the Execo API documentation

Reserve some grid5000 compute nodes

Let's reserve some nodes on a site, for example lyon:

>>> jobs = oarsub([(OarSubmission("cluster=1/nodes=2", walltime=3600), "lyon")])
>>> jobs
[(<jobid>, 'lyon')]
>>> job_id, site = jobs[0]

We can get information on the job:

>>> get_oar_job_info(job_id, site)

And get the list of nodes:

>>> nodes = get_oar_job_nodes(job_id, site)
>>> nodes
[Host('<node1>.lyon.grid5000.fr'),
 Host('<node2>.lyon.grid5000.fr')]

Configure, compile and install the benchmark program on one node

We will use one of the NPB bench, namely a LU decomposition, that performs a linear system solver. Downloading the benchmark, extract it:

$ wget --no-check-certificate https://api.grid5000.fr/sid/sites/lyon/public/mimbert/execo_tutorial/NPB3.3-MPI.tar.bz2
$ tar -xjf NPB3.3-MPI.tar.bz2

Prior to compiling it, we must configure the build according to the total number of cores and the problem sizes which will be used. So we first need to retrieve the number of core of the cluster thanks to the Grid'5000 API:

>>> n_core = get_host_attributes(nodes[0])['architecture']['nb_cores']
>>> total_core = n_core * len(nodes)
>>> total_core
<num total cores>

The problem size can only be a power of two for these benchmark, let's the highest power of two that is inferior to the total number of core

>>> import math
>>> p_size = 2**(math.ceil(math.log2(total_core))-1)

We only compile for this problem size

>>> conf = Process('echo "lu\tA\t%i" > NPB3.3-MPI/config/suite.def' % p_size, shell = True)
>>> conf.run()

We also need to slightly modify the makefile to get it to compile

>>> makefilepatch = Process('sed -i "s/^FFLAGS\t= -O3$/FFLAGS\t= -O3 -fallow-argument-mismatch/" NPB3.3-MPI/config/make.def').run()

Now compile it on a node (not on a frontend, because it's forbidden ;-)... and because we need mpif77):

>>> compilation = SshProcess('cd execo_tutorial/NPB3.3-MPI && make clean && make suite', nodes[0])
>>> compilation.run()

We can see a summary of the compilation process:

>>> print(compilation)

It should be ok:

>>> compilation.ok
True

Also check the detailed compilation outputs, since the compilation process may return without error while still being unsuccessful:

>>> print(compilation.stdout)
>>> print(compilation.stderr)

If everything is OK, the program is ready to be run.

Run the benchmark program

We run the benchmark with the default OpenMPI of Grid'5000 compute nodes, using mpirun. For prototyping, let's use a default fallback OpenMPI connector configuration which will run on most clusters (see FAQ#MPI_options_to_use)

>>> bench = SshProcess('mpirun -H %s -n %i --mca pml ob1 --mca btl tcp,self ~/execo_tutorial/NPB3.3-MPI/bin/lu.A.%i' % (",".join(["%s:%i" % (node.address, n_core) for node in nodes]), p_size, p_size), nodes[0])
>>> bench.run()
>>> bench.ok
>>> print(bench.stdout)
Help.png Be sure to check https://www.grid5000.fr/w/Run_MPI_On_Grid'5000 for some cluster specific settings / options for MPI

Release your resources

>>> oardel(jobs)

Summary

Up to this point, there is no real added value using execo versus doing the same thing in the unix shell. The real added value is that now we can assemble all these steps in an automatic experiment script, with many benefits compared to a shell script: More control over the experiment flow, easier handling of errors, easier handling of concurrency, and the full power of a generic programming language (python).

Transform this prototype in an automated experiment engine

In this section, we show how each step previously interactively prototyped can be integrated in a fully automated experiment engine.

The skeleton: an empty (yet) experiment engine

Execo ships with a generic Engine class which is well-suited for automatizing an experimental workflow with varying parameters. We define and inherit from it our experiment engine with the following experimental workflow:

  • define the parameters to explore: number of parallel nodes, which problem sizes, which clusters
  • prepare the bench according to the parameters (bench compilation)
  • run the bench on the various parameter combinations (and of course, save the results)

Create a new file ~/execo_tutorial/mpi_bench.py with the following code in it, which is the skeleton of our experimental workflow:

from execo_engine import Engine

class mpi_bench(Engine):

    def run(self):
        """Inherited method, put here the code for running the engine"""
        self.define_parameters()
        if self.prepare_bench():
            self.run_xp()
 
    def define_parameters(self):
        """Create the iterator that contains the parameters to be explored """
        pass

    def prepare_bench(self):
        """Copy required files on frontend(s) and compile bench suite, adapted for 
        the clusters to be benchmarked """
        pass

    def run_xp(self):
        """Iterate over the parameters and execute the bench """
        pass

Now let's fill the blanks and implement these 3 steps:

Define the parameters space

For this tutorial, we fix the number of nodes to be used to 4 (to avoid using too much resources). We need to compute the number of cores available on these 4 nodes which depends on the cluster, using function get-host-attributes. We need to do that for each involved cluster, which gives us the max number of cores that can be used in our parameters space. For example:

>>> clusters = ['petitprince', 'edel', 'paravance', 'stremi']
>>> n_nodes = 4
>>> max_core = n_nodes * max([
...         get_host_attributes(cluster + '-1')['architecture']['nb_cores']
...         for cluster in clusters])
96

On all clusters, we will run the bench with a varying number of parallel cores, among powers of two and up to the number of cores of 4 nodes. For example:

>>> from itertools import takewhile, count
>>> list(filter(lambda i: i >= n_nodes,
...        list(takewhile(lambda i: i<max_core,
...                       (2**i for i in count(0, 1))))))
[4, 8, 16, 32, 64]

We use the function sweep to generate all the parameters combinations, and the ParamSweeper class to store and iterate the parameters space, and checkpoint the progress. So we propose this implementation of method define_parameters:

import os
from execo_g5k import get_host_attributes
from itertools import takewhile, count
from execo_engine import logger, ParamSweeper, sweep

    def define_parameters(self):
        """Create the iterator that contains the parameters to be explored """
        self.n_nodes = 4
        # Choose a list of clusters
        clusters = ['petitprince', 'edel', 'paravance', 'stremi']
        # Compute the maximum number of cores 
        max_core = self.n_nodes * max([
                get_host_attributes(cluster + '-1')['architecture']['nb_cores']
                for cluster in clusters])
        # Define the parameter space
        self.parameters = {
            'cluster' : clusters,
            'n_core': list(filter(lambda i: i >= self.n_nodes,
                             list(takewhile(lambda i: i<max_core,
                                            (2**i for i in count(0, 1)))))),
            'size' : ['A', 'B', 'C']
            }
        logger.info(self.parameters)
        self.sweeper = ParamSweeper(os.path.join(self.result_dir, "sweeps"), sweep(self.parameters))
        logger.info('Number of parameters combinations %s', len(self.sweeper.get_remaining()))

Prepare the bench

The bench preparation steps are:

  • reserve a node
  • extract the bench tgz (no need to copy it to the node, we use the NFS to access our home directory)
  • configure bench compilation: number of cores and problem sizes
  • compile the bench
  • copy the compiled bench to all involved frontends
from execo_g5k import get_cluster_site, oarsub, OarSubmission, \
  wait_oar_job_start, get_oar_job_nodes, oardel, get_host_site
from execo import Put, Remote
from execo_engine import logger

   def prepare_bench(self):
       """bench configuration and compilation, copy binaries to frontends
       
       return True if preparation is ok
       """
       logger.info("preparation: configure and compile benchmark")
       # the involved sites. We will do the compilation on the first of these.
       sites = list(set(map(get_cluster_site, self.parameters['cluster'])))
       # generate the bench compilation configuration
       bench_list = '\n'.join([ 'lu\t%s\t%s' % (size, n_core)
                                for n_core in self.parameters['n_core']
                                for size in self.parameters['size'] ])
       # Reserving a node because compiling on the frontend is forbidden
       # and because we need mpif77
       jobs = oarsub([(OarSubmission(resources = "nodes=1",
                                     walltime ='0:10:00'), sites[0])])
       job_id, site = jobs[0]
       if job_id:
           try:
               copy_bench = Put([site], ['NPB3.3-MPI.tar.bz2']).run()
               extract_bench = Remote('tar -xjf NPB3.3-MPI.tar.bz2', [site]).run()
               wait_oar_job_start(job_id, site)
               nodes = get_oar_job_nodes(job_id, site)
               conf_bench = Remote('echo "%s" > ~/NPB3.3-MPI/config/suite.def' % bench_list, nodes).run()
               makefilepatch = Remote('sed -i "s/^FFLAGS\t= -O3$/FFLAGS\t= -O3 -fallow-argument-mismatch/" ~/NPB3.3-MPI/config/make.def', nodes).run()
               compilation = Remote('cd NPB3.3-MPI && make clean && make suite', nodes).run()
           except:
               logger.error("unable to compile bench")
               return False
           finally:
               oardel(jobs)
       # Copying binaries to all other frontends
       frontends = sites[1:]
       rsync = Remote('rsync -avuP ~/NPB3.3-MPI/ {{frontends}}:NPB3.3-MPI', 
                      [get_host_site(nodes[0])] * len(frontends)) 
       rsync.run()
       return compilation.ok and rsync.ok

The experimental workflow

The experimental workflow (the run_xp of our engine) consists in iterating over the parameter combinations, and for each, running the MPI bench and collecting the results.

First of all, as we run the bench on several clusters, we need a function which gives us the appropriate mpirun options depending on the cluster:

def get_mpi_opts(cluster):
    # MPI configuration depends on the cluster
    # see https://www.grid5000.fr/mediawiki/index.php/FAQ#MPI_options_to_use
    if cluster in ['parapluie', 'parapide', 'griffon',
                   'graphene', 'edel', 'adonis', 'genepi' ]:
        mpi_opts = '--mca btl openib,sm,self --mca pml ^cm'
    elif cluster in ['suno', 'chinqchint']:
        mpi_opts = '--mca pml ob1 --mca btl tcp,self'
    elif cluster in ['sol']:
        mpi_opts = '--mca pml cm'
    else:
        mpi_opts = '--mca pml ob1 --mca btl tcp,self'
    return mpi_opts

To understand the iteration over the parameters space, you can look at the ParamSweeper class documentation. We use methods:

  • get_remaining: get the list of unexplored parameter combinations.
  • get_next: get the next available unexplored parameter combination (and mark it as in progress).
  • skip: skip and mark a parameter combination as ignored.
  • cancel: put an in progress parameter combination back in the queue of unexplored.
  • done: mark a parameter combination as done (and remove it from the queue of in progress).
from execo_g5k import get_host_attributes, get_cluster_site, \
  oarsub, OarSubmission, wait_oar_job_start, get_oar_job_nodes, oardel
from execo import SshProcess
from execo_engine import slugify, logger

   def run_xp(self):
       """Iterate over the parameters and execute the bench"""
       while len(self.sweeper.get_remaining()) > 0:
           comb = self.sweeper.get_next()
           if comb['n_core'] > get_host_attributes(comb['cluster']+'-1')['architecture']['nb_cores'] * self.n_nodes: 
               self.sweeper.skip(comb)
               continue
           logger.info('Processing new combination %s' % (comb,))
           site = get_cluster_site(comb['cluster'])
           jobs = oarsub([(OarSubmission(resources = "{cluster='" + comb['cluster']+"'}/nodes=" + str(self.n_nodes),
                                         walltime ='0:10:00'), 
                           site)])
           if jobs[0][0]:
               try:
                   wait_oar_job_start(*jobs[0])
                   nodes = get_oar_job_nodes(*jobs[0])
                   bench_cmd = 'mpirun -H %s -n %i %s ~/NPB3.3-MPI/bin/lu.%s.%i' % (                                
                      ",".join([                                                                               
                          "%s:%i" % (node.address, get_host_attributes(comb['cluster']+'-1')['architecture']['nb_cores'])                 
                          for node in nodes]),
                       get_mpi_opts(comb['cluster']),
                       comb['size'],
                       comb['n_core'])
                   lu_bench = SshProcess(bench_cmd, nodes[0])
                   lu_bench.stdout_handlers.append(self.result_dir + '/' + slugify(sorted(comb.items())) + '.out')
                   lu_bench.run()
                   if lu_bench.ok:
                       logger.info("comb ok: %s" % (comb,))
                       self.sweeper.done(comb)
                       continue
               finally:
                   oardel(jobs)
           logger.info("comb NOT ok: %s" % (comb,))
           self.sweeper.cancel(comb)

An interesting detail is the use of field stdout_handlers of the lu_bench SshProcess: appending a file name to the the list stdout_handlers of an execo Process redirects the stdout of the process to the file. Note that the SshProcess is a remote process, but the file is a local file, thus, there is no need to copy the results, they are directly written in the result directory. Also, function slugify is a convenient function for generating from a parameter combination a string which can be used as a filename.

Launch the engine

Just add the following lines to make your engine runable:

if __name__ == "__main__":
    engine = mpi_bench()
    engine.start()

Your code should be complete now. For your convenience, the code code is available here and you can get it with:

$ wget --no-check-certificate https://api.grid5000.fr/sid/sites/lyon/public/mimbert/execo_tutorial/mpi_bench.py

You can then run it with a simple command such as

$ python3 ./mpi_bench.py

You can run the following to get the list of command line options:

$ python3 ./mpi_bench.py --help

For better debugging, if needed, you can also run it in ipython with

$ ipython3 --pdb -i ./mpi_bench.py

Thanks to the execo_engine.Paramsweeper, the progress of the benches is saved to disk and can later be resumed by running mpi_bench.py with option -c.

The full bench duration is near 90 minutes with the list of clusters given in example. If you want to test it more quickly, you can reduce the number of problem sizes (removing sizes B, C) or reduce the number of clusters. Also, in this example, all benches are ran sequentially. It is possible to run benches for different clusters in parallel. It can be done easily by inheriting an engine class from the g5k_cluster_engine class.

Using the results

Using this python/matplotlib based script, you can generate graphs showing the results of the experiment. You can run it in Grid'5000 this way:

$ wget --no-check-certificate https://api.grid5000.fr/sid/sites/lyon/public/mimbert/execo_tutorial/draw_mpi_bench.py
$ python3 draw_mpi_bench.py -f result.png <experiment_dir>

It then generates for each cluster a file result_<N>.png. You can also use interactive visualization by connecting to Grid'5000 with ssh -X and not using option -f.

Here are some examples of graphs generated with this experiment scripts:

Execo mpi bench result 0.pngExeco mpi bench result 1.pngExeco mpi bench result 2.pngExeco mpi bench result 3.png

Conclusion

During this tutorial, we have learned how to use execo API to quickly create a automated and reproducible experiment on the Grid'5000 platform. Many other features, such as advanced deployment management, resources planning, scalable number of concurrent SSH connections (thanks to TakTuk), network topology, can be found in execo. If interested, you can join the execo mailing list (see execo README).