Commit 79b26d79 authored by Mark Hymers's avatar Mark Hymers

Merge branch '1-write-mpi-docs' into 'master'

Resolve "Write MPI docs"

Closes #1

See merge request !1
parents f9c8e1db 3a0be4b8
Pipeline #747 canceled with stage
in 16 seconds
......@@ -81,7 +81,7 @@ class MPI4PyImplementor(object):
# Get type
t = self.mpi.bcast(None, root=root)
if t == 'ANAM':
clsname = self.mpi.bcast(root=root)
clsname = self.mpi.bcast(None, root=root)
cls = find_class(clsname)
if cls is None:
raise RuntimeError("Cannot find class %s during "
......@@ -121,6 +121,8 @@ class MPI4PyImplementor(object):
if j < (num_pts % num_nodes):
jump += 1
jump = int(jump)
ret.append((cur, cur + jump, ))
cur += jump
......@@ -147,7 +149,7 @@ class MPI4PyImplementor(object):
data_type = info[-1]
# Calculate share size for this node
p = data_shape[0] / self.mpi.size
p = int(data_shape[0] / self.mpi.size)
if self.mpi.rank < (data_shape[0] % self.mpi.size):
p += 1
......@@ -162,7 +164,7 @@ class MPI4PyImplementor(object):
rsz = prod(data_shape[1:]).astype(int)
if self.mpi.rank == root:
# share out jobs as equally as possible
counts = array([data_shape[0]/self.mpi.size]*self.mpi.size)
counts = array([int(data_shape[0]/self.mpi.size)]*self.mpi.size)
counts[:data_shape[0] % self.mpi.size] += 1
# Calculate total data elements for each node
counts *= rsz
......
......@@ -15,6 +15,26 @@ Contents:
reference
Introduction
------------
`anamnesis` is a python module which provides the ability to easily
serialise/unserialise Python classes to and from the HDF5 file format.
It aims to be trivial to incorporate (normally requiring only a single
extra class variable to be added to your classes) and flexible.
The library also extends the HDF5 serialisation/unserialisation
capabilities to the MPI framework. This allows objects to be trivially
passed between nodes in an MPI computation. The library also provides
some wrapper routines to make it simpler to perform scatter and
gather operations on arrays and lists (lists may even contain
objects to be transferred).
`anamnesis` was originally written as part of the `NeuroImaging Analysis
Framework`, a library intended for use in MEG theory work written at
York NeuroImaging Centre, University of York, UK, but it has been split
out in order to make it more generically useful.
Indices and tables
==================
......
#!/usr/bin/python3
from anamnesis import AbstractAnam, register_class
class ComplexPerson(AbstractAnam):
hdf5_outputs = ['name', 'age']
hdf5_defaultgroup = 'person'
def __init__(self, name='Unknown', age=0):
AbstractAnam.__init__(self)
self.name = name
self.age = age
register_class(ComplexPerson)
class ComplexPlace(AbstractAnam):
hdf5_outputs = ['location']
hdf5_defaultgroup = 'place'
def __init__(self, location='Somewhere'):
AbstractAnam.__init__(self)
self.location = location
register_class(ComplexPlace)
class ComplexTrain(AbstractAnam):
hdf5_outputs = ['destination']
hdf5_defaultgroup = 'train'
def __init__(self, destination='Edinburgh'):
AbstractAnam.__init__(self)
self.destination = destination
def init_from_hdf5(self):
print("ComplexTrain.init_from_hdf5")
print("We have already set destination: {}".format(self.destination))
register_class(ComplexTrain)
#!/usr/bin/python3
from anamnesis import MPIHandler
from test_mpiclasses import ComplexPerson, ComplexPlace, ComplexTrain
# All nodes must perform this
m = MPIHandler(use_mpi=True)
if m.rank == 0:
# We are the master node
print("Master node")
# Create a person, place and train to broadcast
s_person = ComplexPerson('Fred', 42)
s_place = ComplexPlace('York')
s_train = ComplexTrain('Disastersville')
print("Master: Person: {} {}".format(s_person.name, s_person.age))
print("Master: Place: {}".format(s_place.location))
print("Master: Train to: {}".format(s_train.destination))
m.bcast(s_person)
m.bcast(s_place)
m.bcast(s_train)
else:
# We are a slave node
print("Slave node {}".format(m.rank))
# Wait for our objects to be ready
s_person = m.bcast()
s_place = m.bcast()
s_train = m.bcast()
print("Slave node {}: Person: {} {}".format(m.rank, s_person.name, s_person.age))
print("Slave node {}: Place: {}".format(m.rank, s_place.location))
print("Slave node {}: Train: {}".format(m.rank, s_train.destination))
# We need to make sure that we finalise MPI otherwise
# we will get an error on exit
m.done()
#!/usr/bin/python3
import sys
from anamnesis import MPIHandler
from test_mpiclasses import ComplexPerson, ComplexPlace, ComplexTrain
# All nodes must perform this
m = MPIHandler(use_mpi=True)
# We need at least three nodes for this
if m.size < 3:
print("Error: This example needs at least three MPI nodes")
m.done()
sys.exit(1)
if m.rank == 0:
# We are the master node
print("Master node")
# Create a person to broadcast
s_person = ComplexPerson('Fred', 42)
print("Master: Created Person: {} {}".format(s_person.name, s_person.age))
m.bcast(s_person)
s_place = m.bcast(root=1)
print("Master: Recieved Place: {}".format(s_place.location))
elif m.rank == 1:
# We are slave node 1
print("Slave node {}".format(m.rank))
# Wait for our broadcast object to be ready
s_person = m.bcast()
print("Slave node {}: Recieved Person: {} {}".format(m.rank, s_person.name, s_person.age))
# Now create our own object and broadcast it to the other nodes
s_place = ComplexPlace('Manchester')
print("Slave node {}: Created place: {}".format(m.rank, s_place.location))
m.bcast(s_place, root=1)
s_train = m.recv(source=2)
print("Slave node {}: Received Train: {} {}".format(m.rank, s_person.name, s_person.age))
else:
# We are slave node 2
print("Slave node {}".format(m.rank))
# Wait for our first broadcast object to be ready
s_person = m.bcast()
print("Slave node {}: Received Person: {} {}".format(m.rank, s_person.name, s_person.age))
# Wait for our second broadcast object to be ready
s_place = m.bcast(root=1)
print("Slave node {}: Received Place: {}".format(m.rank, s_place.location))
# Create a train and send to node 1 only
s_train = ComplexTrain('Durham')
print("Slave node {}: Created train: {}".format(m.rank, s_train.destination))
m.send(s_train, dest=1)
# We need to make sure that we finalise MPI otherwise
# we will get an error on exit
m.done()
#!/usr/bin/python3
import sys
import numpy as np
from anamnesis import MPIHandler
# All nodes must perform this
m = MPIHandler(use_mpi=True)
# We need at least three nodes for this
if m.size < 3:
print("Error: This example needs at least three MPI nodes")
m.done()
sys.exit(1)
# Create a matrix of data for scattering
# Pretend that we have 300 points of data which we want to scatter,
# each of which is a vector of dimension 20
# This creates a matrix containing 0-19 in row 1,
# 100-119 in row 2, etc
data_dim = 20
num_pts = 300
if m.rank == 0:
# We are the master node
print("Master node")
data = np.tile(np.arange((num_pts)) * 100, (data_dim, 1)).T + np.arange(data_dim)[None, :]
print("Master node: Full data array: ({}, {})".format(*data.shape))
# 1. Scatter using scatter_array
m1_data = m.scatter_array(data)
print("Master node M1: m1_data shape: ({}, {})".format(*m1_data.shape))
# 2. Scatter manually, using indices
# Send the data to all nodes
m.bcast(data)
# Calculate which indices each node should work on and send them around
scatter_indices = m.get_scatter_indices(data.shape[0])
m.bcast(scatter_indices)
indices = range(*scatter_indices[m.rank])
m2_data = data[indices, :]
print("Master node M2: m2_data shape: ({}, {})".format(*m2_data.shape))
# 3. Gather using the gather function
# Create some fake data to gather
ret_data = (np.arange(m2_data.shape[0]) + m.rank * 100)[:, None]
print("Master node: data to gather shape: ({}, {})".format(*ret_data.shape))
print("Master node: first 10 elements: ", ret_data[0:10, 0])
all_ret_data = m.gather(ret_data)
print("Master node: gathered data shape: ({}, {})".format(*all_ret_data.shape))
print("all_ret_data 0:10: ", all_ret_data[0:10, 0])
print("all_ret_data 100:110: ", all_ret_data[100:110, 0])
print("all_ret_data 200:210: ", all_ret_data[200:210, 0])
else:
# We are a slave node
print("Slave node {}".format(m.rank))
# 1. Scatter using scatter_array
m1_data = m.scatter_array(None)
print("Slave node {} M1: data shape: ({}, {})".format(m.rank, *m1_data.shape))
# 2. Scatter manually, using indices
# Recieve the full dataset
data = m.bcast()
# Get our indices
scatter_indices = m.bcast()
# Extract our data to work on
indices = range(*scatter_indices[m.rank])
m2_data = data[indices, :]
print("Slave node {} M2: data shape: ({}, {})".format(m.rank, *m2_data.shape))
# 3. Gather using the gather function
# Create some fake data to gather
ret_data = (np.arange(m2_data.shape[0]) + m.rank * 100)[:, None]
print("Slave node {}: data to gather shape: ({}, {})".format(m.rank, *ret_data.shape))
print("Slave node {}: first 10 elements: ".format(m.rank), ret_data[0:10, 0])
m.gather(ret_data)
# We need to make sure that we finalise MPI otherwise
# we will get an error on exit
m.done()
#!/usr/bin/python3
import sys
from anamnesis import MPIHandler
# All nodes must perform this
m = MPIHandler(use_mpi=True)
# We need at least three nodes for this
if m.size < 3:
print("Error: This example needs at least three MPI nodes")
m.done()
sys.exit(1)
# Create a list of data for scattering
# Pretend that we have 300 points of data which we want to scatter
num_pts = 300
if m.rank == 0:
# We are the master node
print("Master node")
data = [str(x) for x in range(num_pts)]
print("Master node: Full data array: len: {}".format(len(data)))
# Scatter using scatter_list
m1_data = m.scatter_list(data)
print("Master node M1: m1_data len: {}".format(len(m1_data)))
# Gather list back together again
all_ret_data = m.gather_list(m1_data, num_pts, return_all=True)
print("Master node: gathered list len: {}".format(len(all_ret_data)))
print("all_ret_data 0:10: ", all_ret_data[0:10])
print("all_ret_data 100:110: ", all_ret_data[100:110])
print("all_ret_data 200:210: ", all_ret_data[200:210])
else:
# We are a slave node
print("Slave node {}".format(m.rank))
# Scatter using scatter_list
m1_data = m.scatter_list(None)
print("Slave node {}: data len: {}".format(m.rank, len(m1_data)))
# Gather using the gather_list function
m.gather_list(m1_data, num_pts)
# We need to make sure that we finalise MPI otherwise
# we will get an error on exit
m.done()
Tutorial 1 - Broadcasting classes using MPI
===========================================
As well as serialisation to and from HDF5, Anamnesis provides wrapper functionality
to allow information to be sent between MPI nodes.
The use of the MPI functions in anamnesis requires the availability of the
`mpi4py` module. If this is not available, you will not be able to use
the MPI functions fully. You can, however, set `use_mpi=True` when creating
the `MPIHandler()` object (see below) and then continue to use the functions.
This allows you to write a single code base which will work both when doing
multi-processing using MPI and running on a single machine.
The MPI functions require the same setup (primarily the `hdf5_outputs` class variable)
as are used for the HDF5 serialisation / unserialisation, so we suggest that you
work through the Serialisation tutorials first.
We are going to re-use some of the classes from the previous example.
We place this code in `test_mpiclasses.py`
.. literalinclude:: test_mpiclasses.py
:language: python
We now write a simple Python script which uses the Anamnesis MPI interface. We
will design this code so that the master node creates an instance of two of the
classes and the slave nodes receive copies of these.
.. literalinclude:: test_script1.py
:language: python
To run this code, we need to execute it in an MPI environment. As usual, make
sure that anamnesis is on the `PYTHONPATH`.
We can then call `mpirun` directly:
.. code-block:: console
$ mpirun -np 2 python3 test_script1.py
Master node
Master: Person: Fred 42
Master: Place 1: York
Slave node 1
Master: Place 2: Glasgow
Slave node 1: Person: Fred 42
Slave node 1: Place 1: York
Slave node 1: Place 2: Glasgow
If you are using a cluster of some form (for instance `gridengine`), you
will need to make sure that you have a queue with MPI enabled and that
you submit your job to that queue. `Gridengine` in particular has
good tight MPI integration which will transparently handle setting
up the necessary hostlists.
The first thing which we need to do in the script is to initalise our
`MPIHandler`. This is a singleton object and the `use_mpi` argument
is only examined on the first use. This means that in future calls,
you can call it without passing any argument.
.. code-block:: python
m = MPIHandler(use_mpi=True)
In MPI, each instance of the script gets given a node number. By convention,
we consider node 0 as the master node. All other nodes are designated as
slave nodes. In order to decide whether we are the master node, we can
therefore check whether our `rank` (stored on our `MPIHandler` object) is
0.
If we are the master, we then create three objects (a Person, a Place
and a Train), set their attributes and print them out for reference.
We then broadcast each of them in turn to our slave node or nodes.
On the slave node(s), we simply wait to receive the objects which are
being sent from the master. There are two things to note. First, we
do not need to specify the object type on the slave, this information
is included in the MPI transfer. Second, we *must* make sure that
our transmit and receive code is lined up; i.e. if we broadcast three
items, every slave must receive three items. Code errors of this form
are one of the most common MPI debugging problems. Try and keep your
transmit / receive logic tidy and well understood in order to avoid
long debugging sessions [#f1].
Once we have recieved the objects, we can simply use them as we normally
would. Note that the objects are *not* shared before the two processes,
you now have two distinct copies of each object.
Finally, it is important to call the `MPIHandler.done()` method to
indicate to the MPI library that you have successfully finished.
.. rubric:: Fotenotes
.. [#f1] Note that mpi4py under Python3 has an unfortunate tendency to
swallow error messages which can make debugging frustrating.
This seems to have got worse since the python2 version.
Any suggestions as to how to improve this situation would be
gratefully recieved by the anamnesis authors.
Tutorial 2 - Sending to/from different nodes
============================================
In many cases, we will not want to send data just from the master
node to all other nodes. We can use a combination of `bcast`, `send`
and `recv` to flexibly send around objects.
Again, for this example we are going to re-use some of the classes from the
previous example which must be in `test_mpiclasses.py` (see Tutorial 1
for details).
Our new script looks like this:
.. literalinclude:: test_script2.py
:language: python
Again, we need to run this code under an MPI environment (refer back to
Tutorial 1 for details). We will get the following output:
.. code-block:: console
Master node
Master: Created Person: Fred 42
Slave node 1
Slave node 2
Slave node 1: Recieved Person: Fred 42
Slave node 1: Created place: Manchester
Slave node 2: Received Person: Fred 42
Slave node 2: Received Place: Manchester
Master: Recieved Place: Manchester
Slave node 2: Created train: Durham
ComplexTrain.init_from_hdf5
We have already set destination: Durham
Slave node 1: Received Train: Fred 42
In order, our script does the following:
1. Set up MPI
2. Create a Person on node 0 (master) and `bcast` it to nodes 1 and 2
3. Create a Place on node 1 and `bcast` it to nodes 0 and 1
4. Create a Train on node 2 and `send` it to node 1 only (on which we call `recv`)
Using these examples, you should be able to see how we can flexibly send
objects around our system.
Tutorial 3 - Scattering and Gathering data
==========================================
Scattering and gathering numpy arrays
-------------------------------------
As well as broadcasting and transferring objects, we may wish to split data up
for analysis. This is done using the `scatter_array` and `gather` functions.
In this script, we look at two ways of scattering data and then how to
gather the data back up for consolidation:
.. literalinclude:: test_script3a.py
:language: python
Scattering Method 1: scatter_array
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The simplest way to scatter data is to use the `scatter_array` function. This
function always operates on the first dimension. I.e., if you have three nodes
and a dataset of size `(100, 15, 23)`, the first node will receive data of size
`(34, 15, 23)` and the remaining two nodes `(33, 15, 23)`.
The code will automatically split the array unequally if necessary.
Scattering Method 2: indices
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
It is sometimes more useful to broadcast an entire dataset to all nodes using
`bcast` and then have the nodes split the data up themselves (for instance, if
they need all of the data for part of the computation but should only work
on some of the data for the full computation).
To do this, we can use the `get_scatter_indices` function. This must be called
with the size of the data which we are "scattering". In the example in the text
above, we would call this function with the argument `100`. The function then
returns a list containing a set of arguments to pass to the `range` function.
In the example above, this would be:
.. code-block:: python
[(0, 34), (34, 67), (67, 100)]
There is an entry in the list for each MPI node. We broadcast this list to
all MPI nodes which are then responsible for extracting just the part of the
data required, for example (assuming that `m` is our `MPIHandler`):
.. code-block:: python
all_indices = m.bcast(None)
my_indices = range(*all_indices[m.rank])
Note that these indices are congruent with the indices used during gather,
so you can safely gather data which has been manually collated in this way.
Gathering numpy arrays
^^^^^^^^^^^^^^^^^^^^^^
Gathering arrays is straightforward. Use the `gather` function, passing the
partial array from each node. There is an example of this in
`test_script3a.py` above. (Note that by default, the data is gathered to the
root node).
Scattering and gathering lists
------------------------------
Scattering and gathering lists is similar to the process for arrays. There are
two differences. The first is that you need to use the `scatter_list` and
`gather_list` routines. The second is that the `gather_list` routine needs
to be told the total length of the combined list, and on nodes where you
want to receive the full list (including the master), you must pass
`return_all` as `True` (the default is `False`).
An example script can be seen below:
.. literalinclude:: test_script3b.py
:language: python
......@@ -15,4 +15,9 @@ Serialisation
MPI
---
MPI tutorials will be coming soon.
.. toctree::
:maxdepth: 1
mpitutorials/tutorial1
mpitutorials/tutorial2
mpitutorials/tutorial3
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment