DAG Pipeline Design: Principles for Data Engineering

Core principles for designing directed acyclic graph (DAG) pipelines that are maintainable, observable, and resilient, with practical examples from production data engineering systems.

technical8 min readBy Klivvr Engineering
Share:

Every data pipeline is a directed acyclic graph, whether the team designed it that way or not. Data flows from sources through transformations to destinations, and the order of those transformations is constrained by dependencies. Making the DAG explicit, designing it intentionally, and maintaining it carefully is the difference between a pipeline that scales gracefully and one that becomes an untouchable nightmare. This article covers the principles we follow when designing DAG pipelines at Klivvr, from task granularity and dependency management to failure handling and performance optimization.

Task Granularity: Finding the Right Unit of Work

The most common mistake in DAG design is choosing the wrong level of granularity. Tasks that are too coarse (a single task that extracts, transforms, and loads all data) are impossible to retry partially and hard to debug. Tasks that are too fine (one task per SQL statement) create excessive orchestration overhead and obscure the pipeline's logical structure.

The right granularity aligns with the logical boundaries of the data flow. Each task should represent a meaningful, independently retriable unit of work:

# Bad: One monolithic task that does everything
def process_all_data():
    extract_from_stripe()
    extract_from_app_db()
    transform_payments()
    transform_customers()
    build_revenue_report()
    send_to_dashboard()
 
# Good: Granular tasks with clear responsibilities
# Each task is independently retriable and observable
 
from airflow.decorators import dag, task
from datetime import datetime
 
@dag(
    schedule="@daily",
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=["finance", "daily"],
)
def daily_revenue_pipeline():
 
    @task()
    def extract_stripe_payments(execution_date: str) -> dict:
        """Extract payment events from Stripe API for the given date."""
        payments = stripe_client.list_payments(
            created_gte=execution_date,
            created_lt=next_day(execution_date),
        )
        write_to_staging("stripe_payments", payments)
        return {"count": len(payments), "date": execution_date}
 
    @task()
    def extract_app_customers() -> dict:
        """Extract customer data from the application database."""
        customers = app_db.query("SELECT * FROM customers WHERE updated_at >= CURRENT_DATE - 1")
        write_to_staging("app_customers", customers)
        return {"count": len(customers)}
 
    @task()
    def run_dbt_staging():
        """Run dbt staging models to clean and standardize raw data."""
        run_dbt(["run", "--select", "tag:staging"])
 
    @task()
    def run_dbt_marts():
        """Run dbt mart models to produce business-ready datasets."""
        run_dbt(["run", "--select", "tag:marts"])
 
    @task()
    def run_dbt_tests():
        """Run dbt tests to validate data quality."""
        run_dbt(["test", "--select", "tag:marts"])
 
    @task()
    def notify_stakeholders(test_results: dict):
        """Send pipeline completion notification."""
        send_slack_message(
            channel="#data-alerts",
            message=f"Daily revenue pipeline complete. Tests: {test_results}"
        )
 
    # Define the DAG structure
    payments = extract_stripe_payments("{{ ds }}")
    customers = extract_app_customers()
 
    staging = run_dbt_staging()
    staging.set_upstream([payments, customers])
 
    marts = run_dbt_marts()
    marts.set_upstream(staging)
 
    tests = run_dbt_tests()
    tests.set_upstream(marts)
 
    notify_stakeholders(tests)
 
daily_revenue_pipeline()

Each task in this DAG can be retried independently. If the Stripe API is temporarily unavailable, only the extraction task fails; the customer extraction runs in parallel and succeeds. When the Stripe issue is resolved, the operator retries just that one task without re-running anything else.

Dependency Patterns: Narrow Waists and Wide Fans

Well-designed DAGs have a characteristic shape: wide fan-out at the extraction layer, a narrow waist at the transformation layer, and wide fan-out again at the consumption layer. This shape minimizes bottlenecks and maximizes parallelism:

# Visualizing the DAG shape:
#
# Wide fan-out (extraction):
#   stripe_payments ─┐
#   app_customers   ─┤
#   crm_contacts    ─┤
#   forex_rates     ─┘
#                    │
# Narrow waist (transformation):
#                    ├── dbt_staging
#                    │
#                    ├── dbt_intermediate
#                    │
#                    ├── dbt_marts
#                    │
# Wide fan-out (consumption):
#                    ├── export_to_dashboard
#                    ├── export_to_ml_features
#                    ├── export_to_finance_report
#                    └── export_to_data_warehouse_views
 
def build_pipeline_dag():
    """
    Build a DAG that follows the wide-narrow-wide pattern.
    """
    # Extraction layer: runs in parallel
    extractors = [
        extract_stripe_payments,
        extract_app_customers,
        extract_crm_contacts,
        extract_forex_rates,
    ]
 
    # Transformation layer: sequential, with clear phases
    transformations = [
        ("staging", ["tag:staging"]),
        ("intermediate", ["tag:intermediate"]),
        ("marts", ["tag:marts"]),
        ("tests", ["tag:marts", "--test"]),
    ]
 
    # Consumption layer: runs in parallel after transformations
    consumers = [
        export_to_dashboard,
        export_to_ml_features,
        export_to_finance_report,
        refresh_data_warehouse_views,
    ]
 
    return extractors, transformations, consumers

The narrow waist is important because it creates a clear boundary where data quality can be validated. All data passes through the transformation layer, and tests at this layer act as a gate that prevents bad data from reaching consumers.

Failure Handling and Retry Strategies

Failures in data pipelines are not exceptional events; they are routine. External APIs go down, database connections time out, and warehouse queries exceed resource limits. The DAG must handle these failures gracefully:

from airflow.models import Variable
from airflow.utils.trigger_rule import TriggerRule
 
# Retry configuration for different failure modes
RETRY_CONFIGS = {
    "api_extraction": {
        "retries": 3,
        "retry_delay": timedelta(minutes=5),
        "retry_exponential_backoff": True,
        "max_retry_delay": timedelta(minutes=30),
    },
    "dbt_transformation": {
        "retries": 2,
        "retry_delay": timedelta(minutes=2),
        "retry_exponential_backoff": False,
    },
    "export": {
        "retries": 3,
        "retry_delay": timedelta(minutes=1),
        "retry_exponential_backoff": True,
        "max_retry_delay": timedelta(minutes=15),
    },
}
 
@task(
    retries=RETRY_CONFIGS["api_extraction"]["retries"],
    retry_delay=RETRY_CONFIGS["api_extraction"]["retry_delay"],
    retry_exponential_backoff=True,
)
def extract_with_resilience(source_name: str, execution_date: str) -> dict:
    """
    Extract data with circuit-breaker pattern.
    After repeated failures, mark the source as degraded
    rather than blocking the entire pipeline.
    """
    try:
        data = extract_source(source_name, execution_date)
        return {"status": "success", "count": len(data)}
    except TemporaryError:
        raise  # Let Airflow retry
    except PermanentError as e:
        # Log the failure but don't block downstream tasks
        log_extraction_failure(source_name, str(e))
        return {"status": "degraded", "count": 0, "error": str(e)}
 
 
@task(trigger_rule=TriggerRule.ALL_DONE)
def assess_data_completeness(extraction_results: list[dict]) -> dict:
    """
    After all extractions complete (success or failure),
    assess whether enough data is available to proceed.
    """
    succeeded = [r for r in extraction_results if r["status"] == "success"]
    degraded = [r for r in extraction_results if r["status"] == "degraded"]
 
    completeness = len(succeeded) / len(extraction_results)
 
    if completeness < 0.5:
        raise ValueError(
            f"Data completeness too low: {completeness:.0%}. "
            f"Degraded sources: {[r.get('error') for r in degraded]}"
        )
 
    return {
        "completeness": completeness,
        "degraded_sources": len(degraded),
        "proceed": True,
    }

The trigger_rule=TriggerRule.ALL_DONE is critical. By default, Airflow tasks only run when all upstream tasks succeed. But in data pipelines, partial data is often better than no data. The assessment task runs regardless of upstream success or failure and makes an informed decision about whether to proceed.

Idempotency: The Non-Negotiable Property

Every task in a data pipeline must be idempotent: running it multiple times with the same input must produce the same output. Without idempotency, retrying a failed task can produce duplicates, and backfilling historical data becomes dangerous:

-- Non-idempotent: appends duplicates on retry
INSERT INTO revenue_daily (date, amount)
SELECT date, sum(amount) FROM payments
WHERE date = '2025-10-26'
GROUP BY date;
 
-- Idempotent: uses MERGE to upsert
MERGE INTO revenue_daily AS target
USING (
    SELECT
        date,
        sum(amount) as amount
    FROM payments
    WHERE date = '2025-10-26'
    GROUP BY date
) AS source
ON target.date = source.date
WHEN MATCHED THEN UPDATE SET amount = source.amount
WHEN NOT MATCHED THEN INSERT (date, amount) VALUES (source.date, source.amount);
# In dbt, incremental models achieve idempotency through the unique_key config
# This is one of the most important configurations to get right
 
# dbt model config:
# {{ config(materialized='incremental', unique_key='transaction_id') }}
#
# This generates a MERGE statement under the hood, ensuring that
# re-processing the same data does not create duplicates.

dbt's incremental models with unique_key are idempotent by design. For custom extraction tasks, implement idempotency through upsert patterns or by writing to date-partitioned tables and replacing entire partitions on each run.

Performance Optimization

DAG performance optimization focuses on two dimensions: reducing individual task duration and maximizing parallelism across tasks.

For individual task performance, the most impactful optimization is pushing computation into the warehouse rather than pulling data into the orchestrator:

# Slow: pulling data into Python for transformation
@task()
def transform_in_python():
    df = warehouse.query("SELECT * FROM raw_payments")  # Pulls millions of rows
    df["amount_usd"] = df.apply(convert_currency, axis=1)  # Slow Python loop
    warehouse.write("transformed_payments", df)  # Pushes millions of rows back
 
# Fast: executing transformation in the warehouse
@task()
def transform_in_warehouse():
    warehouse.execute("""
        CREATE OR REPLACE TABLE transformed_payments AS
        SELECT
            payment_id,
            amount * exchange_rate AS amount_usd,
            created_at
        FROM raw_payments
        JOIN exchange_rates USING (currency, date)
    """)

For parallelism, structure the DAG so that independent tasks run concurrently. Use resource pools to limit concurrency when warehouse resources are constrained:

# Airflow pool configuration to manage warehouse concurrency
# Create a pool named 'warehouse_slots' with 8 slots
# Tasks that run heavy warehouse queries are assigned to this pool
 
@task(pool="warehouse_slots", pool_slots=2)
def run_heavy_transformation():
    """This task uses 2 of the 8 available warehouse slots."""
    run_dbt(["run", "--select", "fct_revenue_detailed"])

Conclusion

DAG pipeline design is the backbone of reliable data engineering. The principles outlined here, appropriate task granularity, intentional dependency patterns, robust failure handling, mandatory idempotency, and disciplined performance optimization, are not optional best practices. They are requirements for any pipeline that needs to run reliably in production.

The most important takeaway is that DAG design is a continuous process, not a one-time decision. Pipelines evolve as data sources are added, business requirements change, and data volumes grow. A well-designed DAG accommodates this evolution gracefully because its principles are sound. Start with clear task boundaries and idempotent operations. Add monitoring and alerting early. Optimize for parallelism where the DAG shape allows it. And always design for the retry case, because in production data engineering, retries are not the exception; they are the rule.

Related Articles

business

Data-Driven Decision Making in Fintech

How a well-architected data platform enables better business decisions across product, finance, and operations — with practical examples of self-service analytics and data democratization at Klivvr.

6 min read
technical

Incremental Models in dbt: Processing Data Efficiently

A deep dive into dbt's incremental materialization strategy, covering when to use incremental models, how to implement them correctly, and how to avoid the common pitfalls that lead to data inconsistencies.

8 min read
technical

Data Quality Testing: Strategies and Implementation

A comprehensive guide to implementing data quality testing across the data pipeline, from schema validation and freshness checks to statistical anomaly detection and business rule enforcement.

8 min read