Saikat

Airflow DAG Example for Paginated REST API

Airflow DAG Example for Paginated REST API

We’re creating a DAG (Directed Acyclic Graph) in Apache Airflow to automate the process of fetching data from a generic API and storing it in an S3 bucket. This DAG will consist of three main tasks:

  1. Fetch Authentication Token: Obtain a token required for API access.
  2. Fetch Paginated Data: Use the token to retrieve data from the API.
  3. Store Data in S3: Upload the retrieved data to an S3 bucket.

Understanding API Pagination

Pagination is a method used by APIs to split large datasets into smaller, more manageable chunks, or “pages.” This is crucial for performance and usability, as it prevents overwhelming both the server and the client with too much data at once.

Typical API Response Structure

When you make a request to a paginated API, the response might look something like this:

{
    "count": 1000,
    "offset": 0,
    "next_offset": 200,
    "results_id": "abc123",
    "results": [
        {"id": 1, "name": "Item 1"},
        {"id": 2, "name": "Item 2"},
        ...
        {"id": 200, "name": "Item 200"}
    ]
}
  • count: Total number of items available across all pages.
  • offset: The starting index of the current page.
  • next_offset: The starting index for the next page.
  • results_id: A unique identifier for the query, used to maintain state across requests.
  • results: The actual data items for the current page.

Pagination Logic

The goal of pagination logic is to determine if there are more pages to fetch and, if so, construct the endpoint for the next page.

Steps to Implement Pagination Logic

  1. Extract Response Content:

    • Parse the JSON response to access the pagination details.
  2. Check for More Pages:

    • Determine if there is a next_offset. If it exists, there are more pages to fetch.
  3. Construct Next Page Endpoint:

    • Use the next_offset and results_id to build the URL for the next page request.
  4. Return the Endpoint or None:

    • If there are more pages, return the constructed endpoint.
    • If not, return None to indicate that all pages have been fetched.

Step 1: Setting Up Default Arguments

default_args = {
    'owner': 'saikat',
    'retries': 0,
    'retry_delay': timedelta(minutes=1),
}
  • Purpose: Define default arguments for the DAG.
  • Owner: Specifies who owns the DAG.
  • Retries: Number of times to retry a task if it fails.
  • Retry Delay: Time to wait between retries.

Step 2: Pagination Logic

def paginate(response) -> dict:
    content = response.json()[0]
    total: int = content['count']
    offset: int = content['offset']
    next_offset: int = content.get('next_offset')

    if next_offset:
        results_id: str = content.get('results_id')
        encoded_results_id = urllib.parse.quote(results_id)
        return dict(endpoint=f"/search?format=object&limit=100&results_id={encoded_results_id}&offset={next_offset}")
    return None
  • Purpose: Handle pagination for API responses.
  • Response Handling: Extracts necessary information from the API response.
  • Next Offset: Determines if there are more pages of data to fetch.
  • Return: Provides the endpoint for the next page if available.

Step 3: Defining the DAG

@dag(
    default_args=default_args,
    schedule='@daily',
    start_date=datetime(2025, 3, 1),
    catchup=False,
    description="DAG to fetch data from a generic API and store in S3",
    tags=['data_fetch', 's3_upload']
)
def discovery_dag():
    # Tasks will be defined here
  • Decorator: @dag is used to define a DAG.
  • Schedule: Runs daily.
  • Start Date: Specifies when the DAG should start.
  • Catchup: Prevents backfilling past the start date.
  • Description and Tags: Provide metadata about the DAG.

Step 4: Task 1 - Fetch Authentication Token

logging.info("Extracting environment variables")
username = Variable.get("API_USERNAME")
password = Variable.get("API_PASSWORD")

fetch_token = HttpOperator(
    task_id='fetch_auth_token',
    http_conn_id='api_conn',
    endpoint='/token',
    method='POST',
    data={"username": f"{username}", "password": f"{password}", "grant_type": "password"},
    headers={"Content-Type": "application/x-www-form-urlencoded"},
    extra_options={"verify": False},
    response_filter=lambda response: {'token': response.json()['access_token']}
)
  • Environment Variables: Fetches API credentials stored in Airflow Variables.
  • HttpOperator: Makes an HTTP POST request to obtain an authentication token.
  • Response Filter: Extracts the access_token from the response and stores it in XCom for later use.

Step 5: Task 2 - Fetch Paginated Data

search_query = Variable.get("API_SEARCH_QUERY")

fetch_data = HttpOperator(
    task_id='fetch_data',
    http_conn_id='api_conn',
    endpoint='/search?format=object&limit=100',
    method='POST',
    data=json.dumps({"query": search_query}),
    headers={"Authorization": "Bearer {{ ti.xcom_pull(task_ids='fetch_auth_token')['token'] }}", "Content-Type": "application/json"},
    extra_options={"verify": False},
    response_filter=lambda responses: json.dumps([response.json() for response in responses]),
    pagination_function=paginate
)
  • Search Query: Retrieves the query from Airflow Variables.
  • HttpOperator: Makes a POST request to fetch data using the token from Task 1.
  • Authorization: Uses the token fetched in Task 1.
  • Pagination: Handles paginated responses using the paginate function.
  • Response Filter: Collects and formats the data from all pages.

Step 6: Task 3 - Store Data in S3

upload_to_s3 = S3CreateObjectOperator(
    task_id="upload_to_s3",
    s3_bucket="airflow-demo",
    s3_key="api_data.json",
    data="{{ ti.xcom_pull(task_ids='fetch_data') }}",
    replace=True,
    aws_conn_id="s3_conn"
)
  • S3CreateObjectOperator: Uploads the fetched data to an S3 bucket.
  • XCom: Retrieves the data from Task 2 using ti.xcom_pull.
  • Replace: Overwrites the existing file if it exists.

Step 7: Task Dependencies

fetch_token >> fetch_data >> upload_to_s3
  • Task Dependencies: Defines the order of task execution.
  • Chaining: Ensures that fetch_data runs after fetch_token, and upload_to_s3 runs after fetch_data.

Step 8: Instantiate the DAG

discovery_dag()
  • Instantiation: Creates an instance of the DAG, making it ready for execution.

Conclusion

This DAG automates the process of fetching data from an API and storing it in S3. By breaking down each task and understanding its role, we can see how Airflow orchestrates complex workflows efficiently. Adjust the connection IDs and variables as needed for your specific use case.