Skip to content
BEE
Backend Engineering Essentials

[BEE-30085] ML Data Validation and Pipeline Quality Gates

INFO

Data validation enforces explicit assertions about data — schema, statistical properties, freshness, completeness — before that data enters a training pipeline, feature store, or model. A pipeline quality gate is a decision point that blocks or reroutes execution when validation fails. Without these gates, bad data reaches models silently, producing degraded predictions that are expensive to diagnose weeks later.

Context

A Dimensional Research survey found 96% of organizations encounter data quality problems when training AI models, and VentureBeat's analysis reported that 87% of data science projects never reach production, with inadequate data quality as the leading cause. Google's MLOps reference architecture for continuous training places data validation as a mandatory step before model training, checking for schema skews (unexpected or missing features) and data value skews (statistical changes that signal model staleness), with explicit guidance to stop the pipeline when problems are detected (https://docs.cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning).

The core problem is that data issues are silent. A model that trains without errors on data containing 40% null values in a critical feature does not crash — it produces subtly wrong predictions. Detection happens downstream when business metrics degrade, at which point root cause analysis is expensive.

The tooling landscape has converged around three complementary layers: Pandera for schema-level validation applied directly to Python DataFrames in training code; Great Expectations for expectation suites and human-readable validation reports across files, databases, and DataFrames; and Soda for YAML-defined data quality checks run against SQL data sources at the pipeline boundary. These tools operate at different points in the data lifecycle and are often used in combination.

The Six Dimensions of Data Quality

Six dimensions define what "good data" means. Every validation rule maps to at least one of them:

DimensionDefinitionTypical check
CompletenessRequired fields and records are presentmissing_percent(feature) < 1%
ValidityValues conform to defined formats, types, domain rulesinvalid_percent(email) < 0.5%
UniquenessEach entity recorded only onceduplicate_count(user_id) = 0
ConsistencyValues agree across related fields and systemsReferential integrity, cross-dataset match rate
AccuracyData correctly represents real-world entitiesCross-referenced against authoritative source
FreshnessData is sufficiently recentfreshness(updated_at) < 24h

Completeness and validity failures are the most common and most detectable automatically. Accuracy typically requires domain-specific cross-reference checks or manual sampling.

Pandera: In-Process DataFrame Validation

Pandera validates DataFrames at the function boundary — where data enters or exits a transformation step. It supports pandas, Polars, PySpark, Dask, and Modin, and follows a schema-declaration style analogous to Pydantic for data models.

Created by Niels Bantilan in 2018 and now an open-source project under Union.ai, Pandera uses two equivalent APIs: an object-based DataFrameSchema and a class-based DataFrameModel.

python
import pandera.pandas as pa
from pandera.typing import Series

# Class-based schema — declarative, self-documenting
class TrainingFeaturesSchema(pa.DataFrameModel):
    user_id: Series[str] = pa.Field(nullable=False, unique=True)
    age: Series[int] = pa.Field(ge=0, le=120)
    purchase_count_30d: Series[int] = pa.Field(ge=0)
    label: Series[int] = pa.Field(isin=[0, 1])

    @pa.check("purchase_count_30d")
    @classmethod
    def purchase_count_reasonable(cls, s: Series[int]) -> Series[bool]:
        # Custom vectorized check: no single user can have >10k purchases in 30 days
        return s <= 10_000

# Validate — raises SchemaError immediately on first failure (default)
validated_df = TrainingFeaturesSchema.validate(df)

# Lazy validation — collect ALL failures before raising
try:
    TrainingFeaturesSchema.validate(df, lazy=True)
except pa.errors.SchemaErrors as e:
    # e.failure_cases contains a DataFrame of all failing rows and checks
    print(e.failure_cases)
    raise

The @pa.check_input and @pa.check_output decorators apply validation at the function boundary without modifying the function body — useful for validating feature engineering functions without changing their signatures:

python
import pandera.pandas as pa

input_schema = pa.DataFrameSchema({
    "event_timestamp": pa.Column("datetime64[ns]", nullable=False),
    "user_id": pa.Column(str, nullable=False),
    "raw_score": pa.Column(float, pa.Check.between(0.0, 1.0)),
})

output_schema = pa.DataFrameSchema({
    "user_id": pa.Column(str, nullable=False),
    "normalized_score": pa.Column(float, pa.Check.between(0.0, 1.0)),
})

@pa.check_input(input_schema)
@pa.check_output(output_schema)
def compute_normalized_score(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["normalized_score"] = (df["raw_score"] - df["raw_score"].min()) / (
        df["raw_score"].max() - df["raw_score"].min()
    )
    return df[["user_id", "normalized_score"]]

Great Expectations: Expectation Suites and Checkpoints

Great Expectations (GX) operates at a higher level than Pandera — it manages named Expectation Suites (collections of rules), runs them as Checkpoints in production, and generates Data Docs (human-readable HTML reports) that serve as living documentation of data quality. GX v1.0 GA was released August 22, 2024; the v1.x API is a Python Fluent API that replaced YAML-based configuration entirely.

An Expectation is a verifiable assertion: "column user_id contains no nulls." An Expectation Suite is a named collection applied as a unit. A Checkpoint executes one or more suites and triggers Actions based on results — Slack notifications, Data Docs updates, blocking exceptions.

python
import great_expectations as gx
import pandas as pd

# GX v1.x Fluent API
context = gx.get_context()

# Connect to a pandas source
data_source = context.data_sources.add_pandas("training_data")
data_asset = data_source.add_dataframe_asset("daily_features")
batch_definition = data_asset.add_batch_definition_whole_dataframe("batch")

# Define expectation suite
suite = context.suites.add(
    gx.ExpectationSuite(name="training_features_suite")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="user_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeUnique(column="user_id")
)
suite.add_expectation(
    gx.expectations.ExpectTableRowCountToBeBetween(
        min_value=10_000, max_value=10_000_000
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="age",
        min_value=0,
        max_value=120,
        mostly=0.99,  # allow up to 1% outside range (soft constraint)
    )
)

# Validate
df = pd.read_parquet("s3://ml-data/features/2025-04-15.parquet")
batch = batch_definition.get_batch(batch_parameters={"dataframe": df})
validation_result = batch.validate(suite)

if not validation_result["success"]:
    failing = [
        r for r in validation_result["results"] if not r["success"]
    ]
    raise RuntimeError(
        f"Validation failed: {len(failing)} expectations not met. "
        f"First failure: {failing[0]['expectation_config']['type']}"
    )

The mostly parameter (0.0 to 1.0) converts an absolute assertion into a partial-pass threshold. mostly=0.99 means 99% of rows must satisfy the check — useful for noisy real-world data where perfect compliance is unrealistic but extreme violations must still be caught.

Checkpoints with Actions enable production-grade validation pipelines where the validation result triggers notifications and documentation updates automatically:

python
validation_definition = context.validation_definitions.add(
    gx.ValidationDefinition(
        name="daily_feature_validation",
        data=batch_definition,
        suite=suite,
    )
)

checkpoint = context.checkpoints.add(
    gx.Checkpoint(
        name="daily_feature_checkpoint",
        validation_definitions=[validation_definition],
        actions=[
            gx.checkpoint.SlackNotificationAction(
                name="slack_alert",
                slack_webhook="${SLACK_WEBHOOK_URL}",
                notify_on="failure",   # "all" | "success" | "failure"
            ),
            gx.checkpoint.UpdateDataDocsAction(name="update_data_docs"),
        ],
        result_format={"result_format": "COMPLETE"},
    )
)

checkpoint_result = checkpoint.run()

Soda: SQL-Layer Quality Gates

Soda Core defines data quality checks in YAML (the Soda Checks Language, SodaCL) and runs them directly against SQL data sources. Founded in Brussels in 2018, it supports over 25 built-in metrics covering all six quality dimensions. Soda is particularly useful for validating data in data warehouses (Snowflake, BigQuery, Redshift) and dbt-adjacent pipelines where validation happens after SQL transformations.

yaml
# checks/feature_checks.yml
checks for training_features:
  - row_count > 10000
  - missing_count(user_id) = 0
  - duplicate_count(user_id) = 0
  - missing_percent(age) < 1 %
  - freshness(updated_at) < 24h
  - invalid_percent(email) < 0.5 %:
      valid format: email
  - schema:
      fail:
        when required column missing:
          - user_id
          - event_timestamp
          - label

# Two-tier alerting: warn before fail
checks for raw_events:
  - duplicate_count(event_id):
      warn: when > 5
      fail: when > 100
bash
soda scan \
  -d snowflake_prod \
  -c soda/configuration.yml \
  checks/feature_checks.yml
# Exit code: 0=all pass, 1=warnings only, 2=failures

The Python API enables programmatic integration:

python
from soda.scan import Scan


def run_soda_gate(data_source: str, checks_file: str) -> None:
    """Run Soda checks and raise on failure. Logs warnings without blocking."""
    scan = Scan()
    scan.set_data_source_name(data_source)
    scan.add_configuration_yaml_file("./soda/configuration.yml")
    scan.add_sodacl_yaml_file(checks_file)

    scan.execute()
    print(scan.get_logs_text())

    if scan.has_failures():
        raise RuntimeError(
            f"Soda data quality gate failed for '{data_source}'. "
            f"Blocking pipeline execution."
        )
    if scan.has_warnings():
        import logging
        logging.warning("Soda checks: warnings detected, pipeline continues.")

Pipeline Integration

Severity Levels

Quality gates SHOULD implement at least three severity levels:

  • CRITICAL — block pipeline, page on-call. Examples: primary key is null, row count is zero, schema column missing.
  • ERROR — block pipeline, send alert. Examples: null rate exceeds 5% on a required feature, data freshness > 48h.
  • WARNING — continue pipeline, log and alert. Examples: null rate increased from 0.1% to 0.5% within acceptable bounds.

Airflow Integration

GX and Soda both integrate natively with Airflow. The GXValidateDataFrameOperator blocks downstream tasks when validation fails. ShortCircuitOperator provides a lightweight gate for simple row count or freshness checks:

python
from airflow.operators.python import ShortCircuitOperator, BranchPythonOperator
from great_expectations_provider.operators.great_expectations import (
    GXValidateDataFrameOperator,
)


def check_minimum_row_count() -> bool:
    """Returns False to stop DAG; True to proceed."""
    count = query_db("SELECT COUNT(*) FROM feature_store_daily")
    return count > 10_000  # returns False → all downstream tasks skipped


def route_on_quality() -> str:
    """BranchPythonOperator: route to training or quarantine."""
    if data_quality_score() >= 0.95:
        return "train_model"
    return "quarantine_data"


with dag:
    # Hard gate: stops DAG if row count too low
    row_count_gate = ShortCircuitOperator(
        task_id="row_count_gate",
        python_callable=check_minimum_row_count,
    )

    # GX validation: blocks downstream on expectation failure
    gx_validation = GXValidateDataFrameOperator(
        task_id="validate_features",
        configure_dataframe=lambda: load_feature_df(),
        configure_expectations=lambda df: gx.ExpectationSuite(
            expectations=[
                gx.expectations.ExpectColumnValuesToNotBeNull(column="user_id"),
                gx.expectations.ExpectTableRowCountToBeBetween(
                    min_value=1_000, max_value=50_000_000
                ),
            ]
        ),
    )

    # Soft gate: route bad data to quarantine instead of failing DAG
    quality_branch = BranchPythonOperator(
        task_id="quality_branch",
        python_callable=route_on_quality,
    )

    row_count_gate >> gx_validation >> quality_branch

The ShortCircuitOperator should be reserved for non-negotiable conditions only. Overuse causes chain-reaction skips across large DAGs. Use BranchPythonOperator when bad data should be quarantined (routed to a separate storage path for human review) rather than discarding an entire pipeline run.

Logging Validation Results to Experiment Tracking

Linking validation state to the model training run enables reproducibility audits — if a model degrades, you can trace whether the training data passed or failed validation at training time:

python
import mlflow

with mlflow.start_run():
    # Run Soda validation and log outcome
    scan = Scan()
    scan.set_data_source_name("snowflake_prod")
    scan.add_sodacl_yaml_file("./checks/features.yml")
    scan.execute()

    mlflow.log_param("data_validation_tool", "soda-core")
    mlflow.log_metric("soda_checks_passed", scan.get_checks_count() - scan.get_checks_failing_count())
    mlflow.log_metric("soda_checks_failed", scan.get_checks_failing_count())
    mlflow.set_tag("data_validation_status", "passed" if not scan.has_failures() else "failed")

    if scan.has_failures():
        raise RuntimeError("Data quality gate failed — training blocked.")

    # Proceed to training only if validation passed
    train_model(...)

Common Mistakes

Writing validation rules only for known problems. Validation suites written reactively — after a data issue caused a model failure — miss future unknown failure modes. Write assertions for the full schema and all statistical properties at pipeline design time, not after incidents.

Using only blocking gates without warnings. A binary pass/fail gate that blocks on the first anomaly causes unnecessary pipeline failures during periods of natural data variation. Two-tier alerting (warn at 1% null rate, fail at 5%) prevents alert fatigue while still catching severe anomalies.

Skipping freshness checks. A pipeline that processes stale data without detecting it will train a model on yesterday's data while thinking it used today's. freshness(updated_at) < 24h is a one-line SodaCL check; omitting it is a common oversight with significant consequences.

Validating only training data, not serving data. Training data validation catches issues at model build time. Serving data validation catches issues at prediction time — when feature values fed to the model diverge from what training data looked like. Both MUST be validated; serving validation happens in the online serving path and must be fast (< 5ms budget for most cases).

Using lazy=False (default) Pandera in production pipelines. With lazy=False, Pandera raises on the first failure, hiding all other failures in the same batch. Always use lazy=True in pipeline contexts to collect the full failure picture before stopping execution.

References