multidplyr: first impressions
Two days ago Hadley Wickham tweeted a link with introduction to his new package multidplyr. Basically it’s a tool to take advantage of many cores for dplyr operations. Let’s see how to play with it.
What you can do with multidplyr?
As it was described on GitHub website, multidplyr is a library for processing data that is distributed across many cores, with the use of dplyr verbs.
The idea is kind of similar to spark. Similar solutions exists for R, and some of them are available for years (like RMPI, distribute, parallel or many other from the list https://cran.r-project.org/web/views/HighPerformanceComputing.html). But the problem with them is that they are made mainly for hackers. It is not that unusual to get an error with 20 lines of traceback without any warning.
Packages from Hadleyverse come with nicer design (as sometimes Apple products do), explode less often. With slightly smaller functionality we get more fun.
The multidplyr is still in the dev phase and sometimes it can make you really angry. But there are a lot of things that you can do well with it, and often also you can do them really fast.
In the multidplyr vignette you will find examples of playing with flights dataset. There are just 300k+ observations so it turns out that the overload related with the data distribution is larger than the computation time. But for larger datasets or for more complicated calculations you should expect some gain from heating of additional cores.
Use case
Right now, I am playing with log files from many bizarre devices. But the point is that there is a lot of rows of data (few hundreds of millions) and logs are stored in many many relatively small files. So I use multidplyr to read the data in parallel and do some initial pre-processing. The cluster is built on 15 cores and everything is done in plain R. It turns out that I can reduce the processing time from one day to two hours. So, it is an improvement. Even if you count the time that you need to spend learning the multidplyr (not that much if you know dplyr and spark).
Let’s see the example step by step.
First, initiate cluster with 15 nodes (one node per one core).
cluster = create_cluster(15) ## Initializing 15 core cluster. set_default_cluster(cluster)
Find all files with the extension ‘log’. Data is there.
lf = data.frame(files=list.files(pattern = 'log', recursive = TRUE), stringsAsFactors = FALSE)
Now, I need to define a function that reads a file and do some preprocessing. This function is then sent to all nodes in the cluster.
readAndExtractTimepoints = function(x) { tmp = readLines(as.character(x)[1]) ftmp = grep(tmp, pattern='Entering scene', value=TRUE) substr(ftmp,1,15) }
cluster_assign_value(cluster, ‘readAndExtractTimepoints’, readAndExtractTimepoints)
Time to initiate some calculations. The list of file names is partitioned across nodes and for each file the readAndExtractTimepoints is executed. The result is an object of the class party_df (again it’s one file per row).
lf_distr = lf %>% partition() %>% group_by(files) %>% do(timepoints = readAndExtractTimepoints(.$files)) lf_distr ## Source: party_df [897 x 3] ## Groups: PARTITION_ID, files ## Shards: 15 [59--60 rows] ## ## PARTITION_ID files timepoints ## (dbl) (chr) (chr) ## 1 1 2013/01/cnk02a/cnk02a.log ## 2 1 2013/01/cnk02b/cnk02b.log ## 3 1 2013/01/cnk06/cnk06.log ## 4 1 2013/01/cnk07/cnk07.log ## 5 1 2013/01/cnk09/cnk09.log ## 6 1 2013/01/cnk10/cnk10.log ## 7 1 2013/01/cnk100/cnk100.log ## 8 1 2013/01/cnk11/cnk11.log ## 9 1 2013/01/cnk15/cnk15.log ## 10 1 2013/01/cnk16/cnk16.log
Results are ready to be collected and transformed into a classical list.
timeP = collect(lf_distr) str(timeP$timepoints) ## List of 897 ## $ : chr [1:144830] "Jan 1 08:15:57 " "Jan 1 18:04:37 " "Jan 1 18:05:44 " "Jan 2 08:15:57 " ... ## $ : chr [1:123649] "Jan 1 08:16:05 " "Jan 2 08:16:05 " "Jan 2 09:46:08 " "Jan 2 09:46:13 " ... ## $ : chr [1:137661] "Jan 1 08:15:57 " "Jan 2 08:15:57 " "Jan 2 09:34:47 " "Jan 2 09:35:45 " ...
General impressions
I guess that one can speed up the whole process even further with the use of python or spark. But if the dataset is not huge then it is much easier to maintain a process that is using just a single technology/language.
Overall I like the multidplyr even if it still looks like a prototype. Sometimes things get nasty, like for example when you try to chain few different do() operations. But knowing the ‘Hadley’ effect I expect that it will be better and better with every version.
Finally, soon we should expect a solution for parallel processing that can be used by normal people not only by hackers.