Topics

Best way of doing concurrent window processing

Pablo Sanfilippo
 

I've been looking for the best way of processing a large raster files while achieving both maximum processing parallelism/speed and a low memory footprint. To achieve a low memory footprint, is enough to iterate over windows and read, process and write in the same loop:

with rasterio.open(infile, 'r') as src:
    with rasterio.open(outfile, 'w', **src.profile) as dst:
        for ij, window in src.block_windows():
            array = src.read(window=window)
            result = compute(array)
            dst.write(result, window=window)

This has a low memory footprint: it has only one window in memory at a time. But it doesn't have concurrency.

So I looked at the concurrency examples (the one using asyncio coroutines and the one using ThreadPoolExecutor), which look like this:

with rasterio.open(infile, 'r') as src:
        with rasterio.open(outfile, "w", **src.profile) as dst:
            windows = [window for ij, window in dst.block_windows()]
            data_gen = (src.read(window=window) for window in windows)

            with concurrent.futures.ThreadPoolExecutor(
                max_workers=num_workers
            ) as executor:
                for window, result in zip(windows, executor.map(compute, data_gen)):
                    dst.write(result, window=window)

Now we have computing concurrency. The comment on the original example made it seem like read -> compute -> write was being done in a streaming manner, overlapping all those operations:

We map the compute() function over the raster data generator, zip the resulting iterator with the windows list, and as pairs come back we write data to the destination dataset.

Which is not true. A verbose test showed that computing was being overlap with reading and also (later) with writing, but reading and writing wasn't being overlapped, meaning that the whole file was in memory at some point. Monitoring of memory usage confirmed this.

This made more sense after reading the documentation of ThreadPoolExecutor.mapthe iterables are collected immediately rather than lazily. So it means that it will read the data_gen iterator eagerly, putting all the data in memory if necessary.

The example that uses asyncio coroutines shows the same behavior.

My next idea was to put the read, compute, and write operation all together in the function being passed to ThreadPoolExecutor.map so they can be overlapped too, like so:

with rasterio.open(infile) as src:
    with rasterio.open(outfile, "w", **src.profile) as dst:
        windows = [window for ij, window in dst.block_windows()]

        def process(window):
            src_array = src.read(window=window)
            result = compute(src_array)
            dst.write(result, window=window)

        with concurrent.futures.ThreadPoolExecutor(
            max_workers=num_workers
        ) as executor:
            executor.map(process, windows)

This obviously caused a race condition because reading and writing are not thread safe operations, so the output was filled with GDAL read errors.

So I fixed that by using locks for the read and write operations:

with rasterio.open(infile) as src:
    with rasterio.open(outfile, "w", **src.profile) as dst:
        windows = [window for ij, window in dst.block_windows()]

        read_lock = threading.Lock()
        write_lock = threading.Lock()

        def process(window):
            with read_lock:
                src_array = src.read(window=window)
            result = compute(src_array)
            with write_lock:
                dst.write(result, window=window)

        with concurrent.futures.ThreadPoolExecutor(
            max_workers=num_workers
        ) as executor:
            executor.map(process, windows)

That fixed the race conditions. Computing is now done in parallel, reading and writing is done in a streaming way, and max memory footprint is window_array_size * num_threads.

My questions are:

  • Is this the right way to solve this problem? Am I missing something?
  • If this is the right way, would you accept a PR with a topic write up about this (maybe enhancements in the concurrency topic)?

Matthew Perry
 

Pablo,

I think your approach would make a great example and addition to the concurrency docs. Being able to do input, output and computation in parallel is a huge benefit. The locks seems effective at avoiding race conditions and I get a decent speedup (~3x on 4 cores) for a few test cases. So while I don't think there is or ever will be a single "right" way to solve concurrency, this is certainly a very viable option.

The only potential issue I see is the `compute` function is required release the GIL to get the full benefit. Most numpy array methods do this but if you've got additional calculations in pure python or call non-GIL-aware C functions, there's a chance that compute will block. If blocking computation outweighs IO significantly, you might not see much performance benefit if at all. Those users would be better served by some form of multiprocessing. 

It's hard to know if any arbitrary python code will block the GIL - making it tricky to predict the performance characteristics of this technique. I think a note in the docs re: the compute function and the GIL would be sufficient to warn users of any pitfalls. 

- Matt

Sean Gillies
 

Hi Pablo,

I really appreciate your analysis of the concurrency example! Things have changed quite a bit in the newest versions of Python since it was written and it needs an update.

While researching the non-laziness of ThreadPoolExecutor.map I discovered a few interesting tickets in the Python tracker. One https://bugs.python.org/issue34168 has a suggestion from Tim Peters for chunking the input and using a combination of c.f.ThreadPoolExecutor.submit and c.f.as_completed as in this example: https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor-example. The chunk size (not to be confused with c.f.ThreadPoolExecutor.map's chunksize) would set the upper limit on memory consumption. This could be worth a try. I'm going to try it soon myself.

I feel like introducing locks is a step back from the nice c.f. abstraction to raw threading. However, an example that used threading only could be nice for users that are still on Python 2.7.

Yours,

On Tue, Dec 4, 2018 at 1:26 PM Pablo Sanfilippo <sanfilippopablo@...> wrote:

I've been looking for the best way of processing a large raster files while achieving both maximum processing parallelism/speed and a low memory footprint. To achieve a low memory footprint, is enough to iterate over windows and read, process and write in the same loop:

with rasterio.open(infile, 'r') as src:
    with rasterio.open(outfile, 'w', **src.profile) as dst:
        for ij, window in src.block_windows():
            array = src.read(window=window)
            result = compute(array)
            dst.write(result, window=window)

This has a low memory footprint: it has only one window in memory at a time. But it doesn't have concurrency.

So I looked at the concurrency examples (the one using asyncio coroutines and the one using ThreadPoolExecutor), which look like this:

with rasterio.open(infile, 'r') as src:
        with rasterio.open(outfile, "w", **src.profile) as dst:
            windows = [window for ij, window in dst.block_windows()]
            data_gen = (src.read(window=window) for window in windows)

            with concurrent.futures.ThreadPoolExecutor(
                max_workers=num_workers
            ) as executor:
                for window, result in zip(windows, executor.map(compute, data_gen)):
                    dst.write(result, window=window)

Now we have computing concurrency. The comment on the original example made it seem like read -> compute -> write was being done in a streaming manner, overlapping all those operations:

We map the compute() function over the raster data generator, zip the resulting iterator with the windows list, and as pairs come back we write data to the destination dataset.

Which is not true. A verbose test showed that computing was being overlap with reading and also (later) with writing, but reading and writing wasn't being overlapped, meaning that the whole file was in memory at some point. Monitoring of memory usage confirmed this.

This made more sense after reading the documentation of ThreadPoolExecutor.mapthe iterables are collected immediately rather than lazily. So it means that it will read the data_gen iterator eagerly, putting all the data in memory if necessary.

The example that uses asyncio coroutines shows the same behavior.

My next idea was to put the read, compute, and write operation all together in the function being passed to ThreadPoolExecutor.map so they can be overlapped too, like so:

with rasterio.open(infile) as src:
    with rasterio.open(outfile, "w", **src.profile) as dst:
        windows = [window for ij, window in dst.block_windows()]

        def process(window):
            src_array = src.read(window=window)
            result = compute(src_array)
            dst.write(result, window=window)

        with concurrent.futures.ThreadPoolExecutor(
            max_workers=num_workers
        ) as executor:
            executor.map(process, windows)

This obviously caused a race condition because reading and writing are not thread safe operations, so the output was filled with GDAL read errors.

So I fixed that by using locks for the read and write operations:

with rasterio.open(infile) as src:
    with rasterio.open(outfile, "w", **src.profile) as dst:
        windows = [window for ij, window in dst.block_windows()]

        read_lock = threading.Lock()
        write_lock = threading.Lock()

        def process(window):
            with read_lock:
                src_array = src.read(window=window)
            result = compute(src_array)
            with write_lock:
                dst.write(result, window=window)

        with concurrent.futures.ThreadPoolExecutor(
            max_workers=num_workers
        ) as executor:
            executor.map(process, windows)

That fixed the race conditions. Computing is now done in parallel, reading and writing is done in a streaming way, and max memory footprint is window_array_size * num_threads.

My questions are:

  • Is this the right way to solve this problem? Am I missing something?
  • If this is the right way, would you accept a PR with a topic write up about this (maybe enhancements in the concurrency topic)?



--
Sean Gillies

Pablo Sanfilippo
 

Hi Sean! Oh, that's really interesting. Thanks for sharing this. I'm going to do my own tests and then update the PR I made with the example.


On Fri, Dec 7, 2018 at 8:34 PM Sean Gillies via Groups.Io <sean=mapbox.com@groups.io> wrote:
Hi Pablo,

I really appreciate your analysis of the concurrency example! Things have changed quite a bit in the newest versions of Python since it was written and it needs an update.

While researching the non-laziness of ThreadPoolExecutor.map I discovered a few interesting tickets in the Python tracker. One https://bugs.python.org/issue34168 has a suggestion from Tim Peters for chunking the input and using a combination of c.f.ThreadPoolExecutor.submit and c.f.as_completed as in this example: https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor-example. The chunk size (not to be confused with c.f.ThreadPoolExecutor.map's chunksize) would set the upper limit on memory consumption. This could be worth a try. I'm going to try it soon myself.

I feel like introducing locks is a step back from the nice c.f. abstraction to raw threading. However, an example that used threading only could be nice for users that are still on Python 2.7.

Yours,

On Tue, Dec 4, 2018 at 1:26 PM Pablo Sanfilippo <sanfilippopablo@...> wrote:

I've been looking for the best way of processing a large raster files while achieving both maximum processing parallelism/speed and a low memory footprint. To achieve a low memory footprint, is enough to iterate over windows and read, process and write in the same loop:

with rasterio.open(infile, 'r') as src:
    with rasterio.open(outfile, 'w', **src.profile) as dst:
        for ij, window in src.block_windows():
            array = src.read(window=window)
            result = compute(array)
            dst.write(result, window=window)

This has a low memory footprint: it has only one window in memory at a time. But it doesn't have concurrency.

So I looked at the concurrency examples (the one using asyncio coroutines and the one using ThreadPoolExecutor), which look like this:

with rasterio.open(infile, 'r') as src:
        with rasterio.open(outfile, "w", **src.profile) as dst:
            windows = [window for ij, window in dst.block_windows()]
            data_gen = (src.read(window=window) for window in windows)

            with concurrent.futures.ThreadPoolExecutor(
                max_workers=num_workers
            ) as executor:
                for window, result in zip(windows, executor.map(compute, data_gen)):
                    dst.write(result, window=window)

Now we have computing concurrency. The comment on the original example made it seem like read -> compute -> write was being done in a streaming manner, overlapping all those operations:

We map the compute() function over the raster data generator, zip the resulting iterator with the windows list, and as pairs come back we write data to the destination dataset.

Which is not true. A verbose test showed that computing was being overlap with reading and also (later) with writing, but reading and writing wasn't being overlapped, meaning that the whole file was in memory at some point. Monitoring of memory usage confirmed this.

This made more sense after reading the documentation of ThreadPoolExecutor.mapthe iterables are collected immediately rather than lazily. So it means that it will read the data_gen iterator eagerly, putting all the data in memory if necessary.

The example that uses asyncio coroutines shows the same behavior.

My next idea was to put the read, compute, and write operation all together in the function being passed to ThreadPoolExecutor.map so they can be overlapped too, like so:

with rasterio.open(infile) as src:
    with rasterio.open(outfile, "w", **src.profile) as dst:
        windows = [window for ij, window in dst.block_windows()]

        def process(window):
            src_array = src.read(window=window)
            result = compute(src_array)
            dst.write(result, window=window)

        with concurrent.futures.ThreadPoolExecutor(
            max_workers=num_workers
        ) as executor:
            executor.map(process, windows)

This obviously caused a race condition because reading and writing are not thread safe operations, so the output was filled with GDAL read errors.

So I fixed that by using locks for the read and write operations:

with rasterio.open(infile) as src:
    with rasterio.open(outfile, "w", **src.profile) as dst:
        windows = [window for ij, window in dst.block_windows()]

        read_lock = threading.Lock()
        write_lock = threading.Lock()

        def process(window):
            with read_lock:
                src_array = src.read(window=window)
            result = compute(src_array)
            with write_lock:
                dst.write(result, window=window)

        with concurrent.futures.ThreadPoolExecutor(
            max_workers=num_workers
        ) as executor:
            executor.map(process, windows)

That fixed the race conditions. Computing is now done in parallel, reading and writing is done in a streaming way, and max memory footprint is window_array_size * num_threads.

My questions are:

  • Is this the right way to solve this problem? Am I missing something?
  • If this is the right way, would you accept a PR with a topic write up about this (maybe enhancements in the concurrency topic)?



--
Sean Gillies

Guy Doulberg
 

Hi guys

Wanted to share my 10 cent about this subject.

I am trying to do the concurrency support for reading by creating a vrt on the windows, VRTs are just XML so they can be created  eagerly in the map execution, moreover each of the vrts is a separate file so accessing the file parts doesn't require any lock - GDAL handles this for you.

Guy

Sean Gillies
 

Hi Pablo,

I don't want to preempt you, but here's what I've come up with: https://gist.github.com/sgillies/b90a79917d7ec5ca0c074b5f6f4857e3#file-cfrw-py. If the window reads are done by the worker threads, the memory usage is proportional to the number of workers plus whatever is in the sequence of complete but unwritten futures. If writing a window is faster than computing on it, this allocation will be small. If computing is faster than writing, one could employ the chunkify() helper (which I got from Peters' example) to cap the number of futures and result arrays.

Opening a dataset for each window adds some overhead. A possible optimization: create a pool of dataset objects up front that is at least as large as the number of workers and then the workers could pop a dataset object from this pool and return it when done reading.

On Sat, Dec 8, 2018 at 9:14 AM Pablo Sanfilippo <sanfilippopablo@...> wrote:
Hi Sean! Oh, that's really interesting. Thanks for sharing this. I'm going to do my own tests and then update the PR I made with the example.

On Fri, Dec 7, 2018 at 8:34 PM Sean Gillies via Groups.Io <sean=mapbox.com@groups.io> wrote:
Hi Pablo,

I really appreciate your analysis of the concurrency example! Things have changed quite a bit in the newest versions of Python since it was written and it needs an update.

While researching the non-laziness of ThreadPoolExecutor.map I discovered a few interesting tickets in the Python tracker. One https://bugs.python.org/issue34168 has a suggestion from Tim Peters for chunking the input and using a combination of c.f.ThreadPoolExecutor.submit and c.f.as_completed as in this example: https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor-example. The chunk size (not to be confused with c.f.ThreadPoolExecutor.map's chunksize) would set the upper limit on memory consumption. This could be worth a try. I'm going to try it soon myself.

I feel like introducing locks is a step back from the nice c.f. abstraction to raw threading. However, an example that used threading only could be nice for users that are still on Python 2.7.

Yours,

On Tue, Dec 4, 2018 at 1:26 PM Pablo Sanfilippo <sanfilippopablo@...> wrote:

I've been looking for the best way of processing a large raster files while achieving both maximum processing parallelism/speed and a low memory footprint. To achieve a low memory footprint, is enough to iterate over windows and read, process and write in the same loop:

with rasterio.open(infile, 'r') as src:
    with rasterio.open(outfile, 'w', **src.profile) as dst:
        for ij, window in src.block_windows():
            array = src.read(window=window)
            result = compute(array)
            dst.write(result, window=window)

This has a low memory footprint: it has only one window in memory at a time. But it doesn't have concurrency.

So I looked at the concurrency examples (the one using asyncio coroutines and the one using ThreadPoolExecutor), which look like this:

with rasterio.open(infile, 'r') as src:
        with rasterio.open(outfile, "w", **src.profile) as dst:
            windows = [window for ij, window in dst.block_windows()]
            data_gen = (src.read(window=window) for window in windows)

            with concurrent.futures.ThreadPoolExecutor(
                max_workers=num_workers
            ) as executor:
                for window, result in zip(windows, executor.map(compute, data_gen)):
                    dst.write(result, window=window)

Now we have computing concurrency. The comment on the original example made it seem like read -> compute -> write was being done in a streaming manner, overlapping all those operations:

We map the compute() function over the raster data generator, zip the resulting iterator with the windows list, and as pairs come back we write data to the destination dataset.

Which is not true. A verbose test showed that computing was being overlap with reading and also (later) with writing, but reading and writing wasn't being overlapped, meaning that the whole file was in memory at some point. Monitoring of memory usage confirmed this.

This made more sense after reading the documentation of ThreadPoolExecutor.mapthe iterables are collected immediately rather than lazily. So it means that it will read the data_gen iterator eagerly, putting all the data in memory if necessary.

The example that uses asyncio coroutines shows the same behavior.

My next idea was to put the read, compute, and write operation all together in the function being passed to ThreadPoolExecutor.map so they can be overlapped too, like so:

with rasterio.open(infile) as src:
    with rasterio.open(outfile, "w", **src.profile) as dst:
        windows = [window for ij, window in dst.block_windows()]

        def process(window):
            src_array = src.read(window=window)
            result = compute(src_array)
            dst.write(result, window=window)

        with concurrent.futures.ThreadPoolExecutor(
            max_workers=num_workers
        ) as executor:
            executor.map(process, windows)

This obviously caused a race condition because reading and writing are not thread safe operations, so the output was filled with GDAL read errors.

So I fixed that by using locks for the read and write operations:

with rasterio.open(infile) as src:
    with rasterio.open(outfile, "w", **src.profile) as dst:
        windows = [window for ij, window in dst.block_windows()]

        read_lock = threading.Lock()
        write_lock = threading.Lock()

        def process(window):
            with read_lock:
                src_array = src.read(window=window)
            result = compute(src_array)
            with write_lock:
                dst.write(result, window=window)

        with concurrent.futures.ThreadPoolExecutor(
            max_workers=num_workers
        ) as executor:
            executor.map(process, windows)

That fixed the race conditions. Computing is now done in parallel, reading and writing is done in a streaming way, and max memory footprint is window_array_size * num_threads.

My questions are:

  • Is this the right way to solve this problem? Am I missing something?
  • If this is the right way, would you accept a PR with a topic write up about this (maybe enhancements in the concurrency topic)?



--
Sean Gillies



--
Sean Gillies