Re: Best way of doing concurrent window processing

Pablo Sanfilippo
 

I don't want to preempt you
 
Not at all! This is great stuff. I'm a little short of time right now, but as soon as I can I'll test all of this. I don't know how much of an overhead opening a dataset per window is, but a dataset pool sound like a good idea.

On Sun, Dec 9, 2018 at 1:56 PM Sean Gillies via Groups.Io <sean=mapbox.com@groups.io> wrote:
Hi Pablo,

I don't want to preempt you, but here's what I've come up with: https://gist.github.com/sgillies/b90a79917d7ec5ca0c074b5f6f4857e3#file-cfrw-py. If the window reads are done by the worker threads, the memory usage is proportional to the number of workers plus whatever is in the sequence of complete but unwritten futures. If writing a window is faster than computing on it, this allocation will be small. If computing is faster than writing, one could employ the chunkify() helper (which I got from Peters' example) to cap the number of futures and result arrays.

Opening a dataset for each window adds some overhead. A possible optimization: create a pool of dataset objects up front that is at least as large as the number of workers and then the workers could pop a dataset object from this pool and return it when done reading.

On Sat, Dec 8, 2018 at 9:14 AM Pablo Sanfilippo <sanfilippopablo@...> wrote:
Hi Sean! Oh, that's really interesting. Thanks for sharing this. I'm going to do my own tests and then update the PR I made with the example.

On Fri, Dec 7, 2018 at 8:34 PM Sean Gillies via Groups.Io <sean=mapbox.com@groups.io> wrote:
Hi Pablo,

I really appreciate your analysis of the concurrency example! Things have changed quite a bit in the newest versions of Python since it was written and it needs an update.

While researching the non-laziness of ThreadPoolExecutor.map I discovered a few interesting tickets in the Python tracker. One https://bugs.python.org/issue34168 has a suggestion from Tim Peters for chunking the input and using a combination of c.f.ThreadPoolExecutor.submit and c.f.as_completed as in this example: https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor-example. The chunk size (not to be confused with c.f.ThreadPoolExecutor.map's chunksize) would set the upper limit on memory consumption. This could be worth a try. I'm going to try it soon myself.

I feel like introducing locks is a step back from the nice c.f. abstraction to raw threading. However, an example that used threading only could be nice for users that are still on Python 2.7.

Yours,

On Tue, Dec 4, 2018 at 1:26 PM Pablo Sanfilippo <sanfilippopablo@...> wrote:

I've been looking for the best way of processing a large raster files while achieving both maximum processing parallelism/speed and a low memory footprint. To achieve a low memory footprint, is enough to iterate over windows and read, process and write in the same loop:

with rasterio.open(infile, 'r') as src:
    with rasterio.open(outfile, 'w', **src.profile) as dst:
        for ij, window in src.block_windows():
            array = src.read(window=window)
            result = compute(array)
            dst.write(result, window=window)

This has a low memory footprint: it has only one window in memory at a time. But it doesn't have concurrency.

So I looked at the concurrency examples (the one using asyncio coroutines and the one using ThreadPoolExecutor), which look like this:

with rasterio.open(infile, 'r') as src:
        with rasterio.open(outfile, "w", **src.profile) as dst:
            windows = [window for ij, window in dst.block_windows()]
            data_gen = (src.read(window=window) for window in windows)

            with concurrent.futures.ThreadPoolExecutor(
                max_workers=num_workers
            ) as executor:
                for window, result in zip(windows, executor.map(compute, data_gen)):
                    dst.write(result, window=window)

Now we have computing concurrency. The comment on the original example made it seem like read -> compute -> write was being done in a streaming manner, overlapping all those operations:

We map the compute() function over the raster data generator, zip the resulting iterator with the windows list, and as pairs come back we write data to the destination dataset.

Which is not true. A verbose test showed that computing was being overlap with reading and also (later) with writing, but reading and writing wasn't being overlapped, meaning that the whole file was in memory at some point. Monitoring of memory usage confirmed this.

This made more sense after reading the documentation of ThreadPoolExecutor.mapthe iterables are collected immediately rather than lazily. So it means that it will read the data_gen iterator eagerly, putting all the data in memory if necessary.

The example that uses asyncio coroutines shows the same behavior.

My next idea was to put the read, compute, and write operation all together in the function being passed to ThreadPoolExecutor.map so they can be overlapped too, like so:

with rasterio.open(infile) as src:
    with rasterio.open(outfile, "w", **src.profile) as dst:
        windows = [window for ij, window in dst.block_windows()]

        def process(window):
            src_array = src.read(window=window)
            result = compute(src_array)
            dst.write(result, window=window)

        with concurrent.futures.ThreadPoolExecutor(
            max_workers=num_workers
        ) as executor:
            executor.map(process, windows)

This obviously caused a race condition because reading and writing are not thread safe operations, so the output was filled with GDAL read errors.

So I fixed that by using locks for the read and write operations:

with rasterio.open(infile) as src:
    with rasterio.open(outfile, "w", **src.profile) as dst:
        windows = [window for ij, window in dst.block_windows()]

        read_lock = threading.Lock()
        write_lock = threading.Lock()

        def process(window):
            with read_lock:
                src_array = src.read(window=window)
            result = compute(src_array)
            with write_lock:
                dst.write(result, window=window)

        with concurrent.futures.ThreadPoolExecutor(
            max_workers=num_workers
        ) as executor:
            executor.map(process, windows)

That fixed the race conditions. Computing is now done in parallel, reading and writing is done in a streaming way, and max memory footprint is window_array_size * num_threads.

My questions are:

  • Is this the right way to solve this problem? Am I missing something?
  • If this is the right way, would you accept a PR with a topic write up about this (maybe enhancements in the concurrency topic)?



--
Sean Gillies



--
Sean Gillies

Join main@rasterio.groups.io to automatically receive all group messages.