La publicación es muy corta. Mucha gente piensa que la computación en paralelo en R es muy difícil y no es aplicable a sus tareas actuales.
Si y no. Si no entra deliberadamente en la teoría, el hardware y todo tipo de detalles, puede dibujar "3 y 1/2" de recetas casi universales. Estos ejemplos son deliberadamente similares a tareas productivas, y no un par de líneas castradas de sintéticos.
Es una continuación de una serie de publicaciones anteriores .
Paquetes usados
Cargando paquetes
library(tidyverse)
library(magrittr)
library(stringi)
library(glue)
library(dqrng)
library(iterators)
library(future)
library(foreach)
library(doFuture)
library(tictoc)
library(futile.logger)
library(lgr) # `lgr`
library(hrbrthemes)
Patrones de paralelización
Patrón 1. Paralelización de cálculos tidyverse
Situación. Hay un script que contiene muchas canalizaciones para tidyverse
.
Tarea de ejemplo. Calculemos el promedio de la suma de los cuadrados de los números. Para mejorar la eficiencia de la computación en paralelo, es importante reducir la cantidad de transferencia de datos entre subprocesos. Usamos el paquete furrr .
canalización `tidyverse`
registerDoFuture()
# future::plan(multiprocess)
workers <- parallel::detectCores() - 1
future::plan(multisession, workers = workers)
num_row <- 1:10^6
ff_seq <- function(x) x^2
ff_par <- function(x) mean(x^2)
tic(" ")
lst1 <- num_row %>%
purrr::map_dbl(ff_seq) %>%
mean()
toc()
tic(" , 1")
lst2 <- num_row %>%
furrr::future_map_dbl(ff_seq) %>%
mean()
toc()
tic(" , 2")
lst2 <- num_row %>%
split(cut(seq_along(.), workers, labels = FALSE)) %>%
furrr::future_map_dbl(ff_par) %>%
mean()
toc()
Naturalmente, el resultado depende de la plataforma de hardware y el sistema operativo en el que se ejecuta todo. En una ejecución de prueba, tengo este diseño:
: 7.23 sec elapsed
, 1: 3.43 sec elapsed
, 2: 0.64 sec elapsed
Windows y Linux son bastante diferentes en cómo se paralelizan. Linux en producción es muy preferido sobre Windows.
Patrón 2. Paralelización manual local
. . , . , %<-%
.
# , 20 10^5
nn <- 10^5
tic("Generating sample data.frame")
df <- 100 %>%
# stri_rand_strings(length = 10, pattern = "[a-z]") %>%
sample(10^4:10^5, .) %>%
sample(20 * nn, replace = TRUE) %>%
matrix(byrow = TRUE, ncol = 20) %>%
as_tibble(.name_repair = "universal") %>%
mutate(user_id = as.character(sample(1:as.integer(nn/10), n(), replace = TRUE))) %>%
#
mutate(ver = sample(1:20, n(), replace = TRUE)) %>%
select(user_id, ver, everything())
toc()
#
demo_fpath <- here::here("temp", "demo_data.xlsx")
openxlsx::write.xlsx(df, demo_fpath, asTable = TRUE)
plan(multisession, workers = parallel::detectCores() - 2)
# plan(sequential)
# https://github.com/HenrikBengtsson/future
# , 2
tic(" ")
tic(" ")
res_lst <- list()
for (j in 1:6) {
res_lst[[j]] <- { readxl::read_excel(demo_fpath) %>% head(5)}
}
toc()
seq_df <- bind_rows(res_lst)
toc()
tic(" ")
tic(" ")
df1 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df2 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df3 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df4 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df5 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df6 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
toc()
par_df <- bind_rows(df1, df2, df3, df4, df5, df6)
toc()
all_equal(seq_df, par_df)
. . :
: 46.23 sec elapsed
: 37.82 sec elapsed
3.
. , , .
.
. $C_n^k$
. .
.
#
flog_logname <- here::here("log", "job_futile.log")
lgr_logname <- here::here("log", "job_lgr.log")
initLogging <- function(log_file){
lgr <- get_logger_glue("logger")
lgr$set_propagate(FALSE)
lgr$set_threshold("all")
lgr$set_appenders(list(
console = AppenderConsole$new(
threshold = "info"
),
file = AppenderFile$new(
file = log_file,
threshold = "all"
)
))
lgr
}
invisible(flog.appender(appender.tee(flog_logname)))
invisible(flog.threshold(INFO))
lgr <- initLogging(lgr_logname)
"Start batch processing" %T>%
flog.info() %T>%
lgr$info()
#
# https://github.com/HenrikBengtsson/doFuture
# https://cran.r-project.org/web/packages/future/vignettes/future-1-overview.html
registerDoFuture()
# future::plan(multiprocess)
future::plan(multisession, workers = parallel::detectCores())
# future::plan(sequential)
# plan(future.callr::callr)
tic("Batch processing")
start_time <- Sys.time()
foreach(it = iter(jobs_tbl, by = "row"), .export = c("start_time"),
# .packages = 'futile.logger',
.verbose = FALSE, .inorder = FALSE, .errorhandling = "remove") %dopar% {
start <- Sys.time() - start_time
#
flog.appender(appender.tee(flog_logname))
lgr <- initLogging(lgr_logname)
res <- arrangements::npermutations(k = it$k, n = it$n, bigz = TRUE)
# https://www.jottr.org/2020/11/06/future-1.20.1-the-future-just-got-a-bit-brighter/
message(" message from thread")
glue("Step {it$idx_str} finished. RAM used {capture.output(pryr::mem_used())}.",
"PID: {Sys.getpid()}",
"Elapsed {round(difftime(Sys.time(), start_time, units = 'mins'), digits = 2)} min(s) ----------->",
.sep = " ") %T>%
flog.info() %T>%
lgr$info()
#
return(list(pid = Sys.getpid(), start = start, finish = Sys.time() - start_time))
} -> output_lst
flog.info("Foreach finished")
checkmate::assertList(output_lst, any.missing = FALSE, null.ok = FALSE, min.len = 1)
output_tbl <- dplyr::bind_rows(output_lst)
# rm(output_lst)
# --------------
future::plan(sequential)
gc(reset = TRUE, full = TRUE)
flog.info(capture.output(toc()))
() () . windows.
.
#
output_tbl %>%
mutate_at("pid", as.factor) %>%
mutate_at(vars(start, finish), as.numeric) %>%
ggplot(aes(start, pid, colour = pid)) +
geom_point(size = 3, alpha = .7) +
geom_point(aes(x=finish), shape = 4, size = 3, colour = "black") +
geom_vline(aes(xintercept = start, colour = pid), lty = "dashed", alpha = 0.7) +
ggthemes::scale_fill_tableau("Tableau 10") +
theme_ipsum_rc() +
xlim(c(0, 5))
, , , . , « »:
(worker) . (, , …), . () .
, core - 1, . , reduce , . .
.
, . , , ( , , API ..). .
, . .
Para una serie de tareas relacionadas con solicitudes largas y sincrónicas de sistemas externos (los representantes típicos son REST API / Web scrapping), puede crear muchas más calculadoras que los núcleos disponibles. Todavía cuelgan la mayor parte del tiempo en modo de espera. Se puede ejecutar como procesos de SO separados configurando el backend apropiado.
registerDoFuture();
plan(future.callr::callr).
Esta es la mitad restante de la receta.
Publicación anterior - "¿Matices de las soluciones R operativas en un entorno empresarial?" ...