An API (Application Programming Interface) is a structured interface that allows software programs to communicate with each other — in data science, APIs are the primary mechanism for accessing real-time and third-party data from web services. Using Python’s requests library, you send HTTP requests to an API endpoint (a URL), receive a response (usually in JSON format), and parse that response into a pandas DataFrame for analysis. Nearly every major data source — financial markets, social media, weather services, government databases, and machine learning platforms — exposes its data through an API.
Introduction
Imagine you want to analyze cryptocurrency price movements, build a weather-aware demand forecasting model, enrich your customer data with company information, or track sentiment about your product on social media. In each case, the data you need isn’t sitting in a file you can download — it’s live, updating constantly, and accessible only through an API.
APIs have become the universal data delivery mechanism of the modern web. Every major platform — Twitter/X, GitHub, Spotify, OpenWeatherMap, Alpha Vantage, Google Maps, Stripe, Salesforce — exposes APIs that give programmatic access to their data. For data scientists, this represents an enormous resource: terabytes of real-world, continuously updated data, accessible through a few lines of Python.
What distinguishes professional API usage from amateur usage isn’t just knowing how to make a request — it’s understanding authentication, pagination, rate limiting, error handling, caching, and building pipelines that work reliably over time rather than breaking on the first network hiccup or API version change.
This article covers the complete arc: from making your first request through handling authentication schemes, navigating pagination, respecting rate limits, dealing with errors gracefully, and building production-quality API data pipelines that you can rely on.
Understanding APIs: The Concepts
Before writing a single line of Python, understanding the underlying concepts makes everything else click into place.
What an API Is and Isn’t
An API defines the interface through which two software systems communicate — what requests can be made, in what format, and what responses to expect. In the context of web APIs (which is what data scientists almost always mean), this communication happens over HTTP, the same protocol your browser uses to load web pages.
The critical difference between browsing a website and calling an API:
- When you browse a website, the server returns HTML designed for a human to read in a browser
- When you call an API, the server returns structured data (usually JSON) designed for a program to parse
The server is often the same — many web services have both a human-facing website and a machine-facing API backed by the same database.
REST: The Dominant API Style
Most modern web APIs are RESTful (Representational State Transfer) — they follow a set of architectural conventions:
- Resources are identified by URLs:
https://api.example.com/customers/12345 - HTTP methods indicate the action:
GET— retrieve a resource (read-only, safe)POST— create a new resourcePUT/PATCH— update an existing resourceDELETE— remove a resource
- Stateless: Each request contains all information needed; the server doesn’t remember previous requests
- Responses are typically JSON
For data scientists, the vast majority of API work is GET requests — retrieving data. POST requests appear when submitting data to ML inference endpoints.
Anatomy of an API Request and Response
Request:
Method: GET
URL: https://api.openweathermap.org/data/2.5/weather
Params: ?q=Austin&units=metric&appid=YOUR_API_KEY
Headers: Accept: application/json
Response:
Status: 200 OK
Headers: Content-Type: application/json
Body: {
"name": "Austin",
"main": {"temp": 28.4, "humidity": 65},
"weather": [{"description": "clear sky"}],
"wind": {"speed": 3.2}
}Status codes indicate whether the request succeeded:
| Status Code | Meaning | What to Do |
|---|---|---|
| 200 OK | Success | Parse response |
| 201 Created | Resource created | Parse if response body returned |
| 400 Bad Request | Your request is malformed | Check parameters |
| 401 Unauthorized | Missing or invalid authentication | Check API key/token |
| 403 Forbidden | Authenticated but not permitted | Check permissions/plan |
| 404 Not Found | Resource doesn’t exist | Check URL and IDs |
| 429 Too Many Requests | Rate limit exceeded | Back off and retry |
| 500 Internal Server Error | Server-side problem | Retry later |
| 503 Service Unavailable | Server temporarily down | Retry later |
The requests Library: Python’s HTTP Client
The requests library is Python’s standard tool for HTTP communication — clean, intuitive, and feature-complete.
pip install requestsYour First API Call
Let’s start with Open-Meteo, a free weather API that requires no authentication:
import requests
import pandas as pd
# The simplest possible API call
response = requests.get(
"https://api.open-meteo.com/v1/forecast",
params={
"latitude": 30.2672, # Austin, TX
"longitude": -97.7431,
"hourly": "temperature_2m,precipitation",
"temperature_unit": "celsius",
"forecast_days": 7
}
)
# Always check status before using the response
print(f"Status code: {response.status_code}")
print(f"Response type: {response.headers['Content-Type']}")
# Parse JSON response
data = response.json()
print(type(data)) # <class 'dict'>
print(data.keys()) # dict_keys(['latitude', 'longitude', 'hourly', ...])Examining the Response Object
response = requests.get("https://api.open-meteo.com/v1/forecast", params={...})
# Status code
response.status_code # 200
# Response headers
response.headers # Dict of response headers
response.headers["Content-Type"] # "application/json; charset=utf-8"
# Response body as text
response.text # Raw JSON string
# Response body parsed as JSON (dict or list)
response.json() # Python dict/list
# Response body as bytes (for binary data like images)
response.content # bytes
# URL that was actually requested (after redirects)
response.url # "https://api.open-meteo.com/v1/forecast?latitude=30.27..."
# Whether the request succeeded (True for 2xx status codes)
response.ok # True if status_code < 400
# Raise an exception immediately if status code indicates error
response.raise_for_status() # Raises requests.HTTPError if status >= 400Parsing JSON into a DataFrame
The response JSON must be navigated to reach the tabular data you need:
import requests
import pandas as pd
response = requests.get(
"https://api.open-meteo.com/v1/forecast",
params={
"latitude": 30.2672,
"longitude": -97.7431,
"hourly": "temperature_2m,precipitation,windspeed_10m",
"forecast_days": 7
}
)
response.raise_for_status()
data = response.json()
# The hourly data is nested under 'hourly'
hourly = data["hourly"]
# hourly = {"time": [...], "temperature_2m": [...], "precipitation": [...]}
# Convert to DataFrame directly
df = pd.DataFrame(hourly)
df["time"] = pd.to_datetime(df["time"])
df = df.set_index("time")
print(df.head())
print(f"Shape: {df.shape}") # (168, 3) — 7 days × 24 hours
# Basic analysis immediately available
print(f"Max temperature: {df['temperature_2m'].max():.1f}°C")
print(f"Total expected precipitation: {df['precipitation'].sum():.1f}mm")Authentication: Proving Who You Are
Most APIs require authentication before they’ll return data. There are several authentication mechanisms you’ll encounter.
API Keys: The Most Common Method
An API key is a secret token that identifies you (or your application) to the API. You obtain it by registering for an account with the API provider.
In query parameters (simplest, but the key appears in URLs and logs):
import requests
import os
API_KEY = os.environ["OPENWEATHER_API_KEY"] # Never hardcode!
response = requests.get(
"https://api.openweathermap.org/data/2.5/weather",
params={
"q": "Austin,US",
"units": "metric",
"appid": API_KEY # Key in query string
}
)In headers (more secure — doesn’t appear in URL or server logs):
response = requests.get(
"https://api.example.com/data",
headers={
"X-API-Key": API_KEY, # Common header names vary by API
# or: "Authorization": f"ApiKey {API_KEY}",
# or: "api-key": API_KEY
}
)Always read the API documentation for the exact header name and format — it varies between providers.
Bearer Token Authentication (OAuth 2.0)
OAuth 2.0 is a more sophisticated authentication framework used by major platforms (Twitter/X, GitHub, Spotify, Google). The flow:
- Exchange your credentials for an access token
- Include that token in the
Authorizationheader asBearer <token>
import requests
import os
# Step 1: Get an access token using your client credentials
auth_response = requests.post(
"https://api.example.com/oauth2/token",
data={
"grant_type": "client_credentials",
"client_id": os.environ["API_CLIENT_ID"],
"client_secret": os.environ["API_CLIENT_SECRET"]
}
)
auth_response.raise_for_status()
access_token = auth_response.json()["access_token"]
token_expires_in = auth_response.json().get("expires_in", 3600) # Seconds
# Step 2: Use the token in subsequent requests
headers = {"Authorization": f"Bearer {access_token}"}
response = requests.get(
"https://api.example.com/data/endpoint",
headers=headers
)
data = response.json()Token management in practice:
import time
from dataclasses import dataclass
from typing import Optional
@dataclass
class TokenManager:
"""Manages OAuth token lifecycle including automatic refresh."""
client_id: str
client_secret: str
token_url: str
_token: Optional[str] = None
_expires_at: float = 0.0
def get_token(self) -> str:
"""Return a valid token, refreshing if expired."""
if self._token is None or time.time() >= self._expires_at - 60:
self._refresh_token()
return self._token
def _refresh_token(self) -> None:
response = requests.post(
self.token_url,
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
)
response.raise_for_status()
data = response.json()
self._token = data["access_token"]
self._expires_at = time.time() + data.get("expires_in", 3600)
def auth_headers(self) -> dict:
return {"Authorization": f"Bearer {self.get_token()}"}HTTP Basic Authentication
Some older APIs use HTTP Basic Auth — a username and password encoded in the request header:
import requests
from requests.auth import HTTPBasicAuth
import os
# Method 1: requests.auth.HTTPBasicAuth object
response = requests.get(
"https://api.example.com/data",
auth=HTTPBasicAuth(os.environ["API_USER"], os.environ["API_PASSWORD"])
)
# Method 2: Tuple shorthand (equivalent)
response = requests.get(
"https://api.example.com/data",
auth=(os.environ["API_USER"], os.environ["API_PASSWORD"])
)Using a Session Object
For multiple requests to the same API, use a requests.Session to reuse the connection and share authentication headers across all calls:
import requests
import os
# Create a session with shared authentication
session = requests.Session()
session.headers.update({
"Authorization": f"Bearer {os.environ['API_TOKEN']}",
"Accept": "application/json",
"User-Agent": "MyDataPipeline/1.0"
})
# All requests through session share headers — more efficient
response1 = session.get("https://api.example.com/customers")
response2 = session.get("https://api.example.com/orders")
response3 = session.get("https://api.example.com/products")
# Close the session when done
session.close()
# Better: use as context manager
with requests.Session() as session:
session.headers.update({...})
response = session.get("https://api.example.com/data")
# Session automatically closed on exitHandling Pagination: Getting All the Data
APIs rarely return all data in a single response — they paginate results to protect server resources. Knowing how to navigate pagination is essential for extracting complete datasets.
Style 1: Page Number Pagination
The simplest form: requests include a page parameter, responses include total page count or total record count.
import requests
import pandas as pd
import os
def fetch_all_customers(api_key: str, page_size: int = 100) -> pd.DataFrame:
"""
Fetch all customers from a paginated API.
Parameters
----------
api_key : str
API authentication key.
page_size : int
Records to fetch per request.
Returns
-------
pd.DataFrame
All customers combined into a single DataFrame.
"""
all_records = []
page = 1
total_pages = None
session = requests.Session()
session.headers.update({"X-API-Key": api_key})
while True:
response = session.get(
"https://api.example.com/customers",
params={"page": page, "per_page": page_size}
)
response.raise_for_status()
data = response.json()
# Parse this page's records
records = data["customers"]
all_records.extend(records)
# Determine if there are more pages
total_pages = data["meta"]["total_pages"]
print(f"Fetched page {page}/{total_pages} ({len(records)} records)")
if page >= total_pages:
break
page += 1
print(f"Total records fetched: {len(all_records):,}")
return pd.DataFrame(all_records)
# Usage
df = fetch_all_customers(api_key=os.environ["API_KEY"])Style 2: Cursor / Token-Based Pagination
Many APIs return a cursor or next_page_token in each response that must be passed to get the next page. This is more efficient for large datasets because it doesn’t require counting total records.
def fetch_all_transactions(session: requests.Session, start_date: str) -> list:
"""Fetch all transactions using cursor-based pagination."""
all_transactions = []
params = {
"start_date": start_date,
"limit": 200
}
page_num = 1
while True:
response = session.get(
"https://api.example.com/transactions",
params=params
)
response.raise_for_status()
data = response.json()
transactions = data.get("data", [])
all_transactions.extend(transactions)
print(f"Page {page_num}: fetched {len(transactions)} transactions "
f"(total so far: {len(all_transactions):,})")
# Check for next page cursor
next_cursor = data.get("pagination", {}).get("next_cursor")
if not next_cursor:
break # No more pages
# Pass cursor to get next page
params["cursor"] = next_cursor
params.pop("start_date", None) # Some APIs don't want date with cursor
page_num += 1
return all_transactionsStyle 3: Link Header Pagination
Some APIs (including GitHub) encode next page URLs in the HTTP Link response header:
def fetch_github_repos(username: str, token: str) -> list:
"""Fetch all public repos for a GitHub user."""
all_repos = []
url = f"https://api.github.com/users/{username}/repos"
session = requests.Session()
session.headers.update({
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json"
})
while url:
response = session.get(url, params={"per_page": 100})
response.raise_for_status()
all_repos.extend(response.json())
# GitHub encodes the next URL in the Link header
# Link: <https://api.github.com/user/repos?page=2>; rel="next"
link_header = response.headers.get("Link", "")
next_url = None
for part in link_header.split(","):
if 'rel="next"' in part:
next_url = part.split(";")[0].strip().strip("<>")
break
url = next_url
return all_reposStyle 4: Offset Pagination
Similar to page-based, but uses offset (number of records to skip) instead of page number:
def fetch_with_offset(session, endpoint: str, limit: int = 100) -> list:
"""Fetch all records using offset pagination."""
all_records = []
offset = 0
while True:
response = session.get(
endpoint,
params={"limit": limit, "offset": offset}
)
response.raise_for_status()
data = response.json()
records = data.get("results", [])
if not records: # Empty page means we're done
break
all_records.extend(records)
print(f"Fetched offset {offset}: {len(records)} records")
if len(records) < limit:
break # Partial page = last page
offset += limit
return all_recordsRate Limiting: Respecting API Limits
Rate limits restrict how many requests you can make in a time period — typically expressed as requests per minute, per hour, or per day. Exceeding them results in 429 Too Many Requests responses, temporary bans, or permanent API key revocation.
Reading Rate Limit Headers
Most APIs communicate your current rate limit status in response headers:
response = requests.get("https://api.github.com/rate_limit",
headers={"Authorization": f"token {token}"})
# Common rate limit header names
headers = response.headers
print(headers.get("X-RateLimit-Limit")) # Total allowed requests
print(headers.get("X-RateLimit-Remaining")) # Requests left in current window
print(headers.get("X-RateLimit-Reset")) # Unix timestamp when window resets
print(headers.get("Retry-After")) # Seconds to wait after 429 errorImplementing Rate-Limited Requests
import requests
import time
from typing import Optional
def rate_limited_get(
session: requests.Session,
url: str,
params: dict = None,
requests_per_second: float = 1.0,
_last_call: list = [0.0] # Mutable default — persists across calls
) -> requests.Response:
"""
Make a GET request respecting a per-second rate limit.
Parameters
----------
session : requests.Session
Requests session to use.
url : str
Endpoint URL.
params : dict, optional
Query parameters.
requests_per_second : float
Maximum request rate. Default: 1.0 (one per second).
Returns
-------
requests.Response
The API response.
"""
# Calculate minimum time between requests
min_interval = 1.0 / requests_per_second
# Wait if we're making requests too quickly
elapsed = time.time() - _last_call[0]
if elapsed < min_interval:
time.sleep(min_interval - elapsed)
response = session.get(url, params=params)
_last_call[0] = time.time()
return responseRetry with Exponential Backoff
When you hit a rate limit or server error, wait and retry — with increasing wait time (exponential backoff) so you don’t hammer a struggling server:
import requests
import time
import logging
logger = logging.getLogger(__name__)
def get_with_backoff(
url: str,
session: requests.Session,
params: dict = None,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0
) -> requests.Response:
"""
Make a GET request with exponential backoff retry logic.
Automatically retries on rate limit (429) and server errors (5xx).
Each retry waits exponentially longer: 1s, 2s, 4s, 8s, 16s...
Parameters
----------
url : str
Request URL.
session : requests.Session
Session to use for the request.
params : dict, optional
Query parameters.
max_retries : int
Maximum retry attempts before raising. Default 5.
base_delay : float
Initial delay in seconds. Default 1.0.
max_delay : float
Maximum delay in seconds. Default 60.0.
Returns
-------
requests.Response
Successful response.
Raises
------
requests.HTTPError
If max retries exhausted or a non-retryable error occurs.
"""
retryable_status_codes = {429, 500, 502, 503, 504}
for attempt in range(max_retries + 1):
response = session.get(url, params=params)
if response.ok:
return response
if response.status_code not in retryable_status_codes:
# Client error (400, 401, 403, 404) — don't retry
response.raise_for_status()
if attempt == max_retries:
logger.error(f"Max retries ({max_retries}) exceeded for {url}")
response.raise_for_status()
# Check Retry-After header (rate limit response often includes it)
retry_after = response.headers.get("Retry-After")
if retry_after:
delay = float(retry_after)
else:
# Exponential backoff with jitter
delay = min(base_delay * (2 ** attempt), max_delay)
delay += random.uniform(0, delay * 0.1) # ±10% jitter
logger.warning(
f"Status {response.status_code} from {url}. "
f"Retry {attempt + 1}/{max_retries} in {delay:.1f}s..."
)
time.sleep(delay)
# Should never reach here, but satisfies type checkers
response.raise_for_status()Using requests-ratelimiter for Production
For production pipelines, the requests-ratelimiter library handles rate limiting automatically:
pip install requests-ratelimiter
from requests_ratelimiter import LimiterSession
# Automatically limits requests to 5 per second
session = LimiterSession(per_second=5)
# Use exactly like a normal session
response = session.get("https://api.example.com/data")
Real-World Examples: APIs Data Scientists Use
Example 1: Financial Data with Alpha Vantage
import requests
import pandas as pd
import os
def fetch_stock_daily(symbol: str, output_size: str = "compact") -> pd.DataFrame:
"""
Fetch daily OHLCV data for a stock from Alpha Vantage.
Parameters
----------
symbol : str
Stock ticker symbol (e.g., 'AAPL', 'MSFT').
output_size : str
'compact' (last 100 days) or 'full' (20+ years). Default 'compact'.
Returns
-------
pd.DataFrame
Daily OHLCV data with datetime index, sorted oldest first.
"""
response = requests.get(
"https://www.alphavantage.co/query",
params={
"function": "TIME_SERIES_DAILY",
"symbol": symbol,
"outputsize": output_size,
"apikey": os.environ["ALPHAVANTAGE_API_KEY"]
}
)
response.raise_for_status()
data = response.json()
if "Error Message" in data:
raise ValueError(f"API error: {data['Error Message']}")
if "Note" in data:
raise RuntimeError(f"Rate limit hit: {data['Note']}")
# Parse the nested JSON structure
time_series = data["Time Series (Daily)"]
df = pd.DataFrame.from_dict(time_series, orient="index")
df.index = pd.to_datetime(df.index)
df.index.name = "date"
# Rename columns and convert types
df.columns = ["open", "high", "low", "close", "volume"]
df = df.astype({"open": float, "high": float, "low": float,
"close": float, "volume": int})
df = df.sort_index() # Oldest first
return df
# Usage
aapl = fetch_stock_daily("AAPL")
print(aapl.tail())
print(f"\n30-day return: {(aapl['close'].iloc[-1] / aapl['close'].iloc[-30] - 1):.2%}")Example 2: Weather Data for Feature Engineering
import requests
import pandas as pd
from datetime import datetime, timedelta
def fetch_historical_weather(
latitude: float,
longitude: float,
start_date: str,
end_date: str
) -> pd.DataFrame:
"""
Fetch historical hourly weather data from Open-Meteo (free, no auth).
Parameters
----------
latitude, longitude : float
Location coordinates.
start_date, end_date : str
Date range in 'YYYY-MM-DD' format.
Returns
-------
pd.DataFrame
Hourly weather with datetime index.
"""
response = requests.get(
"https://archive-api.open-meteo.com/v1/archive",
params={
"latitude": latitude,
"longitude": longitude,
"start_date": start_date,
"end_date": end_date,
"hourly": "temperature_2m,precipitation,windspeed_10m,cloudcover",
"timezone": "auto"
}
)
response.raise_for_status()
data = response.json()
df = pd.DataFrame(data["hourly"])
df["time"] = pd.to_datetime(df["time"])
df = df.set_index("time")
return df
# Fetch Austin weather for last 90 days
end_date = datetime.today().strftime("%Y-%m-%d")
start_date = (datetime.today() - timedelta(days=90)).strftime("%Y-%m-%d")
weather_df = fetch_historical_weather(
latitude=30.2672,
longitude=-97.7431,
start_date=start_date,
end_date=end_date
)
# Daily aggregates — useful as ML features
daily_weather = weather_df.resample("D").agg({
"temperature_2m": ["mean", "max", "min"],
"precipitation": "sum",
"windspeed_10m": "mean",
"cloudcover": "mean"
})
daily_weather.columns = ["_".join(col) for col in daily_weather.columns]
print(daily_weather.tail())Example 3: GitHub API for Portfolio Analysis
import requests
import pandas as pd
import os
def analyze_github_activity(username: str) -> dict:
"""
Analyze a GitHub user's public repository activity.
Returns a summary of repositories, languages, stars, and activity.
"""
token = os.environ.get("GITHUB_TOKEN") # Optional but increases rate limit
session = requests.Session()
session.headers.update({
"Accept": "application/vnd.github.v3+json",
**({"Authorization": f"token {token}"} if token else {})
})
# Fetch all repos (handles pagination)
all_repos = []
url = f"https://api.github.com/users/{username}/repos"
while url:
response = session.get(url, params={"per_page": 100, "type": "owner"})
response.raise_for_status()
all_repos.extend(response.json())
# Follow Link header to next page
links = response.headers.get("Link", "")
url = None
for part in links.split(","):
if 'rel="next"' in part:
url = part.split(";")[0].strip().strip("<>")
df = pd.DataFrame(all_repos)
# Extract useful fields
summary = pd.DataFrame({
"name": df["name"],
"language": df["language"],
"stars": df["stargazers_count"],
"forks": df["forks_count"],
"open_issues": df["open_issues_count"],
"created_at": pd.to_datetime(df["created_at"]),
"last_pushed": pd.to_datetime(df["pushed_at"]),
"is_fork": df["fork"]
})
# Filter to original repos only
original = summary[~summary["is_fork"]]
return {
"total_repos": len(original),
"total_stars": int(original["stars"].sum()),
"top_languages": original["language"].value_counts().head(5).to_dict(),
"most_starred": original.nlargest(3, "stars")[["name", "stars", "language"]].to_dict("records"),
"recently_active": original.nlargest(5, "last_pushed")[["name", "last_pushed"]].to_dict("records")
}
result = analyze_github_activity("torvalds")
print(f"Total public repos: {result['total_repos']}")
print(f"Total stars: {result['total_stars']:,}")
print(f"Top languages: {result['top_languages']}")POST Requests: Sending Data to APIs
While most data collection uses GET, POST requests are common for ML inference APIs and data submission endpoints.
Calling an ML Inference API
import requests
import pandas as pd
import os
def score_batch_via_api(
features_df: pd.DataFrame,
batch_size: int = 100
) -> pd.DataFrame:
"""
Send feature data to a model inference API and collect predictions.
Parameters
----------
features_df : pd.DataFrame
Features to score. Must match model's expected schema.
batch_size : int
Number of records per API call. Default 100.
Returns
-------
pd.DataFrame
Original features with appended prediction columns.
"""
session = requests.Session()
session.headers.update({
"Authorization": f"Bearer {os.environ['ML_API_TOKEN']}",
"Content-Type": "application/json"
})
all_predictions = []
for i in range(0, len(features_df), batch_size):
batch = features_df.iloc[i:i + batch_size]
# Convert batch to JSON-serializable format
payload = {
"instances": batch.to_dict(orient="records")
}
response = session.post(
"https://ml-api.company.com/v1/predict",
json=payload # 'json=' auto-sets Content-Type and serializes
)
response.raise_for_status()
result = response.json()
predictions = result["predictions"]
all_predictions.extend(predictions)
print(f"Scored batch {i // batch_size + 1}: "
f"{len(batch)} records (total: {len(all_predictions):,})")
# Attach predictions back to original DataFrame
features_df = features_df.copy()
features_df["churn_probability"] = [p["churn_probability"] for p in all_predictions]
features_df["predicted_label"] = [p["label"] for p in all_predictions]
return features_dfSending Data with Different Content Types
# JSON payload (most common for REST APIs)
response = requests.post(
"https://api.example.com/events",
json={"event_type": "page_view", "user_id": "USR_001", "timestamp": "2024-09-15T14:23:00Z"}
)
# Form-encoded data (older APIs, OAuth token endpoints)
response = requests.post(
"https://api.example.com/oauth/token",
data={"grant_type": "client_credentials", "client_id": "...", "client_secret": "..."}
)
# File upload (multipart form data)
with open("report.pdf", "rb") as f:
response = requests.post(
"https://api.example.com/documents",
files={"file": ("report.pdf", f, "application/pdf")},
data={"document_type": "report", "year": "2024"}
)Caching: Avoid Redundant API Calls
For data that doesn’t change frequently — daily weather, monthly economic indicators, static reference data — caching API responses avoids unnecessary calls, reduces costs, and speeds up development.
Simple File-Based Cache
import json
import os
import hashlib
import time
from pathlib import Path
from typing import Optional
CACHE_DIR = Path(".api_cache")
CACHE_DIR.mkdir(exist_ok=True)
def cached_get(
session: requests.Session,
url: str,
params: dict = None,
ttl_seconds: int = 3600, # Cache valid for 1 hour by default
) -> dict:
"""
Make a GET request, returning cached result if available and fresh.
Parameters
----------
session : requests.Session
Requests session.
url : str
Request URL.
params : dict, optional
Query parameters.
ttl_seconds : int
Cache time-to-live in seconds. Default 3600 (1 hour).
Returns
-------
dict
Parsed JSON response (from cache or fresh API call).
"""
# Create a unique cache key from URL and params
cache_key = hashlib.md5(
f"{url}?{json.dumps(params, sort_keys=True)}".encode()
).hexdigest()
cache_file = CACHE_DIR / f"{cache_key}.json"
# Return cached result if fresh
if cache_file.exists():
age = time.time() - cache_file.stat().st_mtime
if age < ttl_seconds:
with open(cache_file) as f:
print(f"Cache hit (age: {age:.0f}s)")
return json.load(f)
# Fetch fresh data
response = session.get(url, params=params)
response.raise_for_status()
data = response.json()
# Save to cache
with open(cache_file, "w") as f:
json.dump(data, f)
print("Cache miss — fetched fresh data")
return data
# Usage: daily weather data cached for 6 hours
data = cached_get(
session,
"https://api.openweathermap.org/data/2.5/weather",
params={"q": "Austin", "appid": os.environ["OWM_KEY"]},
ttl_seconds=21600 # 6 hours
)requests-cache for Automatic Caching
The requests-cache library transparently caches all requests — no code changes needed after setup:
pip install requests-cacheimport requests_cache
# Install cache globally — all requests() calls now cached
requests_cache.install_cache(
"api_cache", # Cache file name
backend="sqlite", # Storage backend: sqlite, redis, memory
expire_after=3600 # 1 hour TTL
)
# From here, requests work exactly as before — but cached automatically
response = requests.get("https://api.example.com/data")
print(response.from_cache) # True if served from cache
# Clear cache when needed
requests_cache.clear()Building a Robust API Data Pipeline
Combining all the pieces — authentication, pagination, rate limiting, error handling, and caching — into a production-quality pipeline:
"""
api_pipeline.py
A production-ready pipeline that:
1. Authenticates with an API using bearer token
2. Fetches all records across pages with rate limiting
3. Retries on transient failures with exponential backoff
4. Caches responses to minimize redundant calls
5. Saves results to parquet for downstream analysis
"""
import os
import time
import json
import random
import logging
import hashlib
import requests
import pandas as pd
from pathlib import Path
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
class APIClient:
"""
Robust API client with authentication, pagination, rate limiting,
caching, and retry logic.
"""
def __init__(
self,
base_url: str,
api_key: str,
rate_limit_rps: float = 2.0,
cache_dir: str = ".api_cache",
cache_ttl: int = 3600
):
self.base_url = base_url.rstrip("/")
self.rate_limit_rps = rate_limit_rps
self.cache_dir = Path(cache_dir)
self.cache_ttl = cache_ttl
self.cache_dir.mkdir(exist_ok=True)
self._last_request_time = 0.0
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_key}",
"Accept": "application/json",
"User-Agent": "DataSciencePipeline/1.0"
})
def _throttle(self) -> None:
"""Enforce rate limit between requests."""
min_interval = 1.0 / self.rate_limit_rps
elapsed = time.time() - self._last_request_time
if elapsed < min_interval:
time.sleep(min_interval - elapsed)
self._last_request_time = time.time()
def _cache_key(self, url: str, params: dict) -> str:
return hashlib.md5(
f"{url}?{json.dumps(params or {}, sort_keys=True)}".encode()
).hexdigest()
def _get_cached(self, cache_key: str) -> dict | None:
cache_file = self.cache_dir / f"{cache_key}.json"
if cache_file.exists():
age = time.time() - cache_file.stat().st_mtime
if age < self.cache_ttl:
with open(cache_file) as f:
return json.load(f)
return None
def _save_cache(self, cache_key: str, data: dict) -> None:
cache_file = self.cache_dir / f"{cache_key}.json"
with open(cache_file, "w") as f:
json.dump(data, f)
def get(
self,
endpoint: str,
params: dict = None,
use_cache: bool = True,
max_retries: int = 4
) -> dict:
"""Make a single GET request with caching and retry logic."""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
cache_key = self._cache_key(url, params)
if use_cache:
cached = self._get_cached(cache_key)
if cached is not None:
logger.debug(f"Cache hit: {endpoint}")
return cached
for attempt in range(max_retries + 1):
self._throttle()
response = self.session.get(url, params=params)
if response.ok:
data = response.json()
if use_cache:
self._save_cache(cache_key, data)
return data
if response.status_code in (429, 500, 502, 503, 504):
if attempt == max_retries:
response.raise_for_status()
retry_after = float(response.headers.get("Retry-After", 0))
delay = retry_after or min(1.0 * (2 ** attempt), 60.0)
delay += random.uniform(0, delay * 0.1)
logger.warning(f"Status {response.status_code}. "
f"Retry {attempt + 1}/{max_retries} in {delay:.1f}s")
time.sleep(delay)
else:
response.raise_for_status()
def get_all_pages(
self,
endpoint: str,
params: dict = None,
records_key: str = "data",
page_param: str = "page",
page_size: int = 100
) -> list:
"""Fetch all pages of a paginated endpoint."""
all_records = []
page = 1
base_params = {**(params or {}), "per_page": page_size}
while True:
page_params = {**base_params, page_param: page}
data = self.get(endpoint, params=page_params, use_cache=False)
records = data.get(records_key, [])
if not records:
break
all_records.extend(records)
logger.info(f"{endpoint} page {page}: {len(records)} records "
f"(total: {len(all_records):,})")
if len(records) < page_size:
break
page += 1
logger.info(f"Fetched {len(all_records):,} total records from {endpoint}")
return all_records
def run_pipeline():
"""Run the complete data collection pipeline."""
client = APIClient(
base_url="https://api.example.com/v2",
api_key=os.environ["API_KEY"],
rate_limit_rps=2.0
)
# Fetch all data
customers = client.get_all_pages("customers", records_key="customers")
orders = client.get_all_pages("orders", records_key="orders",
params={"status": "completed"})
products = client.get("products/catalog", use_cache=True)
# Convert to DataFrames
df_customers = pd.DataFrame(customers)
df_orders = pd.DataFrame(orders)
df_products = pd.DataFrame(products["items"])
# Save outputs
output_dir = Path("data/raw/api_extract")
output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
df_customers.to_parquet(output_dir / f"customers_{timestamp}.parquet", index=False)
df_orders.to_parquet(output_dir / f"orders_{timestamp}.parquet", index=False)
df_products.to_parquet(output_dir / f"products_{timestamp}.parquet", index=False)
logger.info(f"Pipeline complete. Saved to {output_dir}")
return df_customers, df_orders, df_products
if __name__ == "__main__":
run_pipeline()Common Mistakes and Best Practices
| Mistake | Problem | Best Practice |
|---|---|---|
| Hardcoding API keys in code | Security breach, key exposed in Git | Use environment variables or secrets manager |
| Ignoring status codes | Parsing error responses as valid data | Always call raise_for_status() or check response.ok |
| No pagination handling | Only getting first page, missing most data | Always check for and follow pagination |
| No rate limit handling | 429 errors, temporary/permanent API ban | Throttle requests; implement exponential backoff |
| No caching | Redundant API calls during development | Cache responses that don’t change frequently |
| No timeout | Requests hang indefinitely on slow servers | Set timeout=30 on every request |
| String concatenation in params | Ugly URLs, encoding errors | Always use params={} dict, not manual URL building |
| Missing error handling | Pipeline crashes on first API error | Wrap in try/except, log errors, retry intelligently |
| Fetching all data when you need a sample | Slow development, unnecessary API usage | Use limit params for exploration; full fetch only for production |
| Not reading the API docs | Wasted time reinventing documented patterns | Read the auth, pagination, and rate limit docs first |
Always Set a Timeout
# Without timeout — can hang forever if server is slow or unresponsive
response = requests.get(url)
# With timeout — raises requests.Timeout after 30 seconds
response = requests.get(url, timeout=30)
# Separate connect and read timeouts
response = requests.get(url, timeout=(5, 30))
# (5 seconds to establish connection, 30 seconds for the server to send response)Summary
APIs are one of the richest data sources available to data scientists — providing real-time access to financial markets, weather data, social media signals, government statistics, e-commerce transactions, and virtually every domain of human activity. Mastering API access in Python means understanding the REST architecture, using the requests library competently, authenticating with API keys and OAuth tokens, navigating pagination patterns to retrieve complete datasets, implementing rate limiting and retry logic to keep pipelines reliable, and caching responses to minimize redundant calls.
The investment in building robust API client code pays dividends immediately: data pipelines that were breaking daily become reliable, rate limit bans become rare, and the development cycle of explore-then-scale works smoothly. The APIClient class pattern presented in this article — encapsulating throttling, caching, retries, and pagination into a reusable object — is the kind of professional infrastructure that separates production data pipelines from one-off notebook experiments.
Key Takeaways
- APIs communicate over HTTP using standard methods — GET (retrieve), POST (create/submit) — and return structured JSON responses; status codes signal success (2xx) or failure (4xx/5xx)
- The
requestslibrary is Python’s standard HTTP client —requests.get(url, params={}, headers={})is the basic pattern, always followed byresponse.raise_for_status()andresponse.json() - API authentication comes in three main forms: API keys (in headers or query params), Bearer tokens (OAuth 2.0 flow), and HTTP Basic Auth — credentials must always come from environment variables, never from hardcoded code
- Pagination is mandatory for complete data extraction — APIs implement it via page numbers, cursors/tokens, Link headers, or offsets; always loop until you’ve collected all pages
- Rate limiting must be respected: throttle requests to stay within limits, read
Retry-AfterandX-RateLimit-Remainingheaders, and implement exponential backoff retry logic for 429 and 5xx responses requests.Session()is more efficient than individualrequests.get()calls for multiple requests to the same API — it reuses connections and allows shared headers across all calls- Caching API responses (file-based or with
requests-cache) eliminates redundant calls during development and reduces costs — cache data that doesn’t change frequently using a reasonable TTL - Always set
timeouton every request to prevent pipelines from hanging indefinitely on unresponsive servers —timeout=(5, 30)sets separate connect and read timeouts








