Parallelization =============== Introduction ------------ Many computations on ``SpatRaster`` (and to a lesser extent ``SpatVector``) objects are slow because of the large (file) size of the raster. The computations are also embarrassingly parallel: the same operation is applied to many cells, many features, or many tiles, and the work for each piece is independent. There are several ways to take advantage of multiple cores and/or multiple machines in a cluster when working with terra, and the right one depends on (a) the nature of the operation, (b) whether you are on a single computer or on a cluster, and (c) whether the data are in memory, in a single file, or scattered across many files. In some cases parallelization is essential and easy — for example when processing many files on a cluster, or for tasks that are repeated often enough that the setup cost is amortised. In other cases the gain is marginal and not worth the effort, because setting up a parallel workflow can take a while (sometimes making it slower than a sequential process!), introduce subtle errors, and complicate debugging. It pays to first time a single-threaded run, estimate the best-case speed-up, and only then decide whether to parallelize. Sometimes it makes more sense to let the computer run and have some fun while it is working for you. This chapter discusses the available approaches and their trade-offs for parallelization support in terra. The constraint: terra objects hold a C++ pointer ------------------------------------------------ ``SpatRaster``, ``SpatVector``, ``SpatRasterDataset``, ``SpatRasterCollection``, ``SpatExtent`` and ``SpatGraticule`` are *thin* R objects: each holds an external pointer to an underlying C++ object that lives in the memory of the R session that created it. This helps wrinting fast code, but as a consequence, these objects **cannot be serialized** or sent to a worker process with functions like ``parallel::clusterExport`` / ``parLapply`` / ``mclapply`` / ``future`` directly. After serialization the pointer would be a dangling address in the worker’s memory. If you try, the worker either errors out, returns ``NULL``, or crashes: .. code:: r library(terra) library(parallel) r <- rast(system.file("ex/elev.tif", package="terra")) cl <- makeCluster(2) parLapply(cl, 1:2, function(i) global(r, "mean", na.rm=TRUE)) # FAILS This is not a show stopper, and the next sections describe four strategies to work around this limitation. Strategy 1 — ``wrap()`` and ``unwrap()`` ---------------------------------------- ``wrap()`` converts a terra object into a regular R object (called ``PackedSpatRaster``, ``PackedSpatVector``, etc.) that holds the data and the metadata in plain R memory. Such packed objects can be sent to workers, or returned across ``future`` boundaries. On the worker side, ``unwrap()`` reconstructs a usable ``SpatRaster`` / ``SpatVector``. The terra version of ``saveRDS`` writes such packed objects to disk. This example does the same thing four times, (not something you would want to do in practise!), in parallel. .. code:: r library(terra) library(parallel) r <- rast(system.file("ex/elev.tif", package="terra")) pr <- wrap(r, proxy=FALSE) # serializable cl <- makeCluster(4) clusterEvalQ(cl, library(terra)) # workers need the package loaded results <- parLapply(cl, 1:4, function(i, p) { x <- unwrap(p) # rebuild the SpatRaster on the worker global(x, "sum", na.rm=TRUE) }, p = pr) stopCluster(cl) unlist(results) global(r, "sum", na.rm=TRUE) It most cases it is important to use ``wrap`` argument ``proxy=FALSE`` to avoid that ``wrap()`` reads all values into memory, as that may not work with large rasters. This assumes that the workers can read from the same shared file system as the parent process. Strategy 2 — pass filenames, open them on the worker ---------------------------------------------------- When the data live in a file (GeoTIFF, NetCDF, …) the cheapest “serialization” is the filename. Each worker creates its own ``SpatRaster`` from the file(s) it gets. terra is thread-safe at the file-open level (each worker gets its own dataset handle), and only the grid cells that are actually needed by a worker are read from disk. .. code:: r fname <- system.file("ex/elev.tif", package="terra") cl <- makeCluster(4) clusterEvalQ(cl, library(terra)) results <- parLapply(cl, 1:4, function(i, f) { r <- rast(f) # cheap: only metadata is loaded global(r, "mean", na.rm=TRUE) }, f = fname) stopCluster(cl) str(results) This pattern is clearly better than ``wrap()`` whenever: - the dataset is too large to comfortably fit in worker memory, - you have many files (one per worker, or one per task), or - you are on an HPC cluster where all nodes can see the same shared file system. Strategy 3 — partition by tiles ------------------------------- terra faciliates this approach with the ``tile_apply()`` method. In its simplest form: .. code:: r library(terra) r <- rast(system.file("ex/elev.tif", package="terra")) out <- tile_apply(r, function(x) x * 2) out minmax(r * 2) splits ``r`` into a small number of tiles, applies ``fun`` to each (with ``window()`` set under the hood, so no values are copied), writes every per-tile result to a temporary GeoTIFF, and returns a ``SpatRaster`` backed by a VRT that stitches those files together. No cell values cross the worker boundary; the parent process only sees filenames. Picking tiles automatically ~~~~~~~~~~~~~~~~~~~~~~~~~~~ The default tile size is chosen to (a) align with the source’s GDAL block size when one is reported, (b) fit ``cores`` workers concurrently inside ``terraOptions("memfrac")`` of free RAM, and (c) leave a few tiles per worker for load balancing. A larger number of ``cores`` therefore produces smaller tiles automatically: .. code:: r out <- tile_apply(r, function(x) focal(x, w=21, fun="mean"), cores=8) You can override the default by passing ``tiles =`` (a ``SpatExtent``, a list of them, a 4-column matrix of extents, a template ``SpatRaster``/``SpatVector``, or one or two integers giving rows/columns *per tile*); see ``?getTileExtents``. Edge effects: ``buffer =`` for ``focal``-style operations ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ For neighbourhood operations like ``focal``, processing each tile in isolation produces erroneous results at tile boundaries (cells near the seam see fewer neighbours than they should). Pass ``buffer =`` to read the additional number of rows/coluymns that are needed around each tile. The function then runs ``fun`` on the enlarged region, and crops the result back to the original tile before writing. Per-tile outputs stay non-overlapping. .. code:: r out <- tile_apply(r, function(x) focal(x, w=11, fun="mean"), buffer=5, cores=4) ``buffer`` is only honoured with the auto-tile path; if you supply ``tiles =`` yourself, build the overlap into the tiles with ``getTileExtents(..., buffer=)`` and pass ``overlap_fun =`` (e.g. ``"mean"``) so the assembly uses ``mosaic()`` to blend the overlap. Memory safety ~~~~~~~~~~~~~ Workers (and the sequential code path when ``cores = 1``) write each tile result straight to disk, so the peak RAM in any one process is just one tile’s worth of values — independent of how big the raster is. The assembly is either a ``vrt()`` (free, no second pass over the data) or a ``mosaic()`` that streams through terra’s standard chunked I/O. Writing the final result ~~~~~~~~~~~~~~~~~~~~~~~~ Pass ``filename =`` to materialise the assembled raster on disk in one step: .. code:: r tile_apply(rast("/data/big.tif"), function(x) terrain(x, "slope"), cores=8, buffer=1, filename="/data/slope.tif", wopt=list(datatype="FLT4S")) When ``filename`` is empty, the per-tile files are kept for the rest of the session because they back the returned VRT; when it is set, the intermediates are removed automatically once the output file is written. Doing it by hand ~~~~~~~~~~~~~~~~ ``tile_apply()`` is just a convenience over the manual pattern below that uses ``window(x) <- ext(...)`` to set *read windows*: subsequent operations on ``x`` behave as if ``x`` had only that extent, but no data are copied or moved. This is the natural way to split a large raster into tiles that are processed independently and then assembled back together. Build it yourself when you need bespoke control (e.g. a different back-end or per-tile bookkeeping): .. code:: r fname <- system.file("ex/elev.tif", package="terra") r <- rast(fname) tiles <- getTileExtents(r, 50, buffer=5) cl <- makeCluster(4) clusterEvalQ(cl, library(terra)) files <- parLapply(cl, asplit(tiles, 1), function(e, f) { x <- rast(f) window(x) <- ext(e) out <- focal(x, w=11, fun="mean") fout <- tempfile(fileext=".tif") writeRaster(out, fout) fout }, f = fname) stopCluster(cl) result <- mosaic(sprc(lapply(files, rast)), fun="mean") # blend the overlap Strategy 4 — exchange raw values -------------------------------- For some workflows it is simplest to step out of the terra type system entirely: extract the cell values as a plain R object (a ``numeric`` vector, ``matrix``, or ``array``), send those numbers to the workers, do the computation, ship the numbers back, and put them back in a ``SpatRaster`` on the parent. For rasters that fit comfortably in memory you can extract everything at once with ``values()`` (returns a matrix with one column per layer) or ``as.array()`` (returns a 3-D array ``[row, col, layer]``), split the data along whichever dimension makes sense, process the pieces in parallel, and assign the result back with ``values(x) <- ...``. .. code:: r r <- rast(system.file("ex/elev.tif", package="terra")) v <- values(r) # plain matrix chunks <- split(seq_len(nrow(v)), cut(seq_len(nrow(v)), 4)) cl <- makeCluster(4) parts <- parLapply(cl, chunks, function(i, v) { sqrt(v[i, , drop=FALSE]) # any vectorized R code }, v = v) stopCluster(cl) out <- rast(r) values(out) <- do.call(rbind, parts) For rasters that do *not* fit in memory the same idea applies, but you process one block at a time using terra’s chunking machinery (``readStart`` / ``readValues`` / ``writeStart`` / ``writeValues`` / ``writeStop``). Each block read returns a numeric vector; you can dispatch that vector to a worker, receive the result, and write it straight to the output file. The workers never see a ``SpatRaster``, only numbers. .. code:: r r <- rast("/data/big.tif") out <- rast(r) b <- writeStart(out, "/data/big_out.tif", overwrite=TRUE) readStart(r); on.exit(readStop(r), add=TRUE) cl <- makeCluster(4) on.exit(stopCluster(cl), add=TRUE) # read all blocks, dispatch them in parallel, write the results in order blocks <- lapply(seq_len(b$n), function(i) readValues(r, b$row[i], b$nrows[i])) results <- parLapply(cl, blocks, function(v) sqrt(v)) for (i in seq_len(b$n)) writeValues(out, results[[i]], b$row[i], b$nrows[i]) writeStop(out) This approach is attractive when: - the per-cell computation is a fast vectorized R expression, - you want to use a non-terra parallel back-end (``future``, ``mirai``, ``BiocParallel``, …) without adding a ``wrap()``/``unwrap()`` step, - the workers should not need terra installed at all. For very large rasters you can combine it with Strategy 3 (read the values from a windowed ``SpatRaster``) to keep memory bounded while still avoiding any ``wrap()`` round-trip. Function-level parallelism: the ``cores`` argument -------------------------------------------------- Some functions that apply a user-supplied R function take a ``cores`` argument and use ``parallel::makeCluster`` (PSOCK) under the hood. Currently this includes the following functions: ``app``, ``tapp``, ``lapp``, ``predict``, ``interpolate``, ``regress``, ``aggregate``, and ``focal`` .. code:: r r <- rast(system.file("ex/elev.tif", package="terra")) out <- app(r, fun=function(v) median(v, na.rm=TRUE), cores=4) With these functions, terra creates the cluster, ships the function (and any extra named arguments) to each worker, and processes the raster block-by-block with the workers in parallel. For ``predict`` you can also pass ``cpkgs=`` to preload packages on each worker, which is necessary when the prediction function comes from a package such as **randomForest** or **mgcv**. built-in functions ~~~~~~~~~~~~~~~~~~ Note that with built-in functions that can be supplied as a character value such as (``"sum"``, ``"mean"``, ``"min"``, ``"max"``, ``"modal"``, “first”\ ``) the``\ cores\` argument is ignored because terra uses optimized C++ implementations and parallelism is controlled via TBB (as long as terraOptions()$parallel . Important caveats: ~~~~~~~~~~~~~~~~~~ - Extra ``...`` arguments **must be named** when ``cores > 1``, because the workers receive them by name through ``clusterExport``. - The function ``fun`` is serialized to each worker; it should not capture large objects from its enclosing environment, and any terra objects it uses must be passed in via ``wrap()`` or as filenames. - The cluster is created and torn down per call and that takes time. For repeated calls with many small tasks create your own cluster and pass that to argument ``cores`` Within-process parallelism: TBB ------------------------------- A growing number of terra’s C++ code paths are parallelized internally with `Intel TBB `__. When TBB is available at build time (which is the default on Windows and on most Linux distributions) you can opt into TBB-based threading by setting the ``parallel`` write option (this is the default in the version >= 1.9-21): .. code:: r # is TBB available? terra:::.have_TBB() # per call focal(r, w=21, fun="mean", wopt=list(parallel=TRUE)) # session-wide default terraOptions(parallel=TRUE) focal(r, w=21, fun="mean") By default TBB schedules across all logical CPU cores. To cap the number of threads (for example to leave room for other processes, or to keep a forked worker from spawning hundreds of children), set the ``threads`` option. ``threads = 0`` (the default) means “no cap, use the TBB default”. Any positive value caps the number of threads at that count. The option is honoured by every TBB-parallel kernel in terra and is also forwarded to GDAL’s warp engine (``project()``, ``resample()``, ``warp()``). .. code:: r terraOptions(parallel=TRUE, threads=4) # use at most 4 threads focal(r, w=21, fun="mean") # capped at 4 terraOptions(threads=0) # back to "all CPUs" Functions that currently benefit from TBB include parts of ``arith``, distance calculations on ``SpatVector``, and ``focal`` with internal fnctins such as ``mean`` and ``sum``. TBB has no per-call setup cost and shares the process memory, so it is the most efficient way to use multiple cores **on a single machine** for one operation. It can be combined with Strategy 2 or 3 to use multi-core processing across machines. Single computer vs. HPC cluster ------------------------------- The right choice depends on where the cores are. One computer, multiple cores ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ In rough order of preference: 1. **TBB** (``wopt=list(parallel=TRUE)``) — zero setup cost, no serialization, lowest overhead. Use whenever the operation supports it. 2. **``cores=`` argument** in ``app``/``lapp``/``tapp``/``predict``/etc. — for user-defined R functions on a single raster. 3. **``tile_apply(x, fun, cores=, buffer=)``** — when one operation on one large raster can be split spatially. Particularly effective for ``focal``, ``aggregate``, and ``predict`` on rasters that do not fit in memory. 4. **Manual ``parallel::parLapply`` with ``wrap()`` or filenames** — for custom workflows that loop over rasters, layers, models, or tiles, or when you need a non-PSOCK back-end. 5. **Raw values via ``values()`` / ``readValues``** — when the per-cell computation is a fast vectorized R expression and you would rather not move terra objects across process boundaries at all. ``mclapply`` and ``future::multicore`` (forked workers) work too on macOS and Linux. They share memory copy-on-write so ``wrap()`` is unnecessary, but the C++ pointer must still be valid on the worker, which it is for forked processes that did not modify the object after the fork. Avoid them on Windows and in RStudio (they fall back to sequential). HPC cluster of nodes (each with multiple cores) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ On an HPC system you typically combine **two levels** of parallelism: - **Across nodes**: one R process per node (or per task), launched via the scheduler (SLURM, PBS, …) using ``Rscript``, ``batchtools``, ``future.batchtools``, MPI (via ``Rmpi``/``pbdMPI``), or simply by partitioning input filenames or tiles across array jobs. - **Within node**: TBB (set ``terraOptions(parallel=TRUE)`` at the top of the worker script), ``cores=`` for user functions, and/or ``tile_apply()`` for tiled neighbourhood operations. The most reliable inter-node “serialization” of raster data is **the filename on a shared file system**. Build the SpatRaster on each node from the path; never ``wrap()`` and ship raster values across the network if the file is reachable. .. code:: r # slurm_array.R, dispatched as one task per tile args <- commandArgs(trailingOnly=TRUE) i <- as.integer(args[1]) library(terra) terraOptions(parallel=TRUE) # use TBB within this node r <- rast("/shared/dem.tif") window(r) <- tiles[[i]] # tiles defined in a shared script out <- focal(r, w=11, fun="mean") writeRaster(out, sprintf("/shared/out/tile_%03d.tif", i)) Then mosaic the tiles in a final, single-process step.