problem with window sizes when parallelizing a funciton


javier lopatin
 

Hi all,
I'm currently using rasterio to process time series analysis of data. Because the raster tiles are too big, I'm trying to implement a windowed function with parallel processing. I tried your tutorial on concurrent processing (
https://github.com/mapbox/rasterio/blob/master/docs/topics/concurrency.rst) and it works beautify with my own functions. I'm just wondering why it is only working with window sizes of 128 (profile.update(blockxsize=128,blockysize=128tiled=True)​), just like in the example. If I change these values to e.g. 200 or 500 it does not work anymore. I'm currently trying with a 3,000 X 3,000 raster size. Because loops are slow, I assume that increasing a bit the window size could be helpful. The error message that I receive if I change blockxsize is:

Traceback (most recent call last):
  File "TSA.py", line 252, in <module>
    main(args.inputImage, args.outputImage, args.j)
  File "TSA.py", line 225, in main
    parallel_process(infile, outfile, n_jobs)
  File "TSA.py", line 187, in parallel_process
    with rasterio.open(outfile, "w", **profile) as dst:
  File "/home/javier/miniconda3/lib/python3.6/site-packages/rasterio/env.py", line 398, in wrapper
    return f(*args, **kwds)
  File "/home/javier/miniconda3/lib/python3.6/site-packages/rasterio/__init__.py", line 226, in open
    **kwargs)
  File "rasterio/_io.pyx", line 1129, in rasterio._io.DatasetWriterBase.__init__
  File "rasterio/_err.pyx", line 194, in rasterio._err.exc_wrap_pointer
rasterio._err.CPLE_AppDefinedError: _TIFFVSetField:/home/javier/Documents/SF_delta/Sentinel/TSA/X-004_Y-001/2015-2019_001-365_LEVEL4_TSA_SEN2L_EVI_C0_S0_FAVG_TY_C95T_FBY_TSA.tif: Bad value 500 for "TileWidth" tag
 

The used function is below (see the whole script at 
https://github.com/JavierLopatin/Python-Remote-Sensing-Scripts/blob/master/TSA.py):

def parallel_process(infile, outfile, n_jobs):
    """
    Process infile block-by-block with parallel processing
    and write to a new file.
    """
    from tqdm import tqdm # progress bar
 
    with rasterio.Env():
 
        with rasterio.open(infile) as src:
 
            # Create a destination dataset based on source params. The
            # destination will be tiled, and we'll process the tiles
            # concurrently.
            profile = src.profile
            profile.update(blockxsize=128, blockysize=128,
                           count=6, dtype='float64', tiled=True)
 
            with rasterio.open(outfile, "w", **profile) as dst:
 
                # Materialize a list of destination block windows
                # that we will use in several statements below.
                windows = [window for ij, window in dst.block_windows()]
 
                # This generator comprehension gives us raster data
                # arrays for each window. Later we will zip a mapping
                # of it with the windows list to get (window, result)
                # pairs.
                data_gen = (src.read(window=window) for window in windows)
 
                with concurrent.futures.ProcessPoolExecutor(
                    max_workers=n_jobs
                ) as executor:
 
                    # We map the TSA() function over the raster
                    # data generator, zip the resulting iterator with
                    # the windows list, and as pairs come back we
                    # write data to the destination dataset.
                    for window, result in zip(
                        tqdm(windows), executor.map(TSA, data_gen)
                    ):
                        dst.write(result, window=window)

Hope you guys can help. 

Cheers, Javier


Alan Snow
 

Hi Javier,

The blocksize is not a dynamic attribute. It represents the blocksize of the file on disk. So, if you want to change the blocksize, you will need to write it to a  new file with a new blocksize.

Hopefully this helps,
Alan


Sean Gillies
 

Alan: the function does indeed write to a new file with blocksizes set appropriately.

Javier: the TIFF spec states that tile width (and this goes for height, I presume) must be a multiple of 16. Neither GDAL's GeoTIFF format page nor Rasterio docs make this clear. I'll add a note. I think that if you try 512 instead of 128, you'll have success.


On Wed, May 22, 2019 at 7:29 AM Alan Snow <alansnow21@...> wrote:
Hi Javier,

The blocksize is not a dynamic attribute. It represents the blocksize of the file on disk. So, if you want to change the blocksize, you will need to write it to a  new file with a new blocksize.

Hopefully this helps,
Alan



--
Sean Gillies


Alan Snow
 

You are indeed correct Sean. Thanks for catching that and providing the correct answer! It seems I skimmed through the original message too fast.


javier lopatin
 

Hi Sean, thanks a lot. This was very helpfull indeed, and the performance improved a bit when changing the size to 512. 
Grate job on the library, congrats!
Cheers,
Javier

El mié., 22 may. 2019 a las 23:36, Alan Snow (<alansnow21@...>) escribió:
You are indeed correct Sean. Thanks for catching that and providing the correct answer! It seems I skimmed through the original message too fast.



--
Javier Lopatin
Institute of Geography and Geoecology
Karlsruhe Institute of Technology (KIT)
javier.lopatin@...


javier lopatin
 

Hi Guys!

I'm still working with the parallelization of raster processes with rasterio, and I came across a problem that you might know about. I've some raster stacks of time series data from which I want to obtain a interpolation signature. In practice I've a function to process the raster chunks in parallel with rasterio (calling a function main()):

def _parallel_process(inData, outData, main, count,  n_jobs=4, chuckSize=256):
    with rasterio.Env():
        with rasterio.open(inData) as src:
            profile = src.profile
            profile.update(blockxsize=chuckSize, blockysize=chuckSize,
                           count=count, dtype='float64', tiled=True)
            with rasterio.open(outData, "w", **profile) as dst:
                windows = [window for ij, window in dst.block_windows()]
                data_gen = (src.read(window=window) for window in windows)
                with concurrent.futures.ProcessPoolExecutor(
                    max_workers=n_jobs
                ) as executor:
                    for window, result in zip(
                        tqdm(windows), executor.map(main, data_gen)
                    ):
                        dst.write(result, window=window)

And I have another function that calls _parallel_process:

def PhenoShape(inData, outData, dates=None, nan_replace=None, rollWindow=None, nGS=46, chuckSize=256, n_jobs=4):
    def main(dstack):
        # load raster as a xarray
        xarray = xr.DataArray(dstack)
        xarray.coords['dim_0'] = dates
        xarray.coords['doy'] = xarray.dim_0.dt.dayofyear
        # sort basds according to day-of-the-year
        xarray = xarray.sortby('doy')
        # rearrange time dimension for smoothing and interpolation
        xarray['time'] = xarray['doy']
        # turn a value to NaN
        if nan_replace is not None:
            xarray = xarray.where(xarray.values != nan_replace)
        # rolling average using moving window
        if rollWindow is not None:
            xarray = xarray.rolling(dim_0=rollWindow, center=True).mean()
        # prepare inputs to getPheno
        x = xarray.doy.values
        y = xarray.values
        # get phenology shape accross the time axis
        return np.apply_along_axis(_getPheno, 0, y, x, nGS)
    # apply PhenoShape with parallel processing
    try:
        _parallel_process(inData, outData, main, nGS, n_jobs, chuckSize)
    except AttributeError:
        print('ERROR somehow.. :(')
 
This works really fine! But when I import the script as a library (functions inside phenology.py):

import phenology as phen
phen.PhenoShape(inData=inData, outData=outData, dates=dates, rollWindow=3,
    nan_replace=-32767, nGS=46, chuckSize=256, n_jobs=4)

I get the following error:

  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'
Traceback (most recent call last):
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'
Traceback (most recent call last):
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'
Traceback (most recent call last):
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'
Traceback (most recent call last):
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'

Any ideas? The main idea here is to call this functions to have cleaner script at the end.
Many thanks in advance.
Cheers,
Javier L.

El jue., 23 may. 2019 a las 11:13, javier lopatin via Groups.Io (<javierlopatin=gmail.com@groups.io>) escribió:
Hi Sean, thanks a lot. This was very helpfull indeed, and the performance improved a bit when changing the size to 512. 
Grate job on the library, congrats!
Cheers,
Javier

El mié., 22 may. 2019 a las 23:36, Alan Snow (<alansnow21@...>) escribió:
You are indeed correct Sean. Thanks for catching that and providing the correct answer! It seems I skimmed through the original message too fast.



--
Javier Lopatin
Institute of Geography and Geoecology
Karlsruhe Institute of Technology (KIT)
javier.lopatin@...



--
Javier Lopatin
Institute of Geography and Geoecology
Karlsruhe Institute of Technology (KIT)
javier.lopatin@...


Dion Häfner <dion.haefner@...>
 

Hey Javier,

this is not really a rasterio problem, mostly a Python peculiarity.

In general, you cannot pass functions defined in a local closure to a different process. Just move your main function to global scope and you should be fine. You can use functools.partial to fix some optional arguments of your main function and achieve the same thing as with your closure.

Best,
Dion

On 13/06/2019 15.05, javier lopatin via Groups.Io wrote:
Hi Guys!

I'm still working with the parallelization of raster processes with rasterio, and I came across a problem that you might know about. I've some raster stacks of time series data from which I want to obtain a interpolation signature. In practice I've a function to process the raster chunks in parallel with rasterio (calling a function main()):

def _parallel_process(inData, outData, main, count,  n_jobs=4, chuckSize=256):
    with rasterio.Env():
        with rasterio.open(inData) as src:
            profile = src.profile
            profile.update(blockxsize=chuckSize, blockysize=chuckSize,
                           count=count, dtype='float64', tiled=True)
            with rasterio.open(outData, "w", **profile) as dst:
                windows = [window for ij, window in dst.block_windows()]
                data_gen = (src.read(window=window) for window in windows)
                with concurrent.futures.ProcessPoolExecutor(
                    max_workers=n_jobs
                ) as executor:
                    for window, result in zip(
                        tqdm(windows), executor.map(main, data_gen)
                    ):
                        dst.write(result, window=window)

And I have another function that calls _parallel_process:

def PhenoShape(inData, outData, dates=None, nan_replace=None, rollWindow=None, nGS=46, chuckSize=256, n_jobs=4):
    def main(dstack):
        # load raster as a xarray
        xarray = xr.DataArray(dstack)
        xarray.coords['dim_0'] = dates
        xarray.coords['doy'] = xarray.dim_0.dt.dayofyear
        # sort basds according to day-of-the-year
        xarray = xarray.sortby('doy')
        # rearrange time dimension for smoothing and interpolation
        xarray['time'] = xarray['doy']
        # turn a value to NaN
        if nan_replace is not None:
            xarray = xarray.where(xarray.values != nan_replace)
        # rolling average using moving window
        if rollWindow is not None:
            xarray = xarray.rolling(dim_0=rollWindow, center=True).mean()
        # prepare inputs to getPheno
        x = xarray.doy.values
        y = xarray.values
        # get phenology shape accross the time axis
        return np.apply_along_axis(_getPheno, 0, y, x, nGS)
    # apply PhenoShape with parallel processing
    try:
        _parallel_process(inData, outData, main, nGS, n_jobs, chuckSize)
    except AttributeError:
        print('ERROR somehow.. :(')
 
This works really fine! But when I import the script as a library (functions inside phenology.py):

import phenology as phen
phen.PhenoShape(inData=inData, outData=outData, dates=dates, rollWindow=3,
    nan_replace=-32767, nGS=46, chuckSize=256, n_jobs=4)

I get the following error:

  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'
Traceback (most recent call last):
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'
Traceback (most recent call last):
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'
Traceback (most recent call last):
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'
Traceback (most recent call last):
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'

Any ideas? The main idea here is to call this functions to have cleaner script at the end.
Many thanks in advance.
Cheers,
Javier L.

El jue., 23 may. 2019 a las 11:13, javier lopatin via Groups.Io (<javierlopatin=gmail.com@groups.io>) escribió:
Hi Sean, thanks a lot. This was very helpfull indeed, and the performance improved a bit when changing the size to 512. 
Grate job on the library, congrats!
Cheers,
Javier

El mié., 22 may. 2019 a las 23:36, Alan Snow (<alansnow21@...>) escribió:
You are indeed correct Sean. Thanks for catching that and providing the correct answer! It seems I skimmed through the original message too fast.


--
Javier Lopatin
Institute of Geography and Geoecology
Karlsruhe Institute of Technology (KIT)
javier.lopatin@...


--
Javier Lopatin
Institute of Geography and Geoecology
Karlsruhe Institute of Technology (KIT)
javier.lopatin@...


javier lopatin
 

Hi Dion, Thanks for the tip. I thought that that might be the problem. Any ideas of how to pass a function to executor.map that need several arguments? so far I'm only able to pass functions with a dstack as only argument, like:

def main(x):
    ...
    return something
executor.map(main, x)

so if I put the function main outside in the global scope I now need to have several args in the function:

def main(x, arg1, arg2, arg3, ...):
    ...
   return something
 
I have no idea how to pass these args to executor.map(). Any ideas?
I really appreciate this!
Cheers,
Javier

El jue., 13 jun. 2019 a las 15:12, Dion Häfner (<dion.haefner@...>) escribió:

Hey Javier,

this is not really a rasterio problem, mostly a Python peculiarity.

In general, you cannot pass functions defined in a local closure to a different process. Just move your main function to global scope and you should be fine. You can use functools.partial to fix some optional arguments of your main function and achieve the same thing as with your closure.

Best,
Dion

On 13/06/2019 15.05, javier lopatin via Groups.Io wrote:
Hi Guys!

I'm still working with the parallelization of raster processes with rasterio, and I came across a problem that you might know about. I've some raster stacks of time series data from which I want to obtain a interpolation signature. In practice I've a function to process the raster chunks in parallel with rasterio (calling a function main()):

def _parallel_process(inData, outData, main, count,  n_jobs=4, chuckSize=256):
    with rasterio.Env():
        with rasterio.open(inData) as src:
            profile = src.profile
            profile.update(blockxsize=chuckSize, blockysize=chuckSize,
                           count=count, dtype='float64', tiled=True)
            with rasterio.open(outData, "w", **profile) as dst:
                windows = [window for ij, window in dst.block_windows()]
                data_gen = (src.read(window=window) for window in windows)
                with concurrent.futures.ProcessPoolExecutor(
                    max_workers=n_jobs
                ) as executor:
                    for window, result in zip(
                        tqdm(windows), executor.map(main, data_gen)
                    ):
                        dst.write(result, window=window)

And I have another function that calls _parallel_process:

def PhenoShape(inData, outData, dates=None, nan_replace=None, rollWindow=None, nGS=46, chuckSize=256, n_jobs=4):
    def main(dstack):
        # load raster as a xarray
        xarray = xr.DataArray(dstack)
        xarray.coords['dim_0'] = dates
        xarray.coords['doy'] = xarray.dim_0.dt.dayofyear
        # sort basds according to day-of-the-year
        xarray = xarray.sortby('doy')
        # rearrange time dimension for smoothing and interpolation
        xarray['time'] = xarray['doy']
        # turn a value to NaN
        if nan_replace is not None:
            xarray = xarray.where(xarray.values != nan_replace)
        # rolling average using moving window
        if rollWindow is not None:
            xarray = xarray.rolling(dim_0=rollWindow, center=True).mean()
        # prepare inputs to getPheno
        x = xarray.doy.values
        y = xarray.values
        # get phenology shape accross the time axis
        return np.apply_along_axis(_getPheno, 0, y, x, nGS)
    # apply PhenoShape with parallel processing
    try:
        _parallel_process(inData, outData, main, nGS, n_jobs, chuckSize)
    except AttributeError:
        print('ERROR somehow.. :(')
 
This works really fine! But when I import the script as a library (functions inside phenology.py):

import phenology as phen
phen.PhenoShape(inData=inData, outData=outData, dates=dates, rollWindow=3,
    nan_replace=-32767, nGS=46, chuckSize=256, n_jobs=4)

I get the following error:

  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'
Traceback (most recent call last):
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'
Traceback (most recent call last):
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'
Traceback (most recent call last):
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'
Traceback (most recent call last):
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'

Any ideas? The main idea here is to call this functions to have cleaner script at the end.
Many thanks in advance.
Cheers,
Javier L.

El jue., 23 may. 2019 a las 11:13, javier lopatin via Groups.Io (<javierlopatin=gmail.com@groups.io>) escribió:
Hi Sean, thanks a lot. This was very helpfull indeed, and the performance improved a bit when changing the size to 512. 
Grate job on the library, congrats!
Cheers,
Javier

El mié., 22 may. 2019 a las 23:36, Alan Snow (<alansnow21@...>) escribió:
You are indeed correct Sean. Thanks for catching that and providing the correct answer! It seems I skimmed through the original message too fast.


--
Javier Lopatin
Institute of Geography and Geoecology
Karlsruhe Institute of Technology (KIT)
javier.lopatin@...


--
Javier Lopatin
Institute of Geography and Geoecology
Karlsruhe Institute of Technology (KIT)
javier.lopatin@...



--
Javier Lopatin
Institute of Geography and Geoecology
Karlsruhe Institute of Technology (KIT)
javier.lopatin@...


Dion Häfner <dion.haefner@...>
 

As I mentioned, the canonical way to do this is to leverage functools.partial.

For example:


    
from functools import partial
from concurrent.futures import ProcessPoolExecutor

    
def worker_function(x, arg1, arg2):
    ...

    
if __name__ == '__main__':
    do_work = partial(worker_function, arg1=0, arg2='foo')
    with ProcessPoolExecutor(4) as executor:
        executor.map(do_work, [1, 2, 3])


The arguments have to be picklable, I think, but they should be in your case.


On 13/06/2019 16.29, javier lopatin via Groups.Io wrote:
Hi Dion, Thanks for the tip. I thought that that might be the problem. Any ideas of how to pass a function to executor.map that need several arguments? so far I'm only able to pass functions with a dstack as only argument, like:

def main(x):
    ...
    return something
executor.map(main, x)

so if I put the function main outside in the global scope I now need to have several args in the function:

def main(x, arg1, arg2, arg3, ...):
    ...
   return something
 
I have no idea how to pass these args to executor.map(). Any ideas?
I really appreciate this!
Cheers,
Javier

El jue., 13 jun. 2019 a las 15:12, Dion Häfner (<dion.haefner@...>) escribió:

Hey Javier,

this is not really a rasterio problem, mostly a Python peculiarity.

In general, you cannot pass functions defined in a local closure to a different process. Just move your main function to global scope and you should be fine. You can use functools.partial to fix some optional arguments of your main function and achieve the same thing as with your closure.

Best,
Dion

On 13/06/2019 15.05, javier lopatin via Groups.Io wrote:
Hi Guys!

I'm still working with the parallelization of raster processes with rasterio, and I came across a problem that you might know about. I've some raster stacks of time series data from which I want to obtain a interpolation signature. In practice I've a function to process the raster chunks in parallel with rasterio (calling a function main()):

def _parallel_process(inData, outData, main, count,  n_jobs=4, chuckSize=256):
    with rasterio.Env():
        with rasterio.open(inData) as src:
            profile = src.profile
            profile.update(blockxsize=chuckSize, blockysize=chuckSize,
                           count=count, dtype='float64', tiled=True)
            with rasterio.open(outData, "w", **profile) as dst:
                windows = [window for ij, window in dst.block_windows()]
                data_gen = (src.read(window=window) for window in windows)
                with concurrent.futures.ProcessPoolExecutor(
                    max_workers=n_jobs
                ) as executor:
                    for window, result in zip(
                        tqdm(windows), executor.map(main, data_gen)
                    ):
                        dst.write(result, window=window)

And I have another function that calls _parallel_process:

def PhenoShape(inData, outData, dates=None, nan_replace=None, rollWindow=None, nGS=46, chuckSize=256, n_jobs=4):
    def main(dstack):
        # load raster as a xarray
        xarray = xr.DataArray(dstack)
        xarray.coords['dim_0'] = dates
        xarray.coords['doy'] = xarray.dim_0.dt.dayofyear
        # sort basds according to day-of-the-year
        xarray = xarray.sortby('doy')
        # rearrange time dimension for smoothing and interpolation
        xarray['time'] = xarray['doy']
        # turn a value to NaN
        if nan_replace is not None:
            xarray = xarray.where(xarray.values != nan_replace)
        # rolling average using moving window
        if rollWindow is not None:
            xarray = xarray.rolling(dim_0=rollWindow, center=True).mean()
        # prepare inputs to getPheno
        x = xarray.doy.values
        y = xarray.values
        # get phenology shape accross the time axis
        return np.apply_along_axis(_getPheno, 0, y, x, nGS)
    # apply PhenoShape with parallel processing
    try:
        _parallel_process(inData, outData, main, nGS, n_jobs, chuckSize)
    except AttributeError:
        print('ERROR somehow.. :(')
 
This works really fine! But when I import the script as a library (functions inside phenology.py):

import phenology as phen
phen.PhenoShape(inData=inData, outData=outData, dates=dates, rollWindow=3,
    nan_replace=-32767, nGS=46, chuckSize=256, n_jobs=4)

I get the following error:

  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'
Traceback (most recent call last):
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'
Traceback (most recent call last):
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'
Traceback (most recent call last):
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'
Traceback (most recent call last):
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'

Any ideas? The main idea here is to call this functions to have cleaner script at the end.
Many thanks in advance.
Cheers,
Javier L.

El jue., 23 may. 2019 a las 11:13, javier lopatin via Groups.Io (<javierlopatin=gmail.com@groups.io>) escribió:
Hi Sean, thanks a lot. This was very helpfull indeed, and the performance improved a bit when changing the size to 512. 
Grate job on the library, congrats!
Cheers,
Javier

El mié., 22 may. 2019 a las 23:36, Alan Snow (<alansnow21@...>) escribió:
You are indeed correct Sean. Thanks for catching that and providing the correct answer! It seems I skimmed through the original message too fast.


--
Javier Lopatin
Institute of Geography and Geoecology
Karlsruhe Institute of Technology (KIT)
javier.lopatin@...


--
Javier Lopatin
Institute of Geography and Geoecology
Karlsruhe Institute of Technology (KIT)
javier.lopatin@...


--
Javier Lopatin
Institute of Geography and Geoecology
Karlsruhe Institute of Technology (KIT)
javier.lopatin@...


javier lopatin
 

Dear Dior, Thanks!, this was perfect. I did't get it the first time, but know it runs smoothly.
Many thanks! 
Javier

El jue., 13 jun. 2019 a las 16:35, Dion Häfner (<dion.haefner@...>) escribió:

As I mentioned, the canonical way to do this is to leverage functools.partial.

For example:


    
from functools import partial
from concurrent.futures import ProcessPoolExecutor

    
def worker_function(x, arg1, arg2):
    ...

    
if __name__ == '__main__':
    do_work = partial(worker_function, arg1=0, arg2='foo')
    with ProcessPoolExecutor(4) as executor:
        executor.map(do_work, [1, 2, 3])


The arguments have to be picklable, I think, but they should be in your case.


On 13/06/2019 16.29, javier lopatin via Groups.Io wrote:
Hi Dion, Thanks for the tip. I thought that that might be the problem. Any ideas of how to pass a function to executor.map that need several arguments? so far I'm only able to pass functions with a dstack as only argument, like:

def main(x):
    ...
    return something
executor.map(main, x)

so if I put the function main outside in the global scope I now need to have several args in the function:

def main(x, arg1, arg2, arg3, ...):
    ...
   return something
 
I have no idea how to pass these args to executor.map(). Any ideas?
I really appreciate this!
Cheers,
Javier

El jue., 13 jun. 2019 a las 15:12, Dion Häfner (<dion.haefner@...>) escribió:

Hey Javier,

this is not really a rasterio problem, mostly a Python peculiarity.

In general, you cannot pass functions defined in a local closure to a different process. Just move your main function to global scope and you should be fine. You can use functools.partial to fix some optional arguments of your main function and achieve the same thing as with your closure.

Best,
Dion

On 13/06/2019 15.05, javier lopatin via Groups.Io wrote:
Hi Guys!

I'm still working with the parallelization of raster processes with rasterio, and I came across a problem that you might know about. I've some raster stacks of time series data from which I want to obtain a interpolation signature. In practice I've a function to process the raster chunks in parallel with rasterio (calling a function main()):

def _parallel_process(inData, outData, main, count,  n_jobs=4, chuckSize=256):
    with rasterio.Env():
        with rasterio.open(inData) as src:
            profile = src.profile
            profile.update(blockxsize=chuckSize, blockysize=chuckSize,
                           count=count, dtype='float64', tiled=True)
            with rasterio.open(outData, "w", **profile) as dst:
                windows = [window for ij, window in dst.block_windows()]
                data_gen = (src.read(window=window) for window in windows)
                with concurrent.futures.ProcessPoolExecutor(
                    max_workers=n_jobs
                ) as executor:
                    for window, result in zip(
                        tqdm(windows), executor.map(main, data_gen)
                    ):
                        dst.write(result, window=window)

And I have another function that calls _parallel_process:

def PhenoShape(inData, outData, dates=None, nan_replace=None, rollWindow=None, nGS=46, chuckSize=256, n_jobs=4):
    def main(dstack):
        # load raster as a xarray
        xarray = xr.DataArray(dstack)
        xarray.coords['dim_0'] = dates
        xarray.coords['doy'] = xarray.dim_0.dt.dayofyear
        # sort basds according to day-of-the-year
        xarray = xarray.sortby('doy')
        # rearrange time dimension for smoothing and interpolation
        xarray['time'] = xarray['doy']
        # turn a value to NaN
        if nan_replace is not None:
            xarray = xarray.where(xarray.values != nan_replace)
        # rolling average using moving window
        if rollWindow is not None:
            xarray = xarray.rolling(dim_0=rollWindow, center=True).mean()
        # prepare inputs to getPheno
        x = xarray.doy.values
        y = xarray.values
        # get phenology shape accross the time axis
        return np.apply_along_axis(_getPheno, 0, y, x, nGS)
    # apply PhenoShape with parallel processing
    try:
        _parallel_process(inData, outData, main, nGS, n_jobs, chuckSize)
    except AttributeError:
        print('ERROR somehow.. :(')
 
This works really fine! But when I import the script as a library (functions inside phenology.py):

import phenology as phen
phen.PhenoShape(inData=inData, outData=outData, dates=dates, rollWindow=3,
    nan_replace=-32767, nGS=46, chuckSize=256, n_jobs=4)

I get the following error:

  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'
Traceback (most recent call last):
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'
Traceback (most recent call last):
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'
Traceback (most recent call last):
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'
Traceback (most recent call last):
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'

Any ideas? The main idea here is to call this functions to have cleaner script at the end.
Many thanks in advance.
Cheers,
Javier L.

El jue., 23 may. 2019 a las 11:13, javier lopatin via Groups.Io (<javierlopatin=gmail.com@groups.io>) escribió:
Hi Sean, thanks a lot. This was very helpfull indeed, and the performance improved a bit when changing the size to 512. 
Grate job on the library, congrats!
Cheers,
Javier

El mié., 22 may. 2019 a las 23:36, Alan Snow (<alansnow21@...>) escribió:
You are indeed correct Sean. Thanks for catching that and providing the correct answer! It seems I skimmed through the original message too fast.


--
Javier Lopatin
Institute of Geography and Geoecology
Karlsruhe Institute of Technology (KIT)
javier.lopatin@...


--
Javier Lopatin
Institute of Geography and Geoecology
Karlsruhe Institute of Technology (KIT)
javier.lopatin@...


--
Javier Lopatin
Institute of Geography and Geoecology
Karlsruhe Institute of Technology (KIT)
javier.lopatin@...



--
Javier Lopatin
Institute of Geography and Geoecology
Karlsruhe Institute of Technology (KIT)
javier.lopatin@...