Migrating data from a NoSQL database to a relational database is rarely a one click operation. It usually begins with a business requirement. Reporting needs better SQL support. Analytics tools expect structured tables. In AWS environments this challenge often appears when teams decide to move data from Amazon DocumentDB to Amazon RDS.
DocumentDB works well for flexible schemas and fast application development. Over time data models tend to grow complex. Nested fields arrays and dynamic keys become common. When reporting or analytics enters the picture these same strengths turn into obstacles. SQL based systems work best with flattened schemas and predictable data types.
This is where AWS Glue becomes extremely useful. Glue provides a serverless ETL framework that can read data from DocumentDB, process it using Apache Spark and load clean structured data into Amazon RDS. In this article I walk through a real world AWS Glue ETL job that migrates data from DocumentDB to MySQL on RDS. This is not a toy example. It reflects a production style pipeline designed to handle unknown schemas, nested structures and dynamically changing collections.
The objective is not just data movement. The focus is schema normalization, security best practices and operational simplicity.
Why Use AWS Glue for DocumentDB to RDS Migration
There are several ways to move data between databases. Custom scripts, Lambda functions, or third party tools are often evaluated. In practice AWS Glue offers a more scalable and maintainable approach.
Glue is fully serverless, which removes the need to manage infrastructure. It scales automatically based on data volume and integrates tightly with AWS services such as Secrets Manager, IAM and CloudWatch. Most importantly Glue is built on Apache Spark, which excels at processing semi structured data.
DocumentDB collections frequently contain nested objects, arrays and inconsistent fields across documents. Spark DataFrames allow dynamic inspection of schemas and transformations without writing custom parsing logic for each collection.
Glue jobs can also be parameterized. This makes it possible to reuse the same job across multiple databases and environments without modifying the code.
High Level Architecture of the Pipeline
Prerequisites
Before running the AWS Glue ETL job to migrate data from Amazon DocumentDB to Amazon RDS, ensure the following prerequisites are in place.
AWS Glue Job Configuration:
Network and Glue Connections:
Required Libraries and Drivers:
Job Parameters:
AWS Secrets Manager:
IAM Permissions:
Glue ETL Pipeline Overview
import sys
import json
import re
import boto3
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pymongo import MongoClient
import pymysql
# =====================================================
# Read AWS Glue Job Parameters
# MONGO_DBNAME : Source MongoDB / Amazon DocumentDB database name
# TARGET_RDS_DB : Target Amazon RDS (MySQL) database to be recreated and loaded
# =====================================================
args = getResolvedOptions(
sys.argv,
['JOB_NAME', 'MONGO_DBNAME', 'TARGET_RDS_DB']
)
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
print(" Glue Job Started")
# =====================================================
# Securely fetch/Load Secrets from AWS Secrets Manager
# =====================================================
REGION = "us-east-1"
sm = boto3.client("secretsmanager", region_name=REGION)
mongo_secret = json.loads(
sm.get_secret_value(
SecretId="prod-mongodb-awsglue-extractjob-cred"
)["SecretString"]
)
rds_secret = json.loads(
sm.get_secret_value(
SecretId="MonitoringDB-RDS-GlueUser-Cred"
)["SecretString"]
)
# MongoDB values
m_user = mongo_secret["username"]
m_pass = mongo_secret["password"]
m_host = mongo_secret["host"]
m_port = mongo_secret["port"]
m_ssl = str(mongo_secret.get("ssl", True)).lower()
m_db = args["MONGO_DBNAME"]
# RDS values
rds_user = rds_secret["username"]
rds_pass = rds_secret["password"]
rds_host = rds_secret["host"]
rds_port = int(rds_secret.get("port", 3306))
rds_db = args["TARGET_RDS_DB"]
print(" Secrets loaded successfully")
# =====================================================
# MongoDB Connection Initialization & Collection Discovery
# Connect to DocumentDB and fetch all collections dynamically
# =====================================================
mongo_uri = (
f"mongodb://{m_user}:{m_pass}@{m_host}:{m_port}/"
f"{m_db}?ssl={m_ssl}"
)
client = MongoClient(mongo_uri)
collections = client[m_db].list_collection_names()
print(" Collections discovered:", collections)
# =====================================================
# Data Preparation & Schema Handling Functions
# =====================================================
# - Sanitize names for MySQL
# - Flatten nested structures
# - Convert NullType columns
# - Map Spark types to MySQL-safe types
# =====================================================
def sanitize(name):
return re.sub(r"[^0-9a-zA-Z_]", "_", name)
def recursive_flatten(df):
complex_cols = [
f.name for f in df.schema.fields
if isinstance(f.dataType, (StructType, ArrayType, MapType))
]
while complex_cols:
col = complex_cols[0]
dtype = df.schema[col].dataType
if isinstance(dtype, StructType):
for field in dtype.fields:
df = df.withColumn(
f"{col}_{field.name}",
F.col(f"{col}.{field.name}")
)
df = df.drop(col)
else:
df = df.withColumn(col, F.to_json(F.col(col)))
complex_cols = [
f.name for f in df.schema.fields
if isinstance(f.dataType, (StructType, ArrayType, MapType))
]
return df
def cast_nulltype_to_string(df):
for field in df.schema.fields:
if isinstance(field.dataType, NullType):
df = df.withColumn(field.name, F.col(field.name).cast("string"))
return df
def spark_type_to_mysql(field):
name = sanitize(field.name)
dtype = field.dataType
if isinstance(dtype, StringType):
return f"`{name}` LONGTEXT"
if isinstance(dtype, BooleanType):
return f"`{name}` TINYINT(1)"
if isinstance(dtype, TimestampType):
return f"`{name}` DATETIME"
if isinstance(dtype, DateType):
return f"`{name}` DATE"
if isinstance(dtype, (IntegerType, LongType)):
return f"`{name}` BIGINT"
return f"`{name}` LONGTEXT"
# =====================================================
# Drop Existing and Recreate Target RDS Database for Fresh Load
# =====================================================
def recreate_database():
conn = pymysql.connect(
host=rds_host,
user=rds_user,
password=rds_pass,
port=rds_port,
autocommit=True
)
cur = conn.cursor()
cur.execute(f"DROP DATABASE IF EXISTS `{rds_db}`;")
cur.execute(
f"CREATE DATABASE `{rds_db}` "
f"CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;"
)
cur.close()
conn.close()
print(f" RDS database `{rds_db}` recreated")
recreate_database()
# =====================================================
# Process Each MongoDB Collection
# =====================================================
for coll in collections:
print(f"\n Processing collection: {coll}")
# Read MongoDB collection into Spark
dyn = glueContext.create_dynamic_frame.from_options(
connection_type="mongodb",
connection_options={
"connection.uri": mongo_uri,
"database": m_db,
"collection": coll,
"ssl": m_ssl
}
)
df = dyn.toDF()
# Flatten & clean schema
df = recursive_flatten(df)
df = cast_nulltype_to_string(df)
#Dynamically Create table in RDS
columns_sql = ", ".join(
[spark_type_to_mysql(f) for f in df.schema.fields]
)
conn = pymysql.connect(
host=rds_host,
user=rds_user,
password=rds_pass,
port=rds_port,
db=rds_db,
autocommit=True
)
cur = conn.cursor()
cur.execute(
f"CREATE TABLE IF NOT EXISTS `{sanitize(coll)}` "
f"({columns_sql}) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;"
)
cur.close()
conn.close()
# Write data into RDS using JDBC
dynf = DynamicFrame.fromDF(df, glueContext, coll)
glueContext.write_dynamic_frame.from_options(
frame=dynf,
connection_type="mysql",
connection_options={
"url": f"jdbc:mysql://{rds_host}:{rds_port}/{rds_db}?useSSL=false",
"user": rds_user,
"password": rds_pass,
"dbtable": sanitize(coll),
"driver": "com.mysql.cj.jdbc.Driver"
}
)
print(f" Loaded `{coll}` into RDS")
# =====================================================
# Commit Glue Job
# =====================================================
job.commit()
print(" Glue Job Completed Successfully")
How the AWS Glue Pipeline Works
Here is an overview of how the AWS Glue ETL pipeline works.
1. Connecting to DocumentDB and Discovering Collections
2. Handling Schema Challenges in NoSQL Data
3. Sanitizing Column and Table Names
4. Handling NullType Columns
5. Mapping Spark Types to MySQL Types
6. Recreating the Target RDS Database
7. Writing Data into Amazon RDS
Final Thoughts
We use cookies to enhance your browsing experience, analyze traffic, and serve personalized marketing content. You can accept all cookies or manage your preferences.