| Title: | Lightweight, General-Purpose Data Analysis Pipelines |
|---|---|
| Description: | A lightweight yet powerful framework for building robust data analysis pipelines. With 'pipeflow', you initialize a pipeline with your dataset and construct workflows step by step by adding R functions. You can modify, remove, or insert steps and parameters at any stage, while 'pipeflow' ensures the pipeline's integrity. Overall, this package offers a beginner-friendly framework that simplifies and streamlines the development of data analysis pipelines by making them modular, intuitive, and adaptable. |
| Authors: | Roman Pahl [aut, cre] |
| Maintainer: | Roman Pahl <[email protected]> |
| License: | GPL-3 |
| Version: | 0.2.3.9005 |
| Built: | 2026-05-13 08:08:10 UTC |
| Source: | https://github.com/rpahl/pipeflow |
A pipeline consists of a series of steps, which usually
are added one by one. Each step is made up of a function computing
something once the pipeline is run. This function can be an existing
R function (e.g. mean()) or an anonymous/lambda function specifically
defined for the pipeline. One useful feature is that function
parameters can refer to results of earlier pipeline steps using the
syntax x = ~earlier_step_name - see the Examples for more details.
pipe_add( pip, step, fun, params = list(), description = "", group = step, tags = character() )pipe_add( pip, step, fun, params = list(), description = "", group = step, tags = character() )
pip |
|
step |
|
fun |
|
params |
|
description |
|
group |
|
tags |
|
returns the Pipeline object invisibly
# Add steps with lambda functions p <- pipe_new("myPipe", data = 1) pipe_add(p, "s1", \(x = ~data) 2*x) # use input data pipe_add(p, "s2", \(x = ~data, y = ~s1) x * y) try(pipe_add(p, "s2", \(z = 3) 3)) # error: step 's2' exists already try(pipe_add(p, "s3", \(z = ~foo) 3)) # dependency 'foo' not found p # Add step with existing function p <- pipe_new("myPipe", data = c(1, 2, NA, 3, 4)) try(pipe_add(p, "calc_mean", mean)) # default value for x is missing pipe_add(p, "calc_mean", mean, params = list(x = ~data, na.rm = TRUE)) p |> pipe_run() |> pipe_get_out("calc_mean") # Step description p <- pipe_new("myPipe", data = 1:10) pipe_add(p, "s1", \(x = ~data) 2*x, description = "multiply by 2") print(p, verbose = TRUE) # print all columns including description # Group output p <- pipe_new("myPipe", data = data.frame(x = 1:2, y = 3:4)) pipe_add(p, "prep_x", \(data = ~data) data$x, group = "prep") pipe_add(p, "prep_y", \(data = ~data) (data$y)^2, group = "prep") pipe_add(p, "sum", \(x = ~prep_x, y = ~prep_y) x + y) p |> pipe_run() |> pipe_collect_out()# Add steps with lambda functions p <- pipe_new("myPipe", data = 1) pipe_add(p, "s1", \(x = ~data) 2*x) # use input data pipe_add(p, "s2", \(x = ~data, y = ~s1) x * y) try(pipe_add(p, "s2", \(z = 3) 3)) # error: step 's2' exists already try(pipe_add(p, "s3", \(z = ~foo) 3)) # dependency 'foo' not found p # Add step with existing function p <- pipe_new("myPipe", data = c(1, 2, NA, 3, 4)) try(pipe_add(p, "calc_mean", mean)) # default value for x is missing pipe_add(p, "calc_mean", mean, params = list(x = ~data, na.rm = TRUE)) p |> pipe_run() |> pipe_get_out("calc_mean") # Step description p <- pipe_new("myPipe", data = 1:10) pipe_add(p, "s1", \(x = ~data) 2*x, description = "multiply by 2") print(p, verbose = TRUE) # print all columns including description # Group output p <- pipe_new("myPipe", data = data.frame(x = 1:2, y = 3:4)) pipe_add(p, "prep_x", \(data = ~data) data$x, group = "prep") pipe_add(p, "prep_y", \(data = ~data) (data$y)^2, group = "prep") pipe_add(p, "sum", \(x = ~prep_x, y = ~prep_y) x + y) p |> pipe_run() |> pipe_collect_out()
When appending, pipeflow takes care of potential name
clashes with respect to step names and dependencies, that is, if
needed, it will automatically adapt step names and dependencies to
make sure they are unique in the merged pipeline.
pipe_append(pip, p, outAsIn = FALSE, tryAutofixNames = TRUE, sep = ".")pipe_append(pip, p, outAsIn = FALSE, tryAutofixNames = TRUE, sep = ".")
pip |
|
p |
|
outAsIn |
|
tryAutofixNames |
|
sep |
|
returns new combined Pipeline object.
# Append pipeline p1 <- pipe_new("pipe1") pipe_add(p1, "step1", \(x = 1) x) p2 <- pipe_new("pipe2") pipe_add(p2, "step2", \(y = 1) y) p1 |> pipe_append(p2) # Append pipeline with potential name clashes p3 <- pipe_new("pipe3") pipe_add(p3, "step1", \(z = 1) z) p1 |> pipe_append(p2) |> pipe_append(p3) # Use output of first pipeline as input for second pipeline p1 <- pipe_new("pipe1", data = 8) p2 <- pipe_new("pipe2") pipe_add(p1, "square", \(x = ~data) x^2) pipe_add(p2, "log2", \(x = ~data) log2(x)) p12 <- p1 |> pipe_append(p2, outAsIn = TRUE) p12 |> pipe_run() |> pipe_get_out("log2") p12 # Custom name separator for adapted step names p1 |> pipe_append(p2, sep = "___")# Append pipeline p1 <- pipe_new("pipe1") pipe_add(p1, "step1", \(x = 1) x) p2 <- pipe_new("pipe2") pipe_add(p2, "step2", \(y = 1) y) p1 |> pipe_append(p2) # Append pipeline with potential name clashes p3 <- pipe_new("pipe3") pipe_add(p3, "step1", \(z = 1) z) p1 |> pipe_append(p2) |> pipe_append(p3) # Use output of first pipeline as input for second pipeline p1 <- pipe_new("pipe1", data = 8) p2 <- pipe_new("pipe2") pipe_add(p1, "square", \(x = ~data) x^2) pipe_add(p2, "log2", \(x = ~data) log2(x)) p12 <- p1 |> pipe_append(p2, outAsIn = TRUE) p12 |> pipe_run() |> pipe_get_out("log2") p12 # Custom name separator for adapted step names p1 |> pipe_append(p2, sep = "___")
Appends string to all step names and takes care of updating step dependencies accordingly.
pipe_append_to_step_names(pip, postfix, sep = ".")pipe_append_to_step_names(pip, postfix, sep = ".")
pip |
|
postfix |
|
sep |
|
returns the Pipeline object invisibly
p <- pipe_new("pipe") pipe_add(p, "step1", \(x = 1) x) pipe_add(p, "step2", \(y = 1) y) pipe_append_to_step_names(p, "new") p pipe_append_to_step_names(p, "foo", sep = "__") pp <- pipe_new("pipe") pipe_add(p, "step1", \(x = 1) x) pipe_add(p, "step2", \(y = 1) y) pipe_append_to_step_names(p, "new") p pipe_append_to_step_names(p, "foo", sep = "__") p
Creates a copy of a pipeline object.
pipe_clone(pip, deep = FALSE)pipe_clone(pip, deep = FALSE)
pip |
|
deep |
|
returns the copied Pipeline object
p1 <- pipe_new("pipe") pipe_add(p1, "step1", \(x = 1) x) p2 <- pipe_clone(p1) pipe_add(p2, "step2", \(y = 1) y) p1 p2p1 <- pipe_new("pipe") pipe_add(p1, "step1", \(x = 1) x) p2 <- pipe_clone(p1) pipe_add(p2, "step2", \(y = 1) y) p1 p2
Collect outputs produced by the pipeline run.
Only steps that were not skipped contribute results.
The output is grouped by the user-defined group names
(see group parameter in function pipe_add()), which by default
are identical to the step names, that is, trivial groups of
size 1. Use groupBy = "state" to group results by the step's
state instead.
pipe_collect_out(pip, groupBy = c("group", "state"))pipe_collect_out(pip, groupBy = c("group", "state"))
pip |
|
groupBy |
|
list containing the output, named after the groups, which,
by default, are the steps.
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "step1", \(x = ~data) x + 2) pipe_add(p, "step2", \(x = ~step1) x + 2) pipe_run(p) pipe_collect_out(p) # Grouped output p <- pipe_new("pipe", data = 1:2) pipe_add(p, "step1", \(x = ~data) x + 2, group = "add") pipe_add(p, "step2", \(x = ~step1, y = 2) x + y, group = "add") pipe_add(p, "step3", \(x = ~data) x * 3, group = "mult") pipe_add(p, "step4", \(x = ~data, y = 2) x * y, group = "mult") p pipe_run(p) pipe_collect_out(p) |> str() # Grouped by state pipe_set_params(p, list(y = 5)) p pipe_collect_out(p, groupBy = "state") |> str()p <- pipe_new("pipe", data = 1:2) pipe_add(p, "step1", \(x = ~data) x + 2) pipe_add(p, "step2", \(x = ~step1) x + 2) pipe_run(p) pipe_collect_out(p) # Grouped output p <- pipe_new("pipe", data = 1:2) pipe_add(p, "step1", \(x = ~data) x + 2, group = "add") pipe_add(p, "step2", \(x = ~step1, y = 2) x + y, group = "add") pipe_add(p, "step3", \(x = ~data) x * 3, group = "mult") pipe_add(p, "step4", \(x = ~data, y = 2) x * y, group = "mult") p pipe_run(p) pipe_collect_out(p) |> str() # Grouped by state pipe_set_params(p, list(y = 5)) p pipe_collect_out(p, groupBy = "state") |> str()
Discard all steps that match a given pattern.
pipe_discard_steps(pip, pattern, recursive = FALSE, fixed = TRUE, ...)pipe_discard_steps(pip, pattern, recursive = FALSE, fixed = TRUE, ...)
pip |
|
pattern |
|
recursive |
|
fixed |
|
... |
further arguments passed to |
the Pipeline object invisibly
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(x = ~data) x + 1) pipe_add(p, "add2", \(x = ~add1) x + 2) pipe_add(p, "mult3", \(x = ~add1) x * 3) pipe_add(p, "mult4", \(x = ~add2) x * 4) p pipe_discard_steps(p, "mult") p # Re-add steps pipe_add(p, "mult3", \(x = ~add1) x * 3) pipe_add(p, "mult4", \(x = ~add2) x * 4) p # Discarding 'add1' does not work ... try(pipe_discard_steps(p, "add1")) # ... unless we enforce to remove its downstream dependencies as well pipe_discard_steps(p, "add1", recursive = TRUE) p # Trying to discard non-existent steps is just ignored pipe_discard_steps(p, "non-existent")p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(x = ~data) x + 1) pipe_add(p, "add2", \(x = ~add1) x + 2) pipe_add(p, "mult3", \(x = ~add1) x * 3) pipe_add(p, "mult4", \(x = ~add2) x * 4) p pipe_discard_steps(p, "mult") p # Re-add steps pipe_add(p, "mult3", \(x = ~add1) x * 3) pipe_add(p, "mult4", \(x = ~add2) x * 4) p # Discarding 'add1' does not work ... try(pipe_discard_steps(p, "add1")) # ... unless we enforce to remove its downstream dependencies as well pipe_discard_steps(p, "add1", recursive = TRUE) p # Trying to discard non-existent steps is just ignored pipe_discard_steps(p, "non-existent")
Get the data set for the pipeline
pipe_get_data(pip)pipe_get_data(pip)
pip |
|
the output defined in the data step, which by default is
the first step of the pipeline
p <- pipe_new("pipe", data = 1:2) pipe_get_data(p) pipe_set_data(p, 3:4) pipe_get_data(p)p <- pipe_new("pipe", data = 1:2) pipe_get_data(p) pipe_set_data(p, 3:4) pipe_get_data(p)
Get step dependencies
pipe_get_depends(pip) pipe_get_depends_down(pip, step, recursive = TRUE) pipe_get_depends_up(pip, step, recursive = TRUE)pipe_get_depends(pip) pipe_get_depends_down(pip, step, recursive = TRUE) pipe_get_depends_up(pip, step, recursive = TRUE)
pip |
|
step |
|
recursive |
|
pipe_get_depends: named list of dependencies for each step
pipe_get_depends_down: list of downstream dependencies
pipe_get_depends_up: list of downstream dependencies
pipe_get_depends: get all dependencies for all steps defined
in the pipeline
pipe_get_depends_down: get all downstream dependencies of a
given step, by default descending recursively.
pipe_get_depends_up: get all upstream dependencies of a
given step, by default descending recursively.
# pipe_get_depends p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(x = ~data) x + 1) pipe_add(p, "add2", \(x = ~data, y = ~add1) x + y) pipe_get_depends(p) # pipe_get_depends_down p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(x = ~data) x + 1) pipe_add(p, "add2", \(x = ~data, y = ~add1) x + y) pipe_add(p, "mult3", \(x = ~add1) x * 3) pipe_add(p, "mult4", \(x = ~add2) x * 4) pipe_get_depends_down(p, "add1") pipe_get_depends_down(p, "add1", recursive = FALSE) # pipe_get_depends_up p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(x = ~data) x + 1) pipe_add(p, "add2", \(x = ~data, y = ~add1) x + y) pipe_add(p, "mult3", \(x = ~add1) x * 3) pipe_add(p, "mult4", \(x = ~add2) x * 4) pipe_get_depends_up(p, "mult4") pipe_get_depends_up(p, "mult4", recursive = FALSE)# pipe_get_depends p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(x = ~data) x + 1) pipe_add(p, "add2", \(x = ~data, y = ~add1) x + y) pipe_get_depends(p) # pipe_get_depends_down p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(x = ~data) x + 1) pipe_add(p, "add2", \(x = ~data, y = ~add1) x + y) pipe_add(p, "mult3", \(x = ~add1) x * 3) pipe_add(p, "mult4", \(x = ~add2) x * 4) pipe_get_depends_down(p, "add1") pipe_get_depends_down(p, "add1", recursive = FALSE) # pipe_get_depends_up p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(x = ~data) x + 1) pipe_add(p, "add2", \(x = ~data, y = ~add1) x + y) pipe_add(p, "mult3", \(x = ~add1) x * 3) pipe_add(p, "mult4", \(x = ~add2) x * 4) pipe_get_depends_up(p, "mult4") pipe_get_depends_up(p, "mult4", recursive = FALSE)
Get the pipeline as a graph with nodes and edges.
pipe_get_graph(pip, groups = NULL)pipe_get_graph(pip, groups = NULL)
pip |
|
groups |
|
list with two data frames, one for nodes and one for edges
ready to be used with the visNetwork::visNetwork() function of the
visNetwork package.
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(data = ~data, x = 1) x + data) pipe_add(p, "add2", \(x = 1, y = ~add1) x + y) pipe_add(p, "mult1", \(x = ~add1, y = ~add2) x * y) graph <- pipe_get_graph(p) graph if (require("visNetwork", quietly = TRUE)) { do.call(visNetwork, args = graph) }p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(data = ~data, x = 1) x + data) pipe_add(p, "add2", \(x = 1, y = ~add1) x + y) pipe_add(p, "mult1", \(x = ~add1, y = ~add2) x * y) graph <- pipe_get_graph(p) graph if (require("visNetwork", quietly = TRUE)) { do.call(visNetwork, args = graph) }
Get output of given step
pipe_get_out(pip, step)pipe_get_out(pip, step)
pip |
|
step |
|
the output at the given step.
pipe_collect_out() to collect output of multiple steps.
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(x = ~data) x + 1) pipe_add(p, "add2", \(x = ~data, y = ~add1) x + y) pipe_run(p) pipe_get_out(p, "add1") pipe_get_out(p, "add2")p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(x = ~data) x + 1) pipe_add(p, "add2", \(x = ~data, y = ~add1) x + y) pipe_run(p) pipe_get_out(p, "add1") pipe_get_out(p, "add2")
Retrieves unbound function parameters defined in the pipeline where 'unbound' means parameters that are not linked to other steps.
pipe_get_params(pip, ignoreHidden = TRUE) pipe_get_params_at_step(pip, step, ignoreHidden = TRUE) pipe_get_params_unique(pip, ignoreHidden = TRUE)pipe_get_params(pip, ignoreHidden = TRUE) pipe_get_params_at_step(pip, step, ignoreHidden = TRUE) pipe_get_params_unique(pip, ignoreHidden = TRUE)
pip |
|
ignoreHidden |
|
step |
|
pipe_get_params: list of parameters, sorted and named by step -
steps with no parameters are filtered out
pipe_get_params_at_step: list of parameters at given step
pipe_get_params_unique: list of parameters where each parameter
is only listed once. The values of the parameters will be the values
of the first step where the parameters were defined, respectively.
# pipe_get_params p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(data = ~data, x = 1) x + data) pipe_add(p, "add2", \(x = 1, y = 2, .z = 3) x + y + .z) pipe_add(p, "add3", \() 1 + 2) pipe_get_params(p, ) |> str() pipe_get_params(p, ignoreHidden = FALSE) |> str() # pipe_get_params_at_step pipe_get_params_at_step(p, "add2") pipe_get_params_at_step(p, "add2", ignoreHidden = FALSE) pipe_get_params_at_step(p, "add3") # pipe_get_params_unique p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(data = ~data, x = 1) x + data) pipe_add(p, "add2", \(x = 1, y = 2, .z = 3) x + y + .z) pipe_add(p, "mult1", \(x = 4, y = 5, .z = 6, b = ~add2) x * y * b) pipe_get_params_unique(p) pipe_get_params_unique(p, ignoreHidden = FALSE)# pipe_get_params p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(data = ~data, x = 1) x + data) pipe_add(p, "add2", \(x = 1, y = 2, .z = 3) x + y + .z) pipe_add(p, "add3", \() 1 + 2) pipe_get_params(p, ) |> str() pipe_get_params(p, ignoreHidden = FALSE) |> str() # pipe_get_params_at_step pipe_get_params_at_step(p, "add2") pipe_get_params_at_step(p, "add2", ignoreHidden = FALSE) pipe_get_params_at_step(p, "add3") # pipe_get_params_unique p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(data = ~data, x = 1) x + data) pipe_add(p, "add2", \(x = 1, y = 2, .z = 3) x + y + .z) pipe_add(p, "mult1", \(x = 4, y = 5, .z = 6, b = ~add2) x * y * b) pipe_get_params_unique(p) pipe_get_params_unique(p, ignoreHidden = FALSE)
Get step information
pipe_get_step(pip, step) pipe_get_step_names(pip) pipe_get_step_number(pip, step) pipe_has_step(pip, step)pipe_get_step(pip, step) pipe_get_step_names(pip) pipe_get_step_number(pip, step) pipe_has_step(pip, step)
pip |
|
step |
|
pipe_get_step: data.table row containing the step
pipe_get_step_names: character vector of step names
pipe_get_step_number: the step number in the pipeline
pipe_has_step: whether step exists
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(data = ~data, x = 1) x + data) pipe_add(p, "add2", \(x = 1, y = 2, z = ~add1) x + y + z) pipe_run(p) # pipe_get_step_names pipe_get_step_names(p) # get_step_number pipe_get_step_number(p, "add1") pipe_get_step_number(p, "add2") # pipe_has_step pipe_has_step(p, "add1") pipe_has_step(p, "foo") # pipe_get_step add1 <- pipe_get_step(p, "add1") add1 add1[["params"]] add1[["fun"]] try(p$get_step("foo")) # error: step 'foo' does not existp <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(data = ~data, x = 1) x + data) pipe_add(p, "add2", \(x = 1, y = 2, z = ~add1) x + y + z) pipe_run(p) # pipe_get_step_names pipe_get_step_names(p) # get_step_number pipe_get_step_number(p, "add1") pipe_get_step_number(p, "add2") # pipe_has_step pipe_has_step(p, "add1") pipe_has_step(p, "foo") # pipe_get_step add1 <- pipe_get_step(p, "add1") add1 add1[["params"]] add1[["fun"]] try(p$get_step("foo")) # error: step 'foo' does not exist
Get a specific field/entry of a step
pipe_get_step_field(pip, step, what)pipe_get_step_field(pip, step, what)
pip |
|
step |
|
what |
|
the requested entry at the given step
Insert step
pipe_insert_after(pip, afterStep, step, ...) pipe_insert_before(pip, beforeStep, step, ...)pipe_insert_after(pip, afterStep, step, ...) pipe_insert_before(pip, beforeStep, step, ...)
pip |
|
afterStep |
|
step |
|
... |
further arguments passed to |
beforeStep |
|
returns the Pipeline object invisibly
pipe_insert_after: insert step after a certain step of the pipeline
pipe_insert_before: insert step before a certain step of the pipeline
# pipe_insert_after p <- pipe_new("pipe", data = 1) pipe_add(p, "f1", \(x = 1) x) pipe_add(p, "f2", \(x = ~f1) x) pipe_insert_after(p, "f1", step = "after_f1", \(x = ~f1) x) p # insert_before pipe_insert_before(p, "f2", step = "before_f2", \(x = ~f1) 2 * x) p# pipe_insert_after p <- pipe_new("pipe", data = 1) pipe_add(p, "f1", \(x = 1) x) pipe_add(p, "f2", \(x = ~f1) x) pipe_insert_after(p, "f1", step = "after_f1", \(x = ~f1) x) p # insert_before pipe_insert_before(p, "f2", step = "before_f2", \(x = ~f1) 2 * x) p
Length of the pipeline
pipe_length(pip)pipe_length(pip)
pip |
|
numeric length of pipeline, that is, the total number of steps
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "f1", \(x = 1) x) pipe_add(p, "f2", \(y = 1) y) p pipe_length(p)p <- pipe_new("pipe", data = 1:2) pipe_add(p, "f1", \(x = 1) x) pipe_add(p, "f2", \(y = 1) y) p pipe_length(p)
Locking a step means that both its parameters and its
output (given it has output) are locked such that neither
setting new pipeline parameters nor future pipeline runs can change
the current parameter and output content. To unlock a locked step,
use pipe_unlock_step().
pipe_lock_step(pip, step) pipe_unlock_step(pip, step)pipe_lock_step(pip, step) pipe_unlock_step(pip, step)
pip |
|
step |
|
the Pipeline object invisibly
# pipe_lock_step p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = 1, data = ~data) x + data) pipe_add(p, "add2", \(x = 1, data = ~data) x + data) pipe_run(p) pipe_get_out(p, "add1") pipe_get_out(p, "add2") pipe_lock_step(p, "add1") pipe_set_data(p, 3) pipe_set_params(p, list(x = 3)) pipe_run(p) pipe_get_out(p, "add1") pipe_get_out(p, "add2") # pipe_unlock_step pipe_unlock_step(p, "add1") pipe_set_params(p, list(x = 3)) pipe_run(p) pipe_get_out(p, "add1")# pipe_lock_step p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = 1, data = ~data) x + data) pipe_add(p, "add2", \(x = 1, data = ~data) x + data) pipe_run(p) pipe_get_out(p, "add1") pipe_get_out(p, "add2") pipe_lock_step(p, "add1") pipe_set_data(p, 3) pipe_set_params(p, list(x = 3)) pipe_run(p) pipe_get_out(p, "add1") pipe_get_out(p, "add2") # pipe_unlock_step pipe_unlock_step(p, "add1") pipe_set_params(p, list(x = 3)) pipe_run(p) pipe_get_out(p, "add1")
A new pipeline is always initialized with one 'data' step, which basically is a function returning the data.
pipe_new(name, data = NULL, logger = NULL)pipe_new(name, data = NULL, logger = NULL)
name |
the name of the Pipeline |
data |
optional data used at the start of the pipeline. The
data also can be set later using the |
logger |
custom logger to be used for logging. If no logger is provided, the default logger is used, which should be sufficient for most use cases. If you do want to use your own custom log function, you need to provide a function that obeys the following form:
The Note that with the default logger, the log layout can be altered
any time via |
returns the Pipeline object invisibly
data <- data.frame(x = 1:2, y = 3:4) p <- pipe_new("myPipe", data = data) p |> pipe_run() |> pipe_get_out("data") # Setting data later p <- pipe_new("myPipe") pipe_get_data(p) p <- pipe_set_data(p, data) pipe_get_data(p) p |> pipe_run() |> pipe_get_out("data") # Initialize with custom logger my_logger <- function(level, msg, ...) { cat(level, msg, "\n") } p <- pipe_new("myPipe", data = data, logger = my_logger) p |> pipe_run() |> pipe_get_out("data")data <- data.frame(x = 1:2, y = 3:4) p <- pipe_new("myPipe", data = data) p |> pipe_run() |> pipe_get_out("data") # Setting data later p <- pipe_new("myPipe") pipe_get_data(p) p <- pipe_set_data(p, data) pipe_get_data(p) p |> pipe_run() |> pipe_get_out("data") # Initialize with custom logger my_logger <- function(level, msg, ...) { cat(level, msg, "\n") } p <- pipe_new("myPipe", data = data, logger = my_logger) p |> pipe_run() |> pipe_get_out("data")
Use this function to drop steps from the end of the pipeline.
pipe_pop_step(pip) pipe_pop_steps_after(pip, step) pipe_pop_steps_from(pip, step)pipe_pop_step(pip) pipe_pop_steps_after(pip, step) pipe_pop_steps_from(pip, step)
pip |
|
step |
|
string the name of the step that was removed
pipe_pop_step: drop last step from the pipeline
pipe_pop_steps_after: drop all steps after given steps
pipe_pop_steps_from: drop all steps from and including given steps
# pipe_pop_step p <- pipe_new("pipe", data = 1:2) pipe_add(p, "f1", \(x = 1) x) pipe_add(p, "f2", \(y = 1) y) p pipe_pop_step(p) p # pipe_pop_steps_after pipe_add(p, "f2", \(y = 1) y) pipe_add(p, "f3", \(z = 1) z) p pipe_pop_steps_after(p, "f1") p # pipe_pop_steps_from pipe_add(p, "f2", \(y = 1) y) pipe_add(p, "f3", \(z = 1) z) p pipe_pop_steps_from(p, "f1") p# pipe_pop_step p <- pipe_new("pipe", data = 1:2) pipe_add(p, "f1", \(x = 1) x) pipe_add(p, "f2", \(y = 1) y) p pipe_pop_step(p) p # pipe_pop_steps_after pipe_add(p, "f2", \(y = 1) y) pipe_add(p, "f3", \(z = 1) z) p pipe_pop_steps_after(p, "f1") p # pipe_pop_steps_from pipe_add(p, "f2", \(y = 1) y) pipe_add(p, "f3", \(z = 1) z) p pipe_pop_steps_from(p, "f1") p
Print the pipeline as a table
pipe_print(pip, verbose = FALSE)pipe_print(pip, verbose = FALSE)
pip |
|
verbose |
|
the Pipeline object invisibly
p <- pipe_new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) pipe_print(p) pipe_print(p, verbose = TRUE) # Also works with standard print function print(p) print(p, verbose = TRUE)p <- pipe_new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) pipe_print(p) pipe_print(p, verbose = TRUE) # Also works with standard print function print(p) print(p, verbose = TRUE)
Can be used to remove any given step.
If other steps depend on the step to be removed, an error is
given and the removal is blocked, unless recursive was set to TRUE.
pipe_remove_step(pip, step, recursive = FALSE)pipe_remove_step(pip, step, recursive = FALSE)
pip |
|
step |
|
recursive |
|
the Pipeline object invisibly
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(data = ~data, x = 1) x + data) pipe_add(p, "add2", \(x = 1, y = ~add1) x + y) pipe_add(p, "mult1", \(x = 1, y = ~add2) x * y) p pipe_remove_step(p, "mult1") p try(pipe_remove_step(p, "add1")) pipe_remove_step(p, "add1", recursive = TRUE) pp <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(data = ~data, x = 1) x + data) pipe_add(p, "add2", \(x = 1, y = ~add1) x + y) pipe_add(p, "mult1", \(x = 1, y = ~add2) x * y) p pipe_remove_step(p, "mult1") p try(pipe_remove_step(p, "add1")) pipe_remove_step(p, "add1", recursive = TRUE) p
Safely rename a step in the pipeline. If new step name would result in a name clash, an error is given.
pipe_rename_step(pip, from, to)pipe_rename_step(pip, from, to)
pip |
|
from |
|
to |
|
the Pipeline object invisibly
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(data = ~data, x = 1) x + data) pipe_add(p, "add2", \(x = 1, y = ~add1) x + y) p try(pipe_rename_step(p, from = "add1", to = "add2")) pipe_rename_step(p, from = "add1", to = "first_add") pp <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(data = ~data, x = 1) x + data) pipe_add(p, "add2", \(x = 1, y = ~add1) x + y) p try(pipe_rename_step(p, from = "add1", to = "add2")) pipe_rename_step(p, from = "add1", to = "first_add") p
Replaces an existing pipeline step.
pipe_replace_step( pip, step, fun, params = list(), description = "", group = step, tags = character() )pipe_replace_step( pip, step, fun, params = list(), description = "", group = step, tags = character() )
pip |
|
step |
|
fun |
|
params |
|
description |
|
group |
|
tags |
|
returns the Pipeline object invisibly
p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 1) x + y) pipe_add(p, "add2", \(x = ~data, y = 2) x + y) pipe_add(p, "mult", \(x = 1, y = 2) x * y) pipe_run(p) |> pipe_collect_out() pipe_replace_step(p, "mult", \(x = ~add1, y = ~add2) x * y) pipe_run(p) |> pipe_collect_out() try(pipe_replace_step(p, "foo", \(x = 1) x)) # step 'foo' does not existp <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 1) x + y) pipe_add(p, "add2", \(x = ~data, y = 2) x + y) pipe_add(p, "mult", \(x = 1, y = 2) x * y) pipe_run(p) |> pipe_collect_out() pipe_replace_step(p, "mult", \(x = ~add1, y = ~add2) x * y) pipe_run(p) |> pipe_collect_out() try(pipe_replace_step(p, "foo", \(x = 1) x)) # step 'foo' does not exist
Resets the pipeline to the state before it was run. This means that all output is removed and the state of all steps is reset to 'New'.
pipe_reset(pip)pipe_reset(pip)
pip |
|
returns the Pipeline object invisibly
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "f1", \(x = 1) x) pipe_add(p, "f2", \(y = 1) y) pipe_run(p, ) p pipe_reset(p) pp <- pipe_new("pipe", data = 1:2) pipe_add(p, "f1", \(x = 1) x) pipe_add(p, "f2", \(y = 1) y) pipe_run(p, ) p pipe_reset(p) p
Runs all new and/or outdated pipeline steps.
pipe_run(pip, force = FALSE, recursive = TRUE, progress = NULL, showLog = TRUE)pipe_run(pip, force = FALSE, recursive = TRUE, progress = NULL, showLog = TRUE)
pip |
|
force |
|
recursive |
|
progress |
|
showLog |
|
returns the Pipeline object invisibly
# Simple pipeline p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 1) x + y) pipe_add(p, "add2", \(x = ~add1, z = 2) x + z) pipe_add(p, "final", \(x = ~add1, y = ~add2) x * y) p |> pipe_run() |> pipe_collect_out() pipe_set_params(p, list(z = 4)) # outdates steps add2 and final p p |> pipe_run() |> pipe_collect_out() # Recursive pipeline (for advanced users) p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 1) x + y) pipe_add(p, "new_pipe", \(x = ~add1) { p2 <- pipe_new("new_pipe", data = x) pipe_add(p2, "add1", \(x = ~data) x + 1) pipe_add(p2, "add2", \(x = ~add1) x + 2) } ) p |> pipe_run() |> pipe_collect_out() # Run pipeline with progress bar p <- pipe_new("pipe", data = 1) pipe_add(p, "first step", \() Sys.sleep(0.5)) pipe_add(p, "second step", \() Sys.sleep(0.5)) pipe_add(p, "last step", \() Sys.sleep(0.5)) pb <- txtProgressBar(min = 1, max = pipe_length(p), style = 3) fprogress <- function(value, detail) { setTxtProgressBar(pb, value) } pipe_run(p, progress = fprogress, showLog = FALSE)# Simple pipeline p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 1) x + y) pipe_add(p, "add2", \(x = ~add1, z = 2) x + z) pipe_add(p, "final", \(x = ~add1, y = ~add2) x * y) p |> pipe_run() |> pipe_collect_out() pipe_set_params(p, list(z = 4)) # outdates steps add2 and final p p |> pipe_run() |> pipe_collect_out() # Recursive pipeline (for advanced users) p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 1) x + y) pipe_add(p, "new_pipe", \(x = ~add1) { p2 <- pipe_new("new_pipe", data = x) pipe_add(p2, "add1", \(x = ~data) x + 1) pipe_add(p2, "add2", \(x = ~add1) x + 2) } ) p |> pipe_run() |> pipe_collect_out() # Run pipeline with progress bar p <- pipe_new("pipe", data = 1) pipe_add(p, "first step", \() Sys.sleep(0.5)) pipe_add(p, "second step", \() Sys.sleep(0.5)) pipe_add(p, "last step", \() Sys.sleep(0.5)) pb <- txtProgressBar(min = 1, max = pipe_length(p), style = 3) fprogress <- function(value, detail) { setTxtProgressBar(pb, value) } pipe_run(p, progress = fprogress, showLog = FALSE)
Run given pipeline step possibly together with upstream and/or downstream dependencies.
pipe_run_step(pip, step, upstream = TRUE, downstream = FALSE)pipe_run_step(pip, step, upstream = TRUE, downstream = FALSE)
pip |
|
step |
|
upstream |
|
downstream |
|
returns the Pipeline object invisibly
p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 1) x + y) pipe_add(p, "add2", \(x = ~add1, z = 2) x + z) pipe_add(p, "mult", \(x = ~add1, y = ~add2) x * y) pipe_run_step(p, "add2") pipe_run_step(p, "add2", downstream = TRUE) pipe_run_step(p, "mult", upstream = TRUE)p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 1) x + y) pipe_add(p, "add2", \(x = ~add1, z = 2) x + z) pipe_add(p, "mult", \(x = ~add1, y = ~add2) x * y) pipe_run_step(p, "add2") pipe_run_step(p, "add2", downstream = TRUE) pipe_run_step(p, "mult", upstream = TRUE)
Set data in first step of pipeline.
pipe_set_data(pip, data)pipe_set_data(pip, data)
pip |
|
data |
initial data set. |
returns the Pipeline object invisibly
p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 1) x + y) p |> pipe_run() |> pipe_collect_out() pipe_set_data(p, 3) p |> pipe_run() |> pipe_collect_out()p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 1) x + y) p |> pipe_run() |> pipe_collect_out() pipe_set_data(p, 3) p |> pipe_run() |> pipe_collect_out()
Set unbound function parameters defined in the pipeline where 'unbound' means parameters that are not linked to other steps. Trying to set parameters that don't exist in the pipeline is ignored, by default, with a warning.
pipe_set_params(pip, params, warnUndefined = TRUE)pipe_set_params(pip, params, warnUndefined = TRUE)
pip |
|
params |
|
warnUndefined |
|
returns the Pipeline object invisibly
p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 2) x + y) pipe_add(p, "add2", \(x = ~data, y = 3) x + y) pipe_add(p, "mult", \(x = 4, z = 5) x * z) pipe_get_params(p) pipe_set_params(p, params = list(x = 3, y = 3)) pipe_get_params(p) pipe_set_params(p, params = list(x = 5, z = 3)) pipe_get_params(p) suppressWarnings( pipe_set_params(p, list(foo = 3)) # gives warning as 'foo' is undefined ) pipe_set_params(p, list(foo = 3), warnUndefined = FALSE)p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 2) x + y) pipe_add(p, "add2", \(x = ~data, y = 3) x + y) pipe_add(p, "mult", \(x = 4, z = 5) x * z) pipe_get_params(p) pipe_set_params(p, params = list(x = 3, y = 3)) pipe_get_params(p) pipe_set_params(p, params = list(x = 5, z = 3)) pipe_get_params(p) suppressWarnings( pipe_set_params(p, list(foo = 3)) # gives warning as 'foo' is undefined ) pipe_set_params(p, list(foo = 3), warnUndefined = FALSE)
Set unbound function parameters defined at given pipeline step where 'unbound' means parameters that are not linked to other steps. If one or more parameters don't exist, an error is given.
pipe_set_params_at_step(pip, step, params)pipe_set_params_at_step(pip, step, params)
pip |
|
step |
|
params |
|
returns the Pipeline object invisibly
p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 2, z = 3) x + y) pipe_set_params_at_step(p, step = "add1", params = list(y = 5, z = 6)) pipe_get_params(p) try( pipe_set_params_at_step(p, step = "add1", params = list(foo = 3)) )p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 2, z = 3) x + y) pipe_set_params_at_step(p, step = "add1", params = list(y = 5, z = 6)) pipe_get_params(p) try( pipe_set_params_at_step(p, step = "add1", params = list(foo = 3)) )
Skips all steps that belong to the specified group.
Works like calling skip_step on every step in that group. Skipped
steps are not executed during run() and their outputs (if any)
are not considered for collect_out().
pipe_skip_group(pip, group) pipe_unskip_group(pip, group)pipe_skip_group(pip, group) pipe_unskip_group(pip, group)
pip |
|
group |
|
the Pipeline object invisibly
p <- pipe_new("pipe", data = 15) |> pipe_add("f1", \(data = ~data, x = 1) data + x) |> pipe_add("log2", \(x = ~f1) log2(x), group = "prep") |> pipe_add("sqrt", \(x = ~log2) sqrt(x), group = "prep") |> pipe_add("final", \(x = ~sqrt, y = ~f1) x + y) p |> pipe_run() |> pipe_collect_out() p |> pipe_set_params_at_step("f1", list(x = 5)) |> pipe_skip_group("prep") p |> pipe_run() |> pipe_collect_out() p |> pipe_unskip_group("prep") |> pipe_run() |> pipe_collect_out()p <- pipe_new("pipe", data = 15) |> pipe_add("f1", \(data = ~data, x = 1) data + x) |> pipe_add("log2", \(x = ~f1) log2(x), group = "prep") |> pipe_add("sqrt", \(x = ~log2) sqrt(x), group = "prep") |> pipe_add("final", \(x = ~sqrt, y = ~f1) x + y) p |> pipe_run() |> pipe_collect_out() p |> pipe_set_params_at_step("f1", list(x = 5)) |> pipe_skip_group("prep") p |> pipe_run() |> pipe_collect_out() p |> pipe_unskip_group("prep") |> pipe_run() |> pipe_collect_out()
Skipping a step means that it is skipped during a
pipeline run and therefore it's output (if existing) remains
untouched. In addition, it is skipped when collecting output via
collect_out, that is, it's output will not be part of the
collected output list.
In contrast to lock_step, skipping a step does not "protect" the
step against changing the step's parameters.
pipe_skip_step(pip, step) pipe_unskip_step(pip, step)pipe_skip_step(pip, step) pipe_unskip_step(pip, step)
pip |
|
step |
|
the Pipeline object invisibly
p <- pipe_new("pipe", data = 1) |> pipe_add("add1", \(x = 2, data = ~data) x + data) |> pipe_add("add2", \(x = 2, data = ~add1) x + data) p |> pipe_run() |> pipe_collect_out() p |> pipe_set_params(list(x = 3)) |> pipe_skip_step("add1") p |> pipe_run() |> pipe_collect_out() p |> pipe_unskip_step("add1") |> pipe_run() |> pipe_collect_out()p <- pipe_new("pipe", data = 1) |> pipe_add("add1", \(x = 2, data = ~data) x + data) |> pipe_add("add2", \(x = 2, data = ~add1) x + data) p |> pipe_run() |> pipe_collect_out() p |> pipe_set_params(list(x = 3)) |> pipe_skip_step("add1") p |> pipe_run() |> pipe_collect_out() p |> pipe_unskip_step("add1") |> pipe_run() |> pipe_collect_out()
Splits pipeline into its independent parts. This can be useful, for example, to split-up the pipeline in order to run each part in parallel.
pipe_split(pip)pipe_split(pip)
pip |
|
list of Pipeline objects
# Example for two independent calculation paths p <- pipe_new("pipe", data = 1) pipe_add(p, "f1", \(x = ~data) x) pipe_add(p, "f2", \(x = 1) x) pipe_add(p, "f3", \(x = ~f1) x) pipe_add(p, "f4", \(x = ~f2) x) pipe_split(p)# Example for two independent calculation paths p <- pipe_new("pipe", data = 1) pipe_add(p, "f1", \(x = ~data) x) pipe_add(p, "f2", \(x = 1) x) pipe_add(p, "f3", \(x = ~f1) x) pipe_add(p, "f4", \(x = ~f2) x) pipe_split(p)
This class implements an analysis pipeline. A pipeline consists of a sequence of analysis steps, which can be added one by one. Each added step may or may not depend on one or more previous steps. The pipeline keeps track of the dependencies among these steps and will ensure that all dependencies are met on creation of the pipeline, that is, before the the pipeline is run. Once the pipeline is run, the output is stored in the pipeline along with each step and can be accessed later. Different pipelines can be bound together while preserving all dependencies within each pipeline.
namestring name of the pipeline
pipelinedata.table the pipeline where each row represents one step.
new()
constructor
Pipeline$new(name, data = NULL, logger = NULL)
namethe name of the Pipeline
dataoptional data used at the start of the pipeline. The
data also can be set later using the set_data function.
loggercustom logger to be used for logging. If no logger is provided, the default logger is used, which should be sufficient for most use cases. If you do want to use your own custom log function, you need to provide a function that obeys the following form:
function(level, msg, ...) { your custom logging code here }
The level argument is a string and will be one of info, warn,
or error. The msg argument is a string containing the message
to be logged. The ... argument is a list of named parameters,
which can be used to add additional information to the log message.
Currently, this is only used to add the context in case of a step
giving a warning or error.
Note that with the default logger, the log layout can be altered
any time via set_log_layout().
returns the Pipeline object invisibly
p <- Pipeline$new("myPipe", data = data.frame(x = 1:8))
p
# Passing custom logger
my_logger <- function(level, msg, ...) {
cat(level, msg, "\n")
}
p <- Pipeline$new("myPipe", logger = my_logger)
add()
Add pipeline step
Pipeline$add( step, fun, params = list(), description = "", group = step, tags = character(0) )
stepstring the name of the step. Each step name must
be unique.
funfunction or name of the function to be applied at
the step. Both existing and anonymous/lambda functions can be used.
All function parameters must have default values. If a parameter
is missing a default value in the function signature, alternatively,
it can be set via the params argument (see Examples section with
mean() function).
paramslist list of parameters to set or overwrite
parameters of the passed function.
descriptionstring optional description of the step
groupstring output collected after pipeline execution
(see function collect_out) is grouped by the defined group
names. By default, this is the name of the step, which comes in
handy when the pipeline is copy-appended multiple times to keep
the results of the same function/step grouped at one place.
tagscharacter Optional tags associated with the step.
Tags can be used later to select certain parts of a pipeline,
for example, to collect output from or skip steps of a certain tag.
returns the Pipeline object invisibly
# Add steps with lambda functions
p <- Pipeline$new("myPipe", data = 1)
p$add("s1", \(x = ~data) 2*x) # use input data
p$add("s2", \(x = ~data, y = ~s1) x * y)
try(p$add("s2", \(z = 3) 3)) # error: step 's2' exists already
try(p$add("s3", \(z = ~foo) 3)) # dependency 'foo' not found
p
# Add step with existing function
p <- Pipeline$new("myPipe", data = c(1, 2, NA, 3, 4))
p$add("calc_mean", mean, params = list(x = ~data, na.rm = TRUE))
p$run()$get_out("calc_mean")
# Step description
p <- Pipeline$new("myPipe", data = 1:10)
p$add("s1", \(x = ~data) 2*x, description = "multiply by 2")
print(p)
print(p, verbose = TRUE) # print all columns
# Group output
p <- Pipeline$new("myPipe", data = data.frame(x = 1:5, y = 1:5))
p$add("prep_x", \(data = ~data) data$x, group = "prep")
p$add("prep_y", \(data = ~data) (data$y)^2, group = "prep")
p$add("sum", \(x = ~prep_x, y = ~prep_y) x + y)
p$run()$collect_out()
append()
Append another pipeline
When appending, pipeflow takes care of potential name clashes with
respect to step names and dependencies, that is, if needed, it will
automatically adapt step names and dependencies to make sure they
are unique in the merged pipeline.
Pipeline$append(p, outAsIn = FALSE, tryAutofixNames = TRUE, sep = ".")
pPipeline object to be appended.
outAsInlogical if TRUE, output of first pipeline is used
as input for the second pipeline.
tryAutofixNameslogical if TRUE, name clashes are tried
to be automatically resolved by appending the 2nd pipeline's name.
Only set to FALSE, if you know what you are doing.
sepstring separator used when auto-resolving step names
returns new combined Pipeline.
# Append pipeline
p1 <- Pipeline$new("pipe1")
p1$add("step1", \(x = 1) x)
p2 <- Pipeline$new("pipe2")
p2$add("step2", \(y = 1) y)
p1$append(p2)
# Append pipeline with potential name clashes
p3 <- Pipeline$new("pipe3")
p3$add("step1", \(z = 1) z)
p1$append(p2)$append(p3)
# Use output of first pipeline as input for second pipeline
p1 <- Pipeline$new("pipe1", data = 8)
p2 <- Pipeline$new("pipe2")
p1$add("square", \(x = ~data) x^2)
p2$add("log2", \(x = ~data) log2(x))
p12 <- p1$append(p2, outAsIn = TRUE)
p12$run()$get_out("log2")
p12
# Custom name separator
p1$append(p2, sep = "___")
append_to_step_names()
Appends string to all step names and takes care of updating step dependencies accordingly.
Pipeline$append_to_step_names(postfix, sep = ".")
postfixstring to be appended to each step name.
sepstring separator between step name and postfix.
returns the Pipeline object invisibly
p <- Pipeline$new("pipe")
p$add("step1", \(x = 1) x)
p$add("step2", \(y = 1) y)
p$append_to_step_names("new")
p
p$append_to_step_names("foo", sep = "__")
p
collect_out()
Collect outputs produced by the pipeline run.
Only steps that were not skipped contribute results.
The output is grouped by the user-defined group names
(see group parameter in function add()), which by default
are identical to the step names, that is, trivial groups of
size 1. Use groupBy = "state" to group results by the step's
state instead.
Pipeline$collect_out(groupBy = c("group", "state"))groupBystring field of pipeline by which to group the
output.
list containing the output, named after the groups, which,
by default, are the steps.
p <- Pipeline$new("pipe", data = 1:2)
p$add("step1", \(x = ~data) x + 2)
p$add("step2", \(x = ~step1) x + 2)
p$run()
p$collect_out()
# Grouped output
p <- Pipeline$new("pipe", data = 1:2)
p$add("step1", \(x = ~data) x + 2, group = "add")
p$add("step2", \(x = ~step1, y = 2) x + y, group = "add")
p$add("step3", \(x = ~data) x * 3, group = "mult")
p$add("step4", \(x = ~data, y = 2) x * y, group = "mult")
p
p$run()$collect_out() |> str()
# Grouped by state
p$set_params(list(y = 5))
p
p$collect_out(groupBy = "state") |> str()
discard_steps()
Discard all steps that match a given pattern.
Pipeline$discard_steps(pattern, recursive = FALSE, fixed = TRUE, ...)
patternstring containing a regular expression (or
character string for fixed = TRUE) to be matched.
recursivelogical if TRUE the step is removed together
with all its downstream dependencies.
fixedlogical If TRUE, pattern is a string to
be matched as is. Overrides all conflicting arguments.
...further arguments passed to grep().
the Pipeline object invisibly
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~add1) x + 2)
p$add("mult3", \(x = ~add1) x * 3)
p$add("mult4", \(x = ~add2) x * 4)
p
p$discard_steps("mult")
p
# Re-add steps
p$add("mult3", \(x = ~add1) x * 3)
p$add("mult4", \(x = ~add2) x * 4)
p
# Discarding 'add1' does not work ...
try(p$discard_steps("add1"))
# ... unless we enforce to remove its downstream dependencies as well
p$discard_steps("add1", recursive = TRUE) # this works
p
# Trying to discard non-existent steps is just ignored
p$discard_steps("non-existent")
get_data()
Get data
Pipeline$get_data()
the output defined in the data step, which by default is
the first step of the pipeline
p <- Pipeline$new("pipe", data = 1:2)
p$get_data()
p$set_data(3:4)
p$get_data()
get_depends()
Get all dependencies defined in the pipeline
Pipeline$get_depends()
named list of dependencies for each step
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~data, y = ~add1) x + y)
p$get_depends()
get_depends_down()
Get all downstream dependencies of given step, by default descending recursively.
Pipeline$get_depends_down(step, recursive = TRUE)
stepstring name of step
recursivelogical if TRUE, dependencies of dependencies
are also returned.
list of downstream dependencies
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~data, y = ~add1) x + y)
p$add("mult3", \(x = ~add1) x * 3)
p$add("mult4", \(x = ~add2) x * 4)
p$get_depends_down("add1")
p$get_depends_down("add1", recursive = FALSE)
get_depends_up()
Get all upstream dependencies of given step, by default descending recursively.
Pipeline$get_depends_up(step, recursive = TRUE)
stepstring name of step
recursivelogical if TRUE, dependencies of dependencies
are also returned.
list of upstream dependencies
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~data, y = ~add1) x + y)
p$add("mult3", \(x = ~add1) x * 3)
p$add("mult4", \(x = ~add2) x * 4)
p$get_depends_up("mult4")
p$get_depends_up("mult4", recursive = FALSE)
get_graph()
Visualize the pipeline as a graph.
Pipeline$get_graph(groups = NULL)
groupscharacter if not NULL, only steps belonging to the
given groups are considered.
two data frames, one for nodes and one for edges ready to be
used with the visNetwork package.
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = ~add1) x + y)
p$add("mult1", \(x = ~add1, y = ~add2) x * y)
graph <- pipe_get_graph(p)
graph
if (require("visNetwork", quietly = TRUE)) {
do.call(visNetwork, args = p$get_graph())
}
get_out()
Get output of given step
Pipeline$get_out(step)
stepstring name of step
the output at the given step.
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~data, y = ~add1) x + y)
p$run()
p$get_out("add1")
p$get_out("add2")
get_params()
Set unbound function parameters defined in the pipeline where 'unbound' means parameters that are not linked to other steps. Trying #' to set parameters that don't exist in the pipeline is ignored, by default, with a warning.
Pipeline$get_params(ignoreHidden = TRUE)
ignoreHiddenlogical if TRUE, hidden parameters (i.e. all
names starting with a dot) are ignored and thus not returned.
list of parameters, sorted and named by step. Steps with
no parameters are filtered out.
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z)
p$add("add3", \() 1 + 2)
p$get_params() |> str()
p$get_params(ignoreHidden = FALSE) |> str()
get_params_at_step()
Get all unbound (i.e. not referring to other steps) at given step name.
Pipeline$get_params_at_step(step, ignoreHidden = TRUE)
stepstring name of step
ignoreHiddenlogical if TRUE, hidden parameters (i.e. all
names starting with a dot) are ignored and thus not returned.
list of parameters defined at given step.
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z)
p$add("add3", \() 1 + 2)
p$get_params_at_step("add2")
p$get_params_at_step("add2", ignoreHidden = FALSE)
p$get_params_at_step("add3")
get_params_unique()
Get all unbound (i.e. not referring to other steps)
parameters defined in the pipeline,
but only list each parameter once. The values of the parameters,
will be the values of the first step where the parameter was defined.
This is particularly useful after the parameters where set using
the set_params function, which will set the same value
for all steps.
Pipeline$get_params_unique(ignoreHidden = TRUE)
ignoreHiddenlogical if TRUE, hidden parameters (i.e. all
names starting with a dot) are ignored and thus not returned.
list of unique parameters
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z)
p$add("mult1", \(x = 1, y = 2, .z = 3, b = ~add2) x * y * b)
p$get_params_unique()
p$get_params_unique(ignoreHidden = FALSE)
get_step()
Get step of pipeline
Pipeline$get_step(step)
stepstring name of step
data.table row containing the step.
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = 2, z = ~add1) x + y + z)
p$run()
add1 <- p$get_step("add1")
print(add1)
add1[["params"]]
add1[["fun"]]
try()
try(p$get_step("foo")) # error: step 'foo' does not exist
get_step_field()
Get a specific field/entry of a step
Pipeline$get_step_field(step, what)
stepstring name of step
whatstring name of the pipeline column to return
the requested entry at the given step
get_step_names()
Get step names of pipeline
Pipeline$get_step_names()
character vector of step names
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$get_step_names()
get_step_number()
Get step number
Pipeline$get_step_number(step)
stepstring name of step
the step number in the pipeline
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$get_step_number("f2")
has_step()
Check if pipeline has given step
Pipeline$has_step(step)
stepstring name of step
logical whether step exists
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$has_step("f2")
p$has_step("foo")
insert_after()
Insert step after a certain step
Pipeline$insert_after(afterStep, step, ...)
afterStepstring name of step after which to insert
stepstring name of step to insert
...further arguments passed to add method of the pipeline
returns the Pipeline object invisibly
p <- Pipeline$new("pipe", data = 1)
p$add("f1", \(x = 1) x)
p$add("f2", \(x = ~f1) x)
p$insert_after("f1", "f3", \(x = ~f1) x)
p
insert_before()
Insert step before a certain step
Pipeline$insert_before(beforeStep, step, ...)
beforeStepstring name of step before which to insert
stepstring name of step to insert
...further arguments passed to add method of the pipeline
returns the Pipeline object invisibly
p <- Pipeline$new("pipe", data = 1)
p$add("f1", \(x = 1) x)
p$add("f2", \(x = ~f1) x)
p$insert_before("f2", "f3", \(x = ~f1) x)
p
length()
Length of the pipeline aka number of pipeline steps.
Pipeline$length()
numeric length of pipeline.
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$length()
lock_step()
Locking a step means that both its parameters and its output (given it has output) are locked such that neither setting new pipeline parameters nor future pipeline runs can change the current parameter and output content.
Pipeline$lock_step(step)
stepstring name of step
the Pipeline object invisibly
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = 1, data = ~data) x + data)
p$add("add2", \(x = 1, data = ~data) x + data)
p$run()
p$get_out("add1")
p$get_out("add2")
p$lock_step("add1")
p$set_data(3)
p$set_params(list(x = 3))
p$run()
p$get_out("add1")
p$get_out("add2")
pop_step()
Drop last step from the pipeline.
Pipeline$pop_step()
string the name of the step that was removed
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p
p$pop_step() # "f2"
p
pop_steps_after()
Drop all steps after the given step.
Pipeline$pop_steps_after(step)
stepstring name of step
character vector of steps that were removed.
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$add("f3", \(z = 1) z)
p$pop_steps_after("f1") # "f2", "f3"
p
pop_steps_from()
Drop all steps from and including the given step.
Pipeline$pop_steps_from(step)
stepstring name of step
character vector of steps that were removed.
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$add("f3", \(z = 1) z)
p$pop_steps_from("f2") # "f2", "f3"
p
print()
Print the pipeline as a table.
Pipeline$print(verbose = FALSE)
verboselogical if TRUE, print all columns of the
pipeline, otherwise only the most relevant columns are displayed.
the Pipeline object invisibly
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$print()
remove_step()
Remove certain step from the pipeline.
If other steps depend on the step to be removed, an error is
given and the removal is blocked, unless recursive was set to
TRUE.
Pipeline$remove_step(step, recursive = FALSE)
stepstring the name of the step to be removed.
recursivelogical if TRUE the step is removed together
with all its downstream dependencies.
the Pipeline object invisibly
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = ~add1) x + y)
p$add("mult1", \(x = 1, y = ~add2) x * y)
p$remove_step("mult1")
p
try(p$remove_step("add1")) # fails because "add2" depends on "add1"
p$remove_step("add1", recursive = TRUE) # removes "add1" and "add2"
p
rename_step()
Safely rename a step in the pipeline. If new step name would result in a name clash, an error is given.
Pipeline$rename_step(from, to)
fromstring the name of the step to be renamed.
tostring the new name of the step.
the Pipeline object invisibly
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = ~add1) x + y)
p
try(p$rename_step("add1", "add2")) # fails because "add2" exists
p$rename_step("add1", "first_add") # Ok
p
replace_step()
Replaces an existing pipeline step.
Pipeline$replace_step( step, fun, params = list(), description = "", group = step, tags = character(0) )
stepstring the name of the step to be replaced. Step must
exist.
funstring or function operation to be applied at the
step. Both existing and lambda/anonymous functions can be used.
paramslist list of parameters to overwrite default
parameters of existing functions.
descriptionstring optional description of the step
groupstring grouping information (by default the same as
the name of the step. Any output collected later (see function
collect_out by default is put together by these group names. This,
for example, comes in handy when the pipeline is copy-appended
multiple times to keep the results of the same function/step at one
place.
tagscharacter Optional tags associated with the step.
Tags can be used later to select certain parts of a pipeline,
for example, to collect output from or skip steps of a certain tag.
the Pipeline object invisibly
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y)
p$add("add2", \(x = ~data, y = 2) x + y)
p$add("mult", \(x = 1, y = 2) x * y)
p$run()$collect_out()
p$replace_step("mult", \(x = ~add1, y = ~add2) x * y)
p$run()$collect_out()
try(p$replace_step("foo", \(x = 1) x)) # step 'foo' does not exist
reset()
Resets the pipeline to the state before it was run. This means that all output is removed and the state of all steps is reset to 'New'.
Pipeline$reset()
returns the Pipeline object invisibly
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$run()
p
p$reset()
p
run()
Run all new and/or outdated pipeline steps.
Pipeline$run(force = FALSE, recursive = TRUE, progress = NULL, showLog = TRUE)
forcelogical if TRUE all steps are run regardless of
whether they are outdated or not.
recursivelogical if TRUE and a step returns a new
pipeline, the run of the current pipeline is aborted and the
new pipeline is run recursively.
progressfunction this parameter can be used to provide a
custom progress function of the form function(value, detail),
which will show the progress of the pipeline run for each step,
where value is the current step number and detail is the name
of the step.
showLoglogical should the steps be logged during the
pipeline run?
returns the Pipeline object invisibly
# Simple pipeline
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y)
p$add("add2", \(x = ~add1, z = 2) x + z)
p$add("final", \(x = ~add1, y = ~add2) x * y)
p$run()$collect_out()
p$set_params(list(z = 4)) # outdates steps add2 and final
p
p$run()$collect_out() |> str()
p
# Recursive pipeline
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y)
p$add("new_pipe", \(x = ~add1) {
pp <- Pipeline$new("new_pipe", data = x)
pp$add("add1", \(x = ~data) x + 1)
pp$add("add2", \(x = ~add1) x + 2)
}
)
p$run(recursive = TRUE)$collect_out() |> str()
# Run pipeline with progress bar
p <- Pipeline$new("pipe", data = 1)
p$add("first step", \() Sys.sleep(1))
p$add("second step", \() Sys.sleep(1))
p$add("last step", \() Sys.sleep(1))
pb <- txtProgressBar(min = 1, max = p$length(), style = 3)
fprogress <- function(value, detail) {
setTxtProgressBar(pb, value)
}
p$run(progress = fprogress, showLog = FALSE)
run_step()
Run given pipeline step possibly together with upstream and downstream dependencies.
Pipeline$run_step(step, upstream = TRUE, downstream = FALSE)
stepstring name of step
upstreamlogical if TRUE, run all dependent upstream
steps first.
downstreamlogical if TRUE, run all depdendent
downstream afterwards.
returns the Pipeline object invisibly
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y)
p$add("add2", \(x = ~add1, z = 2) x + z)
p$add("mult", \(x = ~add1, y = ~add2) x * y)
p$run_step("add2")
p$run_step("add2", downstream = TRUE)
p$run_step("mult", upstream = TRUE)
set_data()
Convenience function to set data in first step of pipeline.
Pipeline$set_data(data)
datainitial data set
returns the Pipeline object invisibly
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y)
p$run()$collect_out()
p$set_data(3)
p$run()$collect_out()
set_params()
Set parameters in the pipeline. If a parameter occurs in several steps, the parameter is set commonly in all steps. Trying to set parameters that don't exist in the pipeline is ignored, by default, with a warning.
Pipeline$set_params(params, warnUndefined = TRUE)
paramslist of parameters to be set
warnUndefinedlogical whether to give a warning when trying
to set a parameter that is not defined in the pipeline.
returns the Pipeline object invisibly
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 2) x + y)
p$add("add2", \(x = ~data, y = 3) x + y)
p$add("mult", \(x = 4, z = 5) x * z)
p$get_params()
p$set_params(list(x = 3, y = 3))
p$get_params()
p$set_params(list(x = 5, z = 3))
p$get_params()
suppressWarnings(
p$set_params(list(foo = 3)) # gives warning as 'foo' is undefined
)
p$set_params(list(foo = 3), warnUndefined = FALSE)
set_params_at_step()
Set unbound function parameters defined at given pipeline step where 'unbound' means parameters that are not linked to other steps.
Pipeline$set_params_at_step(step, params)
stepstring the name of the step
paramslist of parameters to be set
returns the Pipeline object invisibly
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 2, z = 3) x + y)
p$set_params_at_step("add1", list(y = 5, z = 6))
p$get_params()
try(p$set_params_at_step("add1", list(foo = 3))) # foo not defined
skip_group()
Skips all steps that belong to the specified group.
Works like calling skip_step on every step in that group. Skipped
steps are not executed during run() and their outputs (if any)
are not considered for collect_out().
Pipeline$skip_group(group)
groupstring name of the group whose steps should be
skipped.
the Pipeline object invisibly
p <- Pipeline$new("pipe", data = 15)
p$add("f1", \(data = ~data, x = 1) data + x)
p$add("log2", \(x = ~f1) log2(x), group = "prep")
p$add("sqrt", \(x = ~log2) sqrt(x), group = "prep")
p$add("final", \(x = ~sqrt, y = ~f1) x + y)
p$run()$collect_out() |> str()
p$set_params_at_step("f1", list(x = 5))$skip_group("prep")
p$run()$collect_out()
p$unskip_group("prep")$run()$collect_out() |> str()
skip_step()
Skipping a step means that it is skipped during a
pipeline run and therefore it's output (if existing) remains
untouched. In addition, it is skipped when collecting output via
collect_out, that is, it's output will not be part of the
collected output list.
In contrast to lock_step, skipping a step does not "protect" the
step against changing the step's parameters.
Pipeline$skip_step(step)
stepstring name of step
the Pipeline object invisibly
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = 2, data = ~data) x + data)
p$add("add2", \(x = 2, data = ~add1) x + data)
p$run()$collect_out() |> str()
p$skip_step("add1")$set_params(list(x = 3))
p$run()$collect_out() |> str()
split()
Splits pipeline into its independent parts.
Pipeline$split()
list of Pipeline objects
# Example for two independent calculation paths
p <- Pipeline$new("pipe", data = 1)
p$add("f1", \(x = ~data) x)
p$add("f2", \(x = 1) x)
p$add("f3", \(x = ~f1) x)
p$add("f4", \(x = ~f2) x)
p$split()
unlock_step()
Unlock previously locked step. If step was not locked, the command is ignored and a message is printed hinting at that.
Pipeline$unlock_step(step)
stepstring name of step
the Pipeline object invisibly
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = 1, data = ~data) x + data)
p$add("add2", \(x = 1, data = ~data) x + data)
p$lock_step("add1")
p$set_params(list(x = 3))
p$get_params()
p$unlock_step("add1")
p$set_params(list(x = 3))
p$get_params()
unskip_group()
Unskips all steps that belong to the specified group.
Works like calling unskip_step on every step in that group.
Pipeline$unskip_group(group)
groupstring name of the group whose steps should be
unskipped.
the Pipeline object invisibly
p <- Pipeline$new("pipe", data = 15)
p$add("f1", \(data = ~data, x = 1) data + x)
p$add("log2", \(x = ~f1) log2(x), group = "prep")
p$add("sqrt", \(x = ~log2) sqrt(x), group = "prep")
p$add("final", \(x = ~sqrt, y = ~f1) x + y)
p$run()$collect_out() |> str()
p$set_params_at_step("f1", list(x = 5))$skip_group("prep")
p$run()$collect_out() |> str()
p$unskip_group("prep")$run()$collect_out() |> str()
unskip_step()
Unskip previously skipped step. If step was not skipped, the command is ignored, but a warning is given.
Pipeline$unskip_step(step)
stepstring name of step
the Pipeline object invisibly
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = 2, data = ~data) x + data)
p$add("add2", \(x = 2, data = ~add1) x + data)
p$run()$collect_out() |> str()
p$skip_step("add1")$set_params(list(x = 3))
p$run()$collect_out() |> str()
p$unskip_step("add1")$run()$collect_out() |> str()
clone()
The objects of this class are cloneable with this method.
Pipeline$clone(deep = FALSE)
deepWhether to make a deep clone.
Roman Pahl
## ------------------------------------------------ ## Method `Pipeline$new` ## ------------------------------------------------ p <- Pipeline$new("myPipe", data = data.frame(x = 1:8)) p # Passing custom logger my_logger <- function(level, msg, ...) { cat(level, msg, "\n") } p <- Pipeline$new("myPipe", logger = my_logger) ## ------------------------------------------------ ## Method `Pipeline$add` ## ------------------------------------------------ # Add steps with lambda functions p <- Pipeline$new("myPipe", data = 1) p$add("s1", \(x = ~data) 2*x) # use input data p$add("s2", \(x = ~data, y = ~s1) x * y) try(p$add("s2", \(z = 3) 3)) # error: step 's2' exists already try(p$add("s3", \(z = ~foo) 3)) # dependency 'foo' not found p # Add step with existing function p <- Pipeline$new("myPipe", data = c(1, 2, NA, 3, 4)) p$add("calc_mean", mean, params = list(x = ~data, na.rm = TRUE)) p$run()$get_out("calc_mean") # Step description p <- Pipeline$new("myPipe", data = 1:10) p$add("s1", \(x = ~data) 2*x, description = "multiply by 2") print(p) print(p, verbose = TRUE) # print all columns # Group output p <- Pipeline$new("myPipe", data = data.frame(x = 1:5, y = 1:5)) p$add("prep_x", \(data = ~data) data$x, group = "prep") p$add("prep_y", \(data = ~data) (data$y)^2, group = "prep") p$add("sum", \(x = ~prep_x, y = ~prep_y) x + y) p$run()$collect_out() ## ------------------------------------------------ ## Method `Pipeline$append` ## ------------------------------------------------ # Append pipeline p1 <- Pipeline$new("pipe1") p1$add("step1", \(x = 1) x) p2 <- Pipeline$new("pipe2") p2$add("step2", \(y = 1) y) p1$append(p2) # Append pipeline with potential name clashes p3 <- Pipeline$new("pipe3") p3$add("step1", \(z = 1) z) p1$append(p2)$append(p3) # Use output of first pipeline as input for second pipeline p1 <- Pipeline$new("pipe1", data = 8) p2 <- Pipeline$new("pipe2") p1$add("square", \(x = ~data) x^2) p2$add("log2", \(x = ~data) log2(x)) p12 <- p1$append(p2, outAsIn = TRUE) p12$run()$get_out("log2") p12 # Custom name separator p1$append(p2, sep = "___") ## ------------------------------------------------ ## Method `Pipeline$append_to_step_names` ## ------------------------------------------------ p <- Pipeline$new("pipe") p$add("step1", \(x = 1) x) p$add("step2", \(y = 1) y) p$append_to_step_names("new") p p$append_to_step_names("foo", sep = "__") p ## ------------------------------------------------ ## Method `Pipeline$collect_out` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("step1", \(x = ~data) x + 2) p$add("step2", \(x = ~step1) x + 2) p$run() p$collect_out() # Grouped output p <- Pipeline$new("pipe", data = 1:2) p$add("step1", \(x = ~data) x + 2, group = "add") p$add("step2", \(x = ~step1, y = 2) x + y, group = "add") p$add("step3", \(x = ~data) x * 3, group = "mult") p$add("step4", \(x = ~data, y = 2) x * y, group = "mult") p p$run()$collect_out() |> str() # Grouped by state p$set_params(list(y = 5)) p p$collect_out(groupBy = "state") |> str() ## ------------------------------------------------ ## Method `Pipeline$discard_steps` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~add1) x + 2) p$add("mult3", \(x = ~add1) x * 3) p$add("mult4", \(x = ~add2) x * 4) p p$discard_steps("mult") p # Re-add steps p$add("mult3", \(x = ~add1) x * 3) p$add("mult4", \(x = ~add2) x * 4) p # Discarding 'add1' does not work ... try(p$discard_steps("add1")) # ... unless we enforce to remove its downstream dependencies as well p$discard_steps("add1", recursive = TRUE) # this works p # Trying to discard non-existent steps is just ignored p$discard_steps("non-existent") ## ------------------------------------------------ ## Method `Pipeline$get_data` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$get_data() p$set_data(3:4) p$get_data() ## ------------------------------------------------ ## Method `Pipeline$get_depends` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~data, y = ~add1) x + y) p$get_depends() ## ------------------------------------------------ ## Method `Pipeline$get_depends_down` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~data, y = ~add1) x + y) p$add("mult3", \(x = ~add1) x * 3) p$add("mult4", \(x = ~add2) x * 4) p$get_depends_down("add1") p$get_depends_down("add1", recursive = FALSE) ## ------------------------------------------------ ## Method `Pipeline$get_depends_up` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~data, y = ~add1) x + y) p$add("mult3", \(x = ~add1) x * 3) p$add("mult4", \(x = ~add2) x * 4) p$get_depends_up("mult4") p$get_depends_up("mult4", recursive = FALSE) ## ------------------------------------------------ ## Method `Pipeline$get_graph` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = ~add1) x + y) p$add("mult1", \(x = ~add1, y = ~add2) x * y) graph <- pipe_get_graph(p) graph if (require("visNetwork", quietly = TRUE)) { do.call(visNetwork, args = p$get_graph()) } ## ------------------------------------------------ ## Method `Pipeline$get_out` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~data, y = ~add1) x + y) p$run() p$get_out("add1") p$get_out("add2") ## ------------------------------------------------ ## Method `Pipeline$get_params` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z) p$add("add3", \() 1 + 2) p$get_params() |> str() p$get_params(ignoreHidden = FALSE) |> str() ## ------------------------------------------------ ## Method `Pipeline$get_params_at_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z) p$add("add3", \() 1 + 2) p$get_params_at_step("add2") p$get_params_at_step("add2", ignoreHidden = FALSE) p$get_params_at_step("add3") ## ------------------------------------------------ ## Method `Pipeline$get_params_unique` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z) p$add("mult1", \(x = 1, y = 2, .z = 3, b = ~add2) x * y * b) p$get_params_unique() p$get_params_unique(ignoreHidden = FALSE) ## ------------------------------------------------ ## Method `Pipeline$get_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = 2, z = ~add1) x + y + z) p$run() add1 <- p$get_step("add1") print(add1) add1[["params"]] add1[["fun"]] try() try(p$get_step("foo")) # error: step 'foo' does not exist ## ------------------------------------------------ ## Method `Pipeline$get_step_names` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$get_step_names() ## ------------------------------------------------ ## Method `Pipeline$get_step_number` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$get_step_number("f2") ## ------------------------------------------------ ## Method `Pipeline$has_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$has_step("f2") p$has_step("foo") ## ------------------------------------------------ ## Method `Pipeline$insert_after` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("f1", \(x = 1) x) p$add("f2", \(x = ~f1) x) p$insert_after("f1", "f3", \(x = ~f1) x) p ## ------------------------------------------------ ## Method `Pipeline$insert_before` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("f1", \(x = 1) x) p$add("f2", \(x = ~f1) x) p$insert_before("f2", "f3", \(x = ~f1) x) p ## ------------------------------------------------ ## Method `Pipeline$length` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$length() ## ------------------------------------------------ ## Method `Pipeline$lock_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = 1, data = ~data) x + data) p$add("add2", \(x = 1, data = ~data) x + data) p$run() p$get_out("add1") p$get_out("add2") p$lock_step("add1") p$set_data(3) p$set_params(list(x = 3)) p$run() p$get_out("add1") p$get_out("add2") ## ------------------------------------------------ ## Method `Pipeline$pop_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p p$pop_step() # "f2" p ## ------------------------------------------------ ## Method `Pipeline$pop_steps_after` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$add("f3", \(z = 1) z) p$pop_steps_after("f1") # "f2", "f3" p ## ------------------------------------------------ ## Method `Pipeline$pop_steps_from` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$add("f3", \(z = 1) z) p$pop_steps_from("f2") # "f2", "f3" p ## ------------------------------------------------ ## Method `Pipeline$print` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$print() ## ------------------------------------------------ ## Method `Pipeline$remove_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = ~add1) x + y) p$add("mult1", \(x = 1, y = ~add2) x * y) p$remove_step("mult1") p try(p$remove_step("add1")) # fails because "add2" depends on "add1" p$remove_step("add1", recursive = TRUE) # removes "add1" and "add2" p ## ------------------------------------------------ ## Method `Pipeline$rename_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = ~add1) x + y) p try(p$rename_step("add1", "add2")) # fails because "add2" exists p$rename_step("add1", "first_add") # Ok p ## ------------------------------------------------ ## Method `Pipeline$replace_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y) p$add("add2", \(x = ~data, y = 2) x + y) p$add("mult", \(x = 1, y = 2) x * y) p$run()$collect_out() p$replace_step("mult", \(x = ~add1, y = ~add2) x * y) p$run()$collect_out() try(p$replace_step("foo", \(x = 1) x)) # step 'foo' does not exist ## ------------------------------------------------ ## Method `Pipeline$reset` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$run() p p$reset() p ## ------------------------------------------------ ## Method `Pipeline$run` ## ------------------------------------------------ # Simple pipeline p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y) p$add("add2", \(x = ~add1, z = 2) x + z) p$add("final", \(x = ~add1, y = ~add2) x * y) p$run()$collect_out() p$set_params(list(z = 4)) # outdates steps add2 and final p p$run()$collect_out() |> str() p # Recursive pipeline p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y) p$add("new_pipe", \(x = ~add1) { pp <- Pipeline$new("new_pipe", data = x) pp$add("add1", \(x = ~data) x + 1) pp$add("add2", \(x = ~add1) x + 2) } ) p$run(recursive = TRUE)$collect_out() |> str() # Run pipeline with progress bar p <- Pipeline$new("pipe", data = 1) p$add("first step", \() Sys.sleep(1)) p$add("second step", \() Sys.sleep(1)) p$add("last step", \() Sys.sleep(1)) pb <- txtProgressBar(min = 1, max = p$length(), style = 3) fprogress <- function(value, detail) { setTxtProgressBar(pb, value) } p$run(progress = fprogress, showLog = FALSE) ## ------------------------------------------------ ## Method `Pipeline$run_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y) p$add("add2", \(x = ~add1, z = 2) x + z) p$add("mult", \(x = ~add1, y = ~add2) x * y) p$run_step("add2") p$run_step("add2", downstream = TRUE) p$run_step("mult", upstream = TRUE) ## ------------------------------------------------ ## Method `Pipeline$set_data` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y) p$run()$collect_out() p$set_data(3) p$run()$collect_out() ## ------------------------------------------------ ## Method `Pipeline$set_params` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 2) x + y) p$add("add2", \(x = ~data, y = 3) x + y) p$add("mult", \(x = 4, z = 5) x * z) p$get_params() p$set_params(list(x = 3, y = 3)) p$get_params() p$set_params(list(x = 5, z = 3)) p$get_params() suppressWarnings( p$set_params(list(foo = 3)) # gives warning as 'foo' is undefined ) p$set_params(list(foo = 3), warnUndefined = FALSE) ## ------------------------------------------------ ## Method `Pipeline$set_params_at_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 2, z = 3) x + y) p$set_params_at_step("add1", list(y = 5, z = 6)) p$get_params() try(p$set_params_at_step("add1", list(foo = 3))) # foo not defined ## ------------------------------------------------ ## Method `Pipeline$skip_group` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 15) p$add("f1", \(data = ~data, x = 1) data + x) p$add("log2", \(x = ~f1) log2(x), group = "prep") p$add("sqrt", \(x = ~log2) sqrt(x), group = "prep") p$add("final", \(x = ~sqrt, y = ~f1) x + y) p$run()$collect_out() |> str() p$set_params_at_step("f1", list(x = 5))$skip_group("prep") p$run()$collect_out() p$unskip_group("prep")$run()$collect_out() |> str() ## ------------------------------------------------ ## Method `Pipeline$skip_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = 2, data = ~data) x + data) p$add("add2", \(x = 2, data = ~add1) x + data) p$run()$collect_out() |> str() p$skip_step("add1")$set_params(list(x = 3)) p$run()$collect_out() |> str() ## ------------------------------------------------ ## Method `Pipeline$split` ## ------------------------------------------------ # Example for two independent calculation paths p <- Pipeline$new("pipe", data = 1) p$add("f1", \(x = ~data) x) p$add("f2", \(x = 1) x) p$add("f3", \(x = ~f1) x) p$add("f4", \(x = ~f2) x) p$split() ## ------------------------------------------------ ## Method `Pipeline$unlock_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = 1, data = ~data) x + data) p$add("add2", \(x = 1, data = ~data) x + data) p$lock_step("add1") p$set_params(list(x = 3)) p$get_params() p$unlock_step("add1") p$set_params(list(x = 3)) p$get_params() ## ------------------------------------------------ ## Method `Pipeline$unskip_group` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 15) p$add("f1", \(data = ~data, x = 1) data + x) p$add("log2", \(x = ~f1) log2(x), group = "prep") p$add("sqrt", \(x = ~log2) sqrt(x), group = "prep") p$add("final", \(x = ~sqrt, y = ~f1) x + y) p$run()$collect_out() |> str() p$set_params_at_step("f1", list(x = 5))$skip_group("prep") p$run()$collect_out() |> str() p$unskip_group("prep")$run()$collect_out() |> str() ## ------------------------------------------------ ## Method `Pipeline$unskip_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = 2, data = ~data) x + data) p$add("add2", \(x = 2, data = ~add1) x + data) p$run()$collect_out() |> str() p$skip_step("add1")$set_params(list(x = 3)) p$run()$collect_out() |> str() p$unskip_step("add1")$run()$collect_out() |> str()## ------------------------------------------------ ## Method `Pipeline$new` ## ------------------------------------------------ p <- Pipeline$new("myPipe", data = data.frame(x = 1:8)) p # Passing custom logger my_logger <- function(level, msg, ...) { cat(level, msg, "\n") } p <- Pipeline$new("myPipe", logger = my_logger) ## ------------------------------------------------ ## Method `Pipeline$add` ## ------------------------------------------------ # Add steps with lambda functions p <- Pipeline$new("myPipe", data = 1) p$add("s1", \(x = ~data) 2*x) # use input data p$add("s2", \(x = ~data, y = ~s1) x * y) try(p$add("s2", \(z = 3) 3)) # error: step 's2' exists already try(p$add("s3", \(z = ~foo) 3)) # dependency 'foo' not found p # Add step with existing function p <- Pipeline$new("myPipe", data = c(1, 2, NA, 3, 4)) p$add("calc_mean", mean, params = list(x = ~data, na.rm = TRUE)) p$run()$get_out("calc_mean") # Step description p <- Pipeline$new("myPipe", data = 1:10) p$add("s1", \(x = ~data) 2*x, description = "multiply by 2") print(p) print(p, verbose = TRUE) # print all columns # Group output p <- Pipeline$new("myPipe", data = data.frame(x = 1:5, y = 1:5)) p$add("prep_x", \(data = ~data) data$x, group = "prep") p$add("prep_y", \(data = ~data) (data$y)^2, group = "prep") p$add("sum", \(x = ~prep_x, y = ~prep_y) x + y) p$run()$collect_out() ## ------------------------------------------------ ## Method `Pipeline$append` ## ------------------------------------------------ # Append pipeline p1 <- Pipeline$new("pipe1") p1$add("step1", \(x = 1) x) p2 <- Pipeline$new("pipe2") p2$add("step2", \(y = 1) y) p1$append(p2) # Append pipeline with potential name clashes p3 <- Pipeline$new("pipe3") p3$add("step1", \(z = 1) z) p1$append(p2)$append(p3) # Use output of first pipeline as input for second pipeline p1 <- Pipeline$new("pipe1", data = 8) p2 <- Pipeline$new("pipe2") p1$add("square", \(x = ~data) x^2) p2$add("log2", \(x = ~data) log2(x)) p12 <- p1$append(p2, outAsIn = TRUE) p12$run()$get_out("log2") p12 # Custom name separator p1$append(p2, sep = "___") ## ------------------------------------------------ ## Method `Pipeline$append_to_step_names` ## ------------------------------------------------ p <- Pipeline$new("pipe") p$add("step1", \(x = 1) x) p$add("step2", \(y = 1) y) p$append_to_step_names("new") p p$append_to_step_names("foo", sep = "__") p ## ------------------------------------------------ ## Method `Pipeline$collect_out` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("step1", \(x = ~data) x + 2) p$add("step2", \(x = ~step1) x + 2) p$run() p$collect_out() # Grouped output p <- Pipeline$new("pipe", data = 1:2) p$add("step1", \(x = ~data) x + 2, group = "add") p$add("step2", \(x = ~step1, y = 2) x + y, group = "add") p$add("step3", \(x = ~data) x * 3, group = "mult") p$add("step4", \(x = ~data, y = 2) x * y, group = "mult") p p$run()$collect_out() |> str() # Grouped by state p$set_params(list(y = 5)) p p$collect_out(groupBy = "state") |> str() ## ------------------------------------------------ ## Method `Pipeline$discard_steps` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~add1) x + 2) p$add("mult3", \(x = ~add1) x * 3) p$add("mult4", \(x = ~add2) x * 4) p p$discard_steps("mult") p # Re-add steps p$add("mult3", \(x = ~add1) x * 3) p$add("mult4", \(x = ~add2) x * 4) p # Discarding 'add1' does not work ... try(p$discard_steps("add1")) # ... unless we enforce to remove its downstream dependencies as well p$discard_steps("add1", recursive = TRUE) # this works p # Trying to discard non-existent steps is just ignored p$discard_steps("non-existent") ## ------------------------------------------------ ## Method `Pipeline$get_data` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$get_data() p$set_data(3:4) p$get_data() ## ------------------------------------------------ ## Method `Pipeline$get_depends` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~data, y = ~add1) x + y) p$get_depends() ## ------------------------------------------------ ## Method `Pipeline$get_depends_down` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~data, y = ~add1) x + y) p$add("mult3", \(x = ~add1) x * 3) p$add("mult4", \(x = ~add2) x * 4) p$get_depends_down("add1") p$get_depends_down("add1", recursive = FALSE) ## ------------------------------------------------ ## Method `Pipeline$get_depends_up` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~data, y = ~add1) x + y) p$add("mult3", \(x = ~add1) x * 3) p$add("mult4", \(x = ~add2) x * 4) p$get_depends_up("mult4") p$get_depends_up("mult4", recursive = FALSE) ## ------------------------------------------------ ## Method `Pipeline$get_graph` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = ~add1) x + y) p$add("mult1", \(x = ~add1, y = ~add2) x * y) graph <- pipe_get_graph(p) graph if (require("visNetwork", quietly = TRUE)) { do.call(visNetwork, args = p$get_graph()) } ## ------------------------------------------------ ## Method `Pipeline$get_out` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~data, y = ~add1) x + y) p$run() p$get_out("add1") p$get_out("add2") ## ------------------------------------------------ ## Method `Pipeline$get_params` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z) p$add("add3", \() 1 + 2) p$get_params() |> str() p$get_params(ignoreHidden = FALSE) |> str() ## ------------------------------------------------ ## Method `Pipeline$get_params_at_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z) p$add("add3", \() 1 + 2) p$get_params_at_step("add2") p$get_params_at_step("add2", ignoreHidden = FALSE) p$get_params_at_step("add3") ## ------------------------------------------------ ## Method `Pipeline$get_params_unique` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z) p$add("mult1", \(x = 1, y = 2, .z = 3, b = ~add2) x * y * b) p$get_params_unique() p$get_params_unique(ignoreHidden = FALSE) ## ------------------------------------------------ ## Method `Pipeline$get_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = 2, z = ~add1) x + y + z) p$run() add1 <- p$get_step("add1") print(add1) add1[["params"]] add1[["fun"]] try() try(p$get_step("foo")) # error: step 'foo' does not exist ## ------------------------------------------------ ## Method `Pipeline$get_step_names` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$get_step_names() ## ------------------------------------------------ ## Method `Pipeline$get_step_number` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$get_step_number("f2") ## ------------------------------------------------ ## Method `Pipeline$has_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$has_step("f2") p$has_step("foo") ## ------------------------------------------------ ## Method `Pipeline$insert_after` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("f1", \(x = 1) x) p$add("f2", \(x = ~f1) x) p$insert_after("f1", "f3", \(x = ~f1) x) p ## ------------------------------------------------ ## Method `Pipeline$insert_before` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("f1", \(x = 1) x) p$add("f2", \(x = ~f1) x) p$insert_before("f2", "f3", \(x = ~f1) x) p ## ------------------------------------------------ ## Method `Pipeline$length` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$length() ## ------------------------------------------------ ## Method `Pipeline$lock_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = 1, data = ~data) x + data) p$add("add2", \(x = 1, data = ~data) x + data) p$run() p$get_out("add1") p$get_out("add2") p$lock_step("add1") p$set_data(3) p$set_params(list(x = 3)) p$run() p$get_out("add1") p$get_out("add2") ## ------------------------------------------------ ## Method `Pipeline$pop_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p p$pop_step() # "f2" p ## ------------------------------------------------ ## Method `Pipeline$pop_steps_after` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$add("f3", \(z = 1) z) p$pop_steps_after("f1") # "f2", "f3" p ## ------------------------------------------------ ## Method `Pipeline$pop_steps_from` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$add("f3", \(z = 1) z) p$pop_steps_from("f2") # "f2", "f3" p ## ------------------------------------------------ ## Method `Pipeline$print` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$print() ## ------------------------------------------------ ## Method `Pipeline$remove_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = ~add1) x + y) p$add("mult1", \(x = 1, y = ~add2) x * y) p$remove_step("mult1") p try(p$remove_step("add1")) # fails because "add2" depends on "add1" p$remove_step("add1", recursive = TRUE) # removes "add1" and "add2" p ## ------------------------------------------------ ## Method `Pipeline$rename_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = ~add1) x + y) p try(p$rename_step("add1", "add2")) # fails because "add2" exists p$rename_step("add1", "first_add") # Ok p ## ------------------------------------------------ ## Method `Pipeline$replace_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y) p$add("add2", \(x = ~data, y = 2) x + y) p$add("mult", \(x = 1, y = 2) x * y) p$run()$collect_out() p$replace_step("mult", \(x = ~add1, y = ~add2) x * y) p$run()$collect_out() try(p$replace_step("foo", \(x = 1) x)) # step 'foo' does not exist ## ------------------------------------------------ ## Method `Pipeline$reset` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$run() p p$reset() p ## ------------------------------------------------ ## Method `Pipeline$run` ## ------------------------------------------------ # Simple pipeline p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y) p$add("add2", \(x = ~add1, z = 2) x + z) p$add("final", \(x = ~add1, y = ~add2) x * y) p$run()$collect_out() p$set_params(list(z = 4)) # outdates steps add2 and final p p$run()$collect_out() |> str() p # Recursive pipeline p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y) p$add("new_pipe", \(x = ~add1) { pp <- Pipeline$new("new_pipe", data = x) pp$add("add1", \(x = ~data) x + 1) pp$add("add2", \(x = ~add1) x + 2) } ) p$run(recursive = TRUE)$collect_out() |> str() # Run pipeline with progress bar p <- Pipeline$new("pipe", data = 1) p$add("first step", \() Sys.sleep(1)) p$add("second step", \() Sys.sleep(1)) p$add("last step", \() Sys.sleep(1)) pb <- txtProgressBar(min = 1, max = p$length(), style = 3) fprogress <- function(value, detail) { setTxtProgressBar(pb, value) } p$run(progress = fprogress, showLog = FALSE) ## ------------------------------------------------ ## Method `Pipeline$run_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y) p$add("add2", \(x = ~add1, z = 2) x + z) p$add("mult", \(x = ~add1, y = ~add2) x * y) p$run_step("add2") p$run_step("add2", downstream = TRUE) p$run_step("mult", upstream = TRUE) ## ------------------------------------------------ ## Method `Pipeline$set_data` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y) p$run()$collect_out() p$set_data(3) p$run()$collect_out() ## ------------------------------------------------ ## Method `Pipeline$set_params` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 2) x + y) p$add("add2", \(x = ~data, y = 3) x + y) p$add("mult", \(x = 4, z = 5) x * z) p$get_params() p$set_params(list(x = 3, y = 3)) p$get_params() p$set_params(list(x = 5, z = 3)) p$get_params() suppressWarnings( p$set_params(list(foo = 3)) # gives warning as 'foo' is undefined ) p$set_params(list(foo = 3), warnUndefined = FALSE) ## ------------------------------------------------ ## Method `Pipeline$set_params_at_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 2, z = 3) x + y) p$set_params_at_step("add1", list(y = 5, z = 6)) p$get_params() try(p$set_params_at_step("add1", list(foo = 3))) # foo not defined ## ------------------------------------------------ ## Method `Pipeline$skip_group` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 15) p$add("f1", \(data = ~data, x = 1) data + x) p$add("log2", \(x = ~f1) log2(x), group = "prep") p$add("sqrt", \(x = ~log2) sqrt(x), group = "prep") p$add("final", \(x = ~sqrt, y = ~f1) x + y) p$run()$collect_out() |> str() p$set_params_at_step("f1", list(x = 5))$skip_group("prep") p$run()$collect_out() p$unskip_group("prep")$run()$collect_out() |> str() ## ------------------------------------------------ ## Method `Pipeline$skip_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = 2, data = ~data) x + data) p$add("add2", \(x = 2, data = ~add1) x + data) p$run()$collect_out() |> str() p$skip_step("add1")$set_params(list(x = 3)) p$run()$collect_out() |> str() ## ------------------------------------------------ ## Method `Pipeline$split` ## ------------------------------------------------ # Example for two independent calculation paths p <- Pipeline$new("pipe", data = 1) p$add("f1", \(x = ~data) x) p$add("f2", \(x = 1) x) p$add("f3", \(x = ~f1) x) p$add("f4", \(x = ~f2) x) p$split() ## ------------------------------------------------ ## Method `Pipeline$unlock_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = 1, data = ~data) x + data) p$add("add2", \(x = 1, data = ~data) x + data) p$lock_step("add1") p$set_params(list(x = 3)) p$get_params() p$unlock_step("add1") p$set_params(list(x = 3)) p$get_params() ## ------------------------------------------------ ## Method `Pipeline$unskip_group` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 15) p$add("f1", \(data = ~data, x = 1) data + x) p$add("log2", \(x = ~f1) log2(x), group = "prep") p$add("sqrt", \(x = ~log2) sqrt(x), group = "prep") p$add("final", \(x = ~sqrt, y = ~f1) x + y) p$run()$collect_out() |> str() p$set_params_at_step("f1", list(x = 5))$skip_group("prep") p$run()$collect_out() |> str() p$unskip_group("prep")$run()$collect_out() |> str() ## ------------------------------------------------ ## Method `Pipeline$unskip_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = 2, data = ~data) x + data) p$add("add2", \(x = 2, data = ~add1) x + data) p$run()$collect_out() |> str() p$skip_step("add1")$set_params(list(x = 3)) p$run()$collect_out() |> str() p$unskip_step("add1")$run()$collect_out() |> str()
This function provides an easy way to set the basic log
layout of the pipeline logging. For a fine-grained control of the logger,
which you can retrieve via lgr::get_logger("pipeflow"), see e.g. the
logger_config function from the lgr package.
set_log_layout(layout = c("text", "json"))set_log_layout(layout = c("text", "json"))
layout |
Layout name, which at this point can be either 'text' or 'json'. |
invisibly returns a Logger object
p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$run() lg <- set_log_layout("json") print(lg) p$run() set_log_layout("text") p$run()p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$run() lg <- set_log_layout("json") print(lg) p$run() set_log_layout("text") p$run()