Design Data Pipeline

Designs a comprehensive data pipeline, detailing architecture, ingestion, processing, warehousing, quality, and orchestration for specified requirements.

How to use

Replace {{args}} with your specific data pipeline requirements, such as data sources, volume, velocity, latency, and desired outputs. The prompt will generate a detailed design including code examples.

Prompt

Design Data Pipeline

Please design a comprehensive data pipeline for the following requirements:

{{args}}

Data Pipeline Framework

1. Pipeline Architecture

Batch Processing Pipeline

Data Sources → Ingestion → Storage → Processing → Analytics → Output

Streaming Pipeline

Sources → Kafka/Pulsar → Stream Processing → Real-time Analytics → Alerts

2. Data Ingestion

Apache Kafka Configuration

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: user-events
spec:
  partitions: 12
  replicas: 3
  config:
    retention.ms: 604800000
    cleanup.policy: delete
    min.insync.replicas: 2

3. Data Processing

Apache Spark Job

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, to_timestamp, count, window
)

spark = SparkSession.builder \
    .appName("ETL Job") \
    .getOrCreate()

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "user-events") \
    .load()

df_aggregated = df.groupBy(
    window("timestamp", "1 hour"),
    "event_type"
).agg(count("*").alias("event_count"))

df_aggregated.writeStream \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/analytics") \
    .option("dbtable", "hourly_events") \
    .start()

4. Data Warehouse Design

Star Schema

-- Dimension Tables
CREATE TABLE dim_users (
    user_id BIGINT PRIMARY KEY,
    username VARCHAR(100),
    email VARCHAR(255),
    country VARCHAR(100),
    signup_date DATE,
    segment VARCHAR(50)
);

-- Fact Table
CREATE TABLE fact_orders (
    order_id BIGINT PRIMARY KEY,
    user_id BIGINT REFERENCES dim_users(user_id),
    date_key INT,
    quantity INT,
    total_amount DECIMAL(12,2),
    created_at TIMESTAMP DEFAULT NOW()
);

5. Data Quality

Great Expectations

from great_expectations.core import ExpectationSuite

suite = ExpectationSuite("orders_suite")
suite.add_expectation(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={"column": "order_id"}
)
suite.add_expectation(
    expectation_type="expect_column_values_to_be_between",
    kwargs={"column": "quantity", "min_value": 1, "max_value": 100}
)

6. Orchestration

Apache Airflow DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

dag = DAG(
    'daily_etl',
    schedule_interval='0 2 * * *',
    default_args={'owner': 'data-eng'}
)

run_etl = PythonOperator(
    task_id='run_etl_spark_job',
    python_callable=lambda: run_spark_job(),
    dag=dag
)

run_etl

7. Output Format

Provide:

  1. Pipeline Architecture: High-level design
  2. Ingestion Layer: Kafka, Connectors
  3. Processing Logic: Spark, Flink
  4. Data Warehouse: Schema design
  5. Data Quality: Validation rules
  6. Orchestration: Airflow DAGs
  7. Monitoring: Pipeline health

Generate a complete data pipeline design.