Execo Practical Session

From Grid5000
Jump to navigation Jump to search

Overview

The goal of this practical session is to show how to use execo ([1]) to quickly and easily prototype / develop reproducible experiments. The aim in this session is to show the issues of experiment development as faced by typical experimenters, when using grid5000, and how execo can help them being more productive and getting more reproducible results.

This practical session will start by showing how to use execo to interactively develop the different steps of an experiment, then it will show how to use execo to transform this prototype in a fully automatic, configurable, robust experiment engine, producing reproducible results, and able to run on a much larger parameters space.

Tool: execo

execo offers a Python API for local or remote, standalone or parallel, unix processes execution . It is especially well suited for quickly and easily scripting workflows of parallel/distributed operations on local or remote hosts: automate a scientific workflow, conduct computer science experiments, perform automated tests, etc. The core python package is execo. The execo_g5k package provides a set of tools and extensions for the grid5000 testbed. The execo_engine package provides tools to ease the development of computer sciences experiments.

Tutorial requirements

This tutorial requires users to know basic python and be reasonably familiar with grid5000 usage (this is not an absolute beginner session). During the tutorial, users will need to reserve a few nodes on at least two different clusters.

Detailed session program

The use case (the experiment which we will use as a support to illustrate execo functionality) is to benchmark an MPI application on different grid5000 clusters.

execo introduction

slides

execo installation

On a grid5000 frontend, run:

$ export http_proxy="http://proxy:3128"
$ export https_proxy="http://proxy:3128"
$ easy_install --user execo

To check that everything is setup correctly, run a simple hello world:

$ ipython
In [1]: import execo
In [2]: execo.Process("echo 'hello, world'").run().stdout
Out[2]: 'hello, world\n'

Prototype the experiment interactively

Let's start by creating a directory for the tutorial, on the frontend:

$ mkdir ~/execo_tutorial && cd ~/execo_tutorial

From now on, all commands prefixed by >>> are to be run in a python shell, preferably ipython, which is more user-friendly. All python sessions should import the execo modules:

$ ipython
>>> from execo import *
>>> from execo_g5k import *

Reserve some grid5000 compute nodes

Let's reserve some nodes on a site, for example lyon:

>>> jobs = oarsub([(OarSubmission("cluster=1/nodes=2",
...                               walltime=7200,
...                               job_type="allow_classic_ssh"), "lyon")])
>>> jobs
[(<jobid>, 'lyon')]

We can get informations on the job:

>>> get_oar_job_info(*jobs[0])

And get the list of nodes:

>>> nodes = get_oar_job_nodes(*jobs[0])
>>> nodes
[Host('<node1>.lyon.grid5000.fr'),
 Host('<node2>.lyon.grid5000.fr')]

Configure, compile and install the benchmark program on one node

We will use one of the NPB bench, namely a LU decomposition, that performs a linear system solver. Downloading the benchmark, extract it:

$ wget http://public.lyon.grid5000.fr/~lpouilloux/NPB3.3-MPI.tar.bz2
$ tar -xjf NPB3.3-MPI.tar.bz2

Prior to compiling it, we must configure the build according to the total number of cores which will be used. So we first need to retrieve the numbe of core of the cluster thanks to the grid5000 API:

>>> n_core = get_host_attributes(nodes[0])['architecture']['smt_size']
>>> total_core = n_core * len(nodes)
>>> total_core
<num total cores>
>>> conf = Process('echo "lu\tA\t%i" > NPB3.3-MPI/config/suite.def' % total_core)
>>> conf.shell = True
>>> conf.run()

Now compile it on a node (not on a frontend, because it's forbidden ;-)... and because we need mpif77):

>>> compilation = SshProcess('cd execo_tutorial/NPB3.3-MPI && make clean && make suite', nodes[0])
>>> compilation.run()

We can see a summary of the compilation process:

>>> print compilation

It should be ok:

>>> compilation.ok

We can also have a detailed look at compilation outputs if needed:

>>> print compilation.stdout
>>> print compilation.stderr

The program is ready to be run.

Run the benchmark program

>>> bench = SshProcess('mpirun -H %s -n %i --mca pml ob1 --mca btl tcp,self ~/execo_tutorial/NPB3.3-MPI/bin/lu.A.%i' % (
...                        ",".join([node.address for node in nodes]),
...                        total_core, total_core),
...                    nodes[0]) 
>>> bench.run()
>>> bench.ok
>>> print bench.stdout

Release your resources

>>> oardel(jobs)

todo: ajouter quelque part le script complet tout automatisé

Transform this prototype in an automated experiment engine

Execo ships with a generic Engine class which is well-suited for automatizing an experimental workflow with varying parameters. Open a file and define a new engine:

from execo_engine import engine, ParamSweeper

class mpi_bench(Engine)

   def run(self):
       """Inherited method, put here the code for running the engine"""
       self.define_parameters()
       if self.prepare_bench():
           self.run_xp()
 
   def define_parameters(self):
       """Create the iterator that contains the parameters to be explored """
       pass

   def prepare_bench(self):
       """Copy required files on frontend(s) and compile bench suite, adapted for 
       the clusters to be benchmarked """
       pass

   def run_xp(self):
       """Iterate over the parameters and execute the bench """
       pass

Define the parameters space

Implement the method define_parameters using, for example:

   def define_parameters(self):
       """Create the iterator that contains the parameters to be explored """
       # Choose a list of clusters
       self.n_nodes = 4
       clusters = ['graphene', 'petitprince', 'edel', 'paradent', 'stremi']
       #clusters = ['petitprince', 'paradent']
       # Determine the maximum number of cores 
       max_core = self.n_nodes * max([get_host_attributes(cluster + '-1')['architecture']['smt_size']
             for cluster in clusters])
       self.parameters = {
           'cluster' : clusters,
           'n_core': filter(lambda i: i >= self.n_nodes,
                            list(takewhile(lambda i: i<max_core,
                                           (2**i for i in count(0, 1))))),
           'size' : ['A', 'B', 'C']
           }
       logger.info(self.parameters)
       self.sweeper = ParamSweeper(os.path.join(self.result_dir, "sweeps"), sweep(self.parameters))
       logger.info('Number of parameters combinations %s', len(self.sweeper.get_remaining()))

Prepare the bench

We need to:

  • copy the bench code to the involved grid5000 sites
  • extract it on all frontends
  • configure the bench compilation
  • build the bench on every site
   def prepare_bench(self):
       """Copy required files on frontend(s) and compile bench suite, adapted for 
       the clusters to be benchmarked """
       sites = list(set(map(get_cluster_site, self.parameters['cluster'])))
       copy_bench = Put(sites, ['NPB3.3-MPI.tar.bz2']).run()
       extract_bench = Remote('tar -xjf NPB3.3-MPI.tar.bz2', sites).run()
       bench_list = '\n'.join(['lu\tA\t' + str(i)  for i in self.parameters['core']]
       conf_bench = Remote('echo "%s" > ~/NPB3.3-MPI/config/suite.def' % bench_list, sites).run()
       compilation = Remote('cd NPB3.3-MPI && make clean && make suite', sites).run()
       return compilation.ok


  • Inherit a class from g5k_cluster_engine, which is a supplied, generic and reusable execo experiment engine automatizing the workflow of submitting jobs in parallel to grid5000 clusters / sites. It is well suited for bag-of-task kind of jobs, where the cluster is one of the experiment parameter, eg. benching flops, benching storage, network, etc.
  • Automate the workflow of section #Prototype the experiment interactively, for one cluster
  • Choose a parameter to explore, and hard-code the variation of this parameter.
  • Show how to use the ParamSweeper facility to easily explore a much larger parameter space, with the benefit of check-pointing the progress, allowing stopping and restarting the experiment.
  • Draw the same figure as in section #Prototype the experiment interactively, with much more data.

Create the experimental workflow

   def run_xp(self):
       """Iterate over the parameters and execute the bench """
       while len(self.sweeper.get_remaining()) > 0:
           comb = self.sweeper.get_next()
           logger.info('Treating new combination %s', slugify(comb))
           if comb['n_core'] > get_host_attributes(comb['cluster']+'-1')['architecture']['smt_size'] * self.n_nodes: 
               self.sweeper.skip(comb)
               continue
           site = get_cluster_site(comb['cluster'])
           jobs = oarsub([(OarSubmission(resources = "{cluster='" + comb['cluster']+"'}/nodes=" + str(self.n_nodes),
                                   job_type = 'allow_classic_ssh', 
                                   walltime ='0:10:00'), 
                   site)])
           if jobs[0][0]:
               try:
                   wait_oar_job_start(*jobs[0])
                   nodes = get_oar_job_nodes(*jobs[0])
                   if comb['cluster'] in ['paradent', 'parapide', 'griffon', 'graphene', 'edel', 'adonis', 'genepi' ]:
                       mpirun_opts = '--mca btl openib,sm,self --mca pml ^cm'
                   elif comb['cluster'] in ['parapluie', 'suno', 'chinqchint', 'chicon']:
                       mpirun_opts = '--mca pml ob1 --mca btl tcp,self'
                   elif comb['cluster'] in ['sol']:
                       mpirun_opts = '--mca pml cm'
                   else:
                       mpirun_opts = '--mca pml ob1 --mca btl tcp,self'
                   bench_cmd = 'mpirun -H %s -n %i %s ~/NPB3.3-MPI/bin/lu.A.%i' % (
                           ",".join([node.address for node in nodes]),
                           comb['n_core'], mpirun_opts, comb['n_core'])
                   lu_bench = SshProcess(bench_cmd, nodes[0])
                   lu_bench.stdout_handlers.append(self.result_dir + '/' + slugify(comb) + '.out')
                   lu_bench.run()
                   if lu_bench.ok:
                       self.sweeper.done(comb)
                       continue
               finally:
                   oardel(jobs)
           self.sweeper.cancel(comb)

Launch the engine