Massively parallel processing

Run massive parallel R jobs cheaply

Due to its integration with future you can run massive computing tasks using a Google Compute Engine cluster with just a few lines of code.

Some more examples of using future can be found here, using fractals as an example.

On other platforms, see also an Azure example here on Revolution Analytics.

Remote R cluster

This workflow takes advantage of the future integration to run your local R-functions within a cluster of GCE machines.
You can do this to throw up expensive computations by spinning up a cluster and tearing it down again once you are done.

In summary, this workflow:

  1. Creates a GCE cluster
  2. Lets you perform computations
  3. Stops the VMs

Create the cluster

The example below uses a default rocker/r-parallel template, but you can also create a dynamic_template pulled from the Container Registry if required.

Instead of the more generic gce_vm() that is used for more interactive use, we create the instances directly using gce_vm_cluster().

This creates a cluster, uploads any SSH settings you have and tests the connection, then returns the list of VMs suitable for use in future::cluster().

By default it makes a 3 size cluster called r-cluster-1/2/3:

library(future)
library(googleComputeEngineR)

vms <- gce_vm_cluster()
#2019-03-29 23:24:54> # Creating cluster with these arguments:template = r-base,dynamic_image = rocker/r-parallel,wait = #FALSE,predefined_type = n1-standard-1
#2019-03-29 23:25:04> Operation running...
#2019-03-29 23:25:07> Operation running...
#2019-03-29 23:25:10> Operation running...
#2019-03-29 23:25:17> Operation complete in 13 secs
#2019-03-29 23:25:20> Operation complete in 13 secs
#2019-03-29 23:25:23> Operation complete in 14 secs
#2019-03-29 23:25:25> r-cluster-1 VM running
#2019-03-29 23:25:27> r-cluster-2 VM running
#2019-03-29 23:25:29> r-cluster-3 VM running
#2019-03-29 23:25:37> Public SSH key uploaded to instance
#2019-03-29 23:25:45> Public SSH key uploaded to instance
#2019-03-29 23:25:53> Public SSH key uploaded to instance
#2019-03-29 23:25:53> # Testing cluster:
#Warning: Permanently added '35.233.25.199' (ED25519) to the list of known hosts.
r-cluster-1 ssh working
#Warning: Permanently added '35.187.54.41' (ED25519) to the list of known hosts.
r-cluster-2 ssh working
#Warning: Permanently added '35.205.66.124' (ED25519) to the list of known hosts.
r-cluster-3 ssh working

We now make the VM cluster as per details given in the future README

## make a future cluster
plan(cluster, workers = as.cluster(vms))

You can pass in your own arguments to gce_vm_cluster() such as which docker image to use, name and custom SSH arguments you may have. See the function documentation for details.

Using your own Docker image

The default uses rocker/r-parallel as its image, but if you want your own custom image then create your own Docker image based on that one, for example via this tutorial using Google Build Triggers.

This will give you a docker image name such as gcr.io/my-project/my-r - use a version of the code below to use this in your cluster.

You can also customise the RScript command that launches your script, but always make sure to include --net=host as is shown in the default arguments, so the Docker image uses the SSH ports the host VM has (e.g. it can connect to your SSH commands)

plan(cluster, workers = as.cluster(vms, docker_image="gcr.io/my-project/my-r"))

Using the cluster

The cluster is now ready to recieve jobs. You can send them by simply using %<-% instead of <-. Another useful function is future.apply::future_lapply that lets you loop over a cluster. Consult the future.apply documentation for details.

## use %<-% to send functions to work on cluster
## See future README for details: https://github.com/HenrikBengtsson/future
a %<-% Sys.getpid()

## make a big function to run asynchronously
f <- function(my_data, args){
   ## ....expensive...computations
   
   result
}

## send to cluster
result %<-% f(my_data) 

For long running jobs you can use future::resolved to check on its progress.

## check if resolved
resolved(result)
[1] TRUE

Nested parallelization

(Contributed by Grant McDermott.)

The above setup will parallelize across the VMs in your cluster. However, each VM will still only run tasks sequentially. In order to parallelize tasks on the VMs too, we need to tell our remote cluster (via future) to use a nested parallelization strategy. At a high level this will involve two steps:

  1. Parallelization across the remote VMs in our cluster.
  2. Parallelization within each VM, making sure that we “chunk” the input data appropriately to avoid duplication.

To illustrate, consider an example where we wish to run a slow function across a cluster of three VMs that each have eight cores (yielding 24 cores in total). We again use gce_vm_cluster() to set this up. Note in passing that the default instantiation of the rocker/r-parallel Docker image on each VM is what allows us to run parallel processes on these machines themselves without having to install any additional packages.

library(googleComputeEngineR)
library(future.apply)

## Emulate a slow function that can be sped up in parallel
slow_square <- 
  function(x = 1) {
    x_sq <- x^2 
    Sys.sleep(5)
    return(x_sq)
    }

## Set up our cluster: 3 VMs with 8 cores each
vms_nested <- 
  gce_vm_cluster(
    vm_prefix = "nested-cluster",        
    cluster_size = 3,                    
    #docker_image = "rocker/r-parallel",  ## Default 
    predefined_type = "n1-highcpu-8",     
    scheduling = list(preemptible = TRUE) ## Optional: Use cheaper, preemptible machines
    )

#2019-10-25 12:56:21> # Creating cluster with settings: predefined_type = n1-highcpu-8, scheduling = list(preemptible = TRUE), template = r-base, dynamic_image = rocker/r-parallel, wait = FALSE
#2019-10-25 12:56:29> Operation running...
#2019-10-25 12:56:35> Operation complete in 6 secs
#2019-10-25 12:56:38> Operation complete in 4 secs
#2019-10-25 12:56:42> Operation complete in 6 secs
#2019-10-25 12:56:43> nested-cluster1 VM running
#2019-10-25 12:56:44> nested-cluster2 VM running
#2019-10-25 12:56:46> nested-cluster3 VM running
#2019-10-25 12:56:54> Public SSH key uploaded to instance
#2019-10-25 12:57:01> Public SSH key uploaded to instance
#2019-10-25 12:57:09> Public SSH key uploaded to instance
#2019-10-25 12:57:09> # Testing cluster:
#Warning: Permanently added 'XX.XX.XX.XXX' (ED25519) to the list of known hosts.
#nested-cluster1 ssh working
#Warning: Permanently added 'YY.YY.YYY.YY' (ED25519) to the list of known hosts.
#nested-cluster2 ssh working
#Warning: Permanently added 'ZZ.ZZ.ZZZ.ZZ' (ED25519) to the list of known hosts.
#nested-cluster3 ssh working

Next, we must tweak the cluster plan so that future is aware of the nested parallel structure. Nesting in the future framework is operationalised by defining a series of so-called future “topologies”. In this case, we are going to define two topology layers:

  • Topology 1. The “outer” plan, which tells future to use the cluster of three remote VMs.
  • Topology 2. The “inner” plan, which tells future to use all eight cores on each VM via the multiprocess option.
plan(list( 
  ## Topology 1: Use the cluster of remote VMs 
  tweak(cluster, workers = as.cluster(vms_nested)),  
  ## Topology 2: Use all CPUs on each VM
  tweak(multiprocess)  
  ))

The final thing that we’ll do before using our cluster is defining a convenience function for chunking the input arguments (i.e. data) to our function. Remember, we need to do this or else each VM will duplicate effort by working on the same parts of the problem. R provides lots of ways to split data into chunks, but here’s one that I’ve borrowed from StackOverflow. The advantages of this particular function are that it only requires base R operations and is robust to complications like unequal chunk lengths and different input types (e.g. factors vs numeric).

chunk_func <- function(x, n) split(x, cut(seq_along(x), n, labels = FALSE)) 

We are now ready to use all 24 cores of the cluster. For this particular example, I’m going to loop over 48 iterations of our slow_square() function. (Note that this would take two minutes to run sequentially.) I’ll use future.lapply::future_sapply() to run things in parallel and will also use the tictoc package to record timing. I’m going to comment the next code block quite extensively, but just to quickly highlight the key conceptual stages:

  • First, we’ll parallelize across the three VMs in our cluster via an outer future_sapply() call.
  • Next, we’ll split our input data into chunks so that each VM could work on a separate part of the problem (i.e. VM1 gets 1:16, VM2 gets 17:32, and VM3 gets 33:48)
  • Finally, we’ll parallelize internally so that each VM uses all of its available cores via an inner future_sapply() call.
## Input data (vector to be iterated over by our function)
input_data <- 1:48

## Run the function in nested parallel on our cluster and record timing
tictoc::tic()
ans <-
   ## Parallelise over the three VMS
   future_sapply(seq_along(vms_nested), function(v) {
   
      ## Split the input data into distinct chunks for each VM
      input_chunk <- chunk_func(input_data, length(vms_nested))[[v]] 
   
      ## Parallelise within each of the VMs  
      future_sapply(input_chunk, slow_square)
   })
tictoc::toc()
#11.451 sec elapsed

## Show that it worked
as.vector(ans)
# [1]    1    4    9   16   25   36   49   64   81  100  121  144  169  196  225
#[16]  256  289  324  361  400  441  484  529  576  625  676  729  784  841  900
#[31]  961 1024 1089 1156 1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025
#[46] 2116 2209 2304

And there you have it: a 21x speed up compared to the sequential option. (There’s a tiny bit of overhead, which is why we don’t achieve the full theoretical speed-up of 24x. But those margins will improve as the scale of the problem increases.) The same general approach demonstrated here can be adapted fairly easily to greatly reduce computation times for even complex forms of analysis.

Since we have been using preemptible machines for this example, they will automatically be deleted within 24 hours. However, we’ll delete them manaully since there’s no point incurring additional charges now that we’re done with them.

gce_vm_delete(vms_nested)

More examples

Forecasting a large data set

The below splits a dataset into chunks that are each run on a seperate VMs, using a custom Docker image that has the necessary packages installed, for instance via build triggers. Optimise by including the package future in these Docker images.

library(future.apply) ## Will automatically load future too
library(googleComputeEngineR)

my_docker <- gce_tag_container("custom-image", project = "my-project")

vms <- gce_vm_cluster("r-vm", cluster_size = 3, docker_image = my_docker)
                
## create the future cluster
plan(cluster, 
     workers = as.cluster(vms, 
                          docker_image=my_docker))
                          
## create the list of data to run on the cluster
## here we assume they are in a folder of CSVs
## and there are as many files as VMs to run it upon
my_files <- list.files("myfolder")

my_data <- lapply(my_files, read.csv)

## make a big function to run asynchronously
cluster_f <- function(my_data, args = 4){
   
   forecast::forecast(forecast::auto.arima(ts(my_data, frequency = args)))
   
}

## send to cluster
result <- future.apply::future_lapply(my_data, cluster_f, args = 4) 

## once done this will be TRUE
resolved(result)

## Your list of forecasts are now available
result

Rasters in parallel

This is from @ctlamb’s GitHub issue #93 which uses a custom Dockerfile to install the raster package.

The custom Dockerfile was setup in this GitHub repo then made into an image with these Build Trigger settings:

Make sure the VMs are created in the same project as the build triggers to ensure authentication is smooth.

The example code is shown below, assuming your custom Docker image is available at gcr.io/your-project/raster

library(raster)
library(googleComputeEngineR)
library(future)
library(future.apply)
library(SpaDES.tools)

gce_global_project("your-project")

## create raster
row <- 8
col <- 8
r <- raster(nrows=row, ncols=col,
            xmn=0, xmx=row, 
            ymn=0, ymx=col, 
            vals=c(1:(row*col)))
plot(r)

## Split
r_split <- splitRaster(r, nx=2, ny=2)

## create model
df <- data.frame(y=c(1:10),layer=c(1:5,7,6,8:10))
mod <- glm(y~layer, data=df)


## create CPUs names - here we customise the CPU machine type
vms <- gce_vm_cluster("myvms", predefined_type = "n1-highmem-2")

## once all launched, add to cluster with custom Dockerfile
## use plan(sequential) for local testing
plan(cluster, workers = as.cluster(vms, docker_image=my_image)

## make the vector of stuff to send to nodes
o <- lapply(r_split, readAll)

## the action you want to perform on the elements in the cluster
my_single_function <- function(x){
  raster::predict(x, mod)
}

#parallel - working?
result <- future_lapply(o, my_single_function)

## tidy up
gce_vm_stop(vms)

Cleanup

Remember to shut down your cluster. You are charged per second, per instance of uptime.

## shutdown instances when finished
gce_vm_stop(vms)

# or delete them
gce_vm_delete(vms)

Pre-emptible VMs

Preemptible VMs are a lot cheaper (80%) than normal instances, but Google reserves the right to stop them at any time. They are intended to be used in non-critical jobs where if they shutdown you can account for it and launch another.

To create them, you need to pass scheduling = list(preemptible = TRUE) to gce_vm_create() creation family of functions.

Make sure you can cope with the result may not be returned, so over provision the VMs and ensure your script can deal with redoing jobs if they didn’t complete.

Quotas

You can launch as many VMs as you have quota for in your account. These vary from region, from ~240 to 720. You can apply for more quota if you need it.