Redis Streams
Streams are an append-only log data structure introduced in Redis 5.0. They're designed for event sourcing, message queues with consumer groups, and real-time data ingestion—similar to Apache Kafka but simpler.
Key Characteristics
• Append-only (immutable entries) • Automatic ID generation with timestamps • Consumer groups for distributed processing • Message acknowledgment and pending entry list • Range queries and stream trimming
Adding Entries
# Add entry with auto-generated ID
XADD events * type "click" user_id "123" page "/home"
# Returns: "1673847321145-0" (timestamp-sequence)
# Add multiple fields
XADD orders *
order_id "abc123"
user_id "456"
total "99.99"
status "pending"
# Add with specific ID (not recommended usually)
XADD events 1673847321145-0 type "manual"
# Limit stream size while adding
XADD events MAXLEN 1000 * type "click" # Exact limit
XADD events MAXLEN ~ 1000 * type "click" # Approximate (faster)Reading Entries
# Read all entries (oldest to newest)
XRANGE events - +
# Read with count limit
XRANGE events - + COUNT 10
# Read from specific ID
XRANGE events 1673847321145-0 +
# Read in reverse (newest to oldest)
XREVRANGE events + - COUNT 10
# Get stream length
XLEN events
# Get stream info
XINFO STREAM eventsBlocking Reads
Block until new entries arrive—perfect for real-time consumers.
# Block for new entries (from now)
XREAD BLOCK 5000 STREAMS events $
# Block forever
XREAD BLOCK 0 STREAMS events $
# Read from beginning, then block
XREAD BLOCK 5000 STREAMS events 0
# Multiple streams
XREAD BLOCK 5000 STREAMS stream1 stream2 $ $
# With count limit
XREAD COUNT 10 BLOCK 5000 STREAMS events $Consumer Groups
Distribute stream processing across multiple consumers with automatic load balancing.
# Create consumer group
XGROUP CREATE events mygroup $ MKSTREAM
# $ = start from new entries only
# 0 = start from beginning
# Create group starting from specific ID
XGROUP CREATE events mygroup 1673847321145-0
# Read as consumer in group
XREADGROUP GROUP mygroup consumer1 COUNT 10 STREAMS events >
# > = only undelivered messages
# Read pending messages (for reprocessing)
XREADGROUP GROUP mygroup consumer1 COUNT 10 STREAMS events 0Message Acknowledgment
Mark messages as processed to prevent redelivery.
# Acknowledge single message
XACK events mygroup 1673847321145-0
# Acknowledge multiple messages
XACK events mygroup 1673847321145-0 1673847321146-0 1673847321147-0
# Check pending messages
XPENDING events mygroup # Summary
XPENDING events mygroup - + 10 # Detailed list
XPENDING events mygroup - + 10 consumer1 # Per consumerClaiming Stale Messages
Reclaim messages from failed or slow consumers.
# Auto-claim messages idle for more than 60 seconds
XAUTOCLAIM events mygroup consumer2 60000 0-0 COUNT 10
# Manual claim specific messages
XCLAIM events mygroup consumer2 60000 1673847321145-0
# Force claim (ignore idle time)
XCLAIM events mygroup consumer2 0 1673847321145-0 FORCEStream Trimming
# Trim to exact length
XTRIM events MAXLEN 1000
# Trim approximately (faster)
XTRIM events MAXLEN ~ 1000
# Trim by minimum ID
XTRIM events MINID 1673847321145-0
# Delete specific entries
XDEL events 1673847321145-0 1673847321146-0Use Case: Event Sourcing
def record_event(stream, event_type, data):
entry = {"type": event_type, **data}
return XADD(stream, "*", entry)
# Record order lifecycle
record_event("orders:events", "created", {
"order_id": "123",
"user": "alice",
"total": "99.99"
})
record_event("orders:events", "paid", {
"order_id": "123",
"method": "card"
})
record_event("orders:events", "shipped", {
"order_id": "123",
"tracking": "XYZ123"
})
# Rebuild state from events
def get_order_state(order_id):
events = XRANGE("orders:events", "-", "+")
state = {}
for entry_id, fields in events:
if fields.get("order_id") == order_id:
apply_event(state, fields)
return stateUse Case: Distributed Worker Queue
# Setup
XGROUP CREATE jobs:queue workers $ MKSTREAM
# Producer: Add jobs
def submit_job(job_data):
return XADD("jobs:queue", "*", job_data)
# Worker: Process jobs
def worker(worker_id):
while True:
# Read next unprocessed job
result = XREADGROUP(
GROUP="workers",
CONSUMER=worker_id,
COUNT=1,
BLOCK=5000,
STREAMS=["jobs:queue"],
IDS=[">"]
)
if result:
entry_id, fields = result[0]
try:
process_job(fields)
XACK("jobs:queue", "workers", entry_id)
except Exception as e:
# Job will be redelivered to another worker
log_error(e)
# Supervisor: Reclaim stuck jobs
def reclaim_stuck_jobs():
XAUTOCLAIM("jobs:queue", "workers", "supervisor",
300000, # 5 minutes idle
"0-0",
COUNT=100)Use Case: Real-time Activity Feed
# Add activity
def add_activity(user_id, activity_type, metadata):
stream = f"activity:{user_id}"
XADD(stream, "*", MAXLEN="~", 1000, {
"type": activity_type,
**metadata,
"timestamp": str(time.time())
})
# Get recent activity
def get_activity(user_id, count=50):
stream = f"activity:{user_id}"
return XREVRANGE(stream, "+", "-", COUNT=count)
# Real-time listener
def listen_for_activity(user_id):
stream = f"activity:{user_id}"
last_id = "$" # Start from now
while True:
result = XREAD(BLOCK=0, STREAMS=[stream], IDS=[last_id])
if result:
for entry_id, fields in result:
yield fields
last_id = entry_idKnowledge is power.