This page was generated from a notebook available in the repository for this website.

Python multiprocessing and hierarchical data

In this exercise we will adapt the epidemic model from last time for execution in a single python process. In contrast to the parallelism we discussed last time (thanks to either SLURM or GNU parallel), this method offers the opportunity to save all of the data to a single file. We will use the h5py library to do this. The work pattern in this example is generally useful to researchers who wish to write codes which can use an entire compute node and efficiently write the results in one place.

Requirements

We are using the following requrements file for an Anaconda environment.

dependencies:
  - python=3.7
  - matplotlib
  - scipy
  - numpy
  - nb_conda_kernels
  - au-eoed::gnu-parallel
  - h5py
  - pip

Recall that you can review the instructions for creating an environment.

In [1]:

import os,sys
import h5py
import json
import time
import functools
import numpy as np
import multiprocessing as mp

Define a simple numerical experiment

The following code was rescued from last time, thanks to a course by Chris Meyers.

In [2]:

class StochasticSIR:

    def __init__(self,beta,gamma,S,I,R):
        self.S = S
        self.I = I
        self.R = R
        self.beta = beta
        self.gamma = gamma
        self.t = 0.
        self.N = S + I + R
        self.trajectory = np.array([[self.S, self.I, self.R]])
        self.times = None

    def step(self):
        transition = None
        # define rates
        didt = self.beta * self.S * self.I
        drdt = self.gamma * self.I
        total_rate = didt + drdt
        if total_rate == 0.:
            return transition, self.t
        # get a random number
        rand = np.random.random()
        # rates determine the event
        if rand < didt/drdt:
            self.S -= 1
            self.I += 1
            transition = 1
        else:
            self.I -= 1
            self.R += 1
            transition = 2
        # the event happens in the future
        dt = np.random.exponential(1./total_rate,1)[0]
        self.t += dt
        return transition, self.t

    def run(self, T=None, make_traj=True):
        """The Gillespie algorithm."""
        if T is None:
            T = sys.maxsize
        self.times = [0.]
        t0 = self.t
        transition = 1
        while self.t < t0 + T:
            transition, t = self.step()
            if not transition:
                return self.t
            if make_traj: self.trajectory = np.concatenate(
                (self.trajectory, [[self.S,self.I,self.R]]), axis=0)
            self.times.append(self.t)
        return

We slightly modify the study_beta function so that it also returns a list of the parameters. We will use these later to keep track of our experiments.

In [3]:

def study_beta(
    beta = .005,gamma = 10,
    N = 100,I = 1,n_expts = 1000):
    """
    Run many experiments for a particular beta 
    to see how many infections spread.
    """
    R = 0
    S = N-I
    result = np.zeros((n_expts,))
    for expt_num in range(len(result)):
        model = StochasticSIR(beta=beta,gamma=gamma,S=S,I=I,R=R)
        model.run()
        result[expt_num] = model.trajectory[-1][2]
    params = dict(beta=beta,gamma=gamma,N=N,I=I,n_expts=n_expts)
    return result,params

In [4]:

# generate some example data
result,params = study_beta()

In [5]:

# we delete the example output files during testing
# h5py will not overwrite files and must always close a file when finished
! rm -f example.hdf5

In the following example, we will save only the resulting data using h5py and then read it back in. We find that the hdf5 objects are represented slightly differently than numpy objects, but we can recast them easily.

In [6]:

result,params = study_beta()
fn = 'example.hdf5'
if not os.path.isfile(fn):
    fobj = h5py.File(fn,'w')
    fobj.create_dataset('result',data=result)
    fobj.close()
else: raise Exception('file exists: %s'%fn)

In [7]:

# read the result
with h5py.File(fn,'r') as fp:
    print(fp)
    for key in fp:
        print('found key: %s'%key)
        # the file object acts like a dict
        print(fp[key])
        # recast as an array
        print(np.array(fp[key])[:10])
<HDF5 file "example.hdf5" (mode r)>
found key: result
<HDF5 dataset "result": shape (1000,), type "<f8">
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]

Now that we have established that we can write different datasets to a single file, we can build our multiprocessing script.

Aside: large parameter sweeps

In this example we will only recapitulate a one-parameter sweep from last time. Many experiments require a much larger search. After we complete a minimum working example (MWE), you may wish to use the following code to expand our one- parameter sweep to two parameters. It’s important to keep in mind that the utility of this would be limited for the SIR model as we have written it, since that model benefits from using the basic reproduction number instead.

In [8]:

import itertools
beta_sweep = np.arange(0.001,0.02,0.001)
gamma_sweep = 1./np.arange(1.0,10+1)
combos = itertools.product(beta_sweep,gamma_sweep)

A minimal multiprocessing example

First we define a simple parameter sweep.

In [9]:

n_expts = 1000
r0_vals = np.arange(0.1,5,0.1)
gamma = 10.0
beta_sweep = r0_vals/gamma
beta_sweep
array([0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, 0.1 , 0.11,
       0.12, 0.13, 0.14, 0.15, 0.16, 0.17, 0.18, 0.19, 0.2 , 0.21, 0.22,
       0.23, 0.24, 0.25, 0.26, 0.27, 0.28, 0.29, 0.3 , 0.31, 0.32, 0.33,
       0.34, 0.35, 0.36, 0.37, 0.38, 0.39, 0.4 , 0.41, 0.42, 0.43, 0.44,
       0.45, 0.46, 0.47, 0.48, 0.49])

In [10]:

# check the number of processors
print("we have %d processors"%mp.cpu_count())
we have 8 processors

The following code includes an MWE for using a multiprocessing pool along with a hierarchical data file. It contains several notable features:

  • We remove the output file during testing.
  • We check to make sure the file does not exist already (h5py will not let you overwrite it).
  • We manipulate files inside of a with block to ensure they are closed correctly.
  • We tested the serial method alongside the parallel one. The asynchronous parallel operations would “fail silently” which made them very hard to debug!
  • We use a “blocking” callback to write the results.
  • The function that writes the data is “decorated” so that it can receive the file pointer.

We will discuss this example further in class. The first order of business is to make sure it goes faster in parallel.

In [11]:

! rm -f out.hdf5

if os.path.isfile('out.hdf5'):
    raise Exception('file exists!')

do_parallel = False

def writer(incoming,fp=None):
    print('.',end='')
    result,params = incoming
    index = len(fp.attrs)
    dset = fp.create_dataset(str(index),data=result)
    fp.attrs[str(index)] = np.string_(json.dumps(params))

start_time = time.time()
if do_parallel:
    with h5py.File('out.hdf5','w') as fp:
        def handle_output(x): return writer(x,fp=fp)
        pool = mp.Pool(mp.cpu_count())
        for beta in beta_sweep:
            pool.apply_async(
                functools.partial(study_beta,gamma=gamma),
                (beta,),
                callback=handle_output)
        pool.close()
        pool.join()
        fp.close()
else:
    # develop in serial to catch silent async errors
    with h5py.File('out.hdf5','w') as fp:
        for index,beta in enumerate(beta_sweep):
            print('.',end='')
            output = study_beta(beta=beta,n_expts=n_expts,gamma=gamma)
            writer(output,fp=fp)
# compare the parallel time to the serial time (~35s)
print('time: %.1fs'%(time.time()-start_time))
..................................................................................................time: 94.9s

Now that we have a useful parallel code, we can analyze the data.

In [12]:

# unpack the data
meta,data = [],[]
with h5py.File('out.hdf5','r') as fp:
    for index,key in enumerate(fp):
        meta.append(json.loads(fp.attrs[key]))
        data.append(np.array(fp[key]))
# look at the data
for index,(m,d) in enumerate(zip(meta,data)):
    print("Experiment %d"%index)
    print("Metadata: %s"%str(m))
    print("Result: %s"%str(d.mean()))
    # we got the idea so break
    if index>2: break
    #
# reformulate the relevant values
epidemic_rate = []
afflicted = []
for index,(m,d) in enumerate(zip(meta,data)):
    epidemic_rate.append((m['beta']*gamma,np.mean(d>1)))
    afflicted.append((m['beta']*gamma,np.mean(d)))
# sort it, since it comes out all jumbled (but why?)
epidemic_rate = sorted(epidemic_rate,key=lambda x:x[0])
afflicted = sorted(afflicted,key=lambda x:x[0])
Experiment 0
Metadata: {'beta': 0.01, 'gamma': 10.0, 'N': 100, 'I': 1, 'n_expts': 1000}
Result: 1.122
Experiment 1
Metadata: {'beta': 0.02, 'gamma': 10.0, 'N': 100, 'I': 1, 'n_expts': 1000}
Result: 1.31
Experiment 2
Metadata: {'beta': 0.11000000000000001, 'gamma': 10.0, 'N': 100, 'I': 1, 'n_expts': 1000}
Result: 83.872
Experiment 3
Metadata: {'beta': 0.12000000000000002, 'gamma': 10.0, 'N': 100, 'I': 1, 'n_expts': 1000}
Result: 87.219

In [13]:

# imports for plotting
import matplotlib as mpl
%matplotlib inline
import matplotlib.pyplot as plt

In [14]:

fig = plt.figure(figsize=(6,6))
ax = fig.add_subplot(211)
ax.plot(*zip(*epidemic_rate))
ax.set_ylabel('probability of an epidemic')
ax = fig.add_subplot(212)
ax.plot(*zip(*afflicted))
ax.set_ylabel('afflicted population')
ax.set_xlabel('basic reproduction rate')
plt.show()

png

This concludes the exercise. Interested readers are welcome to expand the example above so that it performs a sweep over more than one parameter.