| Title: | Fast Interactive Data Analysis Pipelines |
|---|---|
| Description: | A lightweight and intuitive framework for building interactive data analysis pipelines. You add R functions one by one, and 'pipeflow' wires them into a pipeline that stays consistent as you go. Modify, remove, or insert steps at any stage, manage all parameters in one place, fast execution (C++-powered DAG) for interactive use and Shiny backends. |
| Authors: | Roman Pahl [aut, cre] |
| Maintainer: | Roman Pahl <[email protected]> |
| License: | MIT + file LICENSE |
| Version: | 0.3.0.9002 |
| Built: | 2026-06-27 19:25:50 UTC |
| Source: | https://github.com/rpahl/pipeflow |
Returns a new pipeline containing selected steps and all required upstream dependencies.
Extracts values from a pipeline using one or two indices.
With a single string name, named fields such as "pipeline" or "name"
are returned first; anything else returns the matching step-table column.
With two indices (row, column), a single cell is extracted.
## S3 method for class 'pipeflow_pip' x[i, ...] ## S3 method for class 'pipeflow_pip' x[[i, j, ...]]## S3 method for class 'pipeflow_pip' x[i, ...] ## S3 method for class 'pipeflow_pip' x[[i, j, ...]]
x |
A pipeflow pipeline object. |
i |
integer (row indices) or character vector (step names) of steps to select |
... |
not used |
j |
column names to select |
A new pipeflow pipeline object.
Extracted value(s), depending on i and j.
p <- pip_new() |> pip_add("load", \(n = 5) seq_len(n)) |> pip_add("square", \(x = ~load) x^2) |> pip_add("total", \(x = ~square) sum(x)) # Select by step name — upstream deps are pulled in automatically. # Selecting only "total" still includes "load" and "square". sub <- p["total"] sub[["pipeline"]][["step"]] # "load", "square", "total" # Select a subset of steps by name vector p[c("load", "square")][["pipeline"]][["step"]] # "load", "square" # Select by integer row index p[1:2][["pipeline"]][["step"]] # "load", "square" p <- pip_new() |> pip_add("load", \(x = 1) x) |> pip_add("fit", \(x = ~load) x + 1) # Access internal objects by name p[["pipeline"]] # the full step table p[["name"]] # "pipe" # Shorthand column access (equivalent to p[["pipeline"]][["step"]]) p[["step"]] # Two-index form: p[[row, column]] extracts a single cell p[["fit", "depends"]] # "load" p[[2, "state"]] # state of the second stepp <- pip_new() |> pip_add("load", \(n = 5) seq_len(n)) |> pip_add("square", \(x = ~load) x^2) |> pip_add("total", \(x = ~square) sum(x)) # Select by step name — upstream deps are pulled in automatically. # Selecting only "total" still includes "load" and "square". sub <- p["total"] sub[["pipeline"]][["step"]] # "load", "square", "total" # Select a subset of steps by name vector p[c("load", "square")][["pipeline"]][["step"]] # "load", "square" # Select by integer row index p[1:2][["pipeline"]][["step"]] # "load", "square" p <- pip_new() |> pip_add("load", \(x = 1) x) |> pip_add("fit", \(x = ~load) x + 1) # Access internal objects by name p[["pipeline"]] # the full step table p[["name"]] # "pipe" # Shorthand column access (equivalent to p[["pipeline"]][["step"]]) p[["step"]] # Two-index form: p[[row, column]] extracts a single cell p[["fit", "depends"]] # "load" p[[2, "state"]] # state of the second step
Length of a pipeflow pipeline or view
## S3 method for class 'pipeflow_pip' length(x) ## S3 method for class 'pipeflow_view' length(x)## S3 method for class 'pipeflow_pip' length(x) ## S3 method for class 'pipeflow_view' length(x)
x |
A pipeflow pipeline or view |
Number of steps as an integer.
p <- pip_new() |> pip_add("s1", \(x = 1) x) |> pip_add("s2", \(x = ~s1) x + 1) |> pip_add("s3", \(x = ~s2) x * 2) length(p) # 3 — total steps in the pipeline # A view reports only the number of selected (visible) steps v <- pip_view(p, i = c("s2", "s3")) length(v) # 2p <- pip_new() |> pip_add("s1", \(x = 1) x) |> pip_add("s2", \(x = ~s1) x + 1) |> pip_add("s3", \(x = ~s2) x * 2) length(p) # 3 — total steps in the pipeline # A view reports only the number of selected (visible) steps v <- pip_view(p, i = c("s2", "s3")) length(v) # 2
Adds a named step to the pipeline. Each step is a function whose parameters
either hold constant defaults or reference the output of a prior step using
formula notation (~step_name). Dependencies are validated when the step
is added.
pip_add(x, step, fun, tags = character(0), after = length(x), exec = "auto")pip_add(x, step, fun, tags = character(0), after = length(x), exec = "auto")
x |
A pipeflow pipeline object. |
step |
Unique step name. |
fun |
Function to execute for the step. Each function parameter must
have a default value. Default values that are simple constants are resolved
immediately. Default values that are formulas like |
tags |
Optional character vector of tags belonging to the step.
Can also be adjusted later using |
after |
Optional position after which the new step should be inserted (defaults to last position). Can be a step name or an integer index. If set to 0, the new step will be inserted at the beginning of the pipeline. |
exec |
Execution mode for this step. One of "auto", "split",
"reduce" or "plain".
Using execution mode
|
If after was specified, the new step will be inserted after the given
step or position. Be aware that in contrast to adding a step at the end,
inserting a step in the middle is a rather expensive operation as it
requires re-wiring parts of the internal pipeline structure, especially
if the new step is inserted at an early position.
The updated pipeline, invisibly.
# --- Tags, and view filtering --- p <- pip_new("analysis") |> pip_add("load", \(n = 5) seq_len(n), tags = c("io", "raw")) |> pip_add("clean", \(x = ~load) x * 2, tags = c("io", "process")) |> pip_add("fit", \(x = ~clean) sum(x), tags = c("model", "core", "daily")) |> pip_add("report", \(x = ~fit) paste("result:", x), tags = "report") pip_run(p) p # Filter by tag using pip_view — keeps steps with any matching tag pip_view(p, tags = "daily") pip_view(p, tags = "core") pip_view(p, tags = c("raw", "report")) # --- Split / reduce execution modes --- q <- pip_new("split-demo") |> pip_add("data", \(x = iris) x) |> pip_add("split", \(x = ~data) split(x, x$Species), exec = "split" ) |> pip_add("stats", \(x = ~split) summary(x)) |> pip_add("combine", \(x = ~stats) do.call(rbind, x), exec = "reduce" ) pip_run(q) q[["stats", "out"]] # partitioned list — one summary per species q[["combine", "out"]] # combined table# --- Tags, and view filtering --- p <- pip_new("analysis") |> pip_add("load", \(n = 5) seq_len(n), tags = c("io", "raw")) |> pip_add("clean", \(x = ~load) x * 2, tags = c("io", "process")) |> pip_add("fit", \(x = ~clean) sum(x), tags = c("model", "core", "daily")) |> pip_add("report", \(x = ~fit) paste("result:", x), tags = "report") pip_run(p) p # Filter by tag using pip_view — keeps steps with any matching tag pip_view(p, tags = "daily") pip_view(p, tags = "core") pip_view(p, tags = c("raw", "report")) # --- Split / reduce execution modes --- q <- pip_new("split-demo") |> pip_add("data", \(x = iris) x) |> pip_add("split", \(x = ~data) split(x, x$Species), exec = "split" ) |> pip_add("stats", \(x = ~split) summary(x)) |> pip_add("combine", \(x = ~stats) do.call(rbind, x), exec = "reduce" ) pip_run(q) q[["stats", "out"]] # partitioned list — one summary per species q[["combine", "out"]] # combined table
Copies one step from pipeline y into pipeline x, preserving its
function, parameters, tags, and dependency links.
pip_add_from(x, y, step)pip_add_from(x, y, step)
x |
Target pipeflow pipeline object. |
y |
Source pipeflow pipeline object. |
step |
Step name to copy from |
The updated target pipeline, invisibly.
# Build a source pipeline with reusable steps src <- pip_new("source") |> pip_add("load", \(n = 3) seq_len(n)) |> pip_add("square", \(x = ~load) x^2) # Copy steps into a new pipeline one at a time. # The dependency of "square" on "load" is re-established automatically. dst <- pip_new("target") pip_add_from(dst, src, "load") pip_add_from(dst, src, "square") pip_run(dst) pip_collect_out(dst)# Build a source pipeline with reusable steps src <- pip_new("source") |> pip_add("load", \(n = 3) seq_len(n)) |> pip_add("square", \(x = ~load) x^2) # Copy steps into a new pipeline one at a time. # The dependency of "square" on "load" is re-established automatically. dst <- pip_new("target") pip_add_from(dst, src, "load") pip_add_from(dst, src, "square") pip_run(dst) pip_collect_out(dst)
Bind two pipelines together by concatenating their steps. If both pipelines have steps with the same name, the step names of the second pipeline will be automatically adapted to avoid name clashes.
pip_bind(x, y)pip_bind(x, y)
x |
A pipeflow pipeline object. |
y |
A pipeflow pipeline object. |
A new pipeflow pipeline object representing the bound pipelines.
a <- pip_new("a") |> pip_add("prep", \(x = 1) x * 2) |> pip_add("fit", \(x = ~prep) x + 10) # "prep" exists in both pipelines; the one from b gets a numeric suffix b <- pip_new("b") |> pip_add("prep", \(x = 5) x * 3) ab <- pip_bind(a, b) ab[["step"]] # "prep", "fit", "prep2" (step name conflict auto-resolved) aba <- pip_new("a") |> pip_add("prep", \(x = 1) x * 2) |> pip_add("fit", \(x = ~prep) x + 10) # "prep" exists in both pipelines; the one from b gets a numeric suffix b <- pip_new("b") |> pip_add("prep", \(x = 5) x * 3) ab <- pip_bind(a, b) ab[["step"]] # "prep", "fit", "prep2" (step name conflict auto-resolved) ab
Creates an independent copy of the pipeline. Changes to the cloned pipeline do not affect the original pipeline, and vice versa.
pip_clone(x, name = NULL)pip_clone(x, name = NULL)
x |
A pipeflow pipeline object. |
name |
Optional name for the cloned pipeline. If |
A cloned pipeflow pipeline object.
p <- pip_new("original") |> pip_add("s1", \(x = 1) x) |> pip_add("s2", \(x = ~s1) x + 1) # Clone produces a fully independent copy cp <- pip_clone(p, name = "copy") pip_add(cp, "s3", \(x = ~s2) x * 10) # As a result, the clone has the new step ... cp # ... while the original is left unchanged pp <- pip_new("original") |> pip_add("s1", \(x = 1) x) |> pip_add("s2", \(x = ~s1) x + 1) # Clone produces a fully independent copy cp <- pip_clone(p, name = "copy") pip_add(cp, "s3", \(x = ~s2) x * 10) # As a result, the clone has the new step ... cp # ... while the original is left unchanged p
Returns the outputs of all pipeline steps as a flat named list keyed by
step name. Use pip_view() to narrow the selection before collecting,
and compose calls if grouped output is needed.
pip_collect_out(x)pip_collect_out(x)
x |
A pipeflow pip or view. |
A named list of outputs, one element per step.
p <- pip_new() |> pip_add("load", \(x = 1) x, tags = "io") |> pip_add("clean", \(x = ~load) x + 1, tags = "io") |> pip_add("model", \(x = ~clean) x * 2, tags = "model") pip_run(p) # Flat named list with one entry per step pip_collect_out(p) # Combine with pip_view to collect output for specific tags grouped <- list( io = pip_view(p, tags = "io") |> pip_collect_out(), model = pip_view(p, tags = "model") |> pip_collect_out() ) groupedp <- pip_new() |> pip_add("load", \(x = 1) x, tags = "io") |> pip_add("clean", \(x = ~load) x + 1, tags = "io") |> pip_add("model", \(x = ~clean) x * 2, tags = "model") pip_run(p) # Flat named list with one entry per step pip_collect_out(p) # Combine with pip_view to collect output for specific tags grouped <- list( io = pip_view(p, tags = "io") |> pip_collect_out(), model = pip_view(p, tags = "model") |> pip_collect_out() ) grouped
Builds graph data (nodes and edges) describing the pipeline's step
structure, suitable for visualisation with visNetwork::visNetwork().
pip_get_graph(x, include_upstream = FALSE)pip_get_graph(x, include_upstream = FALSE)
x |
A pipeflow pip or view. |
include_upstream |
Logical. Only relevant for views. If |
Node shapes reflect execution mode:
auto/plain: hexagon
reduce: dot
split: star
A named list with two data.frames: nodes and edges.
p <- pip_new() pip_add(p, "load", \(x = 1) x, tags = "io") pip_add(p, "clean", \(x = ~load) x + 1, tags = "io") pip_add(p, "fit", \(x = ~clean) x * 2, tags = "model") graph <- pip_get_graph(p) graph$nodes # data.frame: id, label, shape, color graph$edges # data.frame: from, to, arrows # For a view, include_upstream = TRUE adds upstream deps to the graph v <- pip_view(p, i = "fit") pip_get_graph(v, include_upstream = TRUE) if (require("visNetwork", quietly = TRUE)) { do.call(what = visNetwork::visNetwork, args = graph) }p <- pip_new() pip_add(p, "load", \(x = 1) x, tags = "io") pip_add(p, "clean", \(x = ~load) x + 1, tags = "io") pip_add(p, "fit", \(x = ~clean) x * 2, tags = "model") graph <- pip_get_graph(p) graph$nodes # data.frame: id, label, shape, color graph$edges # data.frame: from, to, arrows # For a view, include_upstream = TRUE adds upstream deps to the graph v <- pip_view(p, i = "fit") pip_get_graph(v, include_upstream = TRUE) if (require("visNetwork", quietly = TRUE)) { do.call(what = visNetwork::visNetwork, args = graph) }
Returns the current default values of all tunable (non-dependency)
parameters across the pipeline. These are the parameters that can be
updated via pip_set_params(). Parameters wired to another step's output
via ~step_name are excluded.
pip_get_params(x)pip_get_params(x)
x |
A pipeflow pip or view |
Named list of tunable parameter values. If the same parameter name appears in multiple steps, the first occurrence in pipeline order is returned.
p <- pip_new() |> pip_add("load", \(n = 100, seed = 42) seq_len(n)) |> pip_add("model", \(x = ~load, lambda = 0.1) x * lambda) # ~load is a dependency — only non-dependency params are returned pip_get_params(p) # list(n = 100, seed = 42, lambda = 0.1) # Useful as a guide for pip_set_params() pip_set_params(p, params = list(n = 20, lambda = 0.5)) pip_run(p) |> pip_collect_out()p <- pip_new() |> pip_add("load", \(n = 100, seed = 42) seq_len(n)) |> pip_add("model", \(x = ~load, lambda = 0.1) x * lambda) # ~load is a dependency — only non-dependency params are returned pip_get_params(p) # list(n = 100, seed = 42, lambda = 0.1) # Useful as a guide for pip_set_params() pip_set_params(p, params = list(n = 20, lambda = 0.5)) pip_run(p) |> pip_collect_out()
Check whether a step exists
pip_has_step(x, step)pip_has_step(x, step)
x |
A pipeflow pip |
step |
A step name |
Logical indicating if the step exists
p <- pip_new() |> pip_add("load", \(x = 1) x) |> pip_add("fit", \(x = ~load) x + 1) pip_has_step(p, "load") # TRUE pip_has_step(p, "fit") # TRUE pip_has_step(p, "predict") # FALSE — step not yet addedp <- pip_new() |> pip_add("load", \(x = 1) x) |> pip_add("fit", \(x = ~load) x + 1) pip_has_step(p, "load") # TRUE pip_has_step(p, "fit") # TRUE pip_has_step(p, "predict") # FALSE — step not yet added
Locks all selected steps in the pipeline unless p is a view, in which
case only steps covered by the view are locked.
pip_lock(p)pip_lock(p)
p |
A pipeflow pip or view. |
The updated pipeline or view, invisibly.
p <- pip_new() |> pip_add("load", \(x = 10) x) |> pip_add("fit", \(x = ~load) x * 2) pip_run(p, lgr = NULL) # Lock only "load" via a view so it won't be re-executed or overwritten pip_lock(pip_view(p, i = "load")) p[["pipeline"]][["locked"]] # TRUE, FALSE # Locked steps are silently skipped during pip_run() pip_run(p, lgr = NULL, force = TRUE) p[["pipeline"]][["out"]][[1]] # still 10 — locked, not re-executed pip_unlock(p) p[["pipeline"]][["locked"]] # FALSE, FALSEp <- pip_new() |> pip_add("load", \(x = 10) x) |> pip_add("fit", \(x = ~load) x * 2) pip_run(p, lgr = NULL) # Lock only "load" via a view so it won't be re-executed or overwritten pip_lock(pip_view(p, i = "load")) p[["pipeline"]][["locked"]] # TRUE, FALSE # Locked steps are silently skipped during pip_run() pip_run(p, lgr = NULL, force = TRUE) p[["pipeline"]][["out"]][[1]] # still 10 — locked, not re-executed pip_unlock(p) p[["pipeline"]][["locked"]] # FALSE, FALSE
Creates a new, empty pipeline. Add steps with pip_add() and execute
them with pip_run().
pip_new(name = "pipe")pip_new(name = "pipe")
name |
Single name used for printing and for derived view names. |
A pipeflow pipeline object.
# Create a named pipeline p <- pip_new("my_analysis") p[["name"]] # "my_analysis" # Build a simple pipeline and run it pip_add(p, "load", \(n = 5) seq_len(n)) pip_add(p, "double", \(x = ~load) x * 2) # x depends on load's output p pip_run(p) p[["out"]] # list of outputs, one per step# Create a named pipeline p <- pip_new("my_analysis") p[["name"]] # "my_analysis" # Build a simple pipeline and run it pip_add(p, "load", \(n = 5) seq_len(n)) pip_add(p, "double", \(x = ~load) x * 2) # x depends on load's output p pip_run(p) p[["out"]] # list of outputs, one per step
If other steps depend on the step to be removed, an error is
given and the removal is blocked, unless recursive was set to
TRUE. In recursive mode, the selected step and all downstream
dependent steps are removed together.
pip_remove(x, step, recursive = FALSE)pip_remove(x, step, recursive = FALSE)
x |
A pipeflow pip |
step |
|
recursive |
|
The updated pipeline, invisibly.
p <- pip_new() |> pip_add("load", \(x = 1) x) |> pip_add("transform", \(x = ~load) x * 2) |> pip_add("model", \(x = ~transform) x + 10) # Removing a leaf step (nothing depends on it) works directly pip_remove(p, "model") p # "load", "transform" # Trying to remove a step that others depend on raises an error: # pip_remove(p, "load") # Error! # recursive = TRUE removes the step and all its downstream dependents pip_remove(p, "load", recursive = TRUE) p # pipeline is now emptyp <- pip_new() |> pip_add("load", \(x = 1) x) |> pip_add("transform", \(x = ~load) x * 2) |> pip_add("model", \(x = ~transform) x + 10) # Removing a leaf step (nothing depends on it) works directly pip_remove(p, "model") p # "load", "transform" # Trying to remove a step that others depend on raises an error: # pip_remove(p, "load") # Error! # recursive = TRUE removes the step and all its downstream dependents pip_remove(p, "load", recursive = TRUE) p # pipeline is now empty
Renames the selected step and updates dependency references in downstream steps.
pip_rename(x, from, to)pip_rename(x, from, to)
x |
A pipeflow pip |
from |
Existing step name |
to |
New step name |
The updated pipeline, invisibly.
p <- pip_new() |> pip_add("s1", \(x = 1) x) |> pip_add("s2", \(x = ~s1) x + 1) # "s2" depends on "s1" # Downstream dependency references are updated automatically pip_rename(p, from = "s1", to = "load_data") p #' # Trying to rename to an existing step name raises an error: try(pip_rename(p, "load_data", to = "s2")) # step 's2' already exists!p <- pip_new() |> pip_add("s1", \(x = 1) x) |> pip_add("s2", \(x = ~s1) x + 1) # "s2" depends on "s1" # Downstream dependency references are updated automatically pip_rename(p, from = "s1", to = "load_data") p #' # Trying to rename to an existing step name raises an error: try(pip_rename(p, "load_data", to = "s2")) # step 's2' already exists!
Replaces a step's function while keeping it in the same position in the
pipeline. Downstream steps are automatically marked as outdated and will
re-run on the next pip_run().
pip_replace(x, step, fun, tags = character(0))pip_replace(x, step, fun, tags = character(0))
x |
A pipeflow pipeline object. |
step |
Step name. |
fun |
Function to execute for the step. |
tags |
Optional character vector of tags belonging to the step.
Can also be adjusted later using |
The updated pipeline, invisibly.
p <- pip_new() |> pip_add("load", \(n = 5) seq_len(n)) |> pip_add("double", \(x = ~load) x * 2) pip_run(p) p # Replace "load" — downstream steps are automatically marked "outdated" pip_replace(p, "load", \(n = 3) seq_len(n)) p # Re-run to bring everything up to date pip_run(p) pp <- pip_new() |> pip_add("load", \(n = 5) seq_len(n)) |> pip_add("double", \(x = ~load) x * 2) pip_run(p) p # Replace "load" — downstream steps are automatically marked "outdated" pip_replace(p, "load", \(n = 3) seq_len(n)) p # Re-run to bring everything up to date pip_run(p) p
Executes all pending steps in order. Steps already in state "done" are
skipped unless force = TRUE.
pip_run( x, lgr = pipeflow_lgr, force = FALSE, progress = NULL, recursive = FALSE )pip_run( x, lgr = pipeflow_lgr, force = FALSE, progress = NULL, recursive = FALSE )
x |
A pipeflow pip or view |
lgr |
A logging function of the form |
force |
Logical indicating if all steps should be forced to run, regardless of whether they are outdated or not. |
progress |
Optional callback of the form
|
recursive |
If |
When x is a view, requested rows are run together with required
upstream dependencies.
The updated pipeline or view, invisibly.
vignette("v06-self-modify-pipeline", package = "pipeflow")
for an advanced example of recursive/dynamic pipelines.
p <- pip_new() |> pip_add("load", \(n = 3) seq_len(n)) |> pip_add("square", \(x = ~load) x^2) |> pip_add("total", \(x = ~square) sum(x)) pip_run(p) p # Already-done steps are skipped on a second run pip_run(p) # all steps skipped # lgr = NULL suppresses log output pip_run(p, lgr = NULL) # force = TRUE re-executes every step regardless of state pip_run(p, force = TRUE) # Run only a subset of steps via a view; # upstream dependencies are automatically included v <- pip_view(p, i = "total") pip_run(v)p <- pip_new() |> pip_add("load", \(n = 3) seq_len(n)) |> pip_add("square", \(x = ~load) x^2) |> pip_add("total", \(x = ~square) sum(x)) pip_run(p) p # Already-done steps are skipped on a second run pip_run(p) # all steps skipped # lgr = NULL suppresses log output pip_run(p, lgr = NULL) # force = TRUE re-executes every step regardless of state pip_run(p, force = TRUE) # Run only a subset of steps via a view; # upstream dependencies are automatically included v <- pip_view(p, i = "total") pip_run(v)
Updates the default values of tunable parameters across the pipeline. Affected steps and their downstream dependents are automatically marked as outdated.
pip_set_params(p, params = list())pip_set_params(p, params = list())
p |
A pipeflow pip or view |
params |
Named list of parameters to set. |
Parameters of locked steps are never changed and their state remains unchanged.
The updated pipeline or view, invisibly.
p <- pip_new() |> pip_add("load", \(n = 10) seq_len(n)) |> pip_add("scale", \(x = ~load, factor = 0.5) x * factor) # See all adjustable parameters before running pip_get_params(p) # list(n = 10, factor = 0.5) # Updating params marks affected steps (and their dependents) outdated pip_set_params(p, params = list(n = 5, factor = 2.0)) p pip_run(p) pp <- pip_new() |> pip_add("load", \(n = 10) seq_len(n)) |> pip_add("scale", \(x = ~load, factor = 0.5) x * factor) # See all adjustable parameters before running pip_get_params(p) # list(n = 10, factor = 0.5) # Updating params marks affected steps (and their dependents) outdated pip_set_params(p, params = list(n = 5, factor = 2.0)) p pip_run(p) p
Adds tags to existing tags for all steps in the pipeline unless p is a
view, in which case tags are only added for steps covered by the view.
Locked steps are skipped and not updated.
pip_tag(p, tags = character())pip_tag(p, tags = character())
p |
A pipeflow pip or view. |
tags |
Character vector of tags to add for each selected step. |
The updated pipeline or view, invisibly.
p <- pip_new() |> pip_add("load", \(x = 1) x) |> pip_add("fit", \(x = ~load) x + 1) # Tag every step in the pipeline at once pip_tag(p, tags = c("daily", "core")) p[["pipeline"]][["tags"]] # both steps have c("daily", "core") # Add an extra tag to only one step via a view v <- pip_view(p, i = "fit") pip_tag(v, tags = "model") p[["pipeline"]][["tags"]] # "fit" also has "model"p <- pip_new() |> pip_add("load", \(x = 1) x) |> pip_add("fit", \(x = ~load) x + 1) # Tag every step in the pipeline at once pip_tag(p, tags = c("daily", "core")) p[["pipeline"]][["tags"]] # both steps have c("daily", "core") # Add an extra tag to only one step via a view v <- pip_view(p, i = "fit") pip_tag(v, tags = "model") p[["pipeline"]][["tags"]] # "fit" also has "model"
Unlocks all selected steps in the pipeline unless p is a view, in which
case only steps covered by the view are unlocked.
pip_unlock(p)pip_unlock(p)
p |
A pipeflow pip or view. |
The updated pipeline or view, invisibly.
p <- pip_new() |> pip_add("load", \(x = 1) x) |> pip_add("fit", \(x = ~load) x * 2) # Lock all steps, then unlock to restore normal execution pip_lock(p) p[["pipeline"]][["locked"]] # TRUE, TRUE pip_unlock(p) p[["pipeline"]][["locked"]] # FALSE, FALSEp <- pip_new() |> pip_add("load", \(x = 1) x) |> pip_add("fit", \(x = ~load) x * 2) # Lock all steps, then unlock to restore normal execution pip_lock(p) p[["pipeline"]][["locked"]] # TRUE, TRUE pip_unlock(p) p[["pipeline"]][["locked"]] # FALSE, FALSE
Removes tags from existing tags for all steps in the pipeline unless p
is a view, in which case tags are only removed for steps covered by the
view. Locked steps are skipped and not updated.
pip_untag(p, tags = character())pip_untag(p, tags = character())
p |
A pipeflow pip or view. |
tags |
Character vector of tags to remove for each selected step. |
The updated pipeline or view, invisibly.
p <- pip_new() |> pip_add("load", \(x = 1) x, tags = c("daily", "core")) |> pip_add("fit", \(x = ~load) x + 1, tags = c("daily", "model")) # Remove "daily" from all steps pip_untag(p, tags = "daily") # "load" retains "core"; "fit" retains "model" p[["pipeline"]][["tags"]]p <- pip_new() |> pip_add("load", \(x = 1) x, tags = c("daily", "core")) |> pip_add("fit", \(x = ~load) x + 1, tags = c("daily", "model")) # Remove "daily" from all steps pip_untag(p, tags = "daily") # "load" retains "core"; "fit" retains "model" p[["pipeline"]][["tags"]]
Creates a filtered view showing only a selected subset of steps.
A view references the underlying pipeline without copying it, so
operations like pip_run() and pip_set_params() applied to a view
affect only the selected steps.
pip_view( x, i = integer(), filter = list(), tags = character(), fixed = TRUE, ... )pip_view( x, i = integer(), filter = list(), tags = character(), fixed = TRUE, ... )
x |
A pipeflow pipeline or view. |
i |
Optional row indices or step names to keep. |
filter |
A named list of filters to apply. Each element can be a
character vector specifying the values to keep for the corresponding
property or, if |
tags |
Tag filter (character). Keeps steps with any matching tag. |
fixed |
If TRUE, values in |
... |
further args passed to |
A pipeflow_view object.
p <- pip_new() pip_add(p, "load_raw", \(x = 1) x, tags = c("io", "core", "daily") ) pip_add(p, "fit_model", \(x = 2) x + 1, tags = c("model") ) pip_add(p, "eval_model", \(x = ~fit_model) x, tags = c("model", "daily", "report") ) # Filter by a fixed column value (one or more states) pip_view(p, filter = list(state = "new")) # Combine filters: step pattern AND state pip_view(p, filter = list(step = "model", state = "new")) # Filter by tag — keeps steps that have *any* of the given tags pip_view(p, tags = "daily") # Combine explicit step selection with a filter (intersection) pip_view(p, i = c("load_raw", "fit_model"), filter = list(state = "new") ) # Select by integer row indices pip_view(p, i = c(1L, 2L), filter = list(state = "new")) # Use a regex pattern to match step names pip_view(p, filter = list(step = "_model$"), fixed = FALSE) # Views are composable: create a view-of-view for progressive narrowing v1 <- pip_view(p, tags = "daily") print(v1) # load_raw, eval_model v2 <- pip_view(v1, tags = "report") print(v2) # eval_model onlyp <- pip_new() pip_add(p, "load_raw", \(x = 1) x, tags = c("io", "core", "daily") ) pip_add(p, "fit_model", \(x = 2) x + 1, tags = c("model") ) pip_add(p, "eval_model", \(x = ~fit_model) x, tags = c("model", "daily", "report") ) # Filter by a fixed column value (one or more states) pip_view(p, filter = list(state = "new")) # Combine filters: step pattern AND state pip_view(p, filter = list(step = "model", state = "new")) # Filter by tag — keeps steps that have *any* of the given tags pip_view(p, tags = "daily") # Combine explicit step selection with a filter (intersection) pip_view(p, i = c("load_raw", "fit_model"), filter = list(state = "new") ) # Select by integer row indices pip_view(p, i = c(1L, 2L), filter = list(state = "new")) # Use a regex pattern to match step names pip_view(p, filter = list(step = "_model$"), fixed = FALSE) # Views are composable: create a view-of-view for progressive narrowing v1 <- pip_view(p, tags = "daily") print(v1) # load_raw, eval_model v2 <- pip_view(v1, tags = "report") print(v2) # eval_model only
Print pipeflow objects
## S3 method for class 'pipeflow_pip' print( x, rows = integer(), cols = getOption("pipeflow.print.cols", default = "core"), topn = getOption("pipeflow.print.topn", default = 5), nrows = getOption("pipeflow.print.nrows", default = 50), row.names = getOption("pipeflow.print.rownames", default = TRUE), class = getOption("pipeflow.print.class", default = FALSE), header = TRUE, ... ) ## S3 method for class 'pipeflow_view' print(x, header = TRUE, ...)## S3 method for class 'pipeflow_pip' print( x, rows = integer(), cols = getOption("pipeflow.print.cols", default = "core"), topn = getOption("pipeflow.print.topn", default = 5), nrows = getOption("pipeflow.print.nrows", default = 50), row.names = getOption("pipeflow.print.rownames", default = TRUE), class = getOption("pipeflow.print.class", default = FALSE), header = TRUE, ... ) ## S3 method for class 'pipeflow_view' print(x, header = TRUE, ...)
x |
A pipeflow pipeline or view. |
rows |
Row indices to be printed. If empty, all rows are printed. |
cols |
The columns to be printed. Can be either one of
|
topn |
The number of rows to be printed from the beginning
and end of tables with more than |
nrows |
The number of rows printed before truncation is enforced. |
row.names |
If TRUE, row indices will be printed alongside x. |
class |
If TRUE, the resulting output will include above each column its storage class (or a self-evident abbreviation thereof). |
header |
If TRUE, a header with the pipeline name and number of steps will be printed. |
... |
Other arguments passed to |
Invisibly returns x.
p <- pip_new("demo") |> pip_add("load", \(n = 5) seq_len(n), tags = c("io", "raw")) |> pip_add("square", \(x = ~load) x^2, tags = "compute") |> pip_add("total", \(x = ~square) sum(x), tags = "compute") print(p) # core columns: step, depends, tags, out, state print(p, cols = "all") # all non-hidden columns print(p, rows = 2:3) # print only steps 2 and 3 p <- pip_new() |> pip_add("s1", \(x = 1) x, tags = "io") |> pip_add("s2", \(x = ~s1) x + 1, tags = "model") # A view header shows how many steps are selected out of the total v <- pip_view(p, tags = "model") print(v) # "<pipeflow_view> pipe view (1 of 2 steps)"p <- pip_new("demo") |> pip_add("load", \(n = 5) seq_len(n), tags = c("io", "raw")) |> pip_add("square", \(x = ~load) x^2, tags = "compute") |> pip_add("total", \(x = ~square) sum(x), tags = "compute") print(p) # core columns: step, depends, tags, out, state print(p, cols = "all") # all non-hidden columns print(p, rows = 2:3) # print only steps 2 and 3 p <- pip_new() |> pip_add("s1", \(x = 1) x, tags = "io") |> pip_add("s2", \(x = ~s1) x + 1, tags = "model") # A view header shows how many steps are selected out of the total v <- pip_view(p, tags = "model") print(v) # "<pipeflow_view> pipe view (1 of 2 steps)"
This function provides an easy way to set the basic log
layout of the pipeline logging. For a fine-grained control of the logger,
which you can retrieve via lgr::get_logger("pipeflow"), see e.g. the
logger_config function from the lgr package.
set_log_layout(layout = c("text", "json"))set_log_layout(layout = c("text", "json"))
layout |
Layout name, which at this point can be either 'text' or 'json'. |
invisibly returns a Logger object
p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$run() lg <- set_log_layout("json") print(lg) p$run() set_log_layout("text") p$run()p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$run() lg <- set_log_layout("json") print(lg) p$run() set_log_layout("text") p$run()