Hi all, I'm currently using rasterio to process time series analysis of data. Because the raster tiles are too big, I'm trying to implement a windowed function with parallel processing. I tried your tutorial on concurrent processing (https://github.com/mapbox/rasterio/blob/master/docs/topics/concurrency.rst) and it works beautify with my own functions. I'm just wondering why it is only working with window sizes of 128 (profile.update(blockxsize=128,blockysize=128, tiled=True)), just like in the example. If I change these values to e.g. 200 or 500 it does not work anymore. I'm currently trying with a 3,000 X 3,000 raster size. Because loops are slow, I assume that increasing a bit the window size could be helpful. The error message that I receive if I change blockxsize is:
Traceback (most recent call last):
File "TSA.py", line 252, in <module>
main(args.inputImage, args.outputImage, args.j)
File "TSA.py", line 225, in main
parallel_process(infile, outfile, n_jobs)
File "TSA.py", line 187, in parallel_process
with rasterio.open(outfile, "w", **profile) as dst:
File "/home/javier/miniconda3/lib/python3.6/site-packages/rasterio/env.py", line 398, in wrapper
return f(*args, **kwds)
File "/home/javier/miniconda3/lib/python3.6/site-packages/rasterio/__init__.py", line 226, in open
**kwargs)
File "rasterio/_io.pyx", line 1129, in rasterio._io.DatasetWriterBase.__init__
File "rasterio/_err.pyx", line 194, in rasterio._err.exc_wrap_pointer
rasterio._err.CPLE_AppDefinedError: _TIFFVSetField:/home/javier/Documents/SF_delta/Sentinel/TSA/X-004_Y-001/2015-2019_001-365_LEVEL4_TSA_SEN2L_EVI_C0_S0_FAVG_TY_C95T_FBY_TSA.tif: Bad value 500 for "TileWidth" tag
The used function is below (see the whole script at https://github.com/JavierLopatin/Python-Remote-Sensing-Scripts/blob/master/TSA.py) :
def parallel_process(infile, outfile, n_jobs):
"""
Process infile block-by-block with parallel processing
and write to a new file.
"""
from tqdm import tqdm # progress bar
with rasterio.Env():
with rasterio.open(infile) as src:
# Create a destination dataset based on source params. The
# destination will be tiled, and we'll process the tiles
# concurrently.
profile = src.profile
profile.update(blockxsize=128, blockysize=128,
count=6, dtype='float64', tiled=True)
with rasterio.open(outfile, "w", **profile) as dst:
# Materialize a list of destination block windows
# that we will use in several statements below.
windows = [window for ij, window in dst.block_windows()]
# This generator comprehension gives us raster data
# arrays for each window. Later we will zip a mapping
# of it with the windows list to get (window, result)
# pairs.
data_gen = (src.read(window=window) for window in windows)
with concurrent.futures.ProcessPoolExecutor(
max_workers=n_jobs
) as executor:
# We map the TSA() function over the raster
# data generator, zip the resulting iterator with
# the windows list, and as pairs come back we
# write data to the destination dataset.
for window, result in zip(
tqdm(windows), executor.map(TSA, data_gen)
):
dst.write(result, window=window) Hope you guys can help. Cheers, Javier
|
|
Hi Javier,
The blocksize is not a dynamic attribute. It represents the blocksize of the file on disk. So, if you want to change the blocksize, you will need to write it to a new file with a new blocksize.
Hopefully this helps, Alan
|
|
Alan: the function does indeed write to a new file with blocksizes set appropriately.
Javier: the TIFF spec states that tile width (and this goes for height, I presume) must be a multiple of 16. Neither GDAL's GeoTIFF format page nor Rasterio docs make this clear. I'll add a note. I think that if you try 512 instead of 128, you'll have success.
toggle quoted message
Show quoted text
Hi Javier,
The blocksize is not a dynamic attribute. It represents the blocksize of the file on disk. So, if you want to change the blocksize, you will need to write it to a new file with a new blocksize.
Hopefully this helps, Alan
|
|
You are indeed correct Sean. Thanks for catching that and providing the correct answer! It seems I skimmed through the original message too fast.
|
|
Hi Sean, thanks a lot. This was very helpfull indeed, and the performance improved a bit when changing the size to 512. Grate job on the library, congrats! Cheers, Javier
toggle quoted message
Show quoted text
El mié., 22 may. 2019 a las 23:36, Alan Snow (< alansnow21@...>) escribió: You are indeed correct Sean. Thanks for catching that and providing the correct answer! It seems I skimmed through the original message too fast.
-- Javier LopatinInstitute of Geography and GeoecologyKarlsruhe Institute of Technology (KIT)javier.lopatin@...
|
|
javier lopatin <javierlopatin@...>
Hi Guys!
I'm still working with the parallelization of raster processes with rasterio, and I came across a problem that you might know about. I've some raster stacks of time series data from which I want to obtain a interpolation signature. In practice I've a function to process the raster chunks in parallel with rasterio (calling a function main()):
def _parallel_process(inData, outData, main, count, n_jobs=4, chuckSize=256): with rasterio.Env(): with rasterio.open(inData) as src: profile = src.profile profile.update(blockxsize=chuckSize, blockysize=chuckSize, count=count, dtype='float64', tiled=True) with rasterio.open(outData, "w", **profile) as dst: windows = [window for ij, window in dst.block_windows()] data_gen = (src.read(window=window) for window in windows) with concurrent.futures.ProcessPoolExecutor( max_workers=n_jobs ) as executor: for window, result in zip( tqdm(windows), executor.map(main, data_gen) ): dst.write(result, window=window)
And I have another function that calls _parallel_process:
def PhenoShape(inData, outData, dates=None, nan_replace=None, rollWindow=None, nGS=46, chuckSize=256, n_jobs=4): def main(dstack): # load raster as a xarray xarray = xr.DataArray(dstack) xarray.coords['dim_0'] = dates xarray.coords['doy'] = xarray.dim_0.dt.dayofyear # sort basds according to day-of-the-year xarray = xarray.sortby('doy') # rearrange time dimension for smoothing and interpolation xarray['time'] = xarray['doy'] # turn a value to NaN if nan_replace is not None: xarray = xarray.where(xarray.values != nan_replace) # rolling average using moving window if rollWindow is not None: xarray = xarray.rolling(dim_0=rollWindow, center=True).mean() # prepare inputs to getPheno x = xarray.doy.values y = xarray.values # get phenology shape accross the time axis return np.apply_along_axis(_getPheno, 0, y, x, nGS) # apply PhenoShape with parallel processing try: _parallel_process(inData, outData, main, nGS, n_jobs, chuckSize) except AttributeError: print('ERROR somehow.. :(') This works really fine! But when I import the script as a library (functions inside phenology.py):
import phenology as phen phen.PhenoShape(inData=inData, outData=outData, dates=dates, rollWindow=3, nan_replace=-32767, nGS=46, chuckSize=256, n_jobs=4)
I get the following error:
File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed obj = _ForkingPickler.dumps(obj) File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) AttributeError: Can't pickle local object 'PhenoShape.<locals>.main' Traceback (most recent call last): File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed obj = _ForkingPickler.dumps(obj) File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) AttributeError: Can't pickle local object 'PhenoShape.<locals>.main' Traceback (most recent call last): File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed obj = _ForkingPickler.dumps(obj) File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) AttributeError: Can't pickle local object 'PhenoShape.<locals>.main' Traceback (most recent call last): File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed obj = _ForkingPickler.dumps(obj) File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) AttributeError: Can't pickle local object 'PhenoShape.<locals>.main' Traceback (most recent call last): File "/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed obj = _ForkingPickler.dumps(obj) File "/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) AttributeError: Can't pickle local object 'PhenoShape.<locals>.main'
Any ideas? The main idea here is to call this functions to have cleaner script at the end. Many thanks in advance. Cheers, Javier L. El jue., 23 may. 2019 a las 11:13, javier lopatin via Groups.Io (<javierlopatin= gmail.com@groups.io>) escribió: Hi Sean, thanks a lot. This was very helpfull indeed, and the performance improved a bit when changing the size to 512. Grate job on the library, congrats! Cheers, Javier
El mié., 22 may. 2019 a las 23:36, Alan Snow (< alansnow21@...>) escribió: You are indeed correct Sean. Thanks for catching that and providing the correct answer! It seems I skimmed through the original message too fast.
--
Javier LopatinInstitute of Geography and GeoecologyKarlsruhe Institute of Technology (KIT)javier.lopatin@...
-- Javier LopatinInstitute of Geography and GeoecologyKarlsruhe Institute of Technology (KIT)javier.lopatin@...
|
|
Dion Häfner <dion.haefner@...>
Hey Javier,
this is not really a rasterio problem, mostly a Python
peculiarity.
In general, you cannot pass functions defined in a local closure
to a different process. Just move your main function to global
scope and you should be fine. You can use functools.partial to fix
some optional arguments of your main function and achieve the same
thing as with your closure.
Best,
Dion
On 13/06/2019 15.05, javier lopatin via
Groups.Io wrote:
toggle quoted message
Show quoted text
Hi Guys!
I'm still working with the parallelization of raster
processes with rasterio, and I came across a problem that
you might know about. I've some raster stacks of time series
data from which I want to obtain a interpolation signature.
In practice I've a function to process the raster chunks in
parallel with rasterio (calling a function main()):
def
_parallel_process(inData, outData, main, count, n_jobs=4,
chuckSize=256):
with rasterio.Env():
with rasterio.open(inData) as src:
profile = src.profile
profile.update(blockxsize=chuckSize,
blockysize=chuckSize,
count=count, dtype='float64',
tiled=True)
with rasterio.open(outData, "w", **profile) as
dst:
windows = [window for ij, window in
dst.block_windows()]
data_gen = (src.read(window=window) for window
in windows)
with concurrent.futures.ProcessPoolExecutor(
max_workers=n_jobs
) as executor:
for window, result in zip(
tqdm(windows), executor.map(main,
data_gen)
):
dst.write(result, window=window)
And I have another function that calls
_parallel_process:
def
PhenoShape(inData, outData, dates=None, nan_replace=None,
rollWindow=None, nGS=46, chuckSize=256, n_jobs=4):
def main(dstack):
# load raster as a xarray
xarray = xr.DataArray(dstack)
xarray.coords['dim_0'] = dates
xarray.coords['doy'] = xarray.dim_0.dt.dayofyear
# sort basds according to day-of-the-year
xarray = xarray.sortby('doy')
# rearrange time dimension for smoothing and
interpolation
xarray['time'] = xarray['doy']
# turn a value to NaN
if nan_replace is not None:
xarray = xarray.where(xarray.values !=
nan_replace)
# rolling average using moving window
if rollWindow is not None:
xarray = xarray.rolling(dim_0=rollWindow,
center=True).mean()
# prepare inputs to getPheno
x = xarray.doy.values
y = xarray.values
# get phenology shape accross the time axis
return np.apply_along_axis(_getPheno, 0, y, x, nGS)
# apply PhenoShape with parallel processing
try:
_parallel_process(inData, outData, main, nGS,
n_jobs, chuckSize)
except AttributeError:
print('ERROR somehow.. :(')
This works really fine! But when I import the script as
a library (functions inside phenology.py):
import
phenology as phen
phen.PhenoShape(inData=inData, outData=outData, dates=dates,
rollWindow=3,
nan_replace=-32767, nGS=46, chuckSize=256, n_jobs=4)
I get the following error:
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py",
line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py",
line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object
'PhenoShape.<locals>.main'
Traceback (most recent call last):
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py",
line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py",
line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object
'PhenoShape.<locals>.main'
Traceback (most recent call last):
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py",
line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py",
line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object
'PhenoShape.<locals>.main'
Traceback (most recent call last):
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py",
line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py",
line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object
'PhenoShape.<locals>.main'
Traceback (most recent call last):
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py",
line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py",
line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object
'PhenoShape.<locals>.main'
Any ideas? The main idea here is to call this functions
to have cleaner script at the end.
Many thanks in advance.
Cheers,
Javier L.
El jue., 23 may. 2019 a las
11:13, javier lopatin via Groups.Io (<javierlopatin= gmail.com@groups.io>)
escribió:
Hi Sean, thanks a lot. This was very helpfull
indeed, and the performance improved a bit when changing the
size to 512.
Grate job on the library, congrats!
Cheers,
Javier
El mié., 22 may. 2019 a
las 23:36, Alan Snow (< alansnow21@...>)
escribió:
You are indeed correct
Sean. Thanks for catching that and providing the correct
answer! It seems I skimmed through the original message
too fast.
--
Javier
Lopatin
Institute
of Geography and Geoecology
Karlsruhe
Institute of Technology (KIT)
javier.lopatin@...
--
Javier
Lopatin
Institute
of Geography and Geoecology
Karlsruhe
Institute of Technology (KIT)
javier.lopatin@...
|
|
javier lopatin <javierlopatin@...>
Hi Dion, Thanks for the tip. I thought that that might be the problem. Any ideas of how to pass a function to executor.map that need several arguments? so far I'm only able to pass functions with a dstack as only argument, like:
def main(x): ... return something executor.map(main, x)
so if I put the function main outside in the global scope I now need to have several args in the function:
def main(x, arg1, arg2, arg3, ...): ... return something I have no idea how to pass these args to executor.map(). Any ideas? I really appreciate this! Cheers, Javier
Hey Javier,
this is not really a rasterio problem, mostly a Python
peculiarity.
In general, you cannot pass functions defined in a local closure
to a different process. Just move your main function to global
scope and you should be fine. You can use functools.partial to fix
some optional arguments of your main function and achieve the same
thing as with your closure.
Best,
Dion
On 13/06/2019 15.05, javier lopatin via
Groups.Io wrote:
Hi Guys!
I'm still working with the parallelization of raster
processes with rasterio, and I came across a problem that
you might know about. I've some raster stacks of time series
data from which I want to obtain a interpolation signature.
In practice I've a function to process the raster chunks in
parallel with rasterio (calling a function main()):
def
_parallel_process(inData, outData, main, count, n_jobs=4,
chuckSize=256):
with rasterio.Env():
with rasterio.open(inData) as src:
profile = src.profile
profile.update(blockxsize=chuckSize,
blockysize=chuckSize,
count=count, dtype='float64',
tiled=True)
with rasterio.open(outData, "w", **profile) as
dst:
windows = [window for ij, window in
dst.block_windows()]
data_gen = (src.read(window=window) for window
in windows)
with concurrent.futures.ProcessPoolExecutor(
max_workers=n_jobs
) as executor:
for window, result in zip(
tqdm(windows), executor.map(main,
data_gen)
):
dst.write(result, window=window)
And I have another function that calls
_parallel_process:
def
PhenoShape(inData, outData, dates=None, nan_replace=None,
rollWindow=None, nGS=46, chuckSize=256, n_jobs=4):
def main(dstack):
# load raster as a xarray
xarray = xr.DataArray(dstack)
xarray.coords['dim_0'] = dates
xarray.coords['doy'] = xarray.dim_0.dt.dayofyear
# sort basds according to day-of-the-year
xarray = xarray.sortby('doy')
# rearrange time dimension for smoothing and
interpolation
xarray['time'] = xarray['doy']
# turn a value to NaN
if nan_replace is not None:
xarray = xarray.where(xarray.values !=
nan_replace)
# rolling average using moving window
if rollWindow is not None:
xarray = xarray.rolling(dim_0=rollWindow,
center=True).mean()
# prepare inputs to getPheno
x = xarray.doy.values
y = xarray.values
# get phenology shape accross the time axis
return np.apply_along_axis(_getPheno, 0, y, x, nGS)
# apply PhenoShape with parallel processing
try:
_parallel_process(inData, outData, main, nGS,
n_jobs, chuckSize)
except AttributeError:
print('ERROR somehow.. :(')
This works really fine! But when I import the script as
a library (functions inside phenology.py):
import
phenology as phen
phen.PhenoShape(inData=inData, outData=outData, dates=dates,
rollWindow=3,
nan_replace=-32767, nGS=46, chuckSize=256, n_jobs=4)
I get the following error:
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py",
line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py",
line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object
'PhenoShape.<locals>.main'
Traceback (most recent call last):
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py",
line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py",
line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object
'PhenoShape.<locals>.main'
Traceback (most recent call last):
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py",
line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py",
line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object
'PhenoShape.<locals>.main'
Traceback (most recent call last):
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py",
line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py",
line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object
'PhenoShape.<locals>.main'
Traceback (most recent call last):
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py",
line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py",
line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object
'PhenoShape.<locals>.main'
Any ideas? The main idea here is to call this functions
to have cleaner script at the end.
Many thanks in advance.
Cheers,
Javier L.
El jue., 23 may. 2019 a las
11:13, javier lopatin via Groups.Io (<javierlopatin= gmail.com@groups.io>)
escribió:
Hi Sean, thanks a lot. This was very helpfull
indeed, and the performance improved a bit when changing the
size to 512.
Grate job on the library, congrats!
Cheers,
Javier
El mié., 22 may. 2019 a
las 23:36, Alan Snow (< alansnow21@...>)
escribió:
You are indeed correct
Sean. Thanks for catching that and providing the correct
answer! It seems I skimmed through the original message
too fast.
--
Javier
Lopatin
Institute
of Geography and Geoecology
Karlsruhe
Institute of Technology (KIT)
javier.lopatin@...
--
Javier
Lopatin
Institute
of Geography and Geoecology
Karlsruhe
Institute of Technology (KIT)
javier.lopatin@...
-- Javier LopatinInstitute of Geography and GeoecologyKarlsruhe Institute of Technology (KIT)javier.lopatin@...
|
|
Dion Häfner <dion.haefner@...>
As I mentioned, the canonical way to do this is to leverage
functools.partial.
For example:
from functools import partial
from concurrent.futures import ProcessPoolExecutor
def worker_function(x, arg1, arg2):
...
if __name__ == '__main__':
do_work = partial(worker_function, arg1=0, arg2='foo')
with ProcessPoolExecutor(4) as executor:
executor.map(do_work, [1, 2, 3])
The arguments have to be picklable, I think, but they should be
in your case.
On 13/06/2019 16.29, javier lopatin via
Groups.Io wrote:
toggle quoted message
Show quoted text
Hi Dion, Thanks for the tip. I thought that that
might be the problem. Any ideas of how to pass a function to executor.map
that need several arguments? so far I'm only able to pass
functions with a dstack as only argument, like:
def
main(x):
...
return something
executor.map(main, x)
so if I put the function main outside in the global scope I
now need to have several args in the function:
def
main(x, arg1, arg2, arg3, ...):
...
return something
I have no idea how to pass these args to executor.map().
Any ideas?
I really appreciate this!
Cheers,
Javier
Hey Javier,
this is not really a rasterio problem, mostly a Python
peculiarity.
In general, you cannot pass functions defined in a local
closure to a different process. Just move your main
function to global scope and you should be fine. You can
use functools.partial to fix some optional arguments of
your main function and achieve the same thing as with your
closure.
Best,
Dion
On
13/06/2019 15.05, javier lopatin via Groups.Io wrote:
Hi Guys!
I'm still working with the parallelization of
raster processes with rasterio, and I came across a
problem that you might know about. I've some raster
stacks of time series data from which I want to
obtain a interpolation signature. In practice I've a
function to process the raster chunks in parallel
with rasterio (calling a function main()):
def
_parallel_process(inData, outData, main, count,
n_jobs=4, chuckSize=256):
with rasterio.Env():
with rasterio.open(inData) as src:
profile = src.profile
profile.update(blockxsize=chuckSize,
blockysize=chuckSize,
count=count,
dtype='float64', tiled=True)
with rasterio.open(outData, "w",
**profile) as dst:
windows = [window for ij, window in
dst.block_windows()]
data_gen = (src.read(window=window)
for window in windows)
with
concurrent.futures.ProcessPoolExecutor(
max_workers=n_jobs
) as executor:
for window, result in zip(
tqdm(windows),
executor.map(main, data_gen)
):
dst.write(result,
window=window)
And I have another function that calls
_parallel_process:
def
PhenoShape(inData, outData, dates=None,
nan_replace=None, rollWindow=None, nGS=46,
chuckSize=256, n_jobs=4):
def main(dstack):
# load raster as a xarray
xarray = xr.DataArray(dstack)
xarray.coords['dim_0'] = dates
xarray.coords['doy'] =
xarray.dim_0.dt.dayofyear
# sort basds according to day-of-the-year
xarray = xarray.sortby('doy')
# rearrange time dimension for smoothing and
interpolation
xarray['time'] = xarray['doy']
# turn a value to NaN
if nan_replace is not None:
xarray = xarray.where(xarray.values !=
nan_replace)
# rolling average using moving window
if rollWindow is not None:
xarray = xarray.rolling(dim_0=rollWindow,
center=True).mean()
# prepare inputs to getPheno
x = xarray.doy.values
y = xarray.values
# get phenology shape accross the time axis
return np.apply_along_axis(_getPheno, 0, y, x,
nGS)
# apply PhenoShape with parallel processing
try:
_parallel_process(inData, outData, main,
nGS, n_jobs, chuckSize)
except AttributeError:
print('ERROR somehow.. :(')
This works really fine! But when I import the
script as a library (functions inside phenology.py):
import phenology as
phen
phen.PhenoShape(inData=inData, outData=outData,
dates=dates, rollWindow=3,
nan_replace=-32767, nGS=46, chuckSize=256,
n_jobs=4)
I get the following error:
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py",
line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py",
line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object
'PhenoShape.<locals>.main'
Traceback (most recent call last):
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py",
line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py",
line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object
'PhenoShape.<locals>.main'
Traceback (most recent call last):
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py",
line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py",
line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object
'PhenoShape.<locals>.main'
Traceback (most recent call last):
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py",
line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py",
line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object
'PhenoShape.<locals>.main'
Traceback (most recent call last):
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py",
line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py",
line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object
'PhenoShape.<locals>.main'
Any ideas? The main idea here is to call this
functions to have cleaner script at the end.
Many thanks in advance.
Cheers,
Javier L.
El jue., 23 may. 2019
a las 11:13, javier lopatin via Groups.Io
(<javierlopatin= gmail.com@groups.io>)
escribió:
Hi Sean, thanks a lot. This was very
helpfull indeed, and the performance improved a bit
when changing the size to 512.
Grate job on the library, congrats!
Cheers,
Javier
El mié., 22 may.
2019 a las 23:36, Alan Snow (< alansnow21@...>)
escribió:
You are indeed
correct Sean. Thanks for catching that and
providing the correct answer! It seems I skimmed
through the original message too fast.
--
Javier Lopatin
Institute of Geography and Geoecology
Karlsruhe Institute of Technology (KIT)
javier.lopatin@...
--
Javier Lopatin
Institute of Geography and Geoecology
Karlsruhe Institute of Technology (KIT)
javier.lopatin@...
--
Javier
Lopatin
Institute
of Geography and Geoecology
Karlsruhe
Institute of Technology (KIT)
javier.lopatin@...
|
|
javier lopatin <javierlopatin@...>
Dear Dior, Thanks!, this was perfect. I did't get it the first time, but know it runs smoothly. Many thanks! Javier
toggle quoted message
Show quoted text
As I mentioned, the canonical way to do this is to leverage
functools.partial.
For example:
from functools import partial
from concurrent.futures import ProcessPoolExecutor
def worker_function(x, arg1, arg2):
...
if __name__ == '__main__':
do_work = partial(worker_function, arg1=0, arg2='foo')
with ProcessPoolExecutor(4) as executor:
executor.map(do_work, [1, 2, 3])
The arguments have to be picklable, I think, but they should be
in your case.
On 13/06/2019 16.29, javier lopatin via
Groups.Io wrote:
Hi Dion, Thanks for the tip. I thought that that
might be the problem. Any ideas of how to pass a function to executor.map
that need several arguments? so far I'm only able to pass
functions with a dstack as only argument, like:
def
main(x):
...
return something
executor.map(main, x)
so if I put the function main outside in the global scope I
now need to have several args in the function:
def
main(x, arg1, arg2, arg3, ...):
...
return something
I have no idea how to pass these args to executor.map().
Any ideas?
I really appreciate this!
Cheers,
Javier
Hey Javier,
this is not really a rasterio problem, mostly a Python
peculiarity.
In general, you cannot pass functions defined in a local
closure to a different process. Just move your main
function to global scope and you should be fine. You can
use functools.partial to fix some optional arguments of
your main function and achieve the same thing as with your
closure.
Best,
Dion
On
13/06/2019 15.05, javier lopatin via Groups.Io wrote:
Hi Guys!
I'm still working with the parallelization of
raster processes with rasterio, and I came across a
problem that you might know about. I've some raster
stacks of time series data from which I want to
obtain a interpolation signature. In practice I've a
function to process the raster chunks in parallel
with rasterio (calling a function main()):
def
_parallel_process(inData, outData, main, count,
n_jobs=4, chuckSize=256):
with rasterio.Env():
with rasterio.open(inData) as src:
profile = src.profile
profile.update(blockxsize=chuckSize,
blockysize=chuckSize,
count=count,
dtype='float64', tiled=True)
with rasterio.open(outData, "w",
**profile) as dst:
windows = [window for ij, window in
dst.block_windows()]
data_gen = (src.read(window=window)
for window in windows)
with
concurrent.futures.ProcessPoolExecutor(
max_workers=n_jobs
) as executor:
for window, result in zip(
tqdm(windows),
executor.map(main, data_gen)
):
dst.write(result,
window=window)
And I have another function that calls
_parallel_process:
def
PhenoShape(inData, outData, dates=None,
nan_replace=None, rollWindow=None, nGS=46,
chuckSize=256, n_jobs=4):
def main(dstack):
# load raster as a xarray
xarray = xr.DataArray(dstack)
xarray.coords['dim_0'] = dates
xarray.coords['doy'] =
xarray.dim_0.dt.dayofyear
# sort basds according to day-of-the-year
xarray = xarray.sortby('doy')
# rearrange time dimension for smoothing and
interpolation
xarray['time'] = xarray['doy']
# turn a value to NaN
if nan_replace is not None:
xarray = xarray.where(xarray.values !=
nan_replace)
# rolling average using moving window
if rollWindow is not None:
xarray = xarray.rolling(dim_0=rollWindow,
center=True).mean()
# prepare inputs to getPheno
x = xarray.doy.values
y = xarray.values
# get phenology shape accross the time axis
return np.apply_along_axis(_getPheno, 0, y, x,
nGS)
# apply PhenoShape with parallel processing
try:
_parallel_process(inData, outData, main,
nGS, n_jobs, chuckSize)
except AttributeError:
print('ERROR somehow.. :(')
This works really fine! But when I import the
script as a library (functions inside phenology.py):
import phenology as
phen
phen.PhenoShape(inData=inData, outData=outData,
dates=dates, rollWindow=3,
nan_replace=-32767, nGS=46, chuckSize=256,
n_jobs=4)
I get the following error:
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py",
line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py",
line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object
'PhenoShape.<locals>.main'
Traceback (most recent call last):
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py",
line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py",
line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object
'PhenoShape.<locals>.main'
Traceback (most recent call last):
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py",
line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py",
line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object
'PhenoShape.<locals>.main'
Traceback (most recent call last):
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py",
line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py",
line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object
'PhenoShape.<locals>.main'
Traceback (most recent call last):
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/queues.py",
line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File
"/home/javier/miniconda3/lib/python3.6/multiprocessing/reduction.py",
line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object
'PhenoShape.<locals>.main'
Any ideas? The main idea here is to call this
functions to have cleaner script at the end.
Many thanks in advance.
Cheers,
Javier L.
El jue., 23 may. 2019
a las 11:13, javier lopatin via Groups.Io
(<javierlopatin= gmail.com@groups.io>)
escribió:
Hi Sean, thanks a lot. This was very
helpfull indeed, and the performance improved a bit
when changing the size to 512.
Grate job on the library, congrats!
Cheers,
Javier
El mié., 22 may.
2019 a las 23:36, Alan Snow (< alansnow21@...>)
escribió:
You are indeed
correct Sean. Thanks for catching that and
providing the correct answer! It seems I skimmed
through the original message too fast.
--
Javier Lopatin
Institute of Geography and Geoecology
Karlsruhe Institute of Technology (KIT)
javier.lopatin@...
--
Javier Lopatin
Institute of Geography and Geoecology
Karlsruhe Institute of Technology (KIT)
javier.lopatin@...
--
Javier
Lopatin
Institute
of Geography and Geoecology
Karlsruhe
Institute of Technology (KIT)
javier.lopatin@...
-- Javier LopatinInstitute of Geography and GeoecologyKarlsruhe Institute of Technology (KIT)javier.lopatin@...
|
|