Spark + R = SparkR
Spark wins more and more hearts. And no wonder, comments from different sources tell us about a significant speed up (by an order of magnitude) for analysis of big datasets. Well-developed system for caching objects in memory allows us to avoid torturing hard discs during iterative operations performed on same data.
Applications in Spark had to be written in Java, Scala or Python and I saw this fact as a slight drawback of this platform. These programming languages are really nice but I use some advance statistical algorithms available only in R and I do not feel like rewriting them into Python nor Java.
Fortunately, now we have a connector for R, which allows to seamlessly integrate R and Spark. This connection sprang up over a year ago on the initiative of Shivaram Venkataraman from Berkeley. He received support from many people who joined his project of development of SparkR package (see github http://amplab-extras.github.io/SparkR-pkg/)
Today I would like to share my first impressions after using that package.
Use-case
‘The problem’ seemed to be an ideal match for the processing profile of Spark. Briefly: I have MAAAAANY short time series (each consisting of several hundred observations) and I want to create a regression model for each series. Then I would like to employ ARIMA-type model for the residuals to obtain better predictions.
I started my test using a local standalone installation including only 1 master and 1 worker (which is a toy really) each with 8GB RAM. The test version of Spark is in fact a pre-built version for hadoop 2.3 see more details here: https://spark.apache.org/downloads.html.
At the beginning I tried to work on MapR package (from http://doc.mapr.com/display/MapR/MapR+Sandbox+for+Hadoop where you can download preinstalled Hadoop from MapR along with the whole zoo) but for some reason this version did not work satisfactorily. As a result I used only plain Spark on a local system of files. After all my main goal was to test the SparkR.
The beginnings were difficult. SparkR often throw exceptions and stack-traces from Java did not explain much either. Something rasped somewhere in that mechanism.
Documentation for SparkR package is rather mediocre so it took a while to guess what are limitations for the key and value pair (basic Spark structure) and what happens when lists contain more than two slots.
Similar problems were caused by the fact that functions in SparkR package are very naughty and collide with functions of other packages, such as dplyr package for example. If you load dplyr package after SparkR package, then function collect (overloaded for S3 class) from dplyr will overwrite collect (overloaded for S4 class) function from SparkR. We will get a non informative error.
But, after the phase of errors came the time when things actually started working. The basic structure on which you work in Spark are pairs key-value. It is really convenient that both key and value may be an object of any type or structure. This allows you to transmit as a value a data.frame, regression model or any other R objects.
It is also convenient that just a few lines of code are enough to launch R code on Spark. Most initiations, serializations and deserializations are hidden in the SparkR package.
I managed to perform the planned use-case for over a dozen lines of code which I present below having removed the unnecessary details. Once you get through the initial difficulties you discover that using SparkR is really very pleasant.
Few comments related to the attached code:
sparkR.init() function initiates a connection with Spark backend, textFile() creates a connection with the data source whether it is in hdfs or in the local directory structure.
flatMap() function is used to convert the collection of lines into collection of vectors of words. As result it produces a vector of words (or, to be more precise, a list with one vector, a kind of Spark-type format).
You can launch lappy function on this object, which will be applied to each vector separately (remotely on Spark). This function should extract the most important elements from the vectors of words and save them as a list consisting of two elements –key and value. In my data each row is one measurement of the demand for the X property of Y object in time T. As a key I chose column with X id and as a value I saved the list with Y and T.
groupByKey() function reshuffles the data to group pairs with the same key (the same X id).
Next, map() processes the groups. The received argument is a list with key as the first element and list of values (here: list of vectors) as the second element. It is fairly easy to transform such a list into a table with data and to perform predictions for it.
You may use collect() function to download results into R.
The whole procedure worked out. Unfortunately, standalone version of Spark linked through SparkR is tragically tragically tragically slow.
Now I’m going to test a bigger real Spark cluster and I hope that change of backend will considerably speed up data processing.
# install_github("amplab-extras/SparkR-pkg", # subdir="pkg") library(SparkR) # you need to have a properly configured Spark installation sc <- sparkR.init() # read data from local disk linesRDD <- textFile(sc, "data.txt") # preprocessing of text file, split by t, return list of columns words <- flatMap(linesRDD, function(line) { list(strsplit(line, split = "t")[[1]]) }) # now extract keys and values demandWithKey <- lapply(words, function(word) { list( ---here-key---, ---here-value--- }) # group by key groups <- groupByKey(demandWithKey, 1L) # do the mapping mapRes <- map(groups, function(x) { # x[[1]] is the key # x[[2]] is the list of values # write your logic here }) # collect results, download them to R collect(mapRes)
Przemyslaw Biecek