Airflow DAG Example for Paginated REST API
Airflow DAG Example for Paginated REST API
We’re creating a DAG (Directed Acyclic Graph) in Apache Airflow to automate the process of fetching data from a generic API and storing it in an S3 bucket. This DAG will consist of three main tasks:
- Fetch Authentication Token: Obtain a token required for API access.
- Fetch Paginated Data: Use the token to retrieve data from the API.
- Store Data in S3: Upload the retrieved data to an S3 bucket.
Understanding API Pagination
Pagination is a method used by APIs to split large datasets into smaller, more manageable chunks, or “pages.” This is crucial for performance and usability, as it prevents overwhelming both the server and the client with too much data at once.
Typical API Response Structure
When you make a request to a paginated API, the response might look something like this:
{
"count": 1000,
"offset": 0,
"next_offset": 200,
"results_id": "abc123",
"results": [
{"id": 1, "name": "Item 1"},
{"id": 2, "name": "Item 2"},
...
{"id": 200, "name": "Item 200"}
]
}
- count: Total number of items available across all pages.
- offset: The starting index of the current page.
- next_offset: The starting index for the next page.
- results_id: A unique identifier for the query, used to maintain state across requests.
- results: The actual data items for the current page.
Pagination Logic
The goal of pagination logic is to determine if there are more pages to fetch and, if so, construct the endpoint for the next page.
Steps to Implement Pagination Logic
Extract Response Content:
- Parse the JSON response to access the pagination details.
Check for More Pages:
- Determine if there is a
next_offset
. If it exists, there are more pages to fetch.
- Determine if there is a
Construct Next Page Endpoint:
- Use the
next_offset
andresults_id
to build the URL for the next page request.
- Use the
Return the Endpoint or None:
- If there are more pages, return the constructed endpoint.
- If not, return
None
to indicate that all pages have been fetched.
Step 1: Setting Up Default Arguments
default_args = {
'owner': 'saikat',
'retries': 0,
'retry_delay': timedelta(minutes=1),
}
- Purpose: Define default arguments for the DAG.
- Owner: Specifies who owns the DAG.
- Retries: Number of times to retry a task if it fails.
- Retry Delay: Time to wait between retries.
Step 2: Pagination Logic
def paginate(response) -> dict:
content = response.json()[0]
total: int = content['count']
offset: int = content['offset']
next_offset: int = content.get('next_offset')
if next_offset:
results_id: str = content.get('results_id')
encoded_results_id = urllib.parse.quote(results_id)
return dict(endpoint=f"/search?format=object&limit=100&results_id={encoded_results_id}&offset={next_offset}")
return None
- Purpose: Handle pagination for API responses.
- Response Handling: Extracts necessary information from the API response.
- Next Offset: Determines if there are more pages of data to fetch.
- Return: Provides the endpoint for the next page if available.
Step 3: Defining the DAG
@dag(
default_args=default_args,
schedule='@daily',
start_date=datetime(2025, 3, 1),
catchup=False,
description="DAG to fetch data from a generic API and store in S3",
tags=['data_fetch', 's3_upload']
)
def discovery_dag():
# Tasks will be defined here
- Decorator:
@dag
is used to define a DAG. - Schedule: Runs daily.
- Start Date: Specifies when the DAG should start.
- Catchup: Prevents backfilling past the start date.
- Description and Tags: Provide metadata about the DAG.
Step 4: Task 1 - Fetch Authentication Token
logging.info("Extracting environment variables")
username = Variable.get("API_USERNAME")
password = Variable.get("API_PASSWORD")
fetch_token = HttpOperator(
task_id='fetch_auth_token',
http_conn_id='api_conn',
endpoint='/token',
method='POST',
data={"username": f"{username}", "password": f"{password}", "grant_type": "password"},
headers={"Content-Type": "application/x-www-form-urlencoded"},
extra_options={"verify": False},
response_filter=lambda response: {'token': response.json()['access_token']}
)
- Environment Variables: Fetches API credentials stored in Airflow Variables.
- HttpOperator: Makes an HTTP POST request to obtain an authentication token.
- Response Filter: Extracts the
access_token
from the response and stores it in XCom for later use.
Step 5: Task 2 - Fetch Paginated Data
search_query = Variable.get("API_SEARCH_QUERY")
fetch_data = HttpOperator(
task_id='fetch_data',
http_conn_id='api_conn',
endpoint='/search?format=object&limit=100',
method='POST',
data=json.dumps({"query": search_query}),
headers={"Authorization": "Bearer {{ ti.xcom_pull(task_ids='fetch_auth_token')['token'] }}", "Content-Type": "application/json"},
extra_options={"verify": False},
response_filter=lambda responses: json.dumps([response.json() for response in responses]),
pagination_function=paginate
)
- Search Query: Retrieves the query from Airflow Variables.
- HttpOperator: Makes a POST request to fetch data using the token from Task 1.
- Authorization: Uses the token fetched in Task 1.
- Pagination: Handles paginated responses using the
paginate
function. - Response Filter: Collects and formats the data from all pages.
Step 6: Task 3 - Store Data in S3
upload_to_s3 = S3CreateObjectOperator(
task_id="upload_to_s3",
s3_bucket="airflow-demo",
s3_key="api_data.json",
data="{{ ti.xcom_pull(task_ids='fetch_data') }}",
replace=True,
aws_conn_id="s3_conn"
)
- S3CreateObjectOperator: Uploads the fetched data to an S3 bucket.
- XCom: Retrieves the data from Task 2 using
ti.xcom_pull
. - Replace: Overwrites the existing file if it exists.
Step 7: Task Dependencies
fetch_token >> fetch_data >> upload_to_s3
- Task Dependencies: Defines the order of task execution.
- Chaining: Ensures that
fetch_data
runs afterfetch_token
, andupload_to_s3
runs afterfetch_data
.
Step 8: Instantiate the DAG
discovery_dag()
- Instantiation: Creates an instance of the DAG, making it ready for execution.
Conclusion
This DAG automates the process of fetching data from an API and storing it in S3. By breaking down each task and understanding its role, we can see how Airflow orchestrates complex workflows efficiently. Adjust the connection IDs and variables as needed for your specific use case.