[BEE-30085] ML Data Validation and Pipeline Quality Gates
INFO
Data validation enforces explicit assertions about data — schema, statistical properties, freshness, completeness — before that data enters a training pipeline, feature store, or model. A pipeline quality gate is a decision point that blocks or reroutes execution when validation fails. Without these gates, bad data reaches models silently, producing degraded predictions that are expensive to diagnose weeks later.
Context
A Dimensional Research survey found 96% of organizations encounter data quality problems when training AI models, and VentureBeat's analysis reported that 87% of data science projects never reach production, with inadequate data quality as the leading cause. Google's MLOps reference architecture for continuous training places data validation as a mandatory step before model training, checking for schema skews (unexpected or missing features) and data value skews (statistical changes that signal model staleness), with explicit guidance to stop the pipeline when problems are detected (https://docs.cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning).
The core problem is that data issues are silent. A model that trains without errors on data containing 40% null values in a critical feature does not crash — it produces subtly wrong predictions. Detection happens downstream when business metrics degrade, at which point root cause analysis is expensive.
The tooling landscape has converged around three complementary layers: Pandera for schema-level validation applied directly to Python DataFrames in training code; Great Expectations for expectation suites and human-readable validation reports across files, databases, and DataFrames; and Soda for YAML-defined data quality checks run against SQL data sources at the pipeline boundary. These tools operate at different points in the data lifecycle and are often used in combination.
The Six Dimensions of Data Quality
Six dimensions define what "good data" means. Every validation rule maps to at least one of them:
| Dimension | Definition | Typical check |
|---|---|---|
| Completeness | Required fields and records are present | missing_percent(feature) < 1% |
| Validity | Values conform to defined formats, types, domain rules | invalid_percent(email) < 0.5% |
| Uniqueness | Each entity recorded only once | duplicate_count(user_id) = 0 |
| Consistency | Values agree across related fields and systems | Referential integrity, cross-dataset match rate |
| Accuracy | Data correctly represents real-world entities | Cross-referenced against authoritative source |
| Freshness | Data is sufficiently recent | freshness(updated_at) < 24h |
Completeness and validity failures are the most common and most detectable automatically. Accuracy typically requires domain-specific cross-reference checks or manual sampling.
Pandera: In-Process DataFrame Validation
Pandera validates DataFrames at the function boundary — where data enters or exits a transformation step. It supports pandas, Polars, PySpark, Dask, and Modin, and follows a schema-declaration style analogous to Pydantic for data models.
Created by Niels Bantilan in 2018 and now an open-source project under Union.ai, Pandera uses two equivalent APIs: an object-based DataFrameSchema and a class-based DataFrameModel.
import pandera.pandas as pa
from pandera.typing import Series
# Class-based schema — declarative, self-documenting
class TrainingFeaturesSchema(pa.DataFrameModel):
user_id: Series[str] = pa.Field(nullable=False, unique=True)
age: Series[int] = pa.Field(ge=0, le=120)
purchase_count_30d: Series[int] = pa.Field(ge=0)
label: Series[int] = pa.Field(isin=[0, 1])
@pa.check("purchase_count_30d")
@classmethod
def purchase_count_reasonable(cls, s: Series[int]) -> Series[bool]:
# Custom vectorized check: no single user can have >10k purchases in 30 days
return s <= 10_000
# Validate — raises SchemaError immediately on first failure (default)
validated_df = TrainingFeaturesSchema.validate(df)
# Lazy validation — collect ALL failures before raising
try:
TrainingFeaturesSchema.validate(df, lazy=True)
except pa.errors.SchemaErrors as e:
# e.failure_cases contains a DataFrame of all failing rows and checks
print(e.failure_cases)
raiseThe @pa.check_input and @pa.check_output decorators apply validation at the function boundary without modifying the function body — useful for validating feature engineering functions without changing their signatures:
import pandera.pandas as pa
input_schema = pa.DataFrameSchema({
"event_timestamp": pa.Column("datetime64[ns]", nullable=False),
"user_id": pa.Column(str, nullable=False),
"raw_score": pa.Column(float, pa.Check.between(0.0, 1.0)),
})
output_schema = pa.DataFrameSchema({
"user_id": pa.Column(str, nullable=False),
"normalized_score": pa.Column(float, pa.Check.between(0.0, 1.0)),
})
@pa.check_input(input_schema)
@pa.check_output(output_schema)
def compute_normalized_score(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df["normalized_score"] = (df["raw_score"] - df["raw_score"].min()) / (
df["raw_score"].max() - df["raw_score"].min()
)
return df[["user_id", "normalized_score"]]Great Expectations: Expectation Suites and Checkpoints
Great Expectations (GX) operates at a higher level than Pandera — it manages named Expectation Suites (collections of rules), runs them as Checkpoints in production, and generates Data Docs (human-readable HTML reports) that serve as living documentation of data quality. GX v1.0 GA was released August 22, 2024; the v1.x API is a Python Fluent API that replaced YAML-based configuration entirely.
An Expectation is a verifiable assertion: "column user_id contains no nulls." An Expectation Suite is a named collection applied as a unit. A Checkpoint executes one or more suites and triggers Actions based on results — Slack notifications, Data Docs updates, blocking exceptions.
import great_expectations as gx
import pandas as pd
# GX v1.x Fluent API
context = gx.get_context()
# Connect to a pandas source
data_source = context.data_sources.add_pandas("training_data")
data_asset = data_source.add_dataframe_asset("daily_features")
batch_definition = data_asset.add_batch_definition_whole_dataframe("batch")
# Define expectation suite
suite = context.suites.add(
gx.ExpectationSuite(name="training_features_suite")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="user_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="user_id")
)
suite.add_expectation(
gx.expectations.ExpectTableRowCountToBeBetween(
min_value=10_000, max_value=10_000_000
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="age",
min_value=0,
max_value=120,
mostly=0.99, # allow up to 1% outside range (soft constraint)
)
)
# Validate
df = pd.read_parquet("s3://ml-data/features/2025-04-15.parquet")
batch = batch_definition.get_batch(batch_parameters={"dataframe": df})
validation_result = batch.validate(suite)
if not validation_result["success"]:
failing = [
r for r in validation_result["results"] if not r["success"]
]
raise RuntimeError(
f"Validation failed: {len(failing)} expectations not met. "
f"First failure: {failing[0]['expectation_config']['type']}"
)The mostly parameter (0.0 to 1.0) converts an absolute assertion into a partial-pass threshold. mostly=0.99 means 99% of rows must satisfy the check — useful for noisy real-world data where perfect compliance is unrealistic but extreme violations must still be caught.
Checkpoints with Actions enable production-grade validation pipelines where the validation result triggers notifications and documentation updates automatically:
validation_definition = context.validation_definitions.add(
gx.ValidationDefinition(
name="daily_feature_validation",
data=batch_definition,
suite=suite,
)
)
checkpoint = context.checkpoints.add(
gx.Checkpoint(
name="daily_feature_checkpoint",
validation_definitions=[validation_definition],
actions=[
gx.checkpoint.SlackNotificationAction(
name="slack_alert",
slack_webhook="${SLACK_WEBHOOK_URL}",
notify_on="failure", # "all" | "success" | "failure"
),
gx.checkpoint.UpdateDataDocsAction(name="update_data_docs"),
],
result_format={"result_format": "COMPLETE"},
)
)
checkpoint_result = checkpoint.run()Soda: SQL-Layer Quality Gates
Soda Core defines data quality checks in YAML (the Soda Checks Language, SodaCL) and runs them directly against SQL data sources. Founded in Brussels in 2018, it supports over 25 built-in metrics covering all six quality dimensions. Soda is particularly useful for validating data in data warehouses (Snowflake, BigQuery, Redshift) and dbt-adjacent pipelines where validation happens after SQL transformations.
# checks/feature_checks.yml
checks for training_features:
- row_count > 10000
- missing_count(user_id) = 0
- duplicate_count(user_id) = 0
- missing_percent(age) < 1 %
- freshness(updated_at) < 24h
- invalid_percent(email) < 0.5 %:
valid format: email
- schema:
fail:
when required column missing:
- user_id
- event_timestamp
- label
# Two-tier alerting: warn before fail
checks for raw_events:
- duplicate_count(event_id):
warn: when > 5
fail: when > 100soda scan \
-d snowflake_prod \
-c soda/configuration.yml \
checks/feature_checks.yml
# Exit code: 0=all pass, 1=warnings only, 2=failuresThe Python API enables programmatic integration:
from soda.scan import Scan
def run_soda_gate(data_source: str, checks_file: str) -> None:
"""Run Soda checks and raise on failure. Logs warnings without blocking."""
scan = Scan()
scan.set_data_source_name(data_source)
scan.add_configuration_yaml_file("./soda/configuration.yml")
scan.add_sodacl_yaml_file(checks_file)
scan.execute()
print(scan.get_logs_text())
if scan.has_failures():
raise RuntimeError(
f"Soda data quality gate failed for '{data_source}'. "
f"Blocking pipeline execution."
)
if scan.has_warnings():
import logging
logging.warning("Soda checks: warnings detected, pipeline continues.")Pipeline Integration
Severity Levels
Quality gates SHOULD implement at least three severity levels:
- CRITICAL — block pipeline, page on-call. Examples: primary key is null, row count is zero, schema column missing.
- ERROR — block pipeline, send alert. Examples: null rate exceeds 5% on a required feature, data freshness > 48h.
- WARNING — continue pipeline, log and alert. Examples: null rate increased from 0.1% to 0.5% within acceptable bounds.
Airflow Integration
GX and Soda both integrate natively with Airflow. The GXValidateDataFrameOperator blocks downstream tasks when validation fails. ShortCircuitOperator provides a lightweight gate for simple row count or freshness checks:
from airflow.operators.python import ShortCircuitOperator, BranchPythonOperator
from great_expectations_provider.operators.great_expectations import (
GXValidateDataFrameOperator,
)
def check_minimum_row_count() -> bool:
"""Returns False to stop DAG; True to proceed."""
count = query_db("SELECT COUNT(*) FROM feature_store_daily")
return count > 10_000 # returns False → all downstream tasks skipped
def route_on_quality() -> str:
"""BranchPythonOperator: route to training or quarantine."""
if data_quality_score() >= 0.95:
return "train_model"
return "quarantine_data"
with dag:
# Hard gate: stops DAG if row count too low
row_count_gate = ShortCircuitOperator(
task_id="row_count_gate",
python_callable=check_minimum_row_count,
)
# GX validation: blocks downstream on expectation failure
gx_validation = GXValidateDataFrameOperator(
task_id="validate_features",
configure_dataframe=lambda: load_feature_df(),
configure_expectations=lambda df: gx.ExpectationSuite(
expectations=[
gx.expectations.ExpectColumnValuesToNotBeNull(column="user_id"),
gx.expectations.ExpectTableRowCountToBeBetween(
min_value=1_000, max_value=50_000_000
),
]
),
)
# Soft gate: route bad data to quarantine instead of failing DAG
quality_branch = BranchPythonOperator(
task_id="quality_branch",
python_callable=route_on_quality,
)
row_count_gate >> gx_validation >> quality_branchThe ShortCircuitOperator should be reserved for non-negotiable conditions only. Overuse causes chain-reaction skips across large DAGs. Use BranchPythonOperator when bad data should be quarantined (routed to a separate storage path for human review) rather than discarding an entire pipeline run.
Logging Validation Results to Experiment Tracking
Linking validation state to the model training run enables reproducibility audits — if a model degrades, you can trace whether the training data passed or failed validation at training time:
import mlflow
with mlflow.start_run():
# Run Soda validation and log outcome
scan = Scan()
scan.set_data_source_name("snowflake_prod")
scan.add_sodacl_yaml_file("./checks/features.yml")
scan.execute()
mlflow.log_param("data_validation_tool", "soda-core")
mlflow.log_metric("soda_checks_passed", scan.get_checks_count() - scan.get_checks_failing_count())
mlflow.log_metric("soda_checks_failed", scan.get_checks_failing_count())
mlflow.set_tag("data_validation_status", "passed" if not scan.has_failures() else "failed")
if scan.has_failures():
raise RuntimeError("Data quality gate failed — training blocked.")
# Proceed to training only if validation passed
train_model(...)Common Mistakes
Writing validation rules only for known problems. Validation suites written reactively — after a data issue caused a model failure — miss future unknown failure modes. Write assertions for the full schema and all statistical properties at pipeline design time, not after incidents.
Using only blocking gates without warnings. A binary pass/fail gate that blocks on the first anomaly causes unnecessary pipeline failures during periods of natural data variation. Two-tier alerting (warn at 1% null rate, fail at 5%) prevents alert fatigue while still catching severe anomalies.
Skipping freshness checks. A pipeline that processes stale data without detecting it will train a model on yesterday's data while thinking it used today's. freshness(updated_at) < 24h is a one-line SodaCL check; omitting it is a common oversight with significant consequences.
Validating only training data, not serving data. Training data validation catches issues at model build time. Serving data validation catches issues at prediction time — when feature values fed to the model diverge from what training data looked like. Both MUST be validated; serving validation happens in the online serving path and must be fast (< 5ms budget for most cases).
Using lazy=False (default) Pandera in production pipelines. With lazy=False, Pandera raises on the first failure, hiding all other failures in the same batch. Always use lazy=True in pipeline contexts to collect the full failure picture before stopping execution.
Related BEEs
- BEE-30081 AI Feature Stores for ML Inference — feature infrastructure where validation gates are applied before materialization
- BEE-30083 ML Monitoring and Drift Detection — reactive monitoring of distribution changes in production (distinct from proactive validation)
- BEE-30084 ML Experiment Tracking and Model Registry — linking validation results to training runs via MLflow tags
- BEE-6007 Database Migrations — schema evolution that must be paired with validation suite updates
- BEE-30027 AI Workflow Orchestration — orchestration frameworks where quality gates are embedded
References
- Great Expectations, GX Core Overview. https://docs.greatexpectations.io/docs/core/introduction/gx_overview
- Great Expectations, v1.0 GA announcement, August 2024. https://greatexpectations.io/blog/the-next-step-for-gx-oss-1-0/
- Pandera documentation. https://pandera.readthedocs.io/
- Pandera, Lazy Validation. https://pandera.readthedocs.io/en/stable/lazy_validation.html
- Soda, SodaCL Overview. https://docs.soda.io/soda-v3/soda-cl-overview
- Soda, SodaCL Metrics and Checks. https://docs.soda.io/soda-v3/sodacl-reference/metrics-and-checks
- Astronomer, "Data quality and Airflow." https://www.astronomer.io/docs/learn/data-quality
- Astronomer, "Orchestrate GX with Airflow." https://www.astronomer.io/docs/learn/airflow-great-expectations
- Google Cloud, "MLOps: Continuous delivery and automation pipelines in machine learning." https://docs.cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning
- dbt Labs, "Add data tests to your DAG." https://docs.getdbt.com/docs/build/data-tests
- Collibra, "The 6 Dimensions of Data Quality." https://www.collibra.com/blog/the-6-dimensions-of-data-quality
- arXiv:2207.14529, "Effects of Data Quality Problems on ML Model Performance." https://arxiv.org/abs/2207.14529
- TensorFlow Data Validation Guide. https://www.tensorflow.org/tfx/guide/tfdv