Skip to content Skip to sidebar Skip to footer

How To Pass Parameters Other Than Data Through Pool.imap() Function For Multiprocessing In Python?

I am working on hyperspectral images. To reduce the noise from the image I am using wavelet transformation using pywt package. When I am doing this normally(serial processing) it's

Solution 1:

[This isn't a direct answer to the question, but a clearer follow up query than trying the below via the small comment box]

As a quick check, pass in an iterator counter to spec_trans and return it back out (as well as your result) - and push it into a separate list, transformedXseq or something - and then compare to your input sequence. i.e.

def spec_trans(d,wav_fam,threshold_val,thresh_type, iCount):

    data=np.array(d,dtype=np.float64)
    data_dec=decomposition(data,wav_fam)
    data_t=thresholding(data_dec,threshold_val,thresh_type)
    data_rec=reconstruction(data_t,wav_fam)

    return data_rec, iCount

and then within main

jobs=[]
iJobs = 0
for dataBand in xmp:
   jobs.append(p.apply_async(spec_trans,args=(dataBand,wav_fam,threshold_val,thresh_type, iJobs)))
   iJobs = iJobs + 1 

transformedX=[]
transformedXseq=[]
for jobBit in jobs:
    res = jobBit.get()
    transformedX.append(res[0])
    transformedXseq.append(res[1])

... and check the list transformedXseq to see if you've gathered the jobs back up in the sequence you submitted them. It should match!

Solution 2:

Assuming wav_fam, threshold_val and thresh_type do not vary from call to call, first arrange for these arguments to be the first arguments to worker function spec_trans:

defspec_trans(wav_fam, threshold_val, thresh_type, d):

Now I don't see where in your pool-creation block you have defined xmp, but presumably this is an iterable. You need to modify this code as follows:

from functools import partial

defcompute_chunksize(pool_size, iterable_size):
    chunksize, remainder = divmod(iterable_size, 4 * pool_size)
    if remainder:
        chunksize += 1return chunksize

if __name__ == '__main__':

    X=tifffile.imread('data/Classification/university.tif')
    #take paramaters
    threshold_val=float(input("Enter the value for image thresholding: "))
    print("The available wavelet functions:",pywt.wavelist())
    wav_fam=input("Choose a wavelet function for transformation: ")
    threshold_type=['hard','soft']
    print("The available wavelet functions:",threshold_type)
    thresh_type=input("Choose a type for threshholding technique: ")

    start=time.time()
    p = mp.Pool(4)
    # first 3 arguments to spec_trans will be wav_fam, threshold_val and thresh_type 
    worker = partial(spec_trans, wav_fam, threshold_val, thresh_type)
    suitable_chunksize = compute_chunksize(4, len(xmp))
    transformedX = list(p.imap(worker, xmp, chunksize=suitable_chunksize))
    end=time.time()

To obtain improved performance over using apply_async, you must use a "suitable chunksize" value with imap. Function compute_chunksize can be used for computing such a value based on the size of your pool, which is 4, and the size of the iterable being passed to imap, which would be len(xmp). If the size of xmp is small enough such that the chunksize value computed is 1, I don't really see how imap would be significantly more performant over apply_async.

Of course, you might as well just use:

    transformedX = p.map(worker, xmp)

And let the pool compute its own suitable chunksize. imap has an advantage over map when the iterable is very large and not already a list. For map to compute a suitable chunksize it would first have to convert the iterable to a list just to get its length and this could be memory inefficient. But if you know the length (or approximate length) of the iterable, then by using imap you can explicitly set a chunksize without having to convert the iterable to a list. The other advantage of imap_unordered over map is that you can process the results for the individual tasks as they become available whereas with map you only get results when all the submitted tasks are complete.

Update

If you want to catch possible exceptions thrown by individual tasks submitted to your worker function, then stick with using imap, and use the following code to iterate the results returned by imap:

#transformedX = list(p.imap(worker, xmp, chunksize=suitable_chunksize))
    transformedX = []
    results = p.imap(worker, xmp, chunksize=suitable_chunksize)
    import traceback
    whileTrue:
        try:
            result = next(results)
        except StopIteration: # no more resultsbreakexcept Exception as e:
            print('Exception occurred:', e)
            traceback.print_exc() # print stacktraceelse:
            transformedX.append(result)

Post a Comment for "How To Pass Parameters Other Than Data Through Pool.imap() Function For Multiprocessing In Python?"