Data Quality Testing: Strategies and Implementation

A comprehensive guide to implementing data quality testing across the data pipeline, from schema validation and freshness checks to statistical anomaly detection and business rule enforcement.

technical8 min readBy Klivvr Engineering
Share:

Data quality issues are insidious. Unlike application bugs that produce visible errors, data quality problems often propagate silently until they surface in a quarterly report, a customer complaint, or a regulatory audit. By then, the damage is done: decisions were made on bad data, reports were filed incorrectly, and trust in the data platform is eroded. Klivvr's Data Platform treats data quality testing as a first-class concern, not an afterthought, implementing multiple layers of validation that catch issues at the earliest possible point in the pipeline. This article covers the testing strategies we use, from basic schema validation to statistical anomaly detection, with practical implementations in dbt and Python.

The Testing Pyramid for Data

Software engineering has the test pyramid: unit tests at the base, integration tests in the middle, and end-to-end tests at the top. Data engineering has an analogous structure, but the layers correspond to different types of data validation:

# The data quality testing pyramid
#
# Layer 3: Business Rule Tests (fewest, most expensive)
#   - Cross-table consistency checks
#   - Business logic validation
#   - Reconciliation with source systems
#
# Layer 2: Statistical Tests (moderate count)
#   - Volume anomaly detection
#   - Distribution drift monitoring
#   - Freshness and latency checks
#
# Layer 1: Schema Tests (most numerous, cheapest)
#   - Not null constraints
#   - Uniqueness constraints
#   - Type validation
#   - Accepted values
#   - Referential integrity
 
# Implementation priority: start at the base and work up
TESTING_LAYERS = {
    "schema": {
        "cost": "low",
        "coverage": "broad",
        "implementation_time": "hours",
        "catches": "structural data issues",
    },
    "statistical": {
        "cost": "medium",
        "coverage": "moderate",
        "implementation_time": "days",
        "catches": "data drift and anomalies",
    },
    "business_rules": {
        "cost": "high",
        "coverage": "narrow but deep",
        "implementation_time": "weeks",
        "catches": "semantic correctness issues",
    },
}

Most teams start at the top with expensive business rule tests because those catch the issues they have already been burned by. This is reactive and unsustainable. Starting at the base with schema tests provides broad coverage cheaply and catches the most common issues: null values in required fields, duplicate records, and invalid foreign keys.

Schema Tests in dbt

dbt's built-in schema testing is the fastest way to establish a quality baseline. Every model should have at minimum uniqueness and not-null tests on its primary key:

# models/staging/stripe/_stripe__models.yml
version: 2
 
models:
  - name: stg_stripe__payments
    description: "Staged Stripe payment data with standardized types"
    columns:
      - name: payment_id
        description: "Unique payment identifier from Stripe"
        tests:
          - unique
          - not_null
 
      - name: customer_id
        description: "Foreign key to the customer"
        tests:
          - not_null
          - relationships:
              to: ref('stg_app__customers')
              field: customer_id
              config:
                severity: warn  # Don't fail; Stripe may have customers not yet in app DB
 
      - name: amount
        description: "Payment amount in the original currency"
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
              max_value: 1000000
              config:
                where: "status = 'succeeded'"
 
      - name: status
        description: "Payment processing status"
        tests:
          - not_null
          - accepted_values:
              values: ['succeeded', 'failed', 'pending', 'refunded', 'disputed', 'canceled']
 
      - name: currency
        description: "ISO 4217 currency code"
        tests:
          - not_null
          - dbt_utils.not_constant  # Ensure we're getting multiple currencies
 
      - name: created_at
        description: "Payment creation timestamp"
        tests:
          - not_null
          - dbt_utils.recency:
              datepart: hour
              field: created_at
              interval: 24  # Alert if no payments in the last 24 hours

The dbt_utils package extends dbt's built-in tests with practical utilities like accepted_range, recency, and not_constant. Install it via packages.yml and run dbt deps to make these tests available.

The severity: warn configuration is important for referential integrity tests. In a data warehouse, referential integrity violations are common because data arrives from different sources at different times. A payment record may reference a customer who has not yet been replicated. Warnings flag these for investigation without blocking the pipeline.

Statistical Anomaly Detection

Schema tests catch structural issues, but they cannot detect a sudden 80% drop in payment volume or a shift in the distribution of transaction amounts. Statistical tests fill this gap:

-- tests/statistical/assert_payment_volume_within_bounds.sql
-- This test detects unusual daily payment volumes by comparing
-- today's count against the rolling 30-day average and standard deviation
 
with daily_counts as (
    select
        created_at::date as payment_date,
        count(*) as payment_count
    from {{ ref('stg_stripe__payments') }}
    where created_at >= current_date - interval '31 days'
    group by 1
),
 
stats as (
    select
        avg(payment_count) as avg_count,
        stddev(payment_count) as stddev_count
    from daily_counts
    where payment_date < current_date  -- Exclude today from the baseline
),
 
today as (
    select payment_count
    from daily_counts
    where payment_date = current_date
)
 
-- Alert if today's count is more than 3 standard deviations from the mean
select
    today.payment_count,
    stats.avg_count,
    stats.stddev_count,
    (today.payment_count - stats.avg_count) / nullif(stats.stddev_count, 0) as z_score
from today
cross join stats
where abs((today.payment_count - stats.avg_count) / nullif(stats.stddev_count, 0)) > 3

For more sophisticated anomaly detection, we use Python-based tests that can apply time series analysis:

# tests/python/volume_anomaly_detector.py
import pandas as pd
import numpy as np
from scipy import stats as scipy_stats
 
def detect_volume_anomalies(
    daily_counts: pd.DataFrame,
    column: str = "count",
    lookback_days: int = 30,
    sensitivity: float = 3.0,
) -> pd.DataFrame:
    """
    Detect anomalous volumes using a rolling z-score approach
    with day-of-week seasonality adjustment.
    """
    df = daily_counts.copy()
    df["day_of_week"] = df["date"].dt.dayofweek
 
    anomalies = []
 
    for _, row in df.iterrows():
        # Get historical data for the same day of week
        historical = df[
            (df["date"] < row["date"])
            & (df["date"] >= row["date"] - pd.Timedelta(days=lookback_days))
            & (df["day_of_week"] == row["day_of_week"])
        ]
 
        if len(historical) < 3:
            continue  # Not enough data for statistical comparison
 
        mean = historical[column].mean()
        std = historical[column].std()
 
        if std == 0:
            continue  # Constant values, no anomaly possible
 
        z_score = (row[column] - mean) / std
 
        if abs(z_score) > sensitivity:
            anomalies.append({
                "date": row["date"],
                "actual": row[column],
                "expected_mean": round(mean, 1),
                "z_score": round(z_score, 2),
                "severity": "critical" if abs(z_score) > 5 else "warning",
            })
 
    return pd.DataFrame(anomalies)
 
 
def detect_distribution_drift(
    current: pd.Series,
    baseline: pd.Series,
    threshold: float = 0.05,
) -> dict:
    """
    Detect distribution drift using the Kolmogorov-Smirnov test.
    Returns drift information if the distributions are significantly different.
    """
    ks_statistic, p_value = scipy_stats.ks_2samp(current, baseline)
 
    return {
        "drifted": p_value < threshold,
        "ks_statistic": round(ks_statistic, 4),
        "p_value": round(p_value, 6),
        "current_mean": round(current.mean(), 2),
        "baseline_mean": round(baseline.mean(), 2),
        "current_std": round(current.std(), 2),
        "baseline_std": round(baseline.std(), 2),
    }

Business Rule Tests

Business rule tests validate the semantic correctness of data. They encode domain knowledge that cannot be captured by schema or statistical tests:

-- tests/business_rules/assert_revenue_reconciles_with_stripe.sql
-- Revenue in our data warehouse should match Stripe's reported revenue
-- within a 1% tolerance (differences arise from timing and currency conversion)
 
with our_revenue as (
    select
        sum(amount_usd) as total_revenue_usd
    from {{ ref('fct_daily_revenue') }}
    where revenue_date = current_date - interval '1 day'
),
 
stripe_revenue as (
    select
        sum(amount) / 100.0 as total_revenue_usd  -- Stripe stores amounts in cents
    from {{ source('stripe', 'balance_transactions') }}
    where type = 'charge'
      and created::date = current_date - interval '1 day'
)
 
select
    o.total_revenue_usd as our_total,
    s.total_revenue_usd as stripe_total,
    abs(o.total_revenue_usd - s.total_revenue_usd) as difference,
    abs(o.total_revenue_usd - s.total_revenue_usd) / nullif(s.total_revenue_usd, 0) as pct_difference
from our_revenue o
cross join stripe_revenue s
where abs(o.total_revenue_usd - s.total_revenue_usd) / nullif(s.total_revenue_usd, 0) > 0.01
-- tests/business_rules/assert_no_negative_balances.sql
-- Customer balances should never go negative in a prepaid system
 
select
    customer_id,
    balance_date,
    running_balance
from {{ ref('fct_customer_balances') }}
where running_balance < 0
-- tests/business_rules/assert_kyc_before_transaction.sql
-- Every customer with a completed transaction must have passed KYC verification
-- This is a regulatory requirement, not just a data quality preference
 
select
    t.customer_id,
    t.transaction_id,
    t.created_at as transaction_date,
    k.verified_at as kyc_date
from {{ ref('fct_transactions') }} t
left join {{ ref('dim_kyc_status') }} k on t.customer_id = k.customer_id
where t.status = 'completed'
  and (k.verified_at is null or k.verified_at > t.created_at)

Business rule tests are the most valuable tests in the suite because they catch the errors that have the highest impact. A duplicate payment ID is a data issue; a transaction without KYC verification is a compliance violation.

Implementing a Test Severity Framework

Not all test failures are equal. A null value in an optional description field is less urgent than a duplicate primary key. Klivvr's Data Platform uses a severity framework that routes test failures to appropriate response channels:

# dbt_project.yml: global test severity configuration
tests:
  klivvr_data_platform:
    +severity: error  # Default: block the pipeline
 
    # Override for specific test types
    staging:
      +severity: warn  # Staging issues are warnings by default
    marts:
      +severity: error  # Mart issues block the pipeline
# Pipeline orchestration: route test failures by severity
 
def handle_test_results(results: list[dict]) -> None:
    """Route test failures to appropriate channels based on severity."""
    errors = [r for r in results if r["severity"] == "error" and r["status"] == "fail"]
    warnings = [r for r in results if r["severity"] == "warn" and r["status"] == "fail"]
 
    if errors:
        # Critical: block pipeline and alert on-call
        send_pagerduty_alert(
            title="Data Quality Errors Detected",
            body=format_test_failures(errors),
            severity="critical",
        )
        raise DataQualityError(f"{len(errors)} critical test(s) failed")
 
    if warnings:
        # Non-critical: notify in Slack but continue pipeline
        send_slack_notification(
            channel="#data-quality-warnings",
            message=format_test_failures(warnings),
        )

Conclusion

Data quality testing is not a feature to add after the pipeline is built. It is a fundamental part of the pipeline's architecture, as important as the transformations themselves. The testing pyramid provides a structured approach: start with cheap, broad schema tests that catch structural issues; add statistical tests that detect anomalies and drift; and implement business rule tests that validate semantic correctness.

The most important lesson from building Klivvr's data quality testing suite is that coverage matters more than sophistication. A simple not-null test on every primary key column catches more real issues than a sophisticated machine learning anomaly detector on a single metric. Start simple, cover broadly, and add sophistication where the data tells you it is needed. Every test you write is an investment in trust, and trust is the foundation of every data-driven decision.

Related Articles

business

Data-Driven Decision Making in Fintech

How a well-architected data platform enables better business decisions across product, finance, and operations — with practical examples of self-service analytics and data democratization at Klivvr.

6 min read
technical

Incremental Models in dbt: Processing Data Efficiently

A deep dive into dbt's incremental materialization strategy, covering when to use incremental models, how to implement them correctly, and how to avoid the common pitfalls that lead to data inconsistencies.

8 min read
technical

DAG Pipeline Design: Principles for Data Engineering

Core principles for designing directed acyclic graph (DAG) pipelines that are maintainable, observable, and resilient, with practical examples from production data engineering systems.

8 min read