Working with Different Data Sources: APIs, Databases, Files, and More

Working with Different Data Sources: APIs, Databases, Files, and More

Here’s the reality of data engineering: your data never lives in just one nice, tidy place. It’s scattered across CSV files, JSON responses from APIs, database tables, Excel spreadsheets your marketing team “just needs to send you,” and probably a few places you haven’t even discovered yet.

Learning to wrangle data from different sources is like being a data detective. Each source has its own quirks, formats, and gotchas. But once you master the patterns, you’ll be able to pull data from anywhere and turn it into something useful.

The File-Based Data World

Let’s start with files - they’re everywhere, and they come in all shapes and sizes. Files are often the starting point for data pipelines because they’re how systems export data and humans share information.

CSV Files: The Workhorses of Data

CSV files are like the pickup trucks of data formats - not fancy, but they get the job done. You’ll work with them constantly, and pandas makes it surprisingly straightforward:

 1import pandas as pd
 2
 3# Basic CSV reading
 4df = pd.read_csv('customer_data.csv')
 5
 6# But real-world CSVs are messy, so you need options
 7df = pd.read_csv('messy_export.csv',
 8                 sep=';',                    # European CSVs use semicolons
 9                 encoding='latin-1',         # Different character encodings
10                 skiprows=2,                 # Skip header rows
11                 usecols=['date', 'amount'], # Only read specific columns
12                 na_values=['N/A', ''],      # Define what counts as missing
13                 parse_dates=['date'],       # Auto-convert date columns
14                 dtype={'customer_id': str}) # Force data types

The key insight here is that read_csv() has dozens of parameters because real-world CSV files are chaos. You’ll use sep for different delimiters, encoding for international characters, skiprows for files with metadata at the top, and dtype to prevent pandas from guessing wrong about your data types.

JSON Files: Structured but Complex

JSON is the language of web APIs and modern applications. It’s more structured than CSV but can be deeply nested:

 1# Simple flat JSON
 2df = pd.read_json('customer_orders.json')
 3
 4# For nested JSON, you need to flatten it
 5import json
 6
 7with open('nested_api_response.json') as f:
 8    data = json.load(f)
 9
10# Flatten nested JSON into a DataFrame
11df = pd.json_normalize(data['customers'])
12
13# Handle deeply nested structures
14df = pd.json_normalize(data, 
15                      record_path=['orders', 'items'],
16                      meta=['customer_id', 'order_date'])

pd.json_normalize() is your secret weapon for nested JSON. It flattens complex structures into tabular data. The record_path parameter tells it where to find the array of records, and meta preserves higher-level information.

Excel Files: Because Business Loves Spreadsheets

Every company has Excel files, and they’re usually more complex than simple CSV exports:

 1# Basic Excel reading
 2df = pd.read_excel('quarterly_report.xlsx')
 3
 4# Multiple sheets and complex formatting
 5df = pd.read_excel('complex_report.xlsx',
 6                   sheet_name='Sales Data',     # Specific sheet
 7                   header=2,                    # Data starts at row 3
 8                   skipfooter=5,               # Ignore last 5 rows
 9                   usecols='A:F')              # Only columns A through F
10
11# Read multiple sheets at once
12excel_data = pd.read_excel('annual_report.xlsx', sheet_name=None)
13sales_df = excel_data['Sales']
14inventory_df = excel_data['Inventory']

Excel files often have formatting, multiple sheets, and metadata that makes them tricky. The sheet_name, header, and skipfooter parameters help you extract just the data you need.

Working with APIs: Real-Time Data Sources

APIs are how systems talk to each other in real-time. As a data engineer, you’ll often need to pull data from APIs and convert it into DataFrames for processing:

 1import requests
 2import pandas as pd
 3
 4def fetch_api_data(endpoint, params=None):
 5    """Fetch data from an API and convert to DataFrame"""
 6    try:
 7        response = requests.get(endpoint, params=params)
 8        response.raise_for_status()  # Raise an error for bad status codes
 9        
10        data = response.json()
11        return pd.json_normalize(data['results'])
12    
13    except requests.exceptions.RequestException as e:
14        print(f"API request failed: {e}")
15        return pd.DataFrame()  # Return empty DataFrame on error
16
17# Fetch customer data from an API
18customers_df = fetch_api_data('https://api.company.com/customers', 
19                             params={'limit': 1000, 'active': True})

The pattern here is: make the HTTP request, parse the JSON response, and use json_normalize() to flatten it into a DataFrame. Always include error handling because APIs fail, rate limits kick in, and networks are unreliable.

Handling Paginated APIs

Most APIs limit how much data they return at once, so you need to paginate through results:

 1def fetch_all_pages(base_url, params=None):
 2    """Fetch all pages from a paginated API"""
 3    all_data = []
 4    page = 1
 5    
 6    while True:
 7        current_params = params.copy() if params else {}
 8        current_params['page'] = page
 9        
10        response = requests.get(base_url, params=current_params)
11        data = response.json()
12        
13        if not data['results']:
14            break  # No more data
15            
16        all_data.extend(data['results'])
17        page += 1
18        
19        # Be nice to the API
20        time.sleep(0.1)
21    
22    return pd.json_normalize(all_data)
23
24# Get all customer records, not just the first page
25all_customers = fetch_all_pages('https://api.company.com/customers')

This pattern handles pagination by looping through pages until there’s no more data. The time.sleep() prevents you from overwhelming the API server.

Database Connections: Where the Real Data Lives

Databases are where companies store their important data. Pandas can connect to virtually any database system:

 1import sqlalchemy
 2
 3# Create database connection
 4engine = sqlalchemy.create_engine('sqlite:///company_data.db')
 5
 6# Read data with SQL
 7customers_df = pd.read_sql('SELECT * FROM customers WHERE active = 1', engine)
 8
 9# More complex queries
10sales_summary = pd.read_sql("""
11    SELECT 
12        DATE(order_date) as date,
13        COUNT(*) as order_count,
14        SUM(amount) as total_revenue
15    FROM orders 
16    WHERE order_date >= DATE('now', '-30 days')
17    GROUP BY DATE(order_date)
18    ORDER BY date
19""", engine)

pd.read_sql() lets you run SQL queries and get DataFrames back. This is incredibly powerful because you can do filtering and aggregation in the database before bringing data into Python.

Writing Data Back to Databases

Sometimes you need to save your processed data back to a database:

 1# Save DataFrame to database table
 2processed_data.to_sql('processed_sales', 
 3                     engine, 
 4                     if_exists='replace',  # Replace existing table
 5                     index=False)
 6
 7# Append to existing table
 8new_records.to_sql('daily_sales', 
 9                   engine, 
10                   if_exists='append', 
11                   index=False)

to_sql() is the mirror of read_sql(). Use if_exists='replace' to overwrite existing tables, or if_exists='append' to add new records.

Combining Data from Multiple Sources

Real data engineering often means combining data from different sources. Here’s how to merge data from files, APIs, and databases:

 1def build_customer_360_view():
 2    """Combine customer data from multiple sources"""
 3    
 4    # Load base customer data from database
 5    engine = sqlalchemy.create_engine('sqlite:///crm.db')
 6    customers = pd.read_sql('SELECT * FROM customers', engine)
 7    
 8    # Get transaction history from CSV files
 9    transactions = pd.read_csv('transaction_export.csv', 
10                              parse_dates=['transaction_date'])
11    
12    # Fetch recent activity from API
13    api_data = fetch_api_data('https://api.analytics.com/customer-activity')
14    
15    # Merge everything together
16    # Start with customers as the base
17    result = customers.copy()
18    
19    # Add transaction summaries
20    transaction_summary = transactions.groupby('customer_id').agg({
21        'amount': ['sum', 'count', 'mean'],
22        'transaction_date': 'max'
23    }).round(2)
24    
25    # Flatten column names after groupby
26    transaction_summary.columns = ['total_spent', 'transaction_count', 
27                                  'avg_transaction', 'last_transaction']
28    transaction_summary = transaction_summary.reset_index()
29    
30    result = result.merge(transaction_summary, on='customer_id', how='left')
31    
32    # Add API activity data
33    if not api_data.empty:
34        result = result.merge(api_data[['customer_id', 'last_login', 'page_views']], 
35                            on='customer_id', how='left')
36    
37    return result
38
39# Create comprehensive customer view
40customer_360 = build_customer_360_view()

This pattern is common: load data from different sources, aggregate or summarize as needed, then merge everything together using customer ID or another key field.

Handling Different File Formats in Batch

Sometimes you need to process dozens or hundreds of files at once. Here’s how to handle that systematically:

 1import glob
 2from pathlib import Path
 3
 4def process_multiple_files(pattern, processor_func):
 5    """Process multiple files matching a pattern"""
 6    files = glob.glob(pattern)
 7    all_data = []
 8    
 9    for file_path in files:
10        try:
11            # Extract metadata from filename
12            filename = Path(file_path).stem
13            date_str = filename.split('_')[-1]  # Assume date at end
14            
15            # Process the file
16            df = processor_func(file_path)
17            df['source_file'] = filename
18            df['file_date'] = pd.to_datetime(date_str, errors='coerce')
19            
20            all_data.append(df)
21            print(f"Processed {file_path}: {len(df)} records")
22            
23        except Exception as e:
24            print(f"Failed to process {file_path}: {e}")
25    
26    return pd.concat(all_data, ignore_index=True)
27
28def process_sales_file(file_path):
29    """Process a single sales file"""
30    return pd.read_csv(file_path, 
31                      parse_dates=['order_date'],
32                      dtype={'customer_id': str})
33
34# Process all sales files from the last month
35all_sales = process_multiple_files('data/sales_export_*.csv', process_sales_file)

This pattern uses glob to find files matching a pattern, processes each one with a custom function, adds metadata about the source file, then concatenates everything into one big DataFrame.

Streaming Data: Processing Data as It Arrives

Sometimes you need to process data continuously as it arrives, rather than in batches:

 1import time
 2from pathlib import Path
 3
 4def monitor_and_process_files(watch_directory, output_directory, sleep_interval=60):
 5    """Monitor a directory and process new files as they appear"""
 6    processed_files = set()
 7    
 8    while True:
 9        # Check for new files
10        current_files = set(Path(watch_directory).glob('*.csv'))
11        new_files = current_files - processed_files
12        
13        for file_path in new_files:
14            try:
15                # Process the new file
16                df = pd.read_csv(file_path)
17                
18                # Add processing timestamp
19                df['processed_at'] = pd.Timestamp.now()
20                
21                # Save processed data
22                output_path = Path(output_directory) / f"processed_{file_path.name}"
23                df.to_csv(output_path, index=False)
24                
25                processed_files.add(file_path)
26                print(f"Processed {file_path}")
27                
28            except Exception as e:
29                print(f"Failed to process {file_path}: {e}")
30        
31        time.sleep(sleep_interval)
32
33# Monitor for new sales files every minute
34# monitor_and_process_files('incoming/', 'processed/', 60)

This creates a simple file watcher that processes new CSV files as they appear in a directory.

Best Practices for Multi-Source Pipelines

After years of wrestling with data from different sources, here are the patterns that actually work:

1. Standardize Early

 1def standardize_date_columns(df):
 2    """Convert date columns to consistent format"""
 3    date_columns = ['created_date', 'updated_date', 'order_date']
 4    for col in date_columns:
 5        if col in df.columns:
 6            df[col] = pd.to_datetime(df[col], errors='coerce')
 7    return df
 8
 9def standardize_customer_ids(df):
10    """Ensure customer IDs are strings and properly formatted"""
11    if 'customer_id' in df.columns:
12        df['customer_id'] = df['customer_id'].astype(str).str.strip().str.upper()
13    return df

2. Build Robust Connectors

 1class DataSourceConnector:
 2    """Base class for data source connections"""
 3    
 4    def __init__(self, source_config):
 5        self.config = source_config
 6        self.last_successful_run = None
 7    
 8    def extract_data(self, **kwargs):
 9        """Override in subclasses"""
10        raise NotImplementedError
11    
12    def test_connection(self):
13        """Test if the data source is accessible"""
14        try:
15            sample_data = self.extract_data(limit=1)
16            return len(sample_data) >= 0
17        except Exception:
18            return False
19
20class CSVConnector(DataSourceConnector):
21    def extract_data(self, file_path, **kwargs):
22        return pd.read_csv(file_path, **kwargs)
23
24class APIConnector(DataSourceConnector):
25    def extract_data(self, endpoint, **kwargs):
26        response = requests.get(endpoint, params=kwargs)
27        response.raise_for_status()
28        return pd.json_normalize(response.json()['data'])

3. Handle Errors Gracefully

 1def safe_data_extraction(source_func, fallback_value=None, max_retries=3):
 2    """Wrapper to handle data extraction errors"""
 3    for attempt in range(max_retries):
 4        try:
 5            return source_func()
 6        except Exception as e:
 7            print(f"Attempt {attempt + 1} failed: {e}")
 8            if attempt == max_retries - 1:
 9                print("All retries exhausted, using fallback")
10                return fallback_value if fallback_value is not None else pd.DataFrame()
11            time.sleep(2 ** attempt)  # Exponential backoff

The Big Picture: Data Source Strategy

Here’s what I’ve learned about working with multiple data sources: the technical part (reading files, calling APIs) is usually the easy part. The hard part is understanding what the data means, how fresh it is, and how reliable it is.

Always ask these questions:

  • How often does this data update? Real-time, hourly, daily?
  • What happens when the source is unavailable? Do you have fallbacks?
  • How do you know if the data quality is good? Are there validation checks?
  • Who owns this data source? Who do you call when it breaks?

Building Your Data Integration Toolkit

As you work with more data sources, you’ll develop reusable patterns:

 1# Configuration-driven data loading
 2DATA_SOURCES = {
 3    'customers': {
 4        'type': 'database',
 5        'connection': 'postgresql://user:pass@localhost/crm',
 6        'query': 'SELECT * FROM customers WHERE active = true'
 7    },
 8    'transactions': {
 9        'type': 'csv',
10        'path': 'data/transactions/*.csv',
11        'parse_dates': ['transaction_date']
12    },
13    'web_analytics': {
14        'type': 'api',
15        'endpoint': 'https://api.analytics.com/events',
16        'auth_token': 'your_token_here'
17    }
18}
19
20def load_all_sources():
21    """Load data from all configured sources"""
22    data = {}
23    for name, config in DATA_SOURCES.items():
24        try:
25            data[name] = load_data_source(config)
26            print(f"Loaded {name}: {len(data[name])} records")
27        except Exception as e:
28            print(f"Failed to load {name}: {e}")
29            data[name] = pd.DataFrame()
30    return data

The key is building systems that are configurable, testable, and resilient to failure.

Your Next Steps

Don’t try to master every data source at once. Start with the sources you encounter most often - probably CSV files and maybe a simple API. Get comfortable with the pandas functions that read and write data. Then gradually expand to databases, more complex APIs, and real-time streams.

Remember, every data source is just another input to your pipeline. The pandas operations you use to clean, transform, and combine the data are the same regardless of where it came from. Focus on mastering those fundamentals, and you’ll be ready for whatever data sources come your way.