Loops using the foreach package

The foreach package improves the way in which we run loops in R, and provides a construct to run loops in parallel.

The basic structure of loops with the package is:

# Without parallelization --> %do%
output <- foreach(i = 'some object to iterate over', 'options') %do% {some r code}

# With parallelization --> %dopar%
output <- foreach(i = 'some object to iterate over', 'options') %dopar% {some r code}

As a first example, we can use foreach just like a for loop without parallelization

library(foreach)
result <- foreach(x = c(4,9,16)) %do% sqrt(x)
result
## [[1]]
## [1] 2
## 
## [[2]]
## [1] 3
## 
## [[3]]
## [1] 4

Note that, unlike a regular for loop, foreach returns an object (by default a list) that contains the results compiled across all iterations.

We can change the object returned by specifying the function used to combine results across iterations with the .combine option:

result <- foreach(x = c(4,9,16), .combine = 'c') %do% sqrt(x)
class(result)
## [1] "numeric"

Other options for .combine are: cbind, rbind, +, *:

# cbind...
result <- foreach(x = c(4,9,16), .combine = 'cbind') %do% c(sqrt(x), log(x), x^2)
class(result)
## [1] "matrix"
result
##       result.1  result.2   result.3
## [1,]  2.000000  3.000000   4.000000
## [2,]  1.386294  2.197225   2.772589
## [3,] 16.000000 81.000000 256.000000
# rbind
result <- foreach(x = c(4,9,16), .combine = 'rbind') %do% c(sqrt(x), log(x), x^2)
class(result)
## [1] "matrix"
result
##          [,1]     [,2] [,3]
## result.1    2 1.386294   16
## result.2    3 2.197225   81
## result.3    4 2.772589  256
# sum
result <- foreach(x = c(4,9,16), .combine = '+') %do% sqrt(x)
class(result)
## [1] "numeric"
result
## [1] 9

Parallelizing our loops using foreach and doParallel

Before we can parallelize our code, we need to declare a “cluster” – that is, we need to tell R that we have multiple cores – so that R knows how to execute the code. These are the steps involved in this process:

  1. Create the cluster. Note that we need to load the doParallel package to extend the functionality of foreach.
library(doParallel)
## Loading required package: iterators
## Loading required package: parallel
myCluster <- makeCluster(3, # number of cores to use
                         type = "PSOCK") # type of cluster

First, we choose the number of cores we want to use. You can check how many your computer has by running detectCores(). One good rule of thumb is to always leave one core unused for other tasks.

detectCores()
## [1] 4

We can choose between two types of clusters:

  1. Register the cluster with the ‘foreach’ package
registerDoParallel(myCluster)

If you’re running this locally, you can check your Monitor App to see that new instances of R were launched in your computer.

  1. And now we’re ready to use our cluster! We only have to change %do% to %dopar%
output <- foreach(i = 'some object to iterate over', 'options') %dopar% {some r code}

For example:

result <- foreach(x = c(4,9,16), .combine = 'c') %dopar% sqrt(x)
  1. Always remember to stop the cluster when you have finished!
stopCluster(myCluster)

Let’s run some tests to see the improvement in performance. We’ll be using bootstrapping to compute the confidence intervals for a regression coefficient.

d <- read.csv("~/data/UK-tweets.csv", stringsAsFactors=FALSE)
nsims <- 500

# without parallelization

system.time({
r <- foreach(1:nsims, .combine='c') %do% {
  smp <- sample(1:nrow(d), replace=TRUE)
  reg <- lm(log(favourites_count+1) ~ 
              communication + followers_count, data=d[smp,])
  coef(reg)[2]
}})
##    user  system elapsed 
##   4.219   0.042   4.298
quantile(r, probs=c(.025, 0.975))
##      2.5%     97.5% 
## 0.4444164 0.7563428
# with parallelization

myCluster <- makeCluster(3, type = "FORK") # why "FORK"?
registerDoParallel(myCluster)

system.time({
r <- foreach(1:nsims, .combine='c') %dopar% {
  smp <- sample(1:nrow(d), replace=TRUE)
  reg <- lm(log(favourites_count+1) ~ communication + followers_count, data=d[smp,])
  coef(reg)[2]
}})
##    user  system elapsed 
##   0.303   0.053   2.423
stopCluster(myCluster)

quantile(r, probs=c(.025, 0.975))
##      2.5%     97.5% 
## 0.4179464 0.7473848

Why isn’t the total running time 1/ncores the original running time?

Let’s run another example: here we run through all the inaugural presidential speeches in the US and identify the most unique words for each president (something to which we will come back later in the course).

library(quanteda)
## Warning: package 'quanteda' was built under R version 3.4.4
## Package version: 1.3.0
## Parallel computing: 2 of 4 threads used.
## See https://quanteda.io for tutorials and examples.
## 
## Attaching package: 'quanteda'
## The following object is masked from 'package:utils':
## 
##     View
library(readtext)
inaug <- corpus(readtext("~/data/inaugural/*"))
inaugdfm <- dfm(inaug, remove_punct=TRUE)

# regular loop
init <- Sys.time()

speeches <- docnames(inaug)
words <- list()
for (sp in speeches){
  words[[sp]] <- head(textstat_keyness(inaugdfm, target=sp)$feature, n=10)
}

Sys.time() - init
## Time difference of 3.420589 secs
# parallelized loop

myCluster <- makeCluster(3, type = "FORK") # why "FORK"?
registerDoParallel(myCluster)

init <- Sys.time()

words <- foreach(sp = speeches) %dopar% {
  head(textstat_keyness(inaugdfm, target=sp)$feature, n=10)
}

Sys.time() - init
## Time difference of 2.281371 secs
stopCluster(myCluster)