Why R&D pipelines fail
The typical R&D data pipeline starts as a script that works. Someone writes analysis_v1.R. It runs. The results are used. Six months later, a new scientist joins the team and cannot reproduce the results because the script depends on a local file path that no longer exists, was run with a version of R that is no longer installed, and produces different output when run on a different operating system.
This is not a rare failure mode. It is the default outcome of building analysis pipelines without reproducibility as a first-class requirement. The fix is not technical sophistication: it is discipline about five specific properties from the first commit.
The five properties of a reproducible pipeline
- Determinism: given the same input, the pipeline always produces the same output, regardless of who runs it or when.
- Portability: the pipeline runs on any machine with the documented dependencies, without configuration changes.
- Auditability: every transformation is documented, versioned, and traceable from input to output.
- Modularity: each stage of the pipeline is independent and can be re-run without running the entire pipeline.
- Observability: when something breaks, you know exactly where and why.
Layer 1: data ingestion and versioning
Raw data should be treated as immutable. Once ingested, it is never modified. Store it in a versioned object store (S3 with versioning enabled, or DVC on top of any storage) and record a checksum at ingestion time:
import boto3
import hashlib
def ingest_and_checksum(local_path: str, s3_bucket: str, s3_key: str) -> str:
"""Upload file to S3 and return SHA-256 checksum."""
with open(local_path, "rb") as f:
content = f.read()
checksum = hashlib.sha256(content).hexdigest()
s3 = boto3.client("s3")
s3.put_object(
Bucket = s3_bucket,
Key = s3_key,
Body = content,
Metadata = {"sha256": checksum}
)
return checksum
# Usage
checksum = ingest_and_checksum(
"data/raw/assay_results_2025-03-01.csv",
"my-rd-bucket",
"raw/assay_results_2025-03-01.csv"
)
print(f"Ingested with checksum: {checksum}")
Layer 2: transformation with dbt
Data transformation: cleaning, reshaping, applying business rules, joining sources, belongs in dbt SQL models. Each model is a single SELECT statement, is tested, and is documented. The transformation graph is explicit and visualizable.
-- models/staging/stg_assay_results.sql
SELECT
CAST(sample_id AS VARCHAR(50)) AS sample_id,
CAST(assay_date AS DATE) AS assay_date,
CAST(result_value AS FLOAT) AS result_value,
LOWER(TRIM(unit)) AS unit,
CASE
WHEN result_value < lloq THEN lloq / 2.0
ELSE result_value
END AS result_corrected,
CASE
WHEN result_value < lloq THEN 'BLQ'
ELSE ''
END AS blq_flag
FROM {{ source('raw', 'assay_results') }}
WHERE sample_id IS NOT NULL
# schema.yml - every column documented and tested
version: 2
models:
- name: stg_assay_results
columns:
- name: sample_id
tests: [not_null, unique]
- name: result_value
tests: [not_null, {accepted_range: {min_value: 0}}]
Layer 3: statistical analysis with R and renv
Statistical analysis belongs in R, with every dependency locked by renv:
# R/analysis.R
renv::restore() # ensures all packages match lockfile before running
library(readr); library(lme4); library(emmeans)
# Read from dbt output (already cleaned and tested)
data <- read_csv("data/processed/analysis_dataset.csv")
model <- lmer(
result_corrected ~ treatment * timepoint + (1 | subject_id),
data = data
)
results <- emmeans(model, ~ treatment | timepoint) |>
contrast("revpairwise") |>
as.data.frame()
write_csv(results, "results/primary_analysis.csv")
Layer 4: orchestration with Dagster
Dagster orchestrates the pipeline: it knows the dependencies between stages, tracks execution history, and re-runs only what needs to change when an upstream stage changes.
from dagster import asset, AssetIn
@asset
def raw_assay_data():
"""Ingest raw assay results from S3."""
return ingest_from_s3("raw/assay_results_2025-03-01.csv")
@asset(ins={"raw": AssetIn("raw_assay_data")})
def cleaned_assay_data(raw):
"""Run dbt transformations."""
import subprocess
subprocess.run(["dbt", "run", "--models", "stg_assay_results"], check=True)
@asset(ins={"cleaned": AssetIn("cleaned_assay_data")})
def primary_analysis_results(cleaned):
"""Run R statistical analysis."""
import subprocess
subprocess.run(["Rscript", "R/analysis.R"], check=True)
Layer 5: reporting with Quarto
The final layer converts analysis results to formatted outputs:
---
title: "Study Report"
params:
results_path: "results/primary_analysis.csv"
---
```{r}
library(readr); library(gt)
results <- read_csv(params$results_path)
gt(results) |> tab_header("Primary Endpoint Results")
```
Putting it together
The complete pipeline runs with one Dagster job trigger. Every stage is logged, versioned, and traceable. When a result changes unexpectedly, you can query Dagster's execution history to find exactly when it changed and which upstream change caused it.
For GxP contexts, add signed Git tags at pipeline lock points and store the renv lockfile alongside the analysis outputs. This gives you the audit trail without requiring a validated LIMS.
Key takeaway
A reproducible R&D pipeline is five layers: immutable data ingestion, tested dbt transformations, renv-locked R analysis, Dagster orchestration, and Quarto reporting. Each layer solves a specific failure mode. Together they turn an ad hoc script collection into an auditable, maintainable system.