Parallelization of R code

Introduction

To make use of multi-core processors, or even only hyperthreading, you have to write your code in such a way that the blocks that are to be run in parallel can be executed in any order.

Blocks modifying the same object, i.e. file

One obstacle I found in my own code was that blocks loaded and saved the same file, which would, when run in parallel, result in that only the changes made by the block that finished last would be saved. Here is an example of this:

block.one <- function() {
  load("foo.RData")
  foo <- cbind(foo, add_something(foo))
  save(foo, file = "foo.RData")
}
block.two <- function() {
  load("foo.RData")
  foo <- cbind(foo, add_something_else(foo))
  save(foo, file = "foo.RData")
}
block.one()
block.two()

If we would run block.one() and block.two() in parallel, the algorithm would not be deterministic. This kind of code must be rewritten so that each block get foo as input and returns its unique additions. Another, super block, should merge the results back to foo.

block.one <- function(foo) {
  return(add_something(foo))
}
block.two <- function(foo) {
  return(add_something_else(foo))
}
block.super <- function(){
  library(multicore)
  load("foo.RData")
  my.one.job <- parallel(block.one(foo))
  my.two.job <- parallel(block.two(foo))
  foo <- cbind(foo, collect(my.one.job)[[1]], collect(my.two.job)[[1]])
  save(foo, file = "foo.Rdata")
}
block.super()

Let's test this with some real data. For this test, we leave out the loading and saving to disk, because they would dominate the results. If they would have been included, the parallelized version would have come out even better.

foo <- log(sample(1E7))
## Parallelized version
block.one <- function(foo) {
  return(cos(foo) + tan(foo) - var(log(foo) * sin(foo)))
}
block.two <- function(foo) {
  return(sin(foo) + log(foo) - var(tan(foo) * cos(foo)))
}
block.super <- function(){
  library(multicore)
  ## load("foo.RData")
  my.one.job <- parallel(block.one(foo))
  my.two.job <- parallel(block.two(foo))
  foo <- cbind(foo, collect(my.one.job)[[1]], collect(my.two.job)[[1]])
}
## Serial version, without disk I/O
block.one.serial <- function(foo) {
  foo <- cbind(foo, cos(foo) + tan(foo) - var(log(foo) * sin(foo)))
}
block.two.serial <- function(foo) {
  foo <- cbind(foo, sin(foo) + log(foo) - var(tan(foo) * cos(foo)))
}
system.time(block.super())
   user  system elapsed
  7.392   0.896   4.444
system.time(block.one.serial(foo))
   user  system elapsed
  3.308   0.056   3.362
system.time(block.two.serial(foo))
   user  system elapsed
  3.364   0.052   3.414

So, 4.4 vs 3.4 + 3.4 gives a speedup with about 35 percent, without considering the time spent on disk I/O. If you have more than two cores, and more than two blocks that could be run in parallel, the decrease in execution time will of course be larger.

In a real world example with three blocks and including saved time on disk I/O I got the following:

system.time(my.serial.func())
   user  system elapsed
  8.572   0.188   8.783
system.time(my.parallel.func())
   user  system elapsed
  2.988   0.136   3.593

comments powered by Disqus


Back to the index

Blog roll

R-bloggers, Debian Weekly
Valid XHTML 1.0 Strict [Valid RSS] Valid CSS! Emacs Muse Last modified: oktober 17, 2019