Sun, 27 Sep 2009 14:52:50 EST
Processing the Sieve in Python
In a previous post I discussed four methods to multi-thread the Sieve of Erasthones in Python. I concluded that multi-threading didn't increase performance, and in fact could have a significant adverse effect. The global interpretor lock (GIL) prevents threads from running concurrently and thus limits the upside of threading. The use of locks or avoiding the use of shared data can than decrease performance quite a bit.
In this section I'll be using Python's multiprocessing module to 'multi-thread' the Sieve.
The multiprocessing module spawns a number of processes and distributes the calculation amongst them.There is no equivalent to the GIL so I should be able to see some gain in performance as the number of processes increases. On the other hand, spawning processes means that there is startup overhead which may offset any performance gain due to the distribution of its execution across multiple processes. However, I should still be able to investigate how performance scales with the number of processes, and whether the multiprocessing module is able to take advantage of multiple cores. In this post I'll discuss four approaches to distributing the Sieve algorithm, basically following the approaches I discussed earlier when using the multi-threading package.The various approaches differ in the way the load is balanced and whether the state of the sieve is shared.
The source for the code discussed here and in the previous post can be found in prime_share.py in the blog-code package on github.
Sieve of Erasthones
A more extensive discussion can be found in the first post. To find the number of primes below a threshold the Sieve algorithm uses a list of booleans (or 0/1 values). It's the sharing of this list which is a distinction between the approaches discussed below.
The test configuration is the same as in the previous post :A Toshiba A215-S4747 with an AMD Turion 64 X2 (dual-core) processor running Ubuntu 8.04.3 and Python 2.6.2. I won't discuss the performance under Jython or IronPython as they don't have support the multiprocessing module as of yet.
In this section I'll discuss four different implementations of the distributed Sieve. Each implementation is labeled with the name of the implementation in prime_share.py which is part of the blog-code package on github.
- main_smp: no shared data; split the work evenly between the processes.
- main_smp_alt: no shared data; split the sieve between the processes.
- main_smp_shared: share the sieve between the processes.
- main_smp_shared_2: share the sieve between the processes using shared memory.
All implementations, except the last, use the map method of the Pools class in the multiprocessing package. The distributed map method has the same interface as the map built-in function :
map(func, iterable)When the sieve list is not shared some post processing is required. I use the built-in reduce function so we have a map-reduce pattern.
In the last two examples the sieve list is shared amongst the processes. In the first of these the manager class in the multiprocessing package is used. This class manages a server process from which the shared data data is proxied to the processes accessing it. The alternative is using shared memory which is done in the last example.
main_smp : no shared data; split the work evenly
This is the first implementation which uses the Pool class in the multiprocessing module. The Pool class's map method is used to distribute the Sieve across various processes. The results of each calculation are captured and returned in a list by the map function.
def main_smp(top, nthreads) : n = int(top) nthreads = int(nthreads) B = smp_load_balance(nthreads, n) p = multiprocessing.Pool(nthreads) K = p.map(dowork_smp, map(lambda lst : (n, lst, nthreads), B)) PR = transpose(K) prime = p.map(reduce_chunk, PR) return count_primes(prime) def dowork_smp(args) : n, nexti_smp, chunks = args nk = 0 ops = 0 k = nexti_smp L = ( n + 1) *  lim = len(nexti_smp) while 1 : k = nexti_smp[nk] if L[k] == 1 : r = n / k for i in range(nexti_smp, r+1) : ops += 1 L[i*k] = 0 nk += 1 if nk >= lim : break len_L = n + 1 split = len_L / chunks K = range(0, len_L - split + 1, split)+[len_L] Z = [ L[k:k] for k in zip(K, K[1:]) ] return Z def smp_load_balance(th , n) : def operations(t) : return int((n / t) + 1 - t) def find_min(thr_alloc) : min, lst = thr_alloc if min == 0 : return 0 midx = 0 for index in range(1, len(thr_alloc)) : count, lst = thr_alloc[index] if count < min : min = count midx = index return midx lim = int(math.sqrt(n)) + 1 nexti_lb = range(2, lim, 1) if th < 2 : return [nexti_lb] thr_allocs = map(lambda i : (0,  ), range(th)) Z = map(operations, nexti_lb) L = zip(map(operations, nexti_lb), nexti_lb) for i in L : ops, index = i mindex = find_min(thr_allocs) cnt, lst = thr_allocs[mindex] cnt += ops lst.append(index) thr_allocs[mindex] = (cnt, lst) return map(lambda p: p, thr_allocs) def list_reduce(l1, l2) : return map(lambda p : p*p, zip(l1,l2)) def reduce_chunk(C) : return reduce(lambda x, y : x + y, reduce(list_reduce, C)) def transpose(K) : nthreads = len(K) chunks = len(K) X = [ (l, k) for k in range(0, chunks) for l in range(0, nthreads)] len_X = len(X) S = [ X[k:k+nthreads] for k in range(0, len_X, nthreads)] PR = [ [ K[p][p] for p in S[s]] for s in range(0, chunks) ] return PR
Each pool works on an independent set of sieve indices.The indices to work on are chosen such that the amount of work done by each process is roughly the same. This is done in the smp_load_balance procedure, which was discussed in more detail in my previous post. The statement
K = p.map(dowork_smp, map(lambda lst : (n, lst, nthreads), B))
is where the work is distributed between the processes. The results are returned in K.Since the 'sieve list' is not shared the results of each process in the pool need to be combined to form the final result.
This is done by examining each list in the result set. If an index is flagged as a composite number in any one of them, the resulting sieve location is also flagged as a composite number. In other words, each index value across the result lists is treated as a boolean and the results in each list are 'or-ed' together to arrive at the final value. This is expressed as a reduce operation on the results of the map operation. Needless to say, this reduction is going to be time consuming. So I'm distributing this reduction across the pool processes used in the map phase.
In order to speed up the reduction process, the results are not returned as one list,but as a list of partitionings or chunks. Each partitioning corresponds to a contiguous part of the results list. The number of partionings is equal to the number of processes.These partitionings are then combined with equivalent partitionings returned by the other processes in the pool. This is done in the transpose function.
Say we have two processes, and the first one returns [[1,1,1],[1,0,1]] and the second one returns [1,1,1. First of all, if we ignore the chunking, the combination (reduction) of the two results is in fact [1,1,1,1,0,1]. What transpose does is generate a list of equivalent pieces whose reduction can be distributed : [[[1,1,1] , [1,1,1]], [[1,0,1] , [1,1,1]]]. This reduction is distributed amongst the processes in the pool:
prime = p.map(reduce_chunk, PR)
The distributed reduction processes work on a subset of the results returned by the mapping operation earlier. Each process in the distributed reduction returns the number of primes it found in the chunks it received as arguments. In this example that would be [3, 2], resulting in a determination of three primes (zero and one are excluded).timeit package to determine the run time.
For each example, the graph also shows the time taken in the *startup* phase, the *map* phase and the time taken to complete the calculation. *startup* is considered everything up to and including the load balancing.The *map* phase includes the *startup* phase and the first *Pool map* operation.
The increase in *startup* time is proportional to the number of processes that need to be started. The time the *map phase* takes is primarily a function of the *startup* time. However, if you look closely you'll notice that the *map* phases rise somewhat steeper that the *startup* phase. This is due to the fact that the number of operations increases slightly as the number of processes increases. This is explained in my previous post where I used a similar implementation when multi-threading the sieve. Lastly, the total time taken rises much more steeply than either of the two previous phases. This is entirely due to the reduction phase. As the number of processes in increases the number of chunks to be reduced increases as well. This reduction phase seems to be somewhat quadratic (it involves after all transposing a matrix), driving down performance even more.
To summarize, there is no gain to had in distributing the algorithm this way for two reasons: The increase startup time due to the increase in size of the processing pool, is not offset by an increase in processing speed. In addition, the lack of shared data introduces a very inefficient reduction phase.
main_smp_alt: no shared data; split the sieve evenly
The second implementation I looked at is similar to the one called main_nolocks_alt discussed in the
threading post. The sieve is split and each member of the multiprocessing pool receives a piece to process. The code is shown here:
def main_smp_alt(top, nthreads) : n = int(top) nthreads = int(nthreads) ind = range(2, n, 1) B = load_balance(ind, nthreads) p = multiprocessing.Pool(nthreads) K = p.map(dowork_smp_alt, map(lambda lst : (n, lst), B)) prime_smp_alt = reduce(lambda l,r : l + r, K) return count_primes(prime_smp_alt) def dowork_smp_alt(args) : n, irange = args k = 2 lim = int(math.sqrt(n)) + 1 istart, iend = irange L = ( n + 1) *  ifrom = 999999 ito = -1 while 1 : if not (k < lim) : break if not (k < iend) : break if k < istart : s = (istart / k ) + 1 r = (iend / k) + 1 for i in range(s, r) : index = i * k if ifrom > index : ifrom = index if ito < index : ito = index L[i*k] = 0 elif L[k] == 1 : s = 2 r = (iend / k) + 1 for i in range(s, r) : index = i * k if ifrom > index : ifrom = index if ito < index : ito = index L[i*k] = 0 k = k + 1 if ifrom == 4 : ifrom = 0 return L[ifrom: ito + 1]
The reduction phase is very simple. The function do_work_smp_alt returns the part of the sieve it worked on. Consequently the Sieve is the concatenation of the lists returned by the map process. The result are shown here.
Notice that the startup time of this and the previously discussed implementation are roughly in line. The difference is probably due to a slight difference in load on the box as these tests were done at different times.
The second thing to notice is that the performance increases quite a bit from one to two processes. This is not real, but an artifact of the way the test is done.
In this graph I show two runs where I varied the number of processes in the pool from high to low. I start the run with a pool size of fifteen and nine processes respectively, and repeat the calculation as I lower the number of processors in the pool. Any startup effect would then be seen at right hand side of the graph, and the results on the left hand side would be more consistent. You can see that this startup effect clearly in the graph. I have no explanation for it, but it doesn't invalidate the main conclusion which is that the performance of this approach decreases as the number of processes increases.
The reason for this decrease in performance is that the number of operations increases as the number of processes in the pool increases. All processes, except the one working on the first part of the Sieve, have to zero out position in the Sieve, without being able to take advantage of the prime numbers 'discovered' in the first part of the Sieve. The same observation was made when the multi-threaded implementation was discussed. Not having shared data basically hurts performance.
main_smp_shared : share the sieve between the processes.
def main_smp_shared(top, nthreads) : n = int(top) nthreads = int(nthreads) manager = multiprocessing.Manager() prime_s = (n + 1) *  B = smp_load_balance(nthreads, n) p = multiprocessing.Pool(nthreads) L_m = manager.list(prime_s) K = p.map(dowork_smp_shared, map(lambda lst : (n, lst, L_m), B)) return count_primes(L_m) def dowork_smp_shared(args) : n, nexti_shared, L = args nk = 0 ops = 0 k = nexti_shared lim = len(nexti_shared) while 1 : k = nexti_shared[nk] if L[k] == 1 : r = n / k for i in range(nexti_shared, r+1) : ops += 1 L[i*k] = 0 nk += 1 if nk >= lim : break return 
Here's how the shared sieve is created:
prime_s = (n + 1) *  L_m = manager.list(prime_s)
The load balancing was discussed before. There is no reduction phase. The Sieve is shared and the primes can simply be determined by inspection.
The startup phase includes the everything up and including the creation of the process pools. phase 1 adds the creation of the shared list through the list manager. Clearly initializing the manager this quite a bit of processing time and I've reduced the number of repetitions from forty to five. Nevertheless the run times are significantly higher than the cases discussed previously due to the use of the manager to generate a shared data structure. According to the documentation the manager adds an additional process to manage the shared data structure. This is useful if the calculation was distributed across multiple machines, but it's clearly overkill here.
That said, notice that performance improves as the number of threads increases from one to two and three. The AMD Turion processor is a two core processor so you'd expect that. Beyond three processes the returns diminish as processes start interfering with each other. Because the sieve is shared between processes the number of operations remains the same regardless of the size of the process pool. Therefore, the increase in performance is entirely due to the fact that the multiprocessing module takes advantage of the multiple cores on a machine.
Still, in absolute terms the performance here is not particularly good. So let's move on to the next and final approach
main_smp_shared_2 : share the sieve between the processes using shared memory.
The other way to share data between processes is through shared ctypes ojects.ctypes use shared memory which, according to the documentation, 'can be inherited by child processes'. In order to use them, we need to abandon Pool.map and rewrite the process slightly using Process class.
If you try to use ctypes with Pools you get this exception thrown at you if you do:
RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance
The rewrite is trivial and in fact looks very similar to the map approach used in the previous examples.
def main_smp_shared_2(top, nthreads) : n = int(top) nthreads = int(nthreads) prime = (n + 1) *  B = smp_load_balance(nthreads, n) arr = multiprocessing.Array('i',prime) procs = map(create_process, map(lambda lst : (n, lst, arr), B)) map(lambda p : p.start(), procs) map(lambda p : p.join(), procs) prime = arr[:] return count_primes(prime) def create_process(argv) : return multiprocessing.Process(target=dowork_smp_shared_2, args=(argv,)) def dowork_smp_shared_2(args) : n, nexti_sh2, L = args nk = 0 ops = 0 k = nexti_sh2 lim = len(nexti_sh2) while 1 : k = nexti_sh2[nk] if L[k] == 1 : r = n / k for i in range(nexti_sh2, r+1) : ops += 1 L[i*k] = 0 nk += 1 if nk >= lim : break return L
The results of the test are shown here.
There is no reduction phase as the processes share the sieve list. Therefore any change in performance is due to the distribution of the calculation across multiple instances. Notice the increase in performance when the number of processors increase from one to two. Performance starts to decrease after that as adding more processes creates more load on the box.
This post and the previous post use the distribution of the Sieve of Erasthones as way to explore multi-threading and multi-processing in Python.
I explored two simple ways to avoid the use of shared data. Both approaches lead to an increase in the number of total operations required to get the final Sieve. In both cases additional processing steps are required which can add significant processing time, offsetting any potential performance gain due to the distribution of the calculation, regardless of whether I use multi-threading or multi-processing.
When shared data is used, the [global interpretor lock (GIL)](a href=www.dabeaz.com/python/GIL.pdf) in the multi-threading module puts a hard floor on any potential performance increase.
The multiprocessing module does show a performance when the number of processes is roughly equal to the number of cores. However, an algorithm as simple as the Sieve doesn't allow for amortization of the startup cost, and the performance is significantly worse when multi-threading is used.
However, in cases where the startup costs can be amortized successfully the multiprocessing module may well lead to a gain in performance.