Building a Serverless ETL Framework: Python Plugin Architecture for Data Integration
How we built Diablo, a flexible ETL framework handling 100+ daily workflows with plugin-based step executors and Jinja2 templating
Note: Company-specific details have been anonymized to maintain confidentiality while preserving the technical integrity of this case study.
Introduction
Here's a problem every data team faces: You need to move data between dozens of systems, but every integration is a snowflake.
MySQL ā S3, MSSQL ā MySQL, S3 ā SFTP, Excel ā Slack, API ā DynamoDB... each with unique connection parameters, transformation logic, and error handling.
The naive solution? Write a separate Lambda function for each integration. Result: 100+ Lambda functions with 90% duplicated code, deployment nightmares, and impossible maintenance.
This is what I built with Diablo: a serverless ETL framework powered by AWS Step Functions that handles 100+ daily data integration workflows using a plugin architecture, Jinja2 templating for dynamic SQL, and a parameter validation system that catches configuration errors before execution.
We'll explore plugin-based step executors, decorator patterns for observability, AWS Parameter Store integration for credentials, and how to build a framework that scales from simple data copies to complex multi-step transformations.
The Problem: ETL Sprawl
Real-World Requirements
A data integration platform must handle:
-
Diverse Sources/Destinations:
- Databases: MySQL, MSSQL, PostgreSQL, DynamoDB
- Storage: S3, SFTP, Network file systems
- APIs: REST endpoints with various auth methods
- Messaging: SQS, SNS, Slack
- Files: CSV, JSON, Excel
-
Common Patterns:
- Database to S3 exports
- S3 to database imports
- Database to database syncing
- File format transformations
- API data extraction
-
Operational Requirements:
- Dynamic SQL with templating
- Connection pooling and retry logic
- Credential management (no hardcoded secrets)
- Observability (execution time, errors, data volume)
- Parameter validation (catch errors before execution)
The Lambda Sprawl Anti-Pattern
Attempt 1: Separate Lambda per integration
lambdas/
āāā mysql-to-s3-product-export/
āāā mysql-to-s3-pricing-export/
āāā mssql-to-mysql-orders-sync/
āāā s3-to-mysql-inventory-import/
āāā api-to-s3-weather-fetch/
āāā ... (95 more functions)
Problems:
- 100+ Lambda functions with duplicated code
- Deployment complexity (100 separate deployments)
- No code reuse (connection logic copied 100 times)
- Hard to add features (update 100 functions)
- Configuration scattered across 100 Lambda environment variables
Solution: Plugin-Based ETL Framework
Core Architecture
Single Lambda function with plugin-based step executors:
Step Functions Workflow
āāā Step 1: MySqlToS3 (plugin)
āāā Step 2: S3ToMysql (plugin)
āāā Step 3: SlackSendMsg (plugin)
All steps execute via single Lambda:
- Input: { StepType: "MySqlToS3", Parameters: { ... } }
- Output: { success: true, result: { ... } }
Benefits:
- Single Lambda deployment (all plugins included)
- Code reuse via base classes
- Centralized credential management
- Parameter validation framework
- Observability via decorators
Implementation: Real Production Code
1. Step Executor (Plugin Manager)
from diablo.step_types.mysql_to_s3 import *
from diablo.step_types.s3_to_mysql import *
from diablo.step_types.slack_send_msg import *
# ... import all step types
class StepExecutor():
def __init__(self, **kwargs):
pass
@timer # Decorator for execution timing
def execute_step(self, step_name, parameters):
try:
# Get formal parameter definition for validation
step_type_parameters_def = getattr(sys.modules[__name__], f"{step_name}Parameters")
if step_type_parameters_def is not None:
self.validate_parameters(step_type_parameters_def, parameters)
except:
if global_vars.global_config["require_formal_parameters"]:
raise Exception(f"Formal parameters are not defined for step '{step_name}'")
else:
log.warning(f"No formal parameters definition for step '{step_name}'. Skipping validation...")
# Dynamically load step type class
step_type = getattr(sys.modules[__name__], step_name)
# Execute step
result = step_type(**parameters).execute({})
return result
def validate_parameters(self, step_parameters_dict, parameters):
# Validate required parameters, types, formats
for param_name, param_def in step_parameters_dict.items():
if param_def.get("required", False) and param_name not in parameters:
raise ValueError(f"Missing required parameter: {param_name}")
if param_name in parameters:
actual_type = type(parameters[param_name])
expected_type = param_def.get("type")
if expected_type and actual_type != expected_type:
raise TypeError(f"Parameter {param_name} must be {expected_type}, got {actual_type}")
Key pattern: Dynamic plugin loading
getattr(sys.modules[__name__], step_name)ā Load plugin class by name- Single Lambda handles all step types
- No hardcoded plugin list (just import and use)
2. Base Step Type (Plugin Interface)
class BaseStepType:
def __init__(self, **kwargs):
self.parameters = kwargs
@abstractmethod
def execute(self, context):
"""Execute the step. Must be implemented by subclasses."""
pass
def get_connection(self, connection_name):
"""Fetch connection config from Parameter Store cache."""
return global_vars.diablo_connections_cache.get(connection_name)
def log_info(self, message, **kwargs):
log.info(message, **kwargs)
def log_error(self, message, **kwargs):
log.error(message, **kwargs)
Why base class:
- Common functionality (logging, connection management)
- Enforces interface (all plugins have
executemethod) - Simplified plugin development
3. MySQL to S3 Plugin Example
MySqlToS3Parameters = {
"connection_name": {"type": str, "required": True},
"query": {"type": str, "required": True},
"s3_bucket": {"type": str, "required": True},
"s3_key": {"type": str, "required": True},
"file_format": {"type": str, "required": False, "default": "csv"},
}
class MySqlToS3(BaseStepType):
def execute(self, context):
# 1. Get connection config from Parameter Store
connection_config = self.get_connection(self.parameters["connection_name"])
# 2. Render query with Jinja2 (dynamic SQL)
query = self.render_query(self.parameters["query"], context)
# 3. Connect to MySQL
connection = mysql.connector.connect(
host=connection_config["host"],
user=connection_config["user"],
password=connection_config["password"],
database=connection_config["database"],
)
# 4. Execute query
cursor = connection.cursor(dictionary=True)
cursor.execute(query)
results = cursor.fetchall()
self.log_info(f"Fetched {len(results)} rows from MySQL")
# 5. Convert to desired format
if self.parameters.get("file_format") == "json":
file_content = json.dumps(results, default=str)
else:
# CSV format
if len(results) > 0:
import csv
import io
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=results[0].keys())
writer.writeheader()
writer.writerows(results)
file_content = output.getvalue()
else:
file_content = ""
# 6. Upload to S3
s3_client = boto3.client("s3")
s3_client.put_object(
Bucket=self.parameters["s3_bucket"],
Key=self.parameters["s3_key"],
Body=file_content.encode("utf-8"),
)
self.log_info(f"Uploaded {len(results)} rows to s3://{self.parameters['s3_bucket']}/{self.parameters['s3_key']}")
return {
"rows_exported": len(results),
"s3_uri": f"s3://{self.parameters['s3_bucket']}/{self.parameters['s3_key']}",
}
def render_query(self, query_template, context):
"""Render Jinja2 template with context variables."""
from jinja2 import Template
template = Template(query_template)
return template.render(**context)
Key features:
- Parameter validation (enforced by
MySqlToS3Parameters) - Dynamic SQL with Jinja2 templating
- Credential fetching from Parameter Store
- S3 upload with boto3
- Structured logging with row counts
4. Lambda Handler (Entry Point)
step_executor = StepExecutor()
def execute_step(event, context):
print("In Execute Step")
print(event)
# Extract Step Functions execution context
if "ExecutionId" in event:
global_vars.workflow_execution_context = event["ExecutionId"]
execution_id = event["ExecutionId"]["Execution"]["Name"]
global_vars.workflow_execution_context["execution_id"] = execution_id
if "StateMachine" in event["ExecutionId"]:
state_machine_name = event["ExecutionId"]["StateMachine"]["Name"]
global_vars.workflow_execution_context["statemachine"] = state_machine_name
log.info("Entered Diablo Handler")
# Extract step type and parameters from event
step = event["StepType"]
parameters = event["Parameters"]
log.info("Details", step_type=step, parameters=parameters, event_json=event)
try:
# Execute step via plugin system
result = step_executor.execute_step(step, parameters)
log.info("Successfully executed Diablo.")
# Return success result
return jsonpickle.encode(StepResult(True, "The step succeeded.", result=result))
except Exception as e:
log.error("Step execution failed", error=str(e))
# Return failure result
raise Exception(jsonpickle.encode(StepResult(False, "The step failed.", exception=e)))
Step Functions integration:
- Event contains
StepTypeandParameters - Execution context tracked across steps
- Results serialized with jsonpickle
- Errors propagated to Step Functions
5. Timer Decorator for Observability
def timer(func):
"""Decorator to measure execution time."""
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
elapsed = time.time() - start_time
log.info(
"Step execution completed",
function=func.__name__,
duration_seconds=round(elapsed, 2),
success=True,
)
return result
except Exception as e:
elapsed = time.time() - start_time
log.error(
"Step execution failed",
function=func.__name__,
duration_seconds=round(elapsed, 2),
error=str(e),
success=False,
)
raise
return wrapper
Usage:
@timer
def execute_step(self, step_name, parameters):
# ... execution logic ...
Benefits:
- Automatic execution time tracking
- Consistent logging format
- Error tracking
- No manual timing code
6. Parameter Store Helper (Credential Management)
class ParameterStoreHelper:
def __init__(self, prefix):
self.prefix = prefix # e.g., "/diablo/prod/connections/"
self.cache = {}
self.ssm_client = boto3.client("ssm")
def get(self, connection_name):
"""Fetch connection config from Parameter Store with caching."""
cache_key = f"{self.prefix}{connection_name}"
if cache_key in self.cache:
return self.cache[cache_key]
# Fetch from Parameter Store
response = self.ssm_client.get_parameter(
Name=cache_key,
WithDecryption=True # Decrypt secure strings
)
connection_config = json.loads(response["Parameter"]["Value"])
self.cache[cache_key] = connection_config
return connection_config
Why Parameter Store:
- No hardcoded credentials
- Centralized credential management
- Encrypted storage (KMS)
- IAM-based access control
- Caching for performance
Step Functions Workflow Example
{
"Comment": "Daily product export workflow",
"StartAt": "ExportProductsToS3",
"States": {
"ExportProductsToS3": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:diablo-executor",
"Parameters": {
"StepType": "MySqlToS3",
"Parameters": {
"connection_name": "mysql-products-db",
"query": "SELECT * FROM products WHERE updated_at > '{{ execution_date }}'",
"s3_bucket": "data-exports",
"s3_key": "products/{{ execution_date }}.csv",
"file_format": "csv"
}
},
"Next": "SendSlackNotification"
},
"SendSlackNotification": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:diablo-executor",
"Parameters": {
"StepType": "SlackSendMsg",
"Parameters": {
"webhook_url": "https://hooks.slack.com/...",
"message": "Product export completed: {{ execution_date }}"
}
},
"End": true
}
}
}
Benefits:
- Visual workflow representation
- Error handling (retries, fallbacks)
- Parallel execution support
- Execution history and logging
- No custom orchestration code
Performance & Production Results
Metrics (18 months in production)
Scale:
- 100+ Step Functions workflows (daily, weekly, on-demand)
- 500+ step executions/day
- 30+ step types (plugins)
- 50+ database connections managed
Performance:
- P50 step execution: 2.5 seconds
- P95 step execution: 12 seconds (large datasets)
- P99 step execution: 45 seconds
- Success rate: 98.5% (failures = data quality issues, not framework bugs)
Cost:
- Lambda: $80/month (500+ daily executions)
- Step Functions: $15/month
- Parameter Store: $0 (free tier)
- Total: $95/month for 100+ workflows
Developer Experience:
- New step type development: 1-2 hours
- Workflow creation: 15 minutes
- Onboarding: 1 day (vs. 1 week for custom Lambdas)
Lessons Learned
1. Plugin Architecture Scales
Before (separate Lambdas):
- 100 Lambda functions
- 100 deployments
- Duplicated code everywhere
After (plugins):
- 1 Lambda function
- 1 deployment
- Shared base classes
Adding new step type:
- Create plugin class
- Define parameter schema
- Import in
step_executor.py - Done! (No deployment changes)
2. Jinja2 Templating for Dynamic SQL
Static SQL:
SELECT * FROM products WHERE updated_at > '2024-01-01'
Dynamic SQL with Jinja2:
SELECT * FROM products
WHERE updated_at > '{{ start_date }}'
AND category IN ({% for cat in categories %}'{{ cat }}'{% if not loop.last %},{% endif %}{% endfor %})
Benefits:
- Workflows pass context variables
- No string concatenation (SQL injection safe)
- Reusable query templates
- Date math with filters
3. Parameter Store for Secrets
Bad (hardcoded):
connection = mysql.connector.connect(
host="prod-db.us-east-1.rds.amazonaws.com",
user="admin",
password="SuperSecret123!", # š±
)
Good (Parameter Store):
connection_config = parameter_store.get("mysql-products-db")
connection = mysql.connector.connect(**connection_config)
Benefits:
- No secrets in code
- Centralized rotation
- IAM-based access control
- Audit logs
4. Decorators for Cross-Cutting Concerns
@timer decorator:
@timer
def execute_step(self, step_name, parameters):
# Business logic
Automatically logs:
INFO: Step execution completed | function=execute_step | duration_seconds=2.3 | success=true
Other decorators we use:
@retry- Automatic retry on transient failures@cache- Cache expensive operations@validate- Parameter validation
5. Step Functions > Custom Orchestration
Why Step Functions:
- Visual workflow editor
- Built-in error handling (retries, exponential backoff)
- Execution history (90 days)
- CloudWatch integration
- No orchestration code to maintain
Cost:
- First 4,000 state transitions/month = free
- $0.025 per 1,000 transitions after that
- Cheaper than running orchestrator Lambda 24/7
Takeaways for Developers
When to Build an ETL Framework
ā Perfect for:
- 10+ similar integration workflows
- Shared patterns (DB to S3, S3 to DB, etc.)
- Team with Python expertise
- Need for centralized credential management
- Observability requirements
ā Not ideal for:
- <5 workflows (just write Lambdas)
- Completely unique workflows (no shared patterns)
- Real-time streaming (use Kinesis/Kafka)
- Complex transformations (use Spark/EMR)
Key Patterns
- Plugin architecture for code reuse
- Base classes for common functionality
- Parameter validation to catch errors early
- Jinja2 templating for dynamic SQL
- Parameter Store for credentials
- Decorators for observability
- Step Functions for orchestration
Quick Start Guide
1. Create base class:
class BaseStepType:
def __init__(self, **kwargs):
self.parameters = kwargs
@abstractmethod
def execute(self, context):
pass
2. Create plugin:
MySqlToS3Parameters = {
"query": {"type": str, "required": True},
"s3_bucket": {"type": str, "required": True},
}
class MySqlToS3(BaseStepType):
def execute(self, context):
# ... implementation ...
3. Register plugin:
from my_plugins import MySqlToS3
step_type = getattr(sys.modules[__name__], "MySqlToS3")
result = step_type(**parameters).execute({})
Conclusion
The Diablo ETL framework transformed our data integration from 100+ scattered Lambda functions into a unified plugin-based system handling 500+ daily executions.
The impact:
- 95% reduction in deployment complexity
- 80% code reuse across workflows
- 70% faster new integration development
- $400/month cost savings
But the real win? Data engineers can ship integrations in hours, not weeks. No infrastructure setup, no boilerplate, just write a plugin and deploy.
If you're building data integration platforms with repetitive patterns, a plugin-based framework is worth the investment. Your team (and your CI/CD pipeline) will thank you.
Related Articles:
- "Building Serverless Data Pipelines: AWS Step Functions Guide"
- "Python Plugin Architecture: Lessons from 30+ Plugins"
- "Parameter Store Patterns: Managing Secrets at Scale"
Originally published on [your blog/medium] ⢠15 min read