File Operations
Rick provides file handling utilities built on top of the stream processing module. The file module offers enhanced file reading capabilities with metadata support, chunked reading, and multipart file handling.
Overview
The file module includes:
- FilePart - Alias for FileSlice, representing a file part
- FileReader - Enhanced multipart file reader with metadata and chunked reading
Why FileReader?
FileReader extends MultiPartReader with file-specific features:
- Store file metadata (name, content type, custom attributes)
- Read files in fixed-size chunks
- Handle multipart files with metadata
- Process uploaded files or split archives
- Manage file streams with additional context
FilePart
FilePart is an alias for FileSlice from the stream module, providing semantic clarity when working with file parts.
from rick.resource.file import FilePart
# Create a file part
part = FilePart('/path/to/file.dat')
# FilePart is identical to FileSlice
print(f"Part size: {part.size} bytes")
See FileSlice documentation for detailed usage.
FileReader
Enhanced file reader with metadata support and chunked reading capabilities.
Basic Usage
from rick.resource.file import FileReader, FilePart
# Create parts
parts = [
FilePart('/tmp/file1.dat'),
FilePart('/tmp/file2.dat'),
]
# Create file reader with metadata
reader = FileReader(
parts=parts,
name='document.pdf',
content_type='application/pdf'
)
# Access metadata
print(f"Filename: {reader.name}")
print(f"Content-Type: {reader.content_type}")
print(f"Total size: {reader.size} bytes")
# Read entire file as BytesIO
buffer = reader.read_block()
print(f"Read {buffer.getbuffer().nbytes} bytes")
# Read in fixed-size chunks
for chunk in reader.read_chunked(1024 * 1024): # 1MB chunks
process_chunk(chunk)
Constructor
FileReader(parts, name="", content_type="application/octet-stream", **kwargs)
Creates a new FileReader instance with metadata.
Parameters:
parts(list): List of FilePart or SliceReader objectsname(str): Filename (default: "")content_type(str): MIME type (default: "application/octet-stream")**kwargs: Custom properties to attach to the reader
Raises:
ValueError: If a custom property name conflicts with existing attributes
Attributes:
name: Filenamecontent_type: MIME typeparts: List of file partssize: Total size of all partsoffset: Current stream position- Custom attributes from kwargs
Example:
from rick.resource.file import FileReader, FilePart
parts = [FilePart('/data/video.mp4')]
reader = FileReader(
parts=parts,
name='vacation.mp4',
content_type='video/mp4',
upload_id='abc123',
user_id=42
)
print(f"File: {reader.name}")
print(f"Type: {reader.content_type}")
print(f"Upload ID: {reader.upload_id}")
print(f"User ID: {reader.user_id}")
Methods
read_block(offset=0, limit=-1) -> BytesIO
Read data as a single BytesIO buffer.
Parameters:
offset(int): Starting byte position (default: 0)limit(int): Number of bytes to read (default: -1 for all)
Returns:
BytesIO: Buffer containing the read data (position at end, use seek(0) to read)
Example:
reader = FileReader(parts=[FilePart('/data/file.bin')])
# Read entire file
buffer = reader.read_block()
buffer.seek(0) # Reset position to beginning
data = buffer.read()
# Read first 1KB
buffer = reader.read_block(offset=0, limit=1024)
buffer.seek(0)
header = buffer.read()
# Read 1KB starting at position 1000
buffer = reader.read_block(offset=1000, limit=1024)
buffer.seek(0)
chunk = buffer.read()
read_chunked(block_size: int) -> Iterator[BytesIO]
Read file in fixed-size chunks as BytesIO buffers.
Parameters:
block_size(int): Size of each chunk in bytes
Returns:
- Iterator yielding
BytesIO: Buffers of size block_size (last chunk may be smaller)
Example:
reader = FileReader(parts=[FilePart('/data/largefile.bin')])
# Process file in 1MB chunks
chunk_size = 1024 * 1024
for chunk in reader.read_chunked(chunk_size):
data = chunk.read()
process_data(data)
print(f"Processed {len(data)} bytes")
Inherited Methods
FileReader inherits all methods from MultiPartReader:
read(offset, length)- Read as iterator of bytesseek(offset, whence)- Seek to positionseekable()- Check if seekable (always True)
See MultiPartReader documentation for details.
Examples
Simple File Reading
from rick.resource.file import FileReader, FilePart
# Read a single file
parts = [FilePart('/tmp/document.pdf')]
reader = FileReader(parts=parts, name='document.pdf', content_type='application/pdf')
# Read entire file to disk
with open('/tmp/copy.pdf', 'wb') as output:
buffer = reader.read_block()
buffer.seek(0)
output.write(buffer.read())
print(f"Copied {reader.size} bytes")
Multipart File Reading
from rick.resource.file import FileReader, FilePart
# Read file split into multiple parts
parts = [
FilePart('/downloads/video.mp4.part1'),
FilePart('/downloads/video.mp4.part2'),
FilePart('/downloads/video.mp4.part3'),
]
reader = FileReader(
parts=parts,
name='video.mp4',
content_type='video/mp4'
)
# Combine parts into single file
with open('/videos/video.mp4', 'wb') as output:
for chunk in reader.read_chunked(1024 * 1024): # 1MB chunks
output.write(chunk.read())
print(f"Reassembled {reader.name}: {reader.size} bytes")
Custom Metadata
from rick.resource.file import FileReader, FilePart
parts = [FilePart('/uploads/photo.jpg')]
reader = FileReader(
parts=parts,
name='vacation.jpg',
content_type='image/jpeg',
# Custom attributes
upload_timestamp='2024-01-15 10:30:00',
user_id=123,
tags=['vacation', 'beach', 'sunset'],
metadata={'camera': 'Canon EOS R5', 'iso': 400}
)
# Access custom attributes
print(f"Uploaded: {reader.upload_timestamp}")
print(f"User: {reader.user_id}")
print(f"Tags: {', '.join(reader.tags)}")
print(f"Camera: {reader.metadata['camera']}")
File Upload Processing
from rick.resource.file import FileReader, FilePart
import hashlib
def process_upload(file_parts, filename, content_type):
"""Process uploaded file"""
reader = FileReader(
parts=file_parts,
name=filename,
content_type=content_type
)
# Validate file size
max_size = 10 * 1024 * 1024 # 10MB
if reader.size > max_size:
raise ValueError(f"File too large: {reader.size} bytes")
# Calculate hash while saving
hasher = hashlib.sha256()
output_path = f'/uploads/{filename}'
with open(output_path, 'wb') as output:
for chunk in reader.read_chunked(64 * 1024): # 64KB chunks
data = chunk.read()
hasher.update(data)
output.write(data)
return {
'filename': reader.name,
'size': reader.size,
'content_type': reader.content_type,
'sha256': hasher.hexdigest(),
'path': output_path
}
# Example usage
parts = [FilePart('/tmp/uploaded_file.dat')]
result = process_upload(parts, 'document.pdf', 'application/pdf')
print(f"Saved: {result['filename']} ({result['size']} bytes)")
print(f"SHA256: {result['sha256']}")
Chunked File Transfer
from rick.resource.file import FileReader, FilePart
def transfer_file(source_path, destination, chunk_size=1024 * 1024):
"""Transfer file in chunks with progress"""
parts = [FilePart(source_path)]
reader = FileReader(parts=parts)
total_bytes = reader.size
transferred = 0
with open(destination, 'wb') as output:
for chunk in reader.read_chunked(chunk_size):
data = chunk.read()
output.write(data)
transferred += len(data)
# Progress reporting
progress = (transferred / total_bytes) * 100
print(f"\rProgress: {progress:.1f}%", end='')
print(f"\nTransferred {transferred} bytes")
# Transfer with 2MB chunks
transfer_file('/data/largefile.bin', '/backup/largefile.bin', 2 * 1024 * 1024)
File Validation and Processing
from rick.resource.file import FileReader, FilePart
from rick.crypto import sha256_hash
def validate_and_process(file_path, expected_hash):
"""Validate file hash and process if valid"""
parts = [FilePart(file_path)]
reader = FileReader(
parts=parts,
name=file_path.split('/')[-1]
)
# Read entire file and verify hash
buffer = reader.read_block()
buffer.seek(0)
actual_hash = sha256_hash(buffer)
if actual_hash != expected_hash:
raise ValueError(
f"Hash mismatch: expected {expected_hash}, got {actual_hash}"
)
# Process file in chunks
buffer.seek(0) # Reset buffer position
reader.seek(0) # Reset reader position
results = []
for chunk in reader.read_chunked(1024 * 1024):
result = process_chunk(chunk)
results.append(result)
return results
def process_chunk(chunk):
"""Process a chunk of data"""
data = chunk.read()
# Your processing logic here
return len(data)
# Validate and process
results = validate_and_process(
'/downloads/data.bin',
'a1b2c3d4e5f6...' # Expected SHA-256 hash
)
print(f"Processed {len(results)} chunks")
Combining Multiple Files
from rick.resource.file import FileReader, FilePart
from pathlib import Path
def combine_logs(log_directory, output_file):
"""Combine multiple log files into one"""
# Find all log files
log_dir = Path(log_directory)
log_files = sorted(log_dir.glob('*.log'))
if not log_files:
raise ValueError("No log files found")
# Create parts
parts = [FilePart(str(f)) for f in log_files]
# Create reader
reader = FileReader(
parts=parts,
name=Path(output_file).name,
content_type='text/plain',
source_files=[str(f) for f in log_files],
file_count=len(log_files)
)
# Write combined file
with open(output_file, 'wb') as output:
for chunk in reader.read_chunked(1024 * 1024):
output.write(chunk.read())
print(f"Combined {reader.file_count} files:")
for f in reader.source_files:
print(f" - {f}")
print(f"Total size: {reader.size} bytes")
# Combine all logs
combine_logs('/var/log/myapp', '/tmp/combined.log')
Archive Extraction Simulation
from rick.resource.file import FileReader, FilePart
def process_split_archive(archive_parts, extract_to):
"""Process split archive files"""
# Archive split into multiple files
parts = [FilePart(part) for part in archive_parts]
reader = FileReader(
parts=parts,
name='archive.tar.gz',
content_type='application/x-tar',
part_count=len(parts)
)
print(f"Processing {reader.part_count}-part archive")
print(f"Total size: {reader.size} bytes")
# Save reassembled archive
archive_path = f"{extract_to}/{reader.name}"
with open(archive_path, 'wb') as output:
bytes_written = 0
for chunk in reader.read_chunked(512 * 1024): # 512KB chunks
data = chunk.read()
output.write(data)
bytes_written += len(data)
print(f"Reassembled archive: {archive_path}")
return archive_path
# Process split archive
parts = [
'/downloads/archive.tar.gz.001',
'/downloads/archive.tar.gz.002',
'/downloads/archive.tar.gz.003',
]
archive = process_split_archive(parts, '/tmp')
Streaming HTTP Upload
from rick.resource.file import FileReader, FilePart
def upload_to_server(file_path, url, chunk_size=1024 * 1024):
"""Upload file to server in chunks"""
parts = [FilePart(file_path)]
reader = FileReader(
parts=parts,
name=file_path.split('/')[-1],
content_type='application/octet-stream'
)
print(f"Uploading {reader.name} ({reader.size} bytes)")
uploaded = 0
for chunk in reader.read_chunked(chunk_size):
data = chunk.read()
# Send chunk to server
response = send_chunk_to_server(url, data)
if response.status_code != 200:
raise RuntimeError(f"Upload failed: {response.status_code}")
uploaded += len(data)
progress = (uploaded / reader.size) * 100
print(f"\rProgress: {progress:.1f}%", end='')
print("\nUpload complete")
def send_chunk_to_server(url, data):
"""Stub for sending data to server"""
# Your HTTP upload logic here
pass
# Upload file
upload_to_server('/data/video.mp4', 'https://example.com/upload')
Custom Properties for Processing Context
from rick.resource.file import FileReader, FilePart
from datetime import datetime
def process_with_context(file_path, **context):
"""Process file with additional context"""
parts = [FilePart(file_path)]
reader = FileReader(
parts=parts,
name=file_path.split('/')[-1],
content_type='application/octet-stream',
# Add context as custom properties
**context
)
print(f"Processing: {reader.name}")
print(f"User: {reader.user_name}")
print(f"Department: {reader.department}")
print(f"Priority: {reader.priority}")
# Process based on context
if reader.priority == 'high':
chunk_size = 2 * 1024 * 1024 # 2MB for high priority
else:
chunk_size = 512 * 1024 # 512KB for normal
for chunk in reader.read_chunked(chunk_size):
process_chunk_with_context(chunk, reader)
def process_chunk_with_context(chunk, reader):
"""Process chunk with reader context"""
data = chunk.read()
# Use reader.user_name, reader.department, etc.
print(f" Processed {len(data)} bytes for {reader.user_name}")
# Process with context
process_with_context(
'/uploads/report.pdf',
user_name='Alice',
department='Finance',
priority='high',
timestamp=datetime.now().isoformat()
)
Performance Considerations
Memory Efficiency
FileReader uses MultiPartReader internally, providing efficient streaming:
from rick.resource.file import FileReader, FilePart
# Even with a 10GB file, memory usage stays low
parts = [FilePart('/data/huge_10gb_file.bin')]
reader = FileReader(parts=parts)
# Process in 1MB chunks - only 1MB in memory at a time
with open('/backup/huge_10gb_file.bin', 'wb') as output:
for chunk in reader.read_chunked(1024 * 1024):
output.write(chunk.read())
# Memory usage remains constant
Chunk Size Selection
Choose appropriate chunk sizes based on your use case:
from rick.resource.file import FileReader, FilePart
parts = [FilePart('/data/file.bin')]
reader = FileReader(parts=parts)
# Small chunks (64KB) - low memory, more overhead
for chunk in reader.read_chunked(64 * 1024):
process(chunk)
# Medium chunks (1MB) - balanced
for chunk in reader.read_chunked(1024 * 1024):
process(chunk)
# Large chunks (10MB) - higher memory, less overhead
for chunk in reader.read_chunked(10 * 1024 * 1024):
process(chunk)
read_block() vs read_chunked()
Use read_block() for small files, read_chunked() for large files:
from rick.resource.file import FileReader, FilePart
parts = [FilePart('/data/file.bin')]
reader = FileReader(parts=parts)
# Small file - read_block() is fine
if reader.size < 10 * 1024 * 1024: # < 10MB
buffer = reader.read_block()
buffer.seek(0)
process_all(buffer.read())
# Large file - use read_chunked()
else:
for chunk in reader.read_chunked(1024 * 1024):
process_chunk(chunk.read())
Error Handling
from rick.resource.file import FileReader, FilePart
try:
# FilePart validates file exists
parts = [FilePart('/path/to/nonexistent.txt')]
except ValueError as e:
print(f"File error: {e}")
try:
parts = [FilePart('/tmp/file.txt')]
# Reserved names raise ValueError
reader = FileReader(
parts=parts,
name='test.txt',
size=100 # 'size' is reserved
)
except ValueError as e:
print(f"Property error: {e}")
try:
parts = [FilePart('/tmp/file.txt')]
reader = FileReader(parts=parts)
# Seek/read errors from MultiPartReader
reader.seek(-100) # Negative offset
except ValueError as e:
print(f"Seek error: {e}")
API Reference
FilePart
Alias for FileSlice. See FileSlice API.
FileReader
Inherits from MultiPartReader with additional features.
Constructor
| Parameter | Type | Default | Description |
|---|---|---|---|
parts |
list | required | List of FilePart/SliceReader objects |
name |
str | "" | Filename |
content_type |
str | "application/octet-stream" | MIME type |
**kwargs |
any | - | Custom properties to attach |
Methods
| Method | Returns | Description |
|---|---|---|
read_block(offset, limit) |
BytesIO | Read as single buffer |
read_chunked(block_size) |
Iterator[BytesIO] | Read in fixed-size chunks |
read(offset, length) |
Iterator[bytes] | Read as iterator (inherited) |
seek(offset, whence) |
int | Seek to position (inherited) |
seekable() |
bool | Check if seekable (inherited) |
Attributes
| Attribute | Type | Description |
|---|---|---|
name |
str | Filename |
content_type |
str | MIME type |
parts |
list | List of file parts |
size |
int | Total size in bytes |
offset |
int | Current stream position |
opened |
bool | Whether stream has been read |
| Custom attrs | any | User-defined properties from kwargs |
See Also
- Stream Processing - Underlying multipart stream functionality
- Resources Overview - Overview of all resource utilities
- Redis Cache - Redis caching with serialization
- Configuration - Configuration management