Table of contents
Table of contents
Two days ago Hadley Wickham tweeted a link with introduction to his new package multidplyr. Basically it’s a tool to take advantage of many cores for dplyr operations. Let’s see how to play with it.
What you can do with multidplyr?
As it was described on GitHub website, multidplyr is a library for processing data that is distributed across many cores, with the use of dplyr verbs. The idea is kind of similar to spark. Similar solutions exists for R, and some of them are available for years (like RMPI, distribute, parallel or many other from the list https://cran.r-project.org/web/views/HighPerformanceComputing.html). But the problem with them is that they are made mainly for hackers. It is not that unusual to get an error with 20 lines of traceback without any warning. Packages from Hadleyverse come with nicer design (as sometimes Apple products do), explode less often. With slightly smaller functionality we get more fun. The multidplyr is still in the dev phase and sometimes it can make you really angry. But there are a lot of things that you can do well with it, and often also you can do them really fast. In the multidplyr vignette you will find examples of playing with flights dataset. There are just 300k+ observations so it turns out that the overload related with the data distribution is larger than the computation time. But for larger datasets or for more complicated calculations you should expect some gain from heating of additional cores.Use case
Right now, I am playing with log files from many bizarre devices. But the point is that there is a lot of rows of data (few hundreds of millions) and logs are stored in many many relatively small files. So I use multidplyr to read the data in parallel and do some initial pre-processing. The cluster is built on 15 cores and everything is done in plain R. It turns out that I can reduce the processing time from one day to two hours. So, it is an improvement. Even if you count the time that you need to spend learning the multidplyr (not that much if you know dplyr and spark). Let’s see the example step by step. First, initiate cluster with 15 nodes (one node per one core).cluster = create_cluster(15) ## Initializing 15 core cluster. set_default_cluster(cluster)Find all files with the extension ‘log’. Data is there.
lf = data.frame(files=list.files(pattern = 'log', recursive = TRUE), stringsAsFactors = FALSE)Now, I need to define a function that reads a file and do some preprocessing. This function is then sent to all nodes in the cluster.
readAndExtractTimepoints = function(x) { tmp = readLines(as.character(x)[1]) ftmp = grep(tmp, pattern='Entering scene', value=TRUE) substr(ftmp,1,15) }cluster_assign_value(cluster, ‘readAndExtractTimepoints’, readAndExtractTimepoints) Time to initiate some calculations. The list of file names is partitioned across nodes and for each file the readAndExtractTimepoints is executed. The result is an object of the class party_df (again it’s one file per row).
lf_distr = lf %>% partition() %>% group_by(files) %>% do(timepoints = readAndExtractTimepoints(.$files)) lf_distr ## Source: party_df [897 x 3] ## Groups: PARTITION_ID, files ## Shards: 15 [59--60 rows] ## ## PARTITION_ID files timepoints ## (dbl) (chr) (chr) ## 1 1 2013/01/cnk02a/cnk02a.log ## 2 1 2013/01/cnk02b/cnk02b.log ## 3 1 2013/01/cnk06/cnk06.log ## 4 1 2013/01/cnk07/cnk07.log ## 5 1 2013/01/cnk09/cnk09.log ## 6 1 2013/01/cnk10/cnk10.log ## 7 1 2013/01/cnk100/cnk100.log ## 8 1 2013/01/cnk11/cnk11.log ## 9 1 2013/01/cnk15/cnk15.log ## 10 1 2013/01/cnk16/cnk16.logResults are ready to be collected and transformed into a classical list.
timeP = collect(lf_distr) str(timeP$timepoints) ## List of 897 ## $ : chr [1:144830] "Jan 1 08:15:57 " "Jan 1 18:04:37 " "Jan 1 18:05:44 " "Jan 2 08:15:57 " ... ## $ : chr [1:123649] "Jan 1 08:16:05 " "Jan 2 08:16:05 " "Jan 2 09:46:08 " "Jan 2 09:46:13 " ... ## $ : chr [1:137661] "Jan 1 08:15:57 " "Jan 2 08:15:57 " "Jan 2 09:34:47 " "Jan 2 09:35:45 " ...