TechAnek

AWS Glue ETL Job to Migrate Data from Amazon DocumentDB to Amazon RDS

Migrating data from a NoSQL database to a relational database is rarely a one click operation. It usually begins with a business requirement. Reporting needs better SQL support. Analytics tools expect structured tables. In AWS environments this challenge often appears when teams decide to move data from Amazon DocumentDB to Amazon RDS.

DocumentDB works well for flexible schemas and fast application development. Over time data models tend to grow complex. Nested fields arrays and dynamic keys become common. When reporting or analytics enters the picture these same strengths turn into obstacles. SQL based systems work best with flattened schemas and predictable data types.

This is where AWS Glue becomes extremely useful. Glue provides a serverless ETL framework that can read data from DocumentDB, process it using Apache Spark and load clean structured data into Amazon RDS. In this article I walk through a real world AWS Glue ETL job that migrates data from DocumentDB to MySQL on RDS. This is not a toy example. It reflects a production style pipeline designed to handle unknown schemas, nested structures and dynamically changing collections.

The objective is not just data movement. The focus is schema normalization, security best practices and operational simplicity.

Why Use AWS Glue for DocumentDB to RDS Migration

There are several ways to move data between databases. Custom scripts, Lambda functions, or third party tools are often evaluated. In practice AWS Glue offers a more scalable and maintainable approach.

Glue is fully serverless, which removes the need to manage infrastructure. It scales automatically based on data volume and integrates tightly with AWS services such as Secrets Manager, IAM and CloudWatch. Most importantly Glue is built on Apache Spark, which excels at processing semi structured data.

DocumentDB collections frequently contain nested objects, arrays and inconsistent fields across documents. Spark DataFrames allow dynamic inspection of schemas and transformations without writing custom parsing logic for each collection.

Glue jobs can also be parameterized. This makes it possible to reuse the same job across multiple databases and environments without modifying the code.

High Level Architecture of the Pipeline

The pipeline follows a clear flow.
    1. AWS Glue job starts with runtime parameters.
    2. Secrets are securely fetched from AWS Secrets Manager.
    3. Glue connects to DocumentDB and discovers collections automatically.
    4. Each collection is read into Spark.
    5. Nested structures are flattened.
    6. Schemas are sanitized for MySQL compatibility.
    7. A fresh database is created in Amazon RDS.
    8. Tables are created dynamically based on inferred schema.
    9. Data is written into MySQL using JDBC.
    10. The job commits and exits cleanly.
This design ensures consistency and repeatability. Each execution produces a clean RDS database that reflects the current state of DocumentDB.

Prerequisites

Before running the AWS Glue ETL job to migrate data from Amazon DocumentDB to Amazon RDS, ensure the following prerequisites are in place.

AWS Glue Job Configuration:

An AWS Glue ETL job must be created using a Python Spark runtime. The job script should be stored in Amazon S3 and referenced in the Glue job configuration. Timeout and retry values should be adjusted based on data volume. For full reload migrations, a short timeout with zero retries works well when the source data size is predictable.
CloudWatch job metrics and observability metrics should be enabled to track execution time, resource usage and failures.

Network and Glue Connections:

The Glue job must run within the same VPC where DocumentDB and RDS are accessible.
Two Glue connections are required.
  • A MongoDB connection for Amazon DocumentDB
  • A JDBC connection for Amazon RDS MySQL
Both connections must reference the correct VPC, subnet and security groups. Security groups should allow outbound access from Glue to DocumentDB and RDS on their respective ports.
Without these connections the Glue job will fail to establish database connectivity even when credentials are correct.

Required Libraries and Drivers:

Since the job connects to external databases, additional libraries must be provided.
  • PyMongo uploaded to S3 and configured in the Python library path
  • MySQL JDBC driver uploaded to S3 and configured in the dependent JARs path
These dependencies are required for DocumentDB extraction and MySQL loading. Library versions should be pinned to avoid runtime compatibility issues.

Job Parameters:

The Glue job must be configured with runtime parameters to control source and target databases.
Required parameters include
  • –MONGO_DBNAME for the source DocumentDB database
  • –TARGET_RDS_DB for the target Amazon RDS database
These parameters allow the same job to be reused across environments without changing the script.
Optional parameters such as SNS topic ARN can be added for job notifications if needed.

AWS Secrets Manager:

Secrets must exist in AWS Secrets Manager and be accessible by the Glue job IAM role.
One secret is required for DocumentDB containing
  • Username
  • Password
  • Host
  • Port
  • SSL configuration
Another secret is required for Amazon RDS MySQL containing
  1. Username
  2. Password
  3. Host
  4. Port
This approach ensures credentials are never hardcoded and can be rotated safely without redeploying the job.

IAM Permissions:

The Glue job IAM role must have permissions to
  • Read secrets from AWS Secrets Manager
  • Access S3 buckets for scripts and dependencies
  • Write logs and metrics to CloudWatch
  • Use Glue connections and VPC resources
Missing permissions are one of the most common causes of Glue job failures during initial setup.

Glue ETL Pipeline Overview

The Glue ETL script begins by reading job parameters and initializing the Spark and Glue contexts. Secrets are loaded securely from AWS Secrets Manager. A connection to DocumentDB is established and all collections are discovered dynamically.
Each collection is processed independently. Data is flattened, schemas are normalized and MySQL safe table definitions are generated. The target RDS database is recreated to ensure a clean load. Tables are created dynamically and data is written using JDBC.
Once all collections are processed the job commits and exits successfully.
				
					import sys
import json
import re
import boto3

from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark.context import SparkContext
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql import functions as F
from pyspark.sql.types import *

from pymongo import MongoClient
import pymysql

# =====================================================
# Read AWS Glue Job Parameters
# MONGO_DBNAME    : Source MongoDB / Amazon DocumentDB database name
# TARGET_RDS_DB   : Target Amazon RDS (MySQL) database to be recreated and loaded
# =====================================================

args = getResolvedOptions(
    sys.argv,
    ['JOB_NAME', 'MONGO_DBNAME', 'TARGET_RDS_DB']
)

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print(" Glue Job Started")

# =====================================================
# Securely fetch/Load Secrets from AWS Secrets Manager
# =====================================================
REGION = "us-east-1"
sm = boto3.client("secretsmanager", region_name=REGION)

mongo_secret = json.loads(
    sm.get_secret_value(
        SecretId="prod-mongodb-awsglue-extractjob-cred"
    )["SecretString"]
)

rds_secret = json.loads(
    sm.get_secret_value(
        SecretId="MonitoringDB-RDS-GlueUser-Cred"
    )["SecretString"]
)

# MongoDB values
m_user = mongo_secret["username"]
m_pass = mongo_secret["password"]
m_host = mongo_secret["host"]
m_port = mongo_secret["port"]
m_ssl  = str(mongo_secret.get("ssl", True)).lower()
m_db   = args["MONGO_DBNAME"]

# RDS values
rds_user = rds_secret["username"]
rds_pass = rds_secret["password"]
rds_host = rds_secret["host"]
rds_port = int(rds_secret.get("port", 3306))
rds_db   = args["TARGET_RDS_DB"]

print(" Secrets loaded successfully")

# =====================================================
# MongoDB Connection Initialization & Collection Discovery
# Connect to DocumentDB and fetch all collections dynamically
# =====================================================

mongo_uri = (
    f"mongodb://{m_user}:{m_pass}@{m_host}:{m_port}/"
    f"{m_db}?ssl={m_ssl}"
)

client = MongoClient(mongo_uri)
collections = client[m_db].list_collection_names()

print(" Collections discovered:", collections)

# =====================================================
# Data Preparation & Schema Handling Functions
# =====================================================
# - Sanitize names for MySQL
# - Flatten nested structures
# - Convert NullType columns
# - Map Spark types to MySQL-safe types
# =====================================================
def sanitize(name):
    return re.sub(r"[^0-9a-zA-Z_]", "_", name)

def recursive_flatten(df):
    complex_cols = [
        f.name for f in df.schema.fields
        if isinstance(f.dataType, (StructType, ArrayType, MapType))
    ]

    while complex_cols:
        col = complex_cols[0]
        dtype = df.schema[col].dataType

        if isinstance(dtype, StructType):
            for field in dtype.fields:
                df = df.withColumn(
                    f"{col}_{field.name}",
                    F.col(f"{col}.{field.name}")
                )
            df = df.drop(col)
        else:
            df = df.withColumn(col, F.to_json(F.col(col)))

        complex_cols = [
            f.name for f in df.schema.fields
            if isinstance(f.dataType, (StructType, ArrayType, MapType))
        ]

    return df

def cast_nulltype_to_string(df):
    for field in df.schema.fields:
        if isinstance(field.dataType, NullType):
            df = df.withColumn(field.name, F.col(field.name).cast("string"))
    return df

def spark_type_to_mysql(field):
    name = sanitize(field.name)
    dtype = field.dataType

    if isinstance(dtype, StringType):
        return f"`{name}` LONGTEXT"
    if isinstance(dtype, BooleanType):
        return f"`{name}` TINYINT(1)"
    if isinstance(dtype, TimestampType):
        return f"`{name}` DATETIME"
    if isinstance(dtype, DateType):
        return f"`{name}` DATE"
    if isinstance(dtype, (IntegerType, LongType)):
        return f"`{name}` BIGINT"

    return f"`{name}` LONGTEXT"

# =====================================================
# Drop Existing and Recreate Target RDS Database for Fresh Load
# =====================================================
def recreate_database():
    conn = pymysql.connect(
        host=rds_host,
        user=rds_user,
        password=rds_pass,
        port=rds_port,
        autocommit=True
    )
    cur = conn.cursor()
    cur.execute(f"DROP DATABASE IF EXISTS `{rds_db}`;")
    cur.execute(
        f"CREATE DATABASE `{rds_db}` "
        f"CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;"
    )
    cur.close()
    conn.close()
    print(f" RDS database `{rds_db}` recreated")

recreate_database()

# =====================================================
# Process Each MongoDB Collection
# =====================================================
for coll in collections:
    print(f"\n Processing collection: {coll}")

    # Read MongoDB collection into Spark
    dyn = glueContext.create_dynamic_frame.from_options(
        connection_type="mongodb",
        connection_options={
            "connection.uri": mongo_uri,
            "database": m_db,
            "collection": coll,
            "ssl": m_ssl
        }
    )

    df = dyn.toDF()

    # Flatten & clean schema
    df = recursive_flatten(df)
    df = cast_nulltype_to_string(df)

    #Dynamically Create table in RDS
    columns_sql = ", ".join(
        [spark_type_to_mysql(f) for f in df.schema.fields]
    )

    conn = pymysql.connect(
        host=rds_host,
        user=rds_user,
        password=rds_pass,
        port=rds_port,
        db=rds_db,
        autocommit=True
    )

    cur = conn.cursor()
    cur.execute(
        f"CREATE TABLE IF NOT EXISTS `{sanitize(coll)}` "
        f"({columns_sql}) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;"
    )
    cur.close()
    conn.close()

    # Write data into RDS using JDBC
    dynf = DynamicFrame.fromDF(df, glueContext, coll)

    glueContext.write_dynamic_frame.from_options(
        frame=dynf,
        connection_type="mysql",
        connection_options={
            "url": f"jdbc:mysql://{rds_host}:{rds_port}/{rds_db}?useSSL=false",
            "user": rds_user,
            "password": rds_pass,
            "dbtable": sanitize(coll),
            "driver": "com.mysql.cj.jdbc.Driver"
        }
    )

    print(f" Loaded `{coll}` into RDS")

# =====================================================
# Commit Glue Job
# =====================================================
job.commit()
print(" Glue Job Completed Successfully")
				
			

How the AWS Glue Pipeline Works

Here is an overview of how the AWS Glue ETL pipeline works.

1. Connecting to DocumentDB and Discovering Collections

One of the strengths of this pipeline is that it does not assume fixed collections. The job connects to DocumentDB using MongoClient and retrieves all collection names dynamically.
This design makes the pipeline resilient to schema changes and newly added collections. Any new collection appearing in DocumentDB is automatically included in the next Glue run.
The MongoDB connection URI is built using secure credentials and SSL settings, ensuring encrypted communication. Discovered collections are logged to improve observability and troubleshooting.

2. Handling Schema Challenges in NoSQL Data

DocumentDB collections often contain deeply nested structures. Arrays and maps are common. Some fields may be present in only a subset of documents.
Relational databases require fixed schemas, which creates a fundamental mismatch during migration. This pipeline addresses the challenge through a set of structured transformations.

3. Sanitizing Column and Table Names

MySQL does not allow certain characters in column or table names. Spaces and special characters can cause SQL failures.
A sanitization function replaces unsupported characters with underscores. This ensures that all generated table and column names are MySQL safe and prevents runtime errors during table creation.

4. Handling NullType Columns

Spark may infer a NullType when all values in a column are null. MySQL cannot create columns without defined types.
The pipeline scans the schema and casts NullType columns to strings. This avoids schema creation failures and ensures no data is lost.

5. Mapping Spark Types to MySQL Types

Spark data types do not map directly to MySQL types. A mapping function converts Spark schema fields into appropriate MySQL column definitions.
Strings are stored as LONGTEXT. Booleans use TINYINT. Timestamps map to DATETIME. Numeric values map to BIGINT. This strategy balances simplicity and flexibility while preventing truncation.

6. Recreating the Target RDS Database

Before loading data the pipeline drops and recreates the target RDS database. This ensures a clean environment free of stale tables or outdated schemas.
The database is created with UTF8MB4 encoding, which supports full Unicode including emojis. This is important because DocumentDB often stores user generated content.
This approach works well for batch migrations and scheduled refresh jobs. Incremental migration would require a different strategy.

7. Writing Data into Amazon RDS

Once tables are created, data is written using the Glue JDBC connector. Glue manages batching, retries and parallelism internally.
Each successful collection load is logged, making it easy to track progress and identify failures.

Final Thoughts

Migrating data from Amazon DocumentDB to Amazon RDS is not just a data transfer task. It is a transformation problem that requires careful handling of schemas, data types, and security at every stage. AWS Glue provides a powerful and scalable platform to address these challenges in a controlled and repeatable way. By combining Spark based transformations, dynamic schema handling, and native AWS integrations, this pipeline delivers a solution that is both reliable and easy to operate at scale.
This approach removes the need for manual schema management, minimizes operational risk, and simplifies ongoing maintenance. It allows teams to spend less time managing migration complexity and more time focusing on analytics, reporting, and extracting real value from their data.

Leave a Reply

Your email address will not be published. Required fields are marked *