Ingesting Data to Amazon RDS via Airbyte with Infrastructure as Code | Sela.
Sela. | Cloud Better.

Ingesting Data to Amazon RDS via Airbyte with Infrastructure as Code

In the realm of data management and analytics, the efficiency of ingesting data plays a pivotal role in driving insights and making informed business decisions. With the rapid expansion of cloud services and the increasing complexity of data sources, it’s becoming more and more challenging for businesses to streamline their data pipelines.

Emily Armstrong, Cloud Engineer, Sela US

Fortunately, solutions like Airbyte have emerged to address these challenges head-on, offering a streamlined approach to data ingestion and transformation. Airbyte is an open-source data integration platform designed to facilitate the movement of data between sources and destinations. Whether it’s databases, APIs, file storage, SaaS platforms, etc., Airbyte provides connectors that enable seamless data extraction and transformation between data sources and data warehouses/databases.  

 

A recent addition to the Airbyte solution is the Airbyte Terraform provider, which enables developers to programmatically manage Airbyte resources through code. This allows development teams to integrate their Airbyte configuration within their existing infrastructure, simplifying collaboration and enabling streamlined deployments.  

 

Integrating Airbyte with RDS allows users to ingest data from various sources into their relational databases in AWS. Airbyte can efficiently extract, transform, and load (ETL) data into RDS, ensuring its availability for analysis and reporting.  

 

Background 

The team at Sela had the opportunity to architect and implement a solution for an ad tech company aiming to enhance its marketing platform. The prevailing challenge was to streamline the data ingestion process, ensuring both efficiency and reliability in funneling data into a centralized system where it could undergo transformation and integration into the customer’s existing platform. Previously, the customer’s reliance on manual processes for data configuration was not only error-prone, but also constrained their team’s ability to scale and innovate within their platform.  

 

To address this, our team leveraged the Airbyte Open-Source solution to aggregate data from a variety of sources, like the Facebook Marketing and Google Ads APIs, into an Amazon RDS database. We found that there were numerous strengths, and a few limitations, to the platform from a developer perspective, which I’ll share with you below. 

 

The process of ingesting data into Amazon RDS with Airbyte involves the following steps:  

Step 1: Configure the Underlying Infrastructure 

To deploy Airbyte Open Source to AWS, the documentation recommends provisioning an EC2 instance to host the platform. For data to be ingested into RDS, you must also configure an RDS instance in your AWS account. We’ll be using Terraform to deploy both.   

The Terraform Registry provides many preconfigured modules for deploying common infrastructure configurations for EC2 and RDS, or you can write your own custom modules.   

One way to configure Airbyte on the EC2 instance is to utilize the `user_data` argument in the `aws_instance` resource. This argument can point to a shell script that installs and configures both Docker and the Airbyte platform on the instance during the bootstrapping process. This script can be modified to include specific configuration details for the Airbyte platform.   

 

Here is an example of  the shell script that we used to install Docker, install Airbyte from a Docker Image, and set our own credentials for access to the Airbyte UI.  

 

Step 2: Configure the Airbyte Platform 

With the Airbyte Terraform provider, you can create a separate stack configuration for all the resources associated with deploying your Airbyte configuration to the EC2 instance. Data sources, destinations, and the connectors between them can be defined programmatically as resources, and users can create custom Terraform modules to further encapsulate infrastructure configurations and streamline provisioning processes.  

For example, let’s examine a sample configuration to ingest data from the Facebook Marketing API into Amazon RDS. Our use case will pull data from multiple Facebook Marketing customer accounts into one RDS MySQL Database.   

 

 

First, we’ll write the Terraform code to provision resources for the Facebook Marketing source. Let’s create a custom module to define the source resource, outputs, providers, and variables. The source configuration will look something like this: 

 

source.tf 

resource "airbyte_source_facebook_marketing" "facebook_marketing" { 

  name         = "${var.name} ${var.env}" 

  workspace_id = var.workspace_id 

  configuration = { 

    access_token                  = var.access_token 

    account_id                    = var.account_id 

    action_breakdowns_allow_empty = var.action_breakdowns_allow_empty 

    custom_insights = [ 

      for custom_insight in var.custom_insights  

      :  

      { 

        action_breakdowns = custom_insight.action_breakdowns 

        breakdowns = custom_insight.breakdowns 

        start_date = custom_insight.start_date 

        end_date = custom_insight.end_date 

        fields = custom_insight.fields 

        insights_lookback_window = custom_insight.insights_lookback_window 

        level = custom_insight.level 

        name = custom_insight.name 

        time_increment = custom_insight.time_increment 

      } 

    ] 

    start_date               = var.start_date 

    end_date                 = var.end_date 

    fetch_thumbnail_images   = var.fetch_thumbnail_images 

    include_deleted          = var.include_deleted 

    insights_lookback_window = var.insights_lookback_window 

    max_batch_size           = var.max_batch_size 

    page_size                = var.page_size 

    source_type              = "facebook-marketing" 

  } 

} 

 

 

 

When referring to this custom module in your Airbyte stack, you can use locals to define a path to a JSON file where you set the values of each argument for each of your Facebook Marketing accounts. Here’s a sample example.json file where you can replace the empty strings with those values and add an additional object into the map for each individual Facebook Marketing account.  individual Facebook Marketing account.  

 

When you refer to the module in your Airbyte infrastructure stack, you only need to call the module once rather than writing repetitive code to provision a resource for each Facebook Marketing account. This will save you time and headaches moving forward, as you’ll only need to make updates in one place rather than across your code. The `for_each` Meta-Argument in Terraform allows you to create an instance of the Facebook Marketing source for each item in the “facebook_marketing” set from your json file.  

 

  

sources.tf 

module "facebook_marketing_source" { 
    for_each = local.facebook_marketing_connectors
    source = "../modules/facebook-marketing-source"   
    env = local.data.tenant.env
    name = each.value.name
    workspace_id = local.data.tenant.airbyte.workspace_id
    access_token = each.value.access_token
    account_id = each.value.account_id
    start_date = each.value.start_date
    end_date = each.value.end_date
    custom_insights = each.value.custom_insights
    action_breakdowns_allow_empty = each.value.action_breakdowns_allow_empty
    fetch_thumbnail_images = each.value.fetch_thumbnail_images
    include_deleted = each.value.include_deleted
    insights_lookback_window = each.value.insights_lookback_window
    max_batch_size = each.value.max_batch_size
    page_size = each.value.page_size
 

    providers = {
        airbyte = airbyte
    } 

} 

 

You can use the same methodology to create a custom module if you need to send data from the source to multiple destinations, however our use case calls for just one RDS MySQL database, so we’ll keep it simple. The Terraform code to provision the destination resource will look something like this: 

 

destination.tf 

resource "airbyte_destination_mysql" "my_destination_mysql" { 

  configuration = { 

    database         = local.data.tenant.destination.db_name 

    destination_type = "mysql" 

    host             = local.data.tenant.destination.db_host 

    username         = local.data.tenant.destination.db_username 

    password         = local.data.tenant.destination.db_pw 

    port             = local.data.tenant.destination.db_port 

    tunnel_method = { 

      destination_mysql_ssh_tunnel_method_no_tunnel = { 

        tunnel_method = "NO_TUNNEL" 

      } 

    } 

  } 

  name         = "MySQL DB Destination ${local.data.tenant.env}" 

  workspace_id = local.data.tenant.airbyte.workspace_id 

} 

  

You can add the configuration details for RDS to the example.json file that you created previously and refer to those values as locals as well.  

 

Finally, to create a connection between the source and destination and ingest data into RDS from the Facebook Marketing API, we’ll define a connector resource for each source that ties it to the corresponding destination. Since we configured the source using a Terraform module, we don’t have to waste time writing code for a connector corresponding to each individual Facebook Marketing account. Instead, your configuration will look something like this:  

 

resource "airbyte_connection" "facebook_marketing" { 

  for_each       = module.facebook_marketing_source 

  name           = "${each.value.source_name} Facebook Marketing Connection ${local.data.tenant.name} ${local.data.tenant.env}" 

  source_id      = each.value.source_id 

  destination_id = airbyte_destination_mysql.my_destination_mysql.destination_id 

  configurations = { 

    streams = [ { 

      name = "ads_insights" 

      sync_mode = "incremental_append" 

    } ] 

  } 

  schedule = { 

    schedule_type = "manual" 

  } 

} 

  

In the connector resource block, you can set the stream configuration and schedule the frequency of data ingestion programmatically. This lets you define what data is being synced to your RDS database, how often the sync occurs, and the behavior of the sync. For example, `incremental_append` replicates only new or modified data in a sync for any stream, rather than fetching data that you have already replicated from a source.  

 

The supported streams and sync modes for each source in Airbyte are listed in its corresponding documentation (for example, Facebook Marketing).  

 

Limitations to IaC with Airbyte  

One of the downsides to using the Airbyte Terraform provider is how new it is, meaning that there are some features of the Airbyte platform that are not able to be managed via infrastructure as code.  

 

For example, only a limited number of sources, connectors, and destinations are supported within the Terraform provider, meaning that all others must be configured manually from within the Airbyte UI. Airbyte’s public roadmap has support for custom connectors via their Terraform provider slated for release in early 2024, so hopefully this will not be an issue for much longer.  

 

Another example of this limitation that directly impacts ingesting data into Amazon RDS with  Airbyte is the lack of support for custom data transformations via IaC.  

 

Let’s look back at our example use case: ingesting data to Amazon RDS from the Facebook Marketing API. By default, the data from your Airbyte sync will be pushed to your Amazon RDS database as a table, where each row is comprised of a timestamp (`_airbyte_emitted_at_`), a JSON blob containing the response object from the Facebook Marketing API, and a unique ID (`_airbyte_ab_id`). For any typical use case, you would need the JSON blob to be normalized so that each key/value pair is its own column with a corresponding value.  

 

To accomplish this, Airbyte internally uses dbt (Data Build Tool), a comprehensive toolkit for data transformation and modeling. dbt simplifies the process of transforming raw data into analytics-ready datasets using SQL-based transformations.  

 

If your destination supports basic normalization through Airbyte, you can configure those settings in your destination. However, if you want to perform more complex transformations on your data, or your destination does not support basic normalization, you can create a custom dbt app and add it to the normalization configuration of your connector on your deployed Airbyte instance.  

 

As previously stated, this configuration cannot be deployed via infrastructure as code, only via the Airbyte UI. However, the addition of a dbt app to your Airbyte configuration is an effective tool that should not be discounted, especially if you need to manipulate your data prior to ingesting it into a database/data warehouse.  

 

In addition to allowing for data type casting and standardization of datasets, dbt enables you to implement data cleansing and validation rules to identify and handle anomalies or errors in your datasets. You can filter out invalid records, handle missing values, or flag outliers for further investigation. You’re also able to easily compute aggregates across different dimensions of your data, and create calculated fields using expressions or functions, enabling the generation of custom metrics tailored to your specific analytical requirements.  

 

Another powerful feature of dbt is the ability to perform join operations to merge data from multiple sources based on common keys or relationships.  

 

In our prior example of syncing data from the Facebook Marketing API, we specifically pull from the `ads_insights` stream during the sync specified in our connector resource. The response object contains a dynamic number of nested objects related to each Ad Set within a campaign, and each object contains a value for either `post_engagement` or `unique_post_engagement`.  

 

Let’s say that you’d like to return `total_post_engagement` in your resulting MySQL table as the sum of all values of `post_engagement` and `unique_post_engagement` for a particular Ad Set. You have the added challenge that your database is built on MySQL 5.7, so handling JSON objects is a difficult task.  

 

With dbt, you can create multiple temporary tables to unnest the values of the `post_engagement` and `unique_post_engagement` objects and return the sum, which can be referenced in the final table via an inner join on the `airbyte_ab_id`. Below is a sample dbt model that references those temporary tables to return `po.total_post_engagement AS post_engagement`.  

 

{{ config(materialized='table') }} 

 

SELECT 

    ads._airbyte_ab_id AS _airbyte_ab_id, 

    CAST(JSON_UNQUOTE(JSON_EXTRACT(ads._airbyte_data, '$.date_stop')) AS DATE) AS date 

    JSON_UNQUOTE(JSON_EXTRACT(ads._airbyte_data, '$.account_name')) AS account_name, 

    JSON_UNQUOTE(JSON_EXTRACT(ads._airbyte_data, '$.campaign_name')) AS campaign_name, 

    JSON_UNQUOTE(JSON_EXTRACT(ads._airbyte_data, '$.adset_name')) AS adset_name, 

    JSON_UNQUOTE(JSON_EXTRACT(ads._airbyte_data, '$.ad_name')) AS ad_name, 

    JSON_UNQUOTE(JSON_EXTRACT(ads._airbyte_data, '$.ad_id')) AS ad_id, 

    pe.total_post_engagement AS post_engagement, 

 

-- joins the values of the post_engagement_sum and unique_post_engagement_sum from the temporary total_post_engagement table on the _airbyte_ab_id and returns the sum as the total_post_engagement value 

INNER JOIN ( 

    SELECT _airbyte_ab_id,  

        COALESCE(tpe.post_engagement_sum, 0) + COALESCE(tpe.unique_post_engagement_sum, 0) AS total_post_engagement 

    FROM {{ ref('total_post_engagement') }} as tpe 

) AS pe ON pe._airbyte_ab_id = ads._airbyte_ab_id 

 

 

This level of data manipulation is far beyond what could have been achieved with simple normalization. By performing these additional operations through transformations via a custom dbt app, you can shape raw data into actionable insights that drive decision-making and business value.  

 

Hopefully this feature will be included in the Airbyte Terraform provider in a future release, so that all configurations needed for the ETL pipeline are able to be defined as code.  

 

Outcomes  

The implementation of Airbyte with Amazon RDS under an IaC framework significantly improved our customer’s data management capabilities. It enabled the efficient aggregation of data from multiple sources into a centralized RDS database, which was instrumental in enhancing the analytical prowess of the customer’s marketing platform. While the solution offered numerous benefits, there were still areas for improvement, particularly concerning the limitations of the Airbyte Terraform provider and the need for manual interventions in certain configurations.  

 

Conclusion 

Ingesting data into Amazon RDS via Airbyte with Infrastructure as Code represents a powerful approach to modern data integration and management. Moreover, the integration of custom transformations with dbt amplifies the transformative power of Airbyte, enabling developers to fine-tune data using familiar SQL queries as a part of the ETL process.  

 

When the Airbyte Terraform provider is updated to include this feature, Airbyte as a platform will be an even more powerful tool to automate the configuration and deployment of data infrastructure and transformations end-to-end. For now, it’s still an excellent (and free) tool that requires only a bit of manual intervention to save on a lot of operational overhead.