Redis Cache
Rick provides Redis-based caching implementations with support for custom serialization, encryption, and full Redis
client access. The module offers two cache classes: RedisCache for standard caching and CryptRedisCache for
encrypted caching of sensitive data.
Location: rick.resource.redis
Overview
Redis caching in Rick provides:
- Standardized Interface - Implements
CacheInterfacefor consistency - Flexible Serialization - Support for pickle, JSON, MessagePack, or custom serializers
- Encryption Support - Built-in encrypted caching with Fernet256
- Key Prefixing - Namespace isolation for multiple applications or environments
- TTL Support - Automatic expiration of cached data
- Full Client Access - Direct access to underlying Redis client for advanced operations
- Backend Wrapping - Wrap existing Redis clients with cache interface
RedisCache
Basic Redis cache implementation with pickle serialization by default.
Location: rick.resource.redis.RedisCache
Constructor Parameters
Redis Connection Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
host |
str | 'localhost' | Redis server hostname |
port |
int | 6379 | Redis server port |
db |
int | 0 | Redis database number |
password |
str | None | Redis password |
socket_timeout |
float | None | Socket timeout in seconds |
socket_connect_timeout |
float | None | Connection timeout in seconds |
socket_keepalive |
bool | None | Enable TCP keepalive |
socket_keepalive_options |
dict | None | TCP keepalive options |
connection_pool |
ConnectionPool | None | Custom connection pool |
unix_socket_path |
str | None | Unix socket path |
encoding |
str | 'utf-8' | String encoding |
encoding_errors |
str | 'strict' | Encoding error handling |
decode_responses |
bool | False | Decode responses to strings |
retry_on_timeout |
bool | False | Retry on timeout |
ssl |
bool | False | Enable SSL |
ssl_keyfile |
str | None | SSL key file path |
ssl_certfile |
str | None | SSL certificate file path |
ssl_cert_reqs |
str | 'required' | SSL certificate requirements |
ssl_ca_certs |
str | None | SSL CA certificates path |
max_connections |
int | None | Maximum connections in pool |
single_connection_client |
bool | False | Use single connection |
health_check_interval |
int | 0 | Health check interval in seconds |
Cache-Specific Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
backend |
redis.Redis | None | Wrap existing Redis client |
serializer |
callable | pickle.dumps | Function to serialize values |
deserializer |
callable | pickle.loads | Function to deserialize values |
prefix |
str | "" | Key prefix for namespacing |
Methods
get(key)
Retrieve a value from the cache.
Parameters:
key(str) - Cache key
Returns: Cached value or None if not found
set(key, value, ttl=None)
Store a value in the cache.
Parameters:
key(str) - Cache keyvalue(any) - Value to cache (must be serializable)ttl(int, optional) - Time-to-live in seconds
Returns: True if successful
has(key)
Check if a key exists in the cache.
Parameters:
key(str) - Cache key
Returns: True if key exists, False otherwise
remove(key)
Remove a key from the cache.
Parameters:
key(str) - Cache key
Returns: True if key was removed, False if key didn't exist
purge()
Clear all keys from the current database.
Warning: This flushes the entire Redis database. Use with caution.
Returns: True if successful
client()
Get the underlying Redis client for advanced operations.
Returns: redis.Redis instance
close()
Close the Redis connection.
set_prefix(prefix)
Change the key prefix.
Parameters:
prefix(str) - New key prefix
CryptRedisCache
Encrypted Redis cache implementation that extends RedisCache with automatic encryption/decryption using Fernet256.
Location: rick.resource.redis.CryptRedisCache
Constructor Parameters
Required Parameter:
| Parameter | Type | Description |
|---|---|---|
key |
str | 64-character encryption key (will be base64 encoded) |
Additional Parameters: All parameters from RedisCache are supported.
Important: The encryption key must be exactly 64 characters long. The key is base64-encoded internally to create a Fernet256 key.
Generating Encryption Keys
import secrets
# Generate a secure 64-character key
encryption_key = secrets.token_hex(32) # Generates 64 hex characters
print(f"Key: {encryption_key}")
Methods
CryptRedisCache inherits all methods from RedisCache. Values are automatically encrypted on set() and decrypted on
get().
Basic Usage Examples
Simple Caching
from rick.resource.redis import RedisCache
# Create cache instance
cache = RedisCache(host='localhost', port=6379, db=0)
# Store value
cache.set('greeting', 'Hello, World!')
# Retrieve value
message = cache.get('greeting')
print(message) # Output: Hello, World!
# Check existence
if cache.has('greeting'):
print("Key exists")
# Remove key
cache.remove('greeting')
# Close connection
cache.close()
Caching with TTL
from rick.resource.redis import RedisCache
from time import sleep
cache = RedisCache(host='localhost')
# Cache with 60-second expiration
cache.set('temp_data', {'status': 'processing'}, ttl=60)
# Data is available
print(cache.has('temp_data')) # True
# Wait for expiration
sleep(61)
# Data is gone
print(cache.has('temp_data')) # False
Caching Complex Objects
from rick.resource.redis import RedisCache
from datetime import datetime
from decimal import Decimal
cache = RedisCache(host='localhost')
# Cache complex data structures
user_data = {
'id': 123,
'name': 'John Doe',
'balance': Decimal('1234.56'),
'created_at': datetime.now(),
'preferences': {
'theme': 'dark',
'notifications': True
}
}
cache.set('user:123', user_data)
retrieved = cache.get('user:123')
# All types are preserved with pickle
assert isinstance(retrieved['balance'], Decimal)
assert isinstance(retrieved['created_at'], datetime)
Using Key Prefixes
from rick.resource.redis import RedisCache
# Create caches with different prefixes
dev_cache = RedisCache(host='localhost', prefix='dev:')
prod_cache = RedisCache(host='localhost', prefix='prod:')
# Same key, different namespaces
dev_cache.set('config', {'debug': True})
prod_cache.set('config', {'debug': False})
# Keys don't conflict
print(dev_cache.get('config')) # {'debug': True}
print(prod_cache.get('config')) # {'debug': False}
Encrypted Caching
Basic Encrypted Cache
from rick.resource.redis import CryptRedisCache
# Create encrypted cache (key must be 64 characters)
cache = CryptRedisCache(
key='0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef',
host='localhost',
port=6379
)
# Store sensitive data (automatically encrypted)
cache.set('api_token', {'token': 'secret123', 'expires': '2025-12-31'})
# Retrieve (automatically decrypted)
token = cache.get('api_token')
print(token) # {'token': 'secret123', 'expires': '2025-12-31'}
cache.close()
Encrypted Session Storage
from rick.resource.redis import CryptRedisCache
from datetime import datetime
import secrets
# Generate encryption key
ENCRYPTION_KEY = secrets.token_hex(32)
# Create encrypted cache for sessions
session_cache = CryptRedisCache(
key=ENCRYPTION_KEY,
host='localhost',
prefix='session:'
)
# Store session data securely
session_id = secrets.token_urlsafe(32)
session_data = {
'user_id': 123,
'username': 'jdoe',
'roles': ['user', 'admin'],
'created_at': datetime.now(),
'csrf_token': secrets.token_hex(32)
}
# Store with 1-hour expiration
session_cache.set(session_id, session_data, ttl=3600)
# Retrieve session
session = session_cache.get(session_id)
# Data is decrypted automatically
print(session['username']) # 'jdoe'
Encrypted API Key Storage
from rick.resource.redis import CryptRedisCache
# Cache for encrypted API keys
api_cache = CryptRedisCache(
key='your-64-character-key-here-replace-with-actual-key-value-ok',
host='localhost',
db=1,
prefix='api:keys:'
)
# Store API key securely
api_cache.set('service:github', {
'api_key': 'ghp_xxxxxxxxxxxx',
'api_secret': 'secret_value',
'rate_limit': 5000
})
# Retrieve when needed
github_creds = api_cache.get('service:github')
Custom Serialization
Using MessagePack
from rick.resource.redis import RedisCache
from rick.serializer.msgpack import msgpack
from datetime import datetime
from decimal import Decimal
# Create cache with MessagePack serialization
cache = RedisCache(
host='localhost',
serializer=msgpack.packb,
deserializer=msgpack.unpackb
)
# MessagePack efficiently handles complex types
data = {
'timestamp': datetime.now(),
'amount': Decimal('999.99'),
'status': 'active'
}
cache.set('transaction:001', data)
retrieved = cache.get('transaction:001')
# Types are preserved
assert isinstance(retrieved['timestamp'], datetime)
assert isinstance(retrieved['amount'], Decimal)
Using JSON
from rick.resource.redis import RedisCache
from rick.serializer.json.json import ExtendedJsonEncoder
import json
from datetime import datetime
# JSON serializer for interoperability
def json_serializer(obj):
return json.dumps(obj, cls=ExtendedJsonEncoder).encode('utf-8')
def json_deserializer(data):
# Note: JSON deserialization doesn't restore types
return json.loads(data.decode('utf-8'))
cache = RedisCache(
host='localhost',
serializer=json_serializer,
deserializer=json_deserializer
)
# Store data (types will be converted to JSON-compatible formats)
cache.set('data', {
'timestamp': datetime.now(), # Becomes ISO string
'count': 42
})
retrieved = cache.get('data')
# Note: timestamp is now a string, not datetime
Custom Serializer
from rick.resource.redis import RedisCache
import json
class CustomSerializer:
@staticmethod
def serialize(obj):
# Add metadata
wrapped = {
'_type': type(obj).__name__,
'_data': obj
}
return json.dumps(wrapped).encode('utf-8')
@staticmethod
def deserialize(data):
wrapped = json.loads(data.decode('utf-8'))
return wrapped['_data']
cache = RedisCache(
host='localhost',
serializer=CustomSerializer.serialize,
deserializer=CustomSerializer.deserialize
)
cache.set('custom', {'value': 42})
Wrapping Existing Redis Client
Basic Wrapping
import redis
from rick.resource.redis import RedisCache
# Create your own Redis client with custom configuration
my_redis = redis.Redis(
host='localhost',
port=6379,
db=0,
max_connections=50,
socket_keepalive=True
)
# Wrap with RedisCache interface
cache = RedisCache(backend=my_redis)
# Use cache interface
cache.set('key', 'value')
# Still have access to raw client
cache.client().info()
With Connection Pool
import redis
from rick.resource.redis import RedisCache
# Create custom connection pool
pool = redis.ConnectionPool(
host='localhost',
port=6379,
max_connections=100,
socket_keepalive=True,
socket_connect_timeout=5
)
# Create Redis client with pool
redis_client = redis.Redis(connection_pool=pool)
# Wrap with cache interface
cache = RedisCache(backend=redis_client)
Sentinel Configuration
import redis
from redis.sentinel import Sentinel
from rick.resource.redis import RedisCache
# Setup Redis Sentinel
sentinel = Sentinel([
('sentinel1', 26379),
('sentinel2', 26379),
('sentinel3', 26379)
], socket_timeout=0.5)
# Get master client
master = sentinel.master_for('mymaster', socket_timeout=0.5)
# Wrap with RedisCache
cache = RedisCache(backend=master)
Advanced Usage
Pipeline Operations
from rick.resource.redis import RedisCache
cache = RedisCache(host='localhost')
# Use pipeline for bulk operations
client = cache.client()
pipe = client.pipeline()
# Queue multiple operations
for i in range(100):
pipe.set(f'key:{i}', f'value:{i}')
# Execute all at once
pipe.execute()
Pub/Sub with Cache
from rick.resource.redis import RedisCache
cache = RedisCache(host='localhost')
pubsub = cache.client().pubsub()
# Subscribe to channel
pubsub.subscribe('notifications')
# Publish message
cache.client().publish('notifications', 'New update available')
# Listen for messages
for message in pubsub.listen():
if message['type'] == 'message':
print(f"Received: {message['data']}")
Cache Statistics
from rick.resource.redis import RedisCache
cache = RedisCache(host='localhost', prefix='myapp:')
# Get Redis info
info = cache.client().info()
print(f"Used memory: {info['used_memory_human']}")
print(f"Connected clients: {info['connected_clients']}")
# Get key count for namespace
keys = cache.client().keys('myapp:*')
print(f"Keys in namespace: {len(keys)}")
Cache-Aside Pattern
from rick.resource.redis import RedisCache
cache = RedisCache(host='localhost')
def get_user(user_id):
"""Cache-aside pattern implementation"""
cache_key = f'user:{user_id}'
# Try cache first
user = cache.get(cache_key)
if user is None:
# Cache miss - load from database
user = database.load_user(user_id)
if user is not None:
# Store in cache for 1 hour
cache.set(cache_key, user, ttl=3600)
return user
def update_user(user_id, user_data):
"""Update user and invalidate cache"""
# Update database
database.update_user(user_id, user_data)
# Invalidate cache
cache.remove(f'user:{user_id}')
Write-Through Cache
from rick.resource.redis import RedisCache
cache = RedisCache(host='localhost')
def save_user(user):
"""Write-through cache pattern"""
# Write to database
database.save_user(user)
# Write to cache
cache.set(f"user:{user['id']}", user, ttl=3600)
def get_user(user_id):
"""Always try cache first"""
return cache.get(f'user:{user_id}')
Rate Limiting
from rick.resource.redis import RedisCache
from time import time
cache = RedisCache(host='localhost')
def rate_limit(user_id, max_requests=100, window=60):
"""Simple rate limiting with Redis"""
key = f'rate_limit:{user_id}:{int(time() // window)}'
# Get current count
current = cache.get(key)
if current is None:
# First request in window
cache.set(key, 1, ttl=window)
return True
if current >= max_requests:
# Rate limit exceeded
return False
# Increment counter
cache.client().incr(key)
return True
# Usage
if rate_limit('user:123', max_requests=10, window=60):
process_request()
else:
raise RateLimitExceeded()
Error Handling
Connection Errors
from rick.resource.redis import RedisCache
import redis
cache = RedisCache(host='localhost', port=6379)
try:
cache.set('key', 'value')
except redis.ConnectionError as e:
print(f"Cannot connect to Redis: {e}")
# Fallback to alternative storage or raise
except redis.TimeoutError as e:
print(f"Redis operation timed out: {e}")
# Retry or use fallback
Serialization Errors
from rick.resource.redis import RedisCache
import pickle
cache = RedisCache(host='localhost')
class UnserializableObject:
def __init__(self):
self.file_handle = open('/tmp/test', 'w') # Cannot pickle file handles
try:
cache.set('bad_data', UnserializableObject())
except (pickle.PicklingError, TypeError) as e:
print(f"Cannot serialize object: {e}")
Graceful Degradation
from rick.resource.redis import RedisCache
from rick.resource import CacheNull
import redis
def create_cache():
"""Create cache with fallback"""
try:
cache = RedisCache(host='localhost', port=6379, socket_connect_timeout=2)
# Test connection
cache.client().ping()
return cache
except (redis.ConnectionError, redis.TimeoutError):
# Fallback to null cache (no-op)
print("Warning: Redis unavailable, using null cache")
return CacheNull()
# Use cache (works even if Redis is down)
cache = create_cache()
cache.set('key', 'value') # No-op if Redis unavailable
Troubleshooting
Connection Issues
# Test Redis connection
import redis
try:
client = redis.Redis(host='localhost', port=6379, socket_connect_timeout=5)
client.ping()
print("Connection successful")
except redis.ConnectionError:
print("Cannot connect to Redis")
Memory Issues
# Check memory usage
cache = RedisCache(host='localhost')
info = cache.client().info('memory')
print(f"Used memory: {info['used_memory_human']}")
print(f"Peak memory: {info['used_memory_peak_human']}")
# Set maxmemory policy in redis.conf:
# maxmemory 256mb
# maxmemory-policy allkeys-lru
Key Expiration Issues
# Check TTL
cache = RedisCache(host='localhost')
ttl = cache.client().ttl('mykey')
if ttl == -1:
print("Key exists but has no expiration")
elif ttl == -2:
print("Key does not exist")
else:
print(f"Key expires in {ttl} seconds")
Related Topics
- Serializers - Efficient data serialization options
- Crypto - Encryption utilities used by CryptRedisCache
- Configuration - Loading Redis configuration from files