"post"

Sun, 27 Sep 2009 14:52:50 EST

## Processing the Sieve in Python

In a previous post I discussed four methods to multi-thread the Sieve of Erasthones in Python. I concluded that multi-threading didn't increase performance, and in fact could have a significant adverse effect. The global interpretor lock (GIL) prevents threads from running concurrently and thus limits the upside of threading. The use of locks or avoiding the use of shared data can than decrease performance quite a bit.

In this section I'll be using Python's multiprocessing module to 'multi-thread' the *Sieve*.

The multiprocessing module spawns a number of processes and distributes the calculation amongst them.There is no equivalent to the GIL so I should be able to see some gain in performance as the number of processes increases. On the other hand, spawning processes means that there is startup overhead which may offset any performance gain due to the distribution of its execution across multiple processes. However, I should still be able to investigate how performance scales with the number of processes, and whether the multiprocessing module is able to take advantage of multiple cores. In this post I'll discuss four approaches to distributing the *Sieve* algorithm, basically following the approaches I discussed earlier when using the multi-threading package.The various approaches differ in the way the load is balanced and whether the state of the sieve is shared.

The source for the code discussed here and in the previous post can be found in *prime_share.py* in the blog-code package on github.

## Sieve of Erasthones

A more extensive discussion can be found in the first post. To find the number of primes below a threshold the *Sieve* algorithm uses a list of booleans (or 0/1 values). It's the sharing of this list which is a distinction between the approaches discussed below.

## Test Configuration

The test configuration is the same as in the previous post :A Toshiba A215-S4747 with an AMD Turion 64 X2 (dual-core) processor running Ubuntu 8.04.3 and Python 2.6.2. I won't discuss the performance under Jython or IronPython as they don't have support the multiprocessing module as of yet.

## Various Implementations

In this section I'll discuss four different implementations of the distributed *Sieve*. Each implementation is labeled with the name of the implementation in *prime_share.py* which is part of the blog-code package on github.

- main_smp: no shared data; split the work evenly between the processes.
- main_smp_alt: no shared data; split the sieve between the processes.
- main_smp_shared: share the sieve between the processes.
- main_smp_shared_2: share the sieve between the processes using shared memory.

All implementations, except the last, use the map method of the Pools class in the multiprocessing package. The distributed map method has the same interface as the map built-in function : `map(func, iterable)`

When the sieve list is not shared some post processing is required. I use the built-in reduce function so we have a map-reduce pattern.

In the last two examples the sieve list is shared amongst the processes. In the first of these the manager class in the multiprocessing package is used. This class manages a server process from which the shared data data is proxied to the processes accessing it. The alternative is using shared memory which is done in the last example.

### main_smp : no shared data; split the work evenly

This is the first implementation which uses the *Pool* class in the multiprocessing module. The *Pool* class's *map* method is used to distribute the *Sieve* across various processes. The results of each calculation are captured and returned in a list by the map function.

```
def main_smp(top, nthreads) :
n = int(top)
nthreads = int(nthreads)
B = smp_load_balance(nthreads, n)
p = multiprocessing.Pool(nthreads)
K = p.map(dowork_smp, map(lambda lst : (n, lst, nthreads), B))
PR = transpose(K)
prime = p.map(reduce_chunk, PR)
return count_primes(prime)
def dowork_smp(args) :
n, nexti_smp, chunks = args
nk = 0
ops = 0
k = nexti_smp[0]
L = ( n + 1) * [1]
lim = len(nexti_smp)
while 1 :
k = nexti_smp[nk]
if L[k] == 1 :
r = n / k
for i in range(nexti_smp[0], r+1) :
ops += 1
L[i*k] = 0
nk += 1
if nk >= lim : break
len_L = n + 1
split = len_L / chunks
K = range(0, len_L - split + 1, split)+[len_L]
Z = [ L[k[0]:k[1]] for k in zip(K, K[1:]) ]
return Z
def smp_load_balance(th , n) :
def operations(t) :
return int((n / t) + 1 - t)
def find_min(thr_alloc) :
min, lst = thr_alloc[0]
if min == 0 :
return 0
midx = 0
for index in range(1, len(thr_alloc)) :
count, lst = thr_alloc[index]
if count < min :
min = count
midx = index
return midx
lim = int(math.sqrt(n)) + 1
nexti_lb = range(2, lim, 1)
if th < 2 :
return [nexti_lb]
thr_allocs = map(lambda i : (0, [] ), range(th))
Z = map(operations, nexti_lb)
L = zip(map(operations, nexti_lb), nexti_lb)
for i in L :
ops, index = i
mindex = find_min(thr_allocs)
cnt, lst = thr_allocs[mindex]
cnt += ops
lst.append(index)
thr_allocs[mindex] = (cnt, lst)
return map(lambda p: p[1], thr_allocs)
def list_reduce(l1, l2) :
return map(lambda p : p[0]*p[1], zip(l1,l2))
def reduce_chunk(C) :
return reduce(lambda x, y : x + y, reduce(list_reduce, C))
def transpose(K) :
nthreads = len(K)
chunks = len(K[0])
X = [ (l, k) for k in range(0, chunks) for l in range(0, nthreads)]
len_X = len(X)
S = [ X[k:k+nthreads] for k in range(0, len_X, nthreads)]
PR = [ [ K[p[0]][p[1]] for p in S[s]] for s in range(0, chunks) ]
return PR
```

Each pool works on an independent set of sieve indices.The indices to work on are chosen such that the amount of work done by each process is roughly the same. This is done in the *smp_load_balance* procedure, which was discussed in more detail in my previous post. The statement

`K = p.map(dowork_smp, map(lambda lst : (n, lst, nthreads), B)) `

is where the work is distributed between the processes. The results are returned in K.Since the 'sieve list' is not shared the results of each process in the pool need to be combined to form the final result.

This is done by examining each list in the result set. If an index is flagged as a composite number in any one of them, the resulting sieve location is also flagged as a composite number. In other words, each index value across the result lists is treated as a boolean and the results in each list are 'or-ed' together to arrive at the final value. This is expressed as a reduce operation on the results of the map operation. Needless to say, this reduction is going to be time consuming. So I'm distributing this reduction across the pool processes used in the map phase.

In order to speed up the reduction process, the results are not returned as one list,but as a list of partitionings or chunks. Each partitioning corresponds to a contiguous part of the results list. The number of partionings is equal to the number of processes.These partitionings are then combined with equivalent partitionings returned by the other processes in the pool. This is done in the *transpose* function.

Say we have two processes, and the first one returns [[1,1,1],[1,0,1]] and the second one returns [1,1,1. First of all, if we ignore the chunking, the combination (reduction) of the two results is in fact [1,1,1,1,0,1]. What *transpose* does is generate a list of equivalent pieces whose reduction can be distributed : [[[1,1,1] , [1,1,1]], [[1,0,1] , [1,1,1]]]. This reduction is distributed amongst the processes in the pool:

` prime = p.map(reduce_chunk, PR) `

The distributed reduction processes work on a subset of the results returned by the mapping operation earlier. Each process in the distributed reduction returns the number of primes it found in the chunks it received as arguments. In this example that would be [3, 2], resulting in a determination of three primes (zero and one are excluded).

This graph shows the results for this example, labelled as 'smp' and the next one, labelled as 'smp alt'. The x-axis shows the number of processes that are part of the pool. The y-axis shows the execution time for forty repeats of the*Sieve*algorithm to determine the number of primes less than 10000. I used Python's timeit package to determine the run time.

For each example, the graph also shows the time taken in the *startup* phase, the *map* phase and the time taken to complete the calculation. *startup* is considered everything up to and including the load balancing.The *map* phase includes the *startup* phase and the first *Pool map* operation.

The increase in *startup* time is proportional to the number of processes that need to be started. The time the *map phase* takes is primarily a function of the *startup* time. However, if you look closely you'll notice that the *map* phases rise somewhat steeper that the *startup* phase. This is due to the fact that the number of operations increases slightly as the number of processes increases. This is explained in my previous post where I used a similar implementation when multi-threading the sieve. Lastly, the total time taken rises much more steeply than either of the two previous phases. This is entirely due to the reduction phase. As the number of processes in increases the number of chunks to be reduced increases as well. This reduction phase seems to be somewhat quadratic (it involves after all transposing a matrix), driving down performance even more.

To summarize, there is no gain to had in distributing the algorithm this way for two reasons: The increase startup time due to the increase in size of the processing pool, is not offset by an increase in processing speed. In addition, the lack of shared data introduces a very inefficient reduction phase.

### main_smp_alt: no shared data; split the sieve evenly

The second implementation I looked at is similar to the one called *main_nolocks_alt* discussed in the

threading post. The sieve is split and each member of the multiprocessing pool receives a piece to process. The code is shown here:

```
def main_smp_alt(top, nthreads) :
n = int(top)
nthreads = int(nthreads)
ind = range(2, n, 1)
B = load_balance(ind, nthreads)
p = multiprocessing.Pool(nthreads)
K = p.map(dowork_smp_alt, map(lambda lst : (n, lst), B))
prime_smp_alt = reduce(lambda l,r : l + r, K)
return count_primes(prime_smp_alt)
def dowork_smp_alt(args) :
n, irange = args
k = 2
lim = int(math.sqrt(n)) + 1
istart, iend = irange
L = ( n + 1) * [1]
ifrom = 999999
ito = -1
while 1 :
if not (k < lim) : break
if not (k < iend) : break
if k < istart :
s = (istart / k ) + 1
r = (iend / k) + 1
for i in range(s, r) :
index = i * k
if ifrom > index :
ifrom = index
if ito < index :
ito = index
L[i*k] = 0
elif L[k] == 1 :
s = 2
r = (iend / k) + 1
for i in range(s, r) :
index = i * k
if ifrom > index :
ifrom = index
if ito < index :
ito = index
L[i*k] = 0
k = k + 1
if ifrom == 4 :
ifrom = 0
return L[ifrom: ito + 1]
```

The reduction phase is very simple. The function do_work_smp_alt returns the part of the sieve it worked on. Consequently the Sieve is the concatenation of the lists returned by the map process. The result are shown here.

Notice that the *startup* time of this and the previously discussed implementation are roughly in line. The difference is probably due to a slight difference in load on the box as these tests were done at different times.

The second thing to notice is that the performance increases quite a bit from one to two processes. This is not real, but an artifact of the way the test is done.

In this graph I show two runs where I varied the number of processes in the pool from high to low. I start the run with a pool size of fifteen and nine processes respectively, and repeat the calculation as I lower the number of processors in the pool. Any startup effect would then be seen at right hand side of the graph, and the results on the left hand side would be more consistent. You can see that this startup effect clearly in the graph. I have no explanation for it, but it doesn't invalidate the main conclusion which is that the performance of this approach decreases as the number of processes increases.

The reason for this decrease in performance is that the number of operations increases as the number of processes in the pool increases. All processes, except the one working on the first part of the Sieve, have to zero out position in the Sieve, without being able to take advantage of the prime numbers 'discovered' in the first part of the Sieve. The same observation was made when the multi-threaded implementation was discussed. Not having shared data basically hurts performance.

### main_smp_shared : share the sieve between the processes.

In this version the Sieve is shared between the processes in the pool. The shared sieve is created using the manager class in the multiprocessing package.

```
def main_smp_shared(top, nthreads) :
n = int(top)
nthreads = int(nthreads)
manager = multiprocessing.Manager()
prime_s = (n + 1) * [1]
B = smp_load_balance(nthreads, n)
p = multiprocessing.Pool(nthreads)
L_m = manager.list(prime_s)
K = p.map(dowork_smp_shared, map(lambda lst : (n, lst, L_m), B))
return count_primes(L_m)
def dowork_smp_shared(args) :
n, nexti_shared, L = args
nk = 0
ops = 0
k = nexti_shared[0]
lim = len(nexti_shared)
while 1 :
k = nexti_shared[nk]
if L[k] == 1 :
r = n / k
for i in range(nexti_shared[0], r+1) :
ops += 1
L[i*k] = 0
nk += 1
if nk >= lim : break
return []
```

Here's how the shared sieve is created:

```
prime_s = (n + 1) * [1]
L_m = manager.list(prime_s)
```

The load balancing was discussed before. There is no reduction phase. The Sieve is shared and the primes can simply be determined by inspection.

The *startup* phase includes the everything up and including the creation of the process pools. *phase 1* adds the creation of the shared list through the list manager. Clearly initializing the *manager* this quite a bit of processing time and I've reduced the number of repetitions from forty to five. Nevertheless the run times are significantly higher than the cases discussed previously due to the use of the manager to generate a shared data structure. According to the documentation the *manager* adds an additional process to manage the shared data structure. This is useful if the calculation was distributed across multiple machines, but it's clearly overkill here.

That said, notice that performance improves as the number of threads increases from one to two and three. The AMD Turion processor is a two core processor so you'd expect that. Beyond three processes the returns diminish as processes start interfering with each other. Because the sieve is shared between processes the number of operations remains the same regardless of the size of the process pool. Therefore, the increase in performance is entirely due to the fact that the multiprocessing module takes advantage of the multiple cores on a machine.

Still, in absolute terms the performance here is not particularly good. So let's move on to the next and final approach

### main_smp_shared_2 : share the sieve between the processes using shared memory.

The other way to share data between processes is through shared ctypes ojects.*ctypes* use shared memory which, according to the documentation, 'can be inherited by child processes'. In order to use them, we need to abandon *Pool.map* and rewrite the process slightly using Process class.

If you try to use *ctypes* with *Pools* you get this exception thrown at you if you do:

` RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance `

The rewrite is trivial and in fact looks very similar to the map approach used in the previous examples.

```
def main_smp_shared_2(top, nthreads) :
n = int(top)
nthreads = int(nthreads)
prime = (n + 1) * [1]
B = smp_load_balance(nthreads, n)
arr = multiprocessing.Array('i',prime)
procs = map(create_process, map(lambda lst : (n, lst, arr), B))
map(lambda p : p.start(), procs)
map(lambda p : p.join(), procs)
prime = arr[:]
return count_primes(prime)
def create_process(argv) :
return multiprocessing.Process(target=dowork_smp_shared_2, args=(argv,))
def dowork_smp_shared_2(args) :
n, nexti_sh2, L = args
nk = 0
ops = 0
k = nexti_sh2[0]
lim = len(nexti_sh2)
while 1 :
k = nexti_sh2[nk]
if L[k] == 1 :
r = n / k
for i in range(nexti_sh2[0], r+1) :
ops += 1
L[i*k] = 0
nk += 1
if nk >= lim : break
return L
```

The results of the test are shown here.

I should note that ran the test by starting at a higher process count than shown here and counting down. I dropped the results for the higher process count, as it clearly showed the startup effect mentioned in the discussion of the results of the*main_smp_alt*implementation. The

*startup*phase in this example includes everything up to and including the creation of the shared array.

*phase 1*adds the process creation.

There is no reduction phase as the processes share the sieve list. Therefore any change in performance is due to the distribution of the calculation across multiple instances. Notice the increase in performance when the number of processors increase from one to two. Performance starts to decrease after that as adding more processes creates more load on the box. In this graph I show the performance as a function of the number of processes from 1 to 20. Again, changed the number of processes from hight to low, to remove startup effects on the lower end of the graph. You can clearly see the improvement in performance as you change from one to two and three processes, but that improvement diminished rapidly.

## Conclusions

This post and the previous post use the distribution of the Sieve of Erasthones as way to explore multi-threading and multi-processing in Python.

I explored two simple ways to avoid the use of shared data. Both approaches lead to an increase in the number of total operations required to get the final *Sieve*. In both cases additional processing steps are required which can add significant processing time, offsetting any potential performance gain due to the distribution of the calculation, regardless of whether I use multi-threading or multi-processing.

When shared data is used, the [global interpretor lock (GIL)](a href=www.dabeaz.com/python/GIL.pdf) in the multi-threading module puts a hard floor on any potential performance increase.

The multiprocessing module does show a performance when the number of processes is roughly equal to the number of cores. However, an algorithm as simple as the *Sieve* doesn't allow for amortization of the startup cost, and the performance is significantly worse when multi-threading is used.

However, in cases where the startup costs can be amortized successfully the multiprocessing module may well lead to a gain in performance.

3463066370