Design Data Pipeline
Design Data Pipeline
Designs a comprehensive data pipeline, detailing architecture, ingestion, processing, warehousing, quality, and orchestration for specified requirements.
How to use
Replace {{args}} with your specific data pipeline requirements, such as data sources, volume, velocity, latency, and desired outputs. The prompt will generate a detailed design including code examples.
Prompt
Design Data Pipeline
Please design a comprehensive data pipeline for the following requirements:
{{args}}
Data Pipeline Framework
1. Pipeline Architecture
Batch Processing Pipeline
Data Sources → Ingestion → Storage → Processing → Analytics → OutputStreaming Pipeline
Sources → Kafka/Pulsar → Stream Processing → Real-time Analytics → Alerts2. Data Ingestion
Apache Kafka Configuration
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: user-events
spec:
partitions: 12
replicas: 3
config:
retention.ms: 604800000
cleanup.policy: delete
min.insync.replicas: 23. Data Processing
Apache Spark Job
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
col, to_timestamp, count, window
)
spark = SparkSession.builder \
.appName("ETL Job") \
.getOrCreate()
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "user-events") \
.load()
df_aggregated = df.groupBy(
window("timestamp", "1 hour"),
"event_type"
).agg(count("*").alias("event_count"))
df_aggregated.writeStream \
.format("jdbc") \
.option("url", "jdbc:postgresql://postgres:5432/analytics") \
.option("dbtable", "hourly_events") \
.start()4. Data Warehouse Design
Star Schema
-- Dimension Tables
CREATE TABLE dim_users (
user_id BIGINT PRIMARY KEY,
username VARCHAR(100),
email VARCHAR(255),
country VARCHAR(100),
signup_date DATE,
segment VARCHAR(50)
);
-- Fact Table
CREATE TABLE fact_orders (
order_id BIGINT PRIMARY KEY,
user_id BIGINT REFERENCES dim_users(user_id),
date_key INT,
quantity INT,
total_amount DECIMAL(12,2),
created_at TIMESTAMP DEFAULT NOW()
);5. Data Quality
Great Expectations
from great_expectations.core import ExpectationSuite
suite = ExpectationSuite("orders_suite")
suite.add_expectation(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "order_id"}
)
suite.add_expectation(
expectation_type="expect_column_values_to_be_between",
kwargs={"column": "quantity", "min_value": 1, "max_value": 100}
)6. Orchestration
Apache Airflow DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
dag = DAG(
'daily_etl',
schedule_interval='0 2 * * *',
default_args={'owner': 'data-eng'}
)
run_etl = PythonOperator(
task_id='run_etl_spark_job',
python_callable=lambda: run_spark_job(),
dag=dag
)
run_etl7. Output Format
Provide:
- Pipeline Architecture: High-level design
- Ingestion Layer: Kafka, Connectors
- Processing Logic: Spark, Flink
- Data Warehouse: Schema design
- Data Quality: Validation rules
- Orchestration: Airflow DAGs
- Monitoring: Pipeline health
Generate a complete data pipeline design.