Portfolio

Back to Blog

ETL framework

Plugin-based Python ETL framework with Jinja2 templating and parameter validation

PythonLambdaEvent-driven
šŸ”§ 100+ daily workflows

Building a Serverless ETL Framework: Python Plugin Architecture for Data Integration

How we built Diablo, a flexible ETL framework handling 100+ daily workflows with plugin-based step executors and Jinja2 templating

Note: Company-specific details have been anonymized to maintain confidentiality while preserving the technical integrity of this case study.


Introduction

Here's a problem every data team faces: You need to move data between dozens of systems, but every integration is a snowflake.

MySQL → S3, MSSQL → MySQL, S3 → SFTP, Excel → Slack, API → DynamoDB... each with unique connection parameters, transformation logic, and error handling.

The naive solution? Write a separate Lambda function for each integration. Result: 100+ Lambda functions with 90% duplicated code, deployment nightmares, and impossible maintenance.

This is what I built with Diablo: a serverless ETL framework powered by AWS Step Functions that handles 100+ daily data integration workflows using a plugin architecture, Jinja2 templating for dynamic SQL, and a parameter validation system that catches configuration errors before execution.

We'll explore plugin-based step executors, decorator patterns for observability, AWS Parameter Store integration for credentials, and how to build a framework that scales from simple data copies to complex multi-step transformations.


The Problem: ETL Sprawl

Real-World Requirements

A data integration platform must handle:

  1. Diverse Sources/Destinations:

    • Databases: MySQL, MSSQL, PostgreSQL, DynamoDB
    • Storage: S3, SFTP, Network file systems
    • APIs: REST endpoints with various auth methods
    • Messaging: SQS, SNS, Slack
    • Files: CSV, JSON, Excel
  2. Common Patterns:

    • Database to S3 exports
    • S3 to database imports
    • Database to database syncing
    • File format transformations
    • API data extraction
  3. Operational Requirements:

    • Dynamic SQL with templating
    • Connection pooling and retry logic
    • Credential management (no hardcoded secrets)
    • Observability (execution time, errors, data volume)
    • Parameter validation (catch errors before execution)

The Lambda Sprawl Anti-Pattern

Attempt 1: Separate Lambda per integration

lambdas/
  ā”œā”€ā”€ mysql-to-s3-product-export/
  ā”œā”€ā”€ mysql-to-s3-pricing-export/
  ā”œā”€ā”€ mssql-to-mysql-orders-sync/
  ā”œā”€ā”€ s3-to-mysql-inventory-import/
  ā”œā”€ā”€ api-to-s3-weather-fetch/
  └── ... (95 more functions)

Problems:

  • 100+ Lambda functions with duplicated code
  • Deployment complexity (100 separate deployments)
  • No code reuse (connection logic copied 100 times)
  • Hard to add features (update 100 functions)
  • Configuration scattered across 100 Lambda environment variables

Solution: Plugin-Based ETL Framework

Core Architecture

Single Lambda function with plugin-based step executors:

Step Functions Workflow
  ā”œā”€ā”€ Step 1: MySqlToS3 (plugin)
  ā”œā”€ā”€ Step 2: S3ToMysql (plugin)
  └── Step 3: SlackSendMsg (plugin)

All steps execute via single Lambda:
  - Input: { StepType: "MySqlToS3", Parameters: { ... } }
  - Output: { success: true, result: { ... } }

Benefits:

  • Single Lambda deployment (all plugins included)
  • Code reuse via base classes
  • Centralized credential management
  • Parameter validation framework
  • Observability via decorators

Implementation: Real Production Code

1. Step Executor (Plugin Manager)

from diablo.step_types.mysql_to_s3 import *
from diablo.step_types.s3_to_mysql import *
from diablo.step_types.slack_send_msg import *
# ... import all step types

class StepExecutor():
    def __init__(self, **kwargs):
        pass

    @timer  # Decorator for execution timing
    def execute_step(self, step_name, parameters):
        try:
            # Get formal parameter definition for validation
            step_type_parameters_def = getattr(sys.modules[__name__], f"{step_name}Parameters")

            if step_type_parameters_def is not None:
                self.validate_parameters(step_type_parameters_def, parameters)

        except:
            if global_vars.global_config["require_formal_parameters"]:
                raise Exception(f"Formal parameters are not defined for step '{step_name}'")
            else:
                log.warning(f"No formal parameters definition for step '{step_name}'. Skipping validation...")

        # Dynamically load step type class
        step_type = getattr(sys.modules[__name__], step_name)

        # Execute step
        result = step_type(**parameters).execute({})

        return result

    def validate_parameters(self, step_parameters_dict, parameters):
        # Validate required parameters, types, formats
        for param_name, param_def in step_parameters_dict.items():
            if param_def.get("required", False) and param_name not in parameters:
                raise ValueError(f"Missing required parameter: {param_name}")

            if param_name in parameters:
                actual_type = type(parameters[param_name])
                expected_type = param_def.get("type")
                if expected_type and actual_type != expected_type:
                    raise TypeError(f"Parameter {param_name} must be {expected_type}, got {actual_type}")

Key pattern: Dynamic plugin loading

  • getattr(sys.modules[__name__], step_name) → Load plugin class by name
  • Single Lambda handles all step types
  • No hardcoded plugin list (just import and use)

2. Base Step Type (Plugin Interface)

class BaseStepType:
    def __init__(self, **kwargs):
        self.parameters = kwargs

    @abstractmethod
    def execute(self, context):
        """Execute the step. Must be implemented by subclasses."""
        pass

    def get_connection(self, connection_name):
        """Fetch connection config from Parameter Store cache."""
        return global_vars.diablo_connections_cache.get(connection_name)

    def log_info(self, message, **kwargs):
        log.info(message, **kwargs)

    def log_error(self, message, **kwargs):
        log.error(message, **kwargs)

Why base class:

  • Common functionality (logging, connection management)
  • Enforces interface (all plugins have execute method)
  • Simplified plugin development

3. MySQL to S3 Plugin Example

MySqlToS3Parameters = {
    "connection_name": {"type": str, "required": True},
    "query": {"type": str, "required": True},
    "s3_bucket": {"type": str, "required": True},
    "s3_key": {"type": str, "required": True},
    "file_format": {"type": str, "required": False, "default": "csv"},
}

class MySqlToS3(BaseStepType):
    def execute(self, context):
        # 1. Get connection config from Parameter Store
        connection_config = self.get_connection(self.parameters["connection_name"])

        # 2. Render query with Jinja2 (dynamic SQL)
        query = self.render_query(self.parameters["query"], context)

        # 3. Connect to MySQL
        connection = mysql.connector.connect(
            host=connection_config["host"],
            user=connection_config["user"],
            password=connection_config["password"],
            database=connection_config["database"],
        )

        # 4. Execute query
        cursor = connection.cursor(dictionary=True)
        cursor.execute(query)
        results = cursor.fetchall()

        self.log_info(f"Fetched {len(results)} rows from MySQL")

        # 5. Convert to desired format
        if self.parameters.get("file_format") == "json":
            file_content = json.dumps(results, default=str)
        else:
            # CSV format
            if len(results) > 0:
                import csv
                import io
                output = io.StringIO()
                writer = csv.DictWriter(output, fieldnames=results[0].keys())
                writer.writeheader()
                writer.writerows(results)
                file_content = output.getvalue()
            else:
                file_content = ""

        # 6. Upload to S3
        s3_client = boto3.client("s3")
        s3_client.put_object(
            Bucket=self.parameters["s3_bucket"],
            Key=self.parameters["s3_key"],
            Body=file_content.encode("utf-8"),
        )

        self.log_info(f"Uploaded {len(results)} rows to s3://{self.parameters['s3_bucket']}/{self.parameters['s3_key']}")

        return {
            "rows_exported": len(results),
            "s3_uri": f"s3://{self.parameters['s3_bucket']}/{self.parameters['s3_key']}",
        }

    def render_query(self, query_template, context):
        """Render Jinja2 template with context variables."""
        from jinja2 import Template
        template = Template(query_template)
        return template.render(**context)

Key features:

  • Parameter validation (enforced by MySqlToS3Parameters)
  • Dynamic SQL with Jinja2 templating
  • Credential fetching from Parameter Store
  • S3 upload with boto3
  • Structured logging with row counts

4. Lambda Handler (Entry Point)

step_executor = StepExecutor()

def execute_step(event, context):
    print("In Execute Step")
    print(event)

    # Extract Step Functions execution context
    if "ExecutionId" in event:
        global_vars.workflow_execution_context = event["ExecutionId"]
        execution_id = event["ExecutionId"]["Execution"]["Name"]
        global_vars.workflow_execution_context["execution_id"] = execution_id

        if "StateMachine" in event["ExecutionId"]:
            state_machine_name = event["ExecutionId"]["StateMachine"]["Name"]
            global_vars.workflow_execution_context["statemachine"] = state_machine_name

    log.info("Entered Diablo Handler")

    # Extract step type and parameters from event
    step = event["StepType"]
    parameters = event["Parameters"]

    log.info("Details", step_type=step, parameters=parameters, event_json=event)

    try:
        # Execute step via plugin system
        result = step_executor.execute_step(step, parameters)
        log.info("Successfully executed Diablo.")

        # Return success result
        return jsonpickle.encode(StepResult(True, "The step succeeded.", result=result))

    except Exception as e:
        log.error("Step execution failed", error=str(e))
        # Return failure result
        raise Exception(jsonpickle.encode(StepResult(False, "The step failed.", exception=e)))

Step Functions integration:

  • Event contains StepType and Parameters
  • Execution context tracked across steps
  • Results serialized with jsonpickle
  • Errors propagated to Step Functions

5. Timer Decorator for Observability

def timer(func):
    """Decorator to measure execution time."""
    def wrapper(*args, **kwargs):
        start_time = time.time()

        try:
            result = func(*args, **kwargs)
            elapsed = time.time() - start_time

            log.info(
                "Step execution completed",
                function=func.__name__,
                duration_seconds=round(elapsed, 2),
                success=True,
            )

            return result

        except Exception as e:
            elapsed = time.time() - start_time

            log.error(
                "Step execution failed",
                function=func.__name__,
                duration_seconds=round(elapsed, 2),
                error=str(e),
                success=False,
            )

            raise

    return wrapper

Usage:

@timer
def execute_step(self, step_name, parameters):
    # ... execution logic ...

Benefits:

  • Automatic execution time tracking
  • Consistent logging format
  • Error tracking
  • No manual timing code

6. Parameter Store Helper (Credential Management)

class ParameterStoreHelper:
    def __init__(self, prefix):
        self.prefix = prefix  # e.g., "/diablo/prod/connections/"
        self.cache = {}
        self.ssm_client = boto3.client("ssm")

    def get(self, connection_name):
        """Fetch connection config from Parameter Store with caching."""
        cache_key = f"{self.prefix}{connection_name}"

        if cache_key in self.cache:
            return self.cache[cache_key]

        # Fetch from Parameter Store
        response = self.ssm_client.get_parameter(
            Name=cache_key,
            WithDecryption=True  # Decrypt secure strings
        )

        connection_config = json.loads(response["Parameter"]["Value"])
        self.cache[cache_key] = connection_config

        return connection_config

Why Parameter Store:

  • No hardcoded credentials
  • Centralized credential management
  • Encrypted storage (KMS)
  • IAM-based access control
  • Caching for performance

Step Functions Workflow Example

{
  "Comment": "Daily product export workflow",
  "StartAt": "ExportProductsToS3",
  "States": {
    "ExportProductsToS3": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:diablo-executor",
      "Parameters": {
        "StepType": "MySqlToS3",
        "Parameters": {
          "connection_name": "mysql-products-db",
          "query": "SELECT * FROM products WHERE updated_at > '{{ execution_date }}'",
          "s3_bucket": "data-exports",
          "s3_key": "products/{{ execution_date }}.csv",
          "file_format": "csv"
        }
      },
      "Next": "SendSlackNotification"
    },
    "SendSlackNotification": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:diablo-executor",
      "Parameters": {
        "StepType": "SlackSendMsg",
        "Parameters": {
          "webhook_url": "https://hooks.slack.com/...",
          "message": "Product export completed: {{ execution_date }}"
        }
      },
      "End": true
    }
  }
}

Benefits:

  • Visual workflow representation
  • Error handling (retries, fallbacks)
  • Parallel execution support
  • Execution history and logging
  • No custom orchestration code

Performance & Production Results

Metrics (18 months in production)

Scale:

  • 100+ Step Functions workflows (daily, weekly, on-demand)
  • 500+ step executions/day
  • 30+ step types (plugins)
  • 50+ database connections managed

Performance:

  • P50 step execution: 2.5 seconds
  • P95 step execution: 12 seconds (large datasets)
  • P99 step execution: 45 seconds
  • Success rate: 98.5% (failures = data quality issues, not framework bugs)

Cost:

  • Lambda: $80/month (500+ daily executions)
  • Step Functions: $15/month
  • Parameter Store: $0 (free tier)
  • Total: $95/month for 100+ workflows

Developer Experience:

  • New step type development: 1-2 hours
  • Workflow creation: 15 minutes
  • Onboarding: 1 day (vs. 1 week for custom Lambdas)

Lessons Learned

1. Plugin Architecture Scales

Before (separate Lambdas):

  • 100 Lambda functions
  • 100 deployments
  • Duplicated code everywhere

After (plugins):

  • 1 Lambda function
  • 1 deployment
  • Shared base classes

Adding new step type:

  1. Create plugin class
  2. Define parameter schema
  3. Import in step_executor.py
  4. Done! (No deployment changes)

2. Jinja2 Templating for Dynamic SQL

Static SQL:

SELECT * FROM products WHERE updated_at > '2024-01-01'

Dynamic SQL with Jinja2:

SELECT * FROM products
WHERE updated_at > '{{ start_date }}'
  AND category IN ({% for cat in categories %}'{{ cat }}'{% if not loop.last %},{% endif %}{% endfor %})

Benefits:

  • Workflows pass context variables
  • No string concatenation (SQL injection safe)
  • Reusable query templates
  • Date math with filters

3. Parameter Store for Secrets

Bad (hardcoded):

connection = mysql.connector.connect(
    host="prod-db.us-east-1.rds.amazonaws.com",
    user="admin",
    password="SuperSecret123!",  # 😱
)

Good (Parameter Store):

connection_config = parameter_store.get("mysql-products-db")
connection = mysql.connector.connect(**connection_config)

Benefits:

  • No secrets in code
  • Centralized rotation
  • IAM-based access control
  • Audit logs

4. Decorators for Cross-Cutting Concerns

@timer decorator:

@timer
def execute_step(self, step_name, parameters):
    # Business logic

Automatically logs:

INFO: Step execution completed | function=execute_step | duration_seconds=2.3 | success=true

Other decorators we use:

  • @retry - Automatic retry on transient failures
  • @cache - Cache expensive operations
  • @validate - Parameter validation

5. Step Functions > Custom Orchestration

Why Step Functions:

  • Visual workflow editor
  • Built-in error handling (retries, exponential backoff)
  • Execution history (90 days)
  • CloudWatch integration
  • No orchestration code to maintain

Cost:

  • First 4,000 state transitions/month = free
  • $0.025 per 1,000 transitions after that
  • Cheaper than running orchestrator Lambda 24/7

Takeaways for Developers

When to Build an ETL Framework

āœ… Perfect for:

  • 10+ similar integration workflows
  • Shared patterns (DB to S3, S3 to DB, etc.)
  • Team with Python expertise
  • Need for centralized credential management
  • Observability requirements

āŒ Not ideal for:

  • <5 workflows (just write Lambdas)
  • Completely unique workflows (no shared patterns)
  • Real-time streaming (use Kinesis/Kafka)
  • Complex transformations (use Spark/EMR)

Key Patterns

  1. Plugin architecture for code reuse
  2. Base classes for common functionality
  3. Parameter validation to catch errors early
  4. Jinja2 templating for dynamic SQL
  5. Parameter Store for credentials
  6. Decorators for observability
  7. Step Functions for orchestration

Quick Start Guide

1. Create base class:

class BaseStepType:
    def __init__(self, **kwargs):
        self.parameters = kwargs

    @abstractmethod
    def execute(self, context):
        pass

2. Create plugin:

MySqlToS3Parameters = {
    "query": {"type": str, "required": True},
    "s3_bucket": {"type": str, "required": True},
}

class MySqlToS3(BaseStepType):
    def execute(self, context):
        # ... implementation ...

3. Register plugin:

from my_plugins import MySqlToS3

step_type = getattr(sys.modules[__name__], "MySqlToS3")
result = step_type(**parameters).execute({})

Conclusion

The Diablo ETL framework transformed our data integration from 100+ scattered Lambda functions into a unified plugin-based system handling 500+ daily executions.

The impact:

  • 95% reduction in deployment complexity
  • 80% code reuse across workflows
  • 70% faster new integration development
  • $400/month cost savings

But the real win? Data engineers can ship integrations in hours, not weeks. No infrastructure setup, no boilerplate, just write a plugin and deploy.

If you're building data integration platforms with repetitive patterns, a plugin-based framework is worth the investment. Your team (and your CI/CD pipeline) will thank you.


Related Articles:

  • "Building Serverless Data Pipelines: AWS Step Functions Guide"
  • "Python Plugin Architecture: Lessons from 30+ Plugins"
  • "Parameter Store Patterns: Managing Secrets at Scale"

Originally published on [your blog/medium] • 15 min read