Home Accueil / Blog / Data Engineering Data Engineering

How to build a reproducible R&D data pipeline
from scratch

Comment construire un pipeline de données R&D reproductible
de zéro

TL;DR

Most R&D data pipelines start as ad hoc scripts and accumulate technical debt until they become unmaintainable. Building reproducibility in from day one is not harder than building without it, it just requires different habits.

Pourquoi les pipelines R&D échouent

Le pipeline de données R&D typique commence comme un script qui fonctionne. Six mois plus tard, un nouveau scientifique rejoint l'équipe et ne peut pas reproduire les résultats parce que le script dépend d'un chemin de fichier local qui n'existe plus et produit des sorties différentes sur un autre OS. Ce n'est pas un mode d'échec rare: c'est le résultat par défaut.

Les cinq propriétés d'un pipeline reproductible

Déterminisme (mêmes entrées → mêmes sorties), portabilité (fonctionne sur n'importe quelle machine), auditabilité (chaque transformation tracée), modularité (chaque étape indépendante), et observabilité (quand quelque chose casse, vous savez exactement où et pourquoi).

Couche 1 : ingestion et versioning des données

Les données brutes doivent être traitées comme immuables. Stockez-les dans un objet store versionné (S3 avec versioning activé) et enregistrez un checksum à l'ingestion.

checksum = ingest_and_checksum(
    "data/raw/assay_results_2025-03-01.csv",
    "my-rd-bucket",
    "raw/assay_results_2025-03-01.csv"
)

Couche 2 : transformation avec dbt

La transformation des données appartient aux modèles SQL dbt. Chaque modèle est un SELECT unique, est testé et documenté. Le graphe de transformation est explicite et visualisable.

-- models/staging/stg_assay_results.sql
SELECT
    CAST(sample_id AS VARCHAR(50)) AS sample_id,
    CASE WHEN result_value < lloq THEN lloq/2.0
         ELSE result_value END AS result_corrected
FROM {{ source('raw', 'assay_results') }}
WHERE sample_id IS NOT NULL

Couche 3 : analyse statistique avec R et renv

renv::restore()
library(lme4); library(emmeans)

data   <- read_csv("data/processed/analysis_dataset.csv")
model  <- lmer(result_corrected ~ treatment * timepoint + (1 | subject_id), data = data)
results <- emmeans(model, ~ treatment | timepoint) |> contrast("revpairwise") |> as.data.frame()
write_csv(results, "results/primary_analysis.csv")

Couche 4 : orchestration avec Dagster

Dagster orchestrate le pipeline : il connaît les dépendances entre étapes, trace l'historique d'exécution, et ne re-exécute que ce qui doit changer quand une étape en amont change.

Assembler le tout

Le pipeline complet s'exécute avec un déclencheur Dagster. Pour les contextes GxP, ajoutez des tags Git signés aux points de verrouillage du pipeline et stockez le lockfile renv avec les sorties d'analyse. Cela vous donne l'audit trail sans nécessiter un LIMS validé.


À retenir

Un pipeline R&D reproductible repose sur cinq couches : ingestion immuable, transformations dbt testées, analyse R verrouillée par renv, orchestration Dagster, et reporting Quarto. Chaque couche résout un mode d'échec spécifique.

Why R&D pipelines fail

The typical R&D data pipeline starts as a script that works. Someone writes analysis_v1.R. It runs. The results are used. Six months later, a new scientist joins the team and cannot reproduce the results because the script depends on a local file path that no longer exists, was run with a version of R that is no longer installed, and produces different output when run on a different operating system.

This is not a rare failure mode. It is the default outcome of building analysis pipelines without reproducibility as a first-class requirement. The fix is not technical sophistication: it is discipline about five specific properties from the first commit.

The five properties of a reproducible pipeline

  1. Determinism: given the same input, the pipeline always produces the same output, regardless of who runs it or when.
  2. Portability: the pipeline runs on any machine with the documented dependencies, without configuration changes.
  3. Auditability: every transformation is documented, versioned, and traceable from input to output.
  4. Modularity: each stage of the pipeline is independent and can be re-run without running the entire pipeline.
  5. Observability: when something breaks, you know exactly where and why.

Layer 1: data ingestion and versioning

Raw data should be treated as immutable. Once ingested, it is never modified. Store it in a versioned object store (S3 with versioning enabled, or DVC on top of any storage) and record a checksum at ingestion time:

import boto3
import hashlib

def ingest_and_checksum(local_path: str, s3_bucket: str, s3_key: str) -> str:
    """Upload file to S3 and return SHA-256 checksum."""
    with open(local_path, "rb") as f:
        content = f.read()
        checksum = hashlib.sha256(content).hexdigest()

    s3 = boto3.client("s3")
    s3.put_object(
        Bucket  = s3_bucket,
        Key     = s3_key,
        Body    = content,
        Metadata = {"sha256": checksum}
    )
    return checksum

# Usage
checksum = ingest_and_checksum(
    "data/raw/assay_results_2025-03-01.csv",
    "my-rd-bucket",
    "raw/assay_results_2025-03-01.csv"
)
print(f"Ingested with checksum: {checksum}")

Layer 2: transformation with dbt

Data transformation: cleaning, reshaping, applying business rules, joining sources, belongs in dbt SQL models. Each model is a single SELECT statement, is tested, and is documented. The transformation graph is explicit and visualizable.

-- models/staging/stg_assay_results.sql
SELECT
    CAST(sample_id    AS VARCHAR(50)) AS sample_id,
    CAST(assay_date   AS DATE)        AS assay_date,
    CAST(result_value AS FLOAT)       AS result_value,
    LOWER(TRIM(unit))                 AS unit,
    CASE
        WHEN result_value < lloq THEN lloq / 2.0
        ELSE result_value
    END AS result_corrected,
    CASE
        WHEN result_value < lloq THEN 'BLQ'
        ELSE ''
    END AS blq_flag
FROM {{ source('raw', 'assay_results') }}
WHERE sample_id IS NOT NULL
# schema.yml - every column documented and tested
version: 2
models:
  - name: stg_assay_results
    columns:
      - name: sample_id
        tests: [not_null, unique]
      - name: result_value
        tests: [not_null, {accepted_range: {min_value: 0}}]

Layer 3: statistical analysis with R and renv

Statistical analysis belongs in R, with every dependency locked by renv:

# R/analysis.R
renv::restore()  # ensures all packages match lockfile before running

library(readr); library(lme4); library(emmeans)

# Read from dbt output (already cleaned and tested)
data <- read_csv("data/processed/analysis_dataset.csv")

model <- lmer(
  result_corrected ~ treatment * timepoint + (1 | subject_id),
  data = data
)

results <- emmeans(model, ~ treatment | timepoint) |>
  contrast("revpairwise") |>
  as.data.frame()

write_csv(results, "results/primary_analysis.csv")

Layer 4: orchestration with Dagster

Dagster orchestrates the pipeline: it knows the dependencies between stages, tracks execution history, and re-runs only what needs to change when an upstream stage changes.

from dagster import asset, AssetIn

@asset
def raw_assay_data():
    """Ingest raw assay results from S3."""
    return ingest_from_s3("raw/assay_results_2025-03-01.csv")

@asset(ins={"raw": AssetIn("raw_assay_data")})
def cleaned_assay_data(raw):
    """Run dbt transformations."""
    import subprocess
    subprocess.run(["dbt", "run", "--models", "stg_assay_results"], check=True)

@asset(ins={"cleaned": AssetIn("cleaned_assay_data")})
def primary_analysis_results(cleaned):
    """Run R statistical analysis."""
    import subprocess
    subprocess.run(["Rscript", "R/analysis.R"], check=True)

Layer 5: reporting with Quarto

The final layer converts analysis results to formatted outputs:

---
title: "Study Report"
params:
  results_path: "results/primary_analysis.csv"
---

```{r}
library(readr); library(gt)

results <- read_csv(params$results_path)
gt(results) |> tab_header("Primary Endpoint Results")
```

Putting it together

The complete pipeline runs with one Dagster job trigger. Every stage is logged, versioned, and traceable. When a result changes unexpectedly, you can query Dagster's execution history to find exactly when it changed and which upstream change caused it.

For GxP contexts, add signed Git tags at pipeline lock points and store the renv lockfile alongside the analysis outputs. This gives you the audit trail without requiring a validated LIMS.


Key takeaway

A reproducible R&D pipeline is five layers: immutable data ingestion, tested dbt transformations, renv-locked R analysis, Dagster orchestration, and Quarto reporting. Each layer solves a specific failure mode. Together they turn an ad hoc script collection into an auditable, maintainable system.

AM

Aslane Mortreau

Freelance Data & AI specialist working with pharmaceutical, biotech, and cosmetic R&D teams. Statistical modeling, analytical pipelines, and custom applications.

Spécialiste Data & IA freelance travaillant avec des équipes R&D pharmaceutiques, biotech et cosmétiques. Modélisation statistique, pipelines analytiques et applications sur mesure.