File I/O Operations: Reading and Writing Files in Python
File I/O Operations: Reading and Writing Files in Python
File input/output operations are fundamental to most real-world Python applications. Python provides powerful, flexible tools for reading, writing, and manipulating files of various formats while ensuring proper resource management.
Key Concept: Always use context managers (
with statements) for file operations to ensure proper resource cleanup and exception handling.Basic File Operations
Opening and Reading Files
 1# Basic file reading
 2with open('example.txt', 'r') as file:
 3    content = file.read()
 4    print(content)
 5
 6# Reading line by line
 7with open('data.txt', 'r') as file:
 8    for line in file:
 9        print(line.strip())  # strip() removes newline characters
10
11# Reading all lines into a list
12with open('data.txt', 'r') as file:
13    lines = file.readlines()
14    print(f"File has {len(lines)} lines")
15
16# Reading a specific number of characters
17with open('data.txt', 'r') as file:
18    first_100_chars = file.read(100)
19    print(first_100_chars)What to Notice:
- withstatement ensures file is closed automatically
- read()reads entire file content
- readline()reads one line at a time
- readlines()returns list of all lines
Writing to Files
 1# Writing text to a file
 2with open('output.txt', 'w') as file:
 3    file.write("Hello, World!\n")
 4    file.write("This is a new line.\n")
 5
 6# Writing multiple lines
 7lines = ["First line\n", "Second line\n", "Third line\n"]
 8with open('output.txt', 'w') as file:
 9    file.writelines(lines)
10
11# Appending to existing file
12with open('log.txt', 'a') as file:
13    file.write("New log entry\n")
14
15# Using print() to write to file
16with open('output.txt', 'w') as file:
17    print("Hello, World!", file=file)
18    print("Another line", file=file)What to Notice:
- 'w'mode overwrites existing files
- 'a'mode appends to existing files
- writelines()expects an iterable of strings
- print()can write directly to file objects
File Modes and Encoding
 1# Different file modes
 2modes = {
 3    'r': 'Read only (default)',
 4    'w': 'Write only (overwrites)',
 5    'a': 'Append only',
 6    'r+': 'Read and write',
 7    'w+': 'Write and read (overwrites)',
 8    'a+': 'Append and read',
 9    'rb': 'Read binary',
10    'wb': 'Write binary',
11    'ab': 'Append binary'
12}
13
14# Specifying encoding
15with open('unicode_file.txt', 'w', encoding='utf-8') as file:
16    file.write("Hello, 世界! 🌍\n")
17
18with open('unicode_file.txt', 'r', encoding='utf-8') as file:
19    content = file.read()
20    print(content)
21
22# Handling encoding errors
23with open('problematic_file.txt', 'r', encoding='utf-8', errors='ignore') as file:
24    content = file.read()  # Ignores characters that can't be decoded
25
26# Common encoding options
27encodings = ['utf-8', 'ascii', 'latin-1', 'cp1252']What to Notice:
- Always specify encoding for text files
- UTF-8 is recommended for most applications
- Error handling options: ‘strict’, ‘ignore’, ‘replace’
Advanced File Reading Techniques
Reading Large Files Efficiently
 1def read_large_file_chunks(filename, chunk_size=1024):
 2    """Read large file in chunks to manage memory"""
 3    with open(filename, 'r') as file:
 4        while True:
 5            chunk = file.read(chunk_size)
 6            if not chunk:
 7                break
 8            yield chunk
 9
10def process_large_file(filename):
11    """Process large file without loading entire content"""
12    line_count = 0
13    word_count = 0
14    
15    with open(filename, 'r') as file:
16        for line in file:  # File objects are iterators
17            line_count += 1
18            words = line.split()
19            word_count += len(words)
20    
21    return line_count, word_count
22
23# Memory efficient file processing
24def find_pattern_in_large_file(filename, pattern):
25    """Find pattern in large file without loading all into memory"""
26    matches = []
27    with open(filename, 'r') as file:
28        for line_num, line in enumerate(file, 1):
29            if pattern in line:
30                matches.append((line_num, line.strip()))
31    return matches
32
33# Example usage
34# lines, words = process_large_file('very_large_file.txt')
35# print(f"Lines: {lines}, Words: {words}")What to Notice:
- File objects are iterators - memory efficient
- Use generators for processing large files
- Line-by-line processing prevents memory overload
Reading Different File Formats
 1import json
 2import csv
 3from pathlib import Path
 4
 5# JSON file handling
 6def read_json_file(filename):
 7    """Read and parse JSON file"""
 8    with open(filename, 'r') as file:
 9        data = json.load(file)
10    return data
11
12def write_json_file(filename, data):
13    """Write data to JSON file with pretty formatting"""
14    with open(filename, 'w') as file:
15        json.dump(data, file, indent=2, ensure_ascii=False)
16
17# Example JSON operations
18sample_data = {
19    "users": [
20        {"id": 1, "name": "Alice", "active": True},
21        {"id": 2, "name": "Bob", "active": False}
22    ],
23    "metadata": {"version": "1.0", "created": "2024-01-01"}
24}
25
26# write_json_file('users.json', sample_data)
27# loaded_data = read_json_file('users.json')
28
29# CSV file handling
30def read_csv_file(filename):
31    """Read CSV file with proper handling"""
32    rows = []
33    with open(filename, 'r', newline='') as file:
34        csv_reader = csv.reader(file)
35        headers = next(csv_reader)  # First row as headers
36        for row in csv_reader:
37            row_dict = dict(zip(headers, row))
38            rows.append(row_dict)
39    return rows
40
41def write_csv_file(filename, data, headers):
42    """Write data to CSV file"""
43    with open(filename, 'w', newline='') as file:
44        writer = csv.writer(file)
45        writer.writerow(headers)  # Write headers
46        for row in data:
47            writer.writerow(row)
48
49# CSV with DictWriter (recommended for structured data)
50def write_csv_with_dictwriter(filename, data):
51    """Write list of dictionaries to CSV"""
52    if not data:
53        return
54    
55    with open(filename, 'w', newline='') as file:
56        headers = data[0].keys()
57        writer = csv.DictWriter(file, fieldnames=headers)
58        writer.writeheader()
59        writer.writerows(data)Binary File Operations
 1def read_binary_file(filename):
 2    """Read binary file (images, executables, etc.)"""
 3    with open(filename, 'rb') as file:
 4        data = file.read()
 5    return data
 6
 7def copy_binary_file(source, destination):
 8    """Copy binary file efficiently"""
 9    with open(source, 'rb') as src, open(destination, 'wb') as dst:
10        # Copy in chunks for large files
11        while True:
12            chunk = src.read(4096)  # 4KB chunks
13            if not chunk:
14                break
15            dst.write(chunk)
16
17def analyze_file_header(filename):
18    """Analyze file type by reading header bytes"""
19    file_signatures = {
20        b'\x89PNG\r\n\x1a\n': 'PNG Image',
21        b'\xff\xd8\xff': 'JPEG Image',
22        b'GIF87a': 'GIF Image',
23        b'GIF89a': 'GIF Image',
24        b'%PDF': 'PDF Document',
25        b'PK': 'ZIP Archive'
26    }
27    
28    with open(filename, 'rb') as file:
29        header = file.read(10)  # Read first 10 bytes
30    
31    for signature, file_type in file_signatures.items():
32        if header.startswith(signature):
33            return file_type
34    
35    return 'Unknown file type'
36
37# Working with file metadata
38import os
39import time
40
41def get_file_info(filename):
42    """Get comprehensive file information"""
43    try:
44        stat = os.stat(filename)
45        return {
46            'size': stat.st_size,
47            'created': time.ctime(stat.st_ctime),
48            'modified': time.ctime(stat.st_mtime),
49            'accessed': time.ctime(stat.st_atime),
50            'is_file': os.path.isfile(filename),
51            'is_directory': os.path.isdir(filename),
52            'exists': os.path.exists(filename)
53        }
54    except OSError as e:
55        return {'error': str(e)}Working with Paths and Directories
Using pathlib (Modern Approach)
 1from pathlib import Path
 2import os
 3
 4# Modern path handling with pathlib
 5def explore_directory_structure(directory_path):
 6    """Explore directory using pathlib"""
 7    path = Path(directory_path)
 8    
 9    if not path.exists():
10        return f"Directory {directory_path} does not exist"
11    
12    structure = {
13        'path': str(path.absolute()),
14        'is_directory': path.is_dir(),
15        'files': [],
16        'directories': [],
17        'total_size': 0
18    }
19    
20    if path.is_dir():
21        for item in path.iterdir():
22            if item.is_file():
23                structure['files'].append({
24                    'name': item.name,
25                    'size': item.stat().st_size,
26                    'extension': item.suffix
27                })
28                structure['total_size'] += item.stat().st_size
29            elif item.is_dir():
30                structure['directories'].append(item.name)
31    
32    return structure
33
34def find_files_by_extension(directory, extension):
35    """Find all files with specific extension"""
36    path = Path(directory)
37    return list(path.glob(f"**/*.{extension}"))  # Recursive search
38
39def safe_file_operations(source_path, destination_path):
40    """Demonstrate safe file operations with pathlib"""
41    source = Path(source_path)
42    destination = Path(destination_path)
43    
44    # Check if source exists
45    if not source.exists():
46        return f"Source {source_path} does not exist"
47    
48    # Create destination directory if it doesn't exist
49    destination.parent.mkdir(parents=True, exist_ok=True)
50    
51    # Copy file with conflict resolution
52    if destination.exists():
53        backup_name = destination.with_suffix(f'.backup{destination.suffix}')
54        destination.rename(backup_name)
55    
56    # Perform the copy
57    with source.open('rb') as src, destination.open('wb') as dst:
58        dst.write(src.read())
59    
60    return f"Successfully copied {source_path} to {destination_path}"
61
62# Working with file names and extensions
63def process_filename(filepath):
64    """Extract components from file path"""
65    path = Path(filepath)
66    return {
67        'full_path': str(path.absolute()),
68        'directory': str(path.parent),
69        'filename': path.name,
70        'stem': path.stem,  # filename without extension
71        'extension': path.suffix,
72        'all_extensions': path.suffixes  # for files like .tar.gz
73    }
74
75# Example usage
76# info = process_filename('/path/to/document.backup.pdf')
77# print(info)
78# {
79#     'full_path': '/absolute/path/to/document.backup.pdf',
80#     'directory': '/path/to',
81#     'filename': 'document.backup.pdf',
82#     'stem': 'document.backup',
83#     'extension': '.pdf',
84#     'all_extensions': ['.backup', '.pdf']
85# }Directory Operations
 1import shutil
 2import tempfile
 3from pathlib import Path
 4
 5def create_directory_structure(base_path, structure):
 6    """Create nested directory structure from dictionary"""
 7    base = Path(base_path)
 8    base.mkdir(parents=True, exist_ok=True)
 9    
10    for name, content in structure.items():
11        path = base / name
12        if isinstance(content, dict):
13            # It's a directory
14            create_directory_structure(path, content)
15        else:
16            # It's a file
17            path.write_text(content)
18
19# Example directory structure
20project_structure = {
21    'src': {
22        'main.py': 'print("Hello, World!")',
23        'utils': {
24            '__init__.py': '',
25            'helpers.py': 'def helper_function(): pass'
26        }
27    },
28    'tests': {
29        'test_main.py': 'import unittest'
30    },
31    'README.md': '# My Project\n\nThis is a sample project.'
32}
33
34def backup_directory(source_dir, backup_dir):
35    """Create backup of entire directory"""
36    source = Path(source_dir)
37    backup = Path(backup_dir)
38    
39    if source.exists() and source.is_dir():
40        if backup.exists():
41            shutil.rmtree(backup)  # Remove existing backup
42        shutil.copytree(source, backup)
43        return f"Backup created: {backup}"
44    else:
45        return f"Source directory {source_dir} does not exist"
46
47def cleanup_temporary_files(directory, pattern="*.tmp"):
48    """Clean up temporary files matching pattern"""
49    path = Path(directory)
50    deleted_files = []
51    
52    for temp_file in path.glob(pattern):
53        if temp_file.is_file():
54            temp_file.unlink()  # Delete file
55            deleted_files.append(str(temp_file))
56    
57    return deleted_files
58
59# Working with temporary files
60def work_with_temporary_files():
61    """Demonstrate temporary file operations"""
62    # Create temporary file
63    with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.txt') as temp_file:
64        temp_file.write("This is temporary data\n")
65        temp_filename = temp_file.name
66    
67    # Read from temporary file
68    with open(temp_filename, 'r') as file:
69        content = file.read()
70        print(f"Temporary file content: {content}")
71    
72    # Clean up
73    Path(temp_filename).unlink()
74    
75    # Create temporary directory
76    with tempfile.TemporaryDirectory() as temp_dir:
77        temp_path = Path(temp_dir)
78        
79        # Create files in temporary directory
80        (temp_path / 'file1.txt').write_text('Content 1')
81        (temp_path / 'file2.txt').write_text('Content 2')
82        
83        # List files in temporary directory
84        files = list(temp_path.glob('*.txt'))
85        print(f"Files in temp directory: {[f.name for f in files]}")
86        
87        # Directory automatically cleaned up when exiting with blockError Handling and Resource Management
Comprehensive Error Handling
 1import errno
 2import logging
 3
 4def robust_file_reader(filename):
 5    """Read file with comprehensive error handling"""
 6    try:
 7        with open(filename, 'r', encoding='utf-8') as file:
 8            return file.read()
 9    
10    except FileNotFoundError:
11        logging.error(f"File not found: {filename}")
12        return None
13    
14    except PermissionError:
15        logging.error(f"Permission denied: {filename}")
16        return None
17    
18    except UnicodeDecodeError as e:
19        logging.error(f"Encoding error in {filename}: {e}")
20        # Try with different encoding
21        try:
22            with open(filename, 'r', encoding='latin-1') as file:
23                return file.read()
24        except Exception:
25            return None
26    
27    except OSError as e:
28        if e.errno == errno.ENOSPC:
29            logging.error("No space left on device")
30        elif e.errno == errno.EACCES:
31            logging.error("Access denied")
32        else:
33            logging.error(f"OS error: {e}")
34        return None
35    
36    except Exception as e:
37        logging.error(f"Unexpected error reading {filename}: {e}")
38        return None
39
40def safe_file_writer(filename, content, backup=True):
41    """Write file safely with optional backup"""
42    filepath = Path(filename)
43    
44    # Create backup if file exists
45    if backup and filepath.exists():
46        backup_path = filepath.with_suffix(f'.backup{filepath.suffix}')
47        shutil.copy2(filepath, backup_path)
48    
49    # Write to temporary file first
50    temp_path = filepath.with_suffix('.tmp')
51    
52    try:
53        with open(temp_path, 'w', encoding='utf-8') as file:
54            file.write(content)
55            file.flush()  # Ensure data is written
56            os.fsync(file.fileno())  # Force OS to write to disk
57        
58        # Atomic rename (works on most systems)
59        temp_path.rename(filepath)
60        return True
61    
62    except Exception as e:
63        # Clean up temporary file on error
64        if temp_path.exists():
65            temp_path.unlink()
66        logging.error(f"Failed to write {filename}: {e}")
67        return False
68
69class FileManager:
70    """Context manager for handling multiple files"""
71    
72    def __init__(self, *filenames_and_modes):
73        self.files = []
74        self.filenames_and_modes = filenames_and_modes
75    
76    def __enter__(self):
77        try:
78            for filename, mode in self.filenames_and_modes:
79                file_obj = open(filename, mode)
80                self.files.append(file_obj)
81            return self.files
82        except Exception:
83            # Close any files that were opened before the error
84            self.__exit__(None, None, None)
85            raise
86    
87    def __exit__(self, exc_type, exc_val, exc_tb):
88        for file_obj in self.files:
89            try:
90                file_obj.close()
91            except Exception:
92                pass  # Don't let cleanup errors mask original exception
93
94# Usage example
95# with FileManager(('input.txt', 'r'), ('output.txt', 'w')) as (input_file, output_file):
96#     data = input_file.read()
97#     output_file.write(data.upper())File Locking and Concurrent Access
 1import fcntl  # Unix/Linux only
 2import time
 3from contextlib import contextmanager
 4
 5@contextmanager
 6def file_lock(filename, mode='r'):
 7    """Context manager for file locking (Unix/Linux)"""
 8    file_obj = open(filename, mode)
 9    try:
10        fcntl.flock(file_obj.fileno(), fcntl.LOCK_EX)  # Exclusive lock
11        yield file_obj
12    finally:
13        fcntl.flock(file_obj.fileno(), fcntl.LOCK_UN)  # Unlock
14        file_obj.close()
15
16def safe_append_to_log(filename, message):
17    """Safely append to log file with locking"""
18    timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
19    log_entry = f"[{timestamp}] {message}\n"
20    
21    try:
22        with file_lock(filename, 'a') as file:
23            file.write(log_entry)
24        return True
25    except Exception as e:
26        print(f"Failed to write to log: {e}")
27        return False
28
29# Cross-platform file locking alternative
30import portalocker  # Third-party library: pip install portalocker
31
32def cross_platform_file_lock(filename, content):
33    """Cross-platform file locking"""
34    try:
35        with open(filename, 'a') as file:
36            portalocker.lock(file, portalocker.LOCK_EX)
37            file.write(content)
38            portalocker.unlock(file)
39        return True
40    except Exception as e:
41        print(f"Failed to write with lock: {e}")
42        return FalsePerformance Optimization
Efficient File Processing Patterns
 1import mmap
 2import os
 3
 4def memory_mapped_file_search(filename, search_term):
 5    """Use memory mapping for efficient large file searching"""
 6    with open(filename, 'rb') as file:
 7        with mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ) as mmapped_file:
 8            # Memory mapped file can be searched like bytes
 9            index = mmapped_file.find(search_term.encode())
10            if index != -1:
11                # Find line containing the term
12                line_start = mmapped_file.rfind(b'\n', 0, index) + 1
13                line_end = mmapped_file.find(b'\n', index)
14                if line_end == -1:
15                    line_end = len(mmapped_file)
16                
17                line = mmapped_file[line_start:line_end].decode('utf-8', errors='ignore')
18                return index, line
19    
20    return None, None
21
22def buffered_file_copy(source, destination, buffer_size=64*1024):
23    """Efficient file copying with custom buffer size"""
24    with open(source, 'rb') as src, open(destination, 'wb') as dst:
25        while True:
26            buffer = src.read(buffer_size)
27            if not buffer:
28                break
29            dst.write(buffer)
30
31def batch_file_processor(filenames, process_function, batch_size=10):
32    """Process multiple files in batches"""
33    results = []
34    
35    for i in range(0, len(filenames), batch_size):
36        batch = filenames[i:i + batch_size]
37        batch_results = []
38        
39        for filename in batch:
40            try:
41                result = process_function(filename)
42                batch_results.append((filename, result))
43            except Exception as e:
44                batch_results.append((filename, f"Error: {e}"))
45        
46        results.extend(batch_results)
47        
48        # Optional: yield results for streaming processing
49        yield batch_results
50
51# Example: Count lines in multiple files
52def count_lines(filename):
53    """Count lines in a file"""
54    with open(filename, 'r') as file:
55        return sum(1 for line in file)
56
57# filenames = ['file1.txt', 'file2.txt', 'file3.txt']
58# for batch_results in batch_file_processor(filenames, count_lines):
59#     for filename, line_count in batch_results:
60#         print(f"{filename}: {line_count} lines")Monitoring File Operations
 1import time
 2from functools import wraps
 3
 4def monitor_file_operation(func):
 5    """Decorator to monitor file operation performance"""
 6    @wraps(func)
 7    def wrapper(*args, **kwargs):
 8        start_time = time.time()
 9        start_memory = get_memory_usage()
10        
11        try:
12            result = func(*args, **kwargs)
13            success = True
14        except Exception as e:
15            result = f"Error: {e}"
16            success = False
17        
18        end_time = time.time()
19        end_memory = get_memory_usage()
20        
21        print(f"Operation: {func.__name__}")
22        print(f"Duration: {end_time - start_time:.4f} seconds")
23        print(f"Memory change: {end_memory - start_memory:.2f} MB")
24        print(f"Success: {success}")
25        print("-" * 40)
26        
27        return result
28    
29    return wrapper
30
31def get_memory_usage():
32    """Get current memory usage (simplified)"""
33    try:
34        import psutil
35        process = psutil.Process()
36        return process.memory_info().rss / 1024 / 1024  # MB
37    except ImportError:
38        return 0  # psutil not available
39
40@monitor_file_operation
41def process_large_file(filename):
42    """Example function with monitoring"""
43    line_count = 0
44    with open(filename, 'r') as file:
45        for line in file:
46            line_count += 1
47    return line_countSummary
File I/O operations in Python provide powerful capabilities for:
- Reading and writing various file formats (text, binary, JSON, CSV)
- Efficient processing of large files using generators and streaming
- Safe resource management with context managers and proper error handling
- Path manipulation using modern pathlibapproach
- Performance optimization through memory mapping and buffering
Key Takeaways
- Always use context managers (withstatements) for file operations
- Specify encoding explicitly for text files (UTF-8 recommended)
- Handle errors gracefully with appropriate exception handling
- Use pathlib for modern path manipulation
- Process large files efficiently with generators and line-by-line reading
- Consider performance for large-scale file operations
Best Practices
- Use pathlib.Pathfor path operations
- Always specify file encoding
- Implement proper error handling
- Use appropriate file modes
- Consider memory usage for large files
- Implement atomic write operations for critical data
- Use file locking for concurrent access scenarios
← Previous: Generators & Iterators Next: Decorators & Context Managers →