Working with Different Data Sources: APIs, Databases, Files, and More
Here’s the reality of data engineering: your data never lives in just one nice, tidy place. It’s scattered across CSV files, JSON responses from APIs, database tables, Excel spreadsheets your marketing team “just needs to send you,” and probably a few places you haven’t even discovered yet.
Learning to wrangle data from different sources is like being a data detective. Each source has its own quirks, formats, and gotchas. But once you master the patterns, you’ll be able to pull data from anywhere and turn it into something useful.
The File-Based Data World
Let’s start with files - they’re everywhere, and they come in all shapes and sizes. Files are often the starting point for data pipelines because they’re how systems export data and humans share information.
CSV Files: The Workhorses of Data
CSV files are like the pickup trucks of data formats - not fancy, but they get the job done. You’ll work with them constantly, and pandas makes it surprisingly straightforward:
1import pandas as pd
2
3# Basic CSV reading
4df = pd.read_csv('customer_data.csv')
5
6# But real-world CSVs are messy, so you need options
7df = pd.read_csv('messy_export.csv',
8 sep=';', # European CSVs use semicolons
9 encoding='latin-1', # Different character encodings
10 skiprows=2, # Skip header rows
11 usecols=['date', 'amount'], # Only read specific columns
12 na_values=['N/A', ''], # Define what counts as missing
13 parse_dates=['date'], # Auto-convert date columns
14 dtype={'customer_id': str}) # Force data types
The key insight here is that read_csv()
has dozens of parameters because real-world CSV files are chaos. You’ll use sep
for different delimiters, encoding
for international characters, skiprows
for files with metadata at the top, and dtype
to prevent pandas from guessing wrong about your data types.
JSON Files: Structured but Complex
JSON is the language of web APIs and modern applications. It’s more structured than CSV but can be deeply nested:
1# Simple flat JSON
2df = pd.read_json('customer_orders.json')
3
4# For nested JSON, you need to flatten it
5import json
6
7with open('nested_api_response.json') as f:
8 data = json.load(f)
9
10# Flatten nested JSON into a DataFrame
11df = pd.json_normalize(data['customers'])
12
13# Handle deeply nested structures
14df = pd.json_normalize(data,
15 record_path=['orders', 'items'],
16 meta=['customer_id', 'order_date'])
pd.json_normalize()
is your secret weapon for nested JSON. It flattens complex structures into tabular data. The record_path
parameter tells it where to find the array of records, and meta
preserves higher-level information.
Excel Files: Because Business Loves Spreadsheets
Every company has Excel files, and they’re usually more complex than simple CSV exports:
1# Basic Excel reading
2df = pd.read_excel('quarterly_report.xlsx')
3
4# Multiple sheets and complex formatting
5df = pd.read_excel('complex_report.xlsx',
6 sheet_name='Sales Data', # Specific sheet
7 header=2, # Data starts at row 3
8 skipfooter=5, # Ignore last 5 rows
9 usecols='A:F') # Only columns A through F
10
11# Read multiple sheets at once
12excel_data = pd.read_excel('annual_report.xlsx', sheet_name=None)
13sales_df = excel_data['Sales']
14inventory_df = excel_data['Inventory']
Excel files often have formatting, multiple sheets, and metadata that makes them tricky. The sheet_name
, header
, and skipfooter
parameters help you extract just the data you need.
Working with APIs: Real-Time Data Sources
APIs are how systems talk to each other in real-time. As a data engineer, you’ll often need to pull data from APIs and convert it into DataFrames for processing:
1import requests
2import pandas as pd
3
4def fetch_api_data(endpoint, params=None):
5 """Fetch data from an API and convert to DataFrame"""
6 try:
7 response = requests.get(endpoint, params=params)
8 response.raise_for_status() # Raise an error for bad status codes
9
10 data = response.json()
11 return pd.json_normalize(data['results'])
12
13 except requests.exceptions.RequestException as e:
14 print(f"API request failed: {e}")
15 return pd.DataFrame() # Return empty DataFrame on error
16
17# Fetch customer data from an API
18customers_df = fetch_api_data('https://api.company.com/customers',
19 params={'limit': 1000, 'active': True})
The pattern here is: make the HTTP request, parse the JSON response, and use json_normalize()
to flatten it into a DataFrame. Always include error handling because APIs fail, rate limits kick in, and networks are unreliable.
Handling Paginated APIs
Most APIs limit how much data they return at once, so you need to paginate through results:
1def fetch_all_pages(base_url, params=None):
2 """Fetch all pages from a paginated API"""
3 all_data = []
4 page = 1
5
6 while True:
7 current_params = params.copy() if params else {}
8 current_params['page'] = page
9
10 response = requests.get(base_url, params=current_params)
11 data = response.json()
12
13 if not data['results']:
14 break # No more data
15
16 all_data.extend(data['results'])
17 page += 1
18
19 # Be nice to the API
20 time.sleep(0.1)
21
22 return pd.json_normalize(all_data)
23
24# Get all customer records, not just the first page
25all_customers = fetch_all_pages('https://api.company.com/customers')
This pattern handles pagination by looping through pages until there’s no more data. The time.sleep()
prevents you from overwhelming the API server.
Database Connections: Where the Real Data Lives
Databases are where companies store their important data. Pandas can connect to virtually any database system:
1import sqlalchemy
2
3# Create database connection
4engine = sqlalchemy.create_engine('sqlite:///company_data.db')
5
6# Read data with SQL
7customers_df = pd.read_sql('SELECT * FROM customers WHERE active = 1', engine)
8
9# More complex queries
10sales_summary = pd.read_sql("""
11 SELECT
12 DATE(order_date) as date,
13 COUNT(*) as order_count,
14 SUM(amount) as total_revenue
15 FROM orders
16 WHERE order_date >= DATE('now', '-30 days')
17 GROUP BY DATE(order_date)
18 ORDER BY date
19""", engine)
pd.read_sql()
lets you run SQL queries and get DataFrames back. This is incredibly powerful because you can do filtering and aggregation in the database before bringing data into Python.
Writing Data Back to Databases
Sometimes you need to save your processed data back to a database:
1# Save DataFrame to database table
2processed_data.to_sql('processed_sales',
3 engine,
4 if_exists='replace', # Replace existing table
5 index=False)
6
7# Append to existing table
8new_records.to_sql('daily_sales',
9 engine,
10 if_exists='append',
11 index=False)
to_sql()
is the mirror of read_sql()
. Use if_exists='replace'
to overwrite existing tables, or if_exists='append'
to add new records.
Combining Data from Multiple Sources
Real data engineering often means combining data from different sources. Here’s how to merge data from files, APIs, and databases:
1def build_customer_360_view():
2 """Combine customer data from multiple sources"""
3
4 # Load base customer data from database
5 engine = sqlalchemy.create_engine('sqlite:///crm.db')
6 customers = pd.read_sql('SELECT * FROM customers', engine)
7
8 # Get transaction history from CSV files
9 transactions = pd.read_csv('transaction_export.csv',
10 parse_dates=['transaction_date'])
11
12 # Fetch recent activity from API
13 api_data = fetch_api_data('https://api.analytics.com/customer-activity')
14
15 # Merge everything together
16 # Start with customers as the base
17 result = customers.copy()
18
19 # Add transaction summaries
20 transaction_summary = transactions.groupby('customer_id').agg({
21 'amount': ['sum', 'count', 'mean'],
22 'transaction_date': 'max'
23 }).round(2)
24
25 # Flatten column names after groupby
26 transaction_summary.columns = ['total_spent', 'transaction_count',
27 'avg_transaction', 'last_transaction']
28 transaction_summary = transaction_summary.reset_index()
29
30 result = result.merge(transaction_summary, on='customer_id', how='left')
31
32 # Add API activity data
33 if not api_data.empty:
34 result = result.merge(api_data[['customer_id', 'last_login', 'page_views']],
35 on='customer_id', how='left')
36
37 return result
38
39# Create comprehensive customer view
40customer_360 = build_customer_360_view()
This pattern is common: load data from different sources, aggregate or summarize as needed, then merge everything together using customer ID or another key field.
Handling Different File Formats in Batch
Sometimes you need to process dozens or hundreds of files at once. Here’s how to handle that systematically:
1import glob
2from pathlib import Path
3
4def process_multiple_files(pattern, processor_func):
5 """Process multiple files matching a pattern"""
6 files = glob.glob(pattern)
7 all_data = []
8
9 for file_path in files:
10 try:
11 # Extract metadata from filename
12 filename = Path(file_path).stem
13 date_str = filename.split('_')[-1] # Assume date at end
14
15 # Process the file
16 df = processor_func(file_path)
17 df['source_file'] = filename
18 df['file_date'] = pd.to_datetime(date_str, errors='coerce')
19
20 all_data.append(df)
21 print(f"Processed {file_path}: {len(df)} records")
22
23 except Exception as e:
24 print(f"Failed to process {file_path}: {e}")
25
26 return pd.concat(all_data, ignore_index=True)
27
28def process_sales_file(file_path):
29 """Process a single sales file"""
30 return pd.read_csv(file_path,
31 parse_dates=['order_date'],
32 dtype={'customer_id': str})
33
34# Process all sales files from the last month
35all_sales = process_multiple_files('data/sales_export_*.csv', process_sales_file)
This pattern uses glob
to find files matching a pattern, processes each one with a custom function, adds metadata about the source file, then concatenates everything into one big DataFrame.
Streaming Data: Processing Data as It Arrives
Sometimes you need to process data continuously as it arrives, rather than in batches:
1import time
2from pathlib import Path
3
4def monitor_and_process_files(watch_directory, output_directory, sleep_interval=60):
5 """Monitor a directory and process new files as they appear"""
6 processed_files = set()
7
8 while True:
9 # Check for new files
10 current_files = set(Path(watch_directory).glob('*.csv'))
11 new_files = current_files - processed_files
12
13 for file_path in new_files:
14 try:
15 # Process the new file
16 df = pd.read_csv(file_path)
17
18 # Add processing timestamp
19 df['processed_at'] = pd.Timestamp.now()
20
21 # Save processed data
22 output_path = Path(output_directory) / f"processed_{file_path.name}"
23 df.to_csv(output_path, index=False)
24
25 processed_files.add(file_path)
26 print(f"Processed {file_path}")
27
28 except Exception as e:
29 print(f"Failed to process {file_path}: {e}")
30
31 time.sleep(sleep_interval)
32
33# Monitor for new sales files every minute
34# monitor_and_process_files('incoming/', 'processed/', 60)
This creates a simple file watcher that processes new CSV files as they appear in a directory.
Best Practices for Multi-Source Pipelines
After years of wrestling with data from different sources, here are the patterns that actually work:
1. Standardize Early
1def standardize_date_columns(df):
2 """Convert date columns to consistent format"""
3 date_columns = ['created_date', 'updated_date', 'order_date']
4 for col in date_columns:
5 if col in df.columns:
6 df[col] = pd.to_datetime(df[col], errors='coerce')
7 return df
8
9def standardize_customer_ids(df):
10 """Ensure customer IDs are strings and properly formatted"""
11 if 'customer_id' in df.columns:
12 df['customer_id'] = df['customer_id'].astype(str).str.strip().str.upper()
13 return df
2. Build Robust Connectors
1class DataSourceConnector:
2 """Base class for data source connections"""
3
4 def __init__(self, source_config):
5 self.config = source_config
6 self.last_successful_run = None
7
8 def extract_data(self, **kwargs):
9 """Override in subclasses"""
10 raise NotImplementedError
11
12 def test_connection(self):
13 """Test if the data source is accessible"""
14 try:
15 sample_data = self.extract_data(limit=1)
16 return len(sample_data) >= 0
17 except Exception:
18 return False
19
20class CSVConnector(DataSourceConnector):
21 def extract_data(self, file_path, **kwargs):
22 return pd.read_csv(file_path, **kwargs)
23
24class APIConnector(DataSourceConnector):
25 def extract_data(self, endpoint, **kwargs):
26 response = requests.get(endpoint, params=kwargs)
27 response.raise_for_status()
28 return pd.json_normalize(response.json()['data'])
3. Handle Errors Gracefully
1def safe_data_extraction(source_func, fallback_value=None, max_retries=3):
2 """Wrapper to handle data extraction errors"""
3 for attempt in range(max_retries):
4 try:
5 return source_func()
6 except Exception as e:
7 print(f"Attempt {attempt + 1} failed: {e}")
8 if attempt == max_retries - 1:
9 print("All retries exhausted, using fallback")
10 return fallback_value if fallback_value is not None else pd.DataFrame()
11 time.sleep(2 ** attempt) # Exponential backoff
The Big Picture: Data Source Strategy
Here’s what I’ve learned about working with multiple data sources: the technical part (reading files, calling APIs) is usually the easy part. The hard part is understanding what the data means, how fresh it is, and how reliable it is.
Always ask these questions:
- How often does this data update? Real-time, hourly, daily?
- What happens when the source is unavailable? Do you have fallbacks?
- How do you know if the data quality is good? Are there validation checks?
- Who owns this data source? Who do you call when it breaks?
Building Your Data Integration Toolkit
As you work with more data sources, you’ll develop reusable patterns:
1# Configuration-driven data loading
2DATA_SOURCES = {
3 'customers': {
4 'type': 'database',
5 'connection': 'postgresql://user:pass@localhost/crm',
6 'query': 'SELECT * FROM customers WHERE active = true'
7 },
8 'transactions': {
9 'type': 'csv',
10 'path': 'data/transactions/*.csv',
11 'parse_dates': ['transaction_date']
12 },
13 'web_analytics': {
14 'type': 'api',
15 'endpoint': 'https://api.analytics.com/events',
16 'auth_token': 'your_token_here'
17 }
18}
19
20def load_all_sources():
21 """Load data from all configured sources"""
22 data = {}
23 for name, config in DATA_SOURCES.items():
24 try:
25 data[name] = load_data_source(config)
26 print(f"Loaded {name}: {len(data[name])} records")
27 except Exception as e:
28 print(f"Failed to load {name}: {e}")
29 data[name] = pd.DataFrame()
30 return data
The key is building systems that are configurable, testable, and resilient to failure.
Your Next Steps
Don’t try to master every data source at once. Start with the sources you encounter most often - probably CSV files and maybe a simple API. Get comfortable with the pandas functions that read and write data. Then gradually expand to databases, more complex APIs, and real-time streams.
Remember, every data source is just another input to your pipeline. The pandas operations you use to clean, transform, and combine the data are the same regardless of where it came from. Focus on mastering those fundamentals, and you’ll be ready for whatever data sources come your way.