Redis Streams

Streams are an append-only log data structure introduced in Redis 5.0. They're designed for event sourcing, message queues with consumer groups, and real-time data ingestion—similar to Apache Kafka but simpler.

Key Characteristics

• Append-only (immutable entries) • Automatic ID generation with timestamps • Consumer groups for distributed processing • Message acknowledgment and pending entry list • Range queries and stream trimming

Adding Entries

# Add entry with auto-generated ID
XADD events * type "click" user_id "123" page "/home"
# Returns: "1673847321145-0" (timestamp-sequence)

# Add multiple fields
XADD orders *
    order_id "abc123"
    user_id "456"
    total "99.99"
    status "pending"

# Add with specific ID (not recommended usually)
XADD events 1673847321145-0 type "manual"

# Limit stream size while adding
XADD events MAXLEN 1000 * type "click"      # Exact limit
XADD events MAXLEN ~ 1000 * type "click"    # Approximate (faster)

Reading Entries

# Read all entries (oldest to newest)
XRANGE events - +

# Read with count limit
XRANGE events - + COUNT 10

# Read from specific ID
XRANGE events 1673847321145-0 +

# Read in reverse (newest to oldest)
XREVRANGE events + - COUNT 10

# Get stream length
XLEN events

# Get stream info
XINFO STREAM events

Blocking Reads

Block until new entries arrive—perfect for real-time consumers.

# Block for new entries (from now)
XREAD BLOCK 5000 STREAMS events $

# Block forever
XREAD BLOCK 0 STREAMS events $

# Read from beginning, then block
XREAD BLOCK 5000 STREAMS events 0

# Multiple streams
XREAD BLOCK 5000 STREAMS stream1 stream2 $ $

# With count limit
XREAD COUNT 10 BLOCK 5000 STREAMS events $

Consumer Groups

Distribute stream processing across multiple consumers with automatic load balancing.

# Create consumer group
XGROUP CREATE events mygroup $ MKSTREAM
# $ = start from new entries only
# 0 = start from beginning

# Create group starting from specific ID
XGROUP CREATE events mygroup 1673847321145-0

# Read as consumer in group
XREADGROUP GROUP mygroup consumer1 COUNT 10 STREAMS events >
# > = only undelivered messages

# Read pending messages (for reprocessing)
XREADGROUP GROUP mygroup consumer1 COUNT 10 STREAMS events 0

Message Acknowledgment

Mark messages as processed to prevent redelivery.

# Acknowledge single message
XACK events mygroup 1673847321145-0

# Acknowledge multiple messages
XACK events mygroup 1673847321145-0 1673847321146-0 1673847321147-0

# Check pending messages
XPENDING events mygroup                 # Summary
XPENDING events mygroup - + 10          # Detailed list
XPENDING events mygroup - + 10 consumer1  # Per consumer

Claiming Stale Messages

Reclaim messages from failed or slow consumers.

# Auto-claim messages idle for more than 60 seconds
XAUTOCLAIM events mygroup consumer2 60000 0-0 COUNT 10

# Manual claim specific messages
XCLAIM events mygroup consumer2 60000 1673847321145-0

# Force claim (ignore idle time)
XCLAIM events mygroup consumer2 0 1673847321145-0 FORCE

Stream Trimming

# Trim to exact length
XTRIM events MAXLEN 1000

# Trim approximately (faster)
XTRIM events MAXLEN ~ 1000

# Trim by minimum ID
XTRIM events MINID 1673847321145-0

# Delete specific entries
XDEL events 1673847321145-0 1673847321146-0

Use Case: Event Sourcing

def record_event(stream, event_type, data):
    entry = {"type": event_type, **data}
    return XADD(stream, "*", entry)

# Record order lifecycle
record_event("orders:events", "created", {
    "order_id": "123",
    "user": "alice",
    "total": "99.99"
})
record_event("orders:events", "paid", {
    "order_id": "123",
    "method": "card"
})
record_event("orders:events", "shipped", {
    "order_id": "123",
    "tracking": "XYZ123"
})

# Rebuild state from events
def get_order_state(order_id):
    events = XRANGE("orders:events", "-", "+")
    state = {}
    for entry_id, fields in events:
        if fields.get("order_id") == order_id:
            apply_event(state, fields)
    return state

Use Case: Distributed Worker Queue

# Setup
XGROUP CREATE jobs:queue workers $ MKSTREAM

# Producer: Add jobs
def submit_job(job_data):
    return XADD("jobs:queue", "*", job_data)

# Worker: Process jobs
def worker(worker_id):
    while True:
        # Read next unprocessed job
        result = XREADGROUP(
            GROUP="workers",
            CONSUMER=worker_id,
            COUNT=1,
            BLOCK=5000,
            STREAMS=["jobs:queue"],
            IDS=[">"]
        )

        if result:
            entry_id, fields = result[0]
            try:
                process_job(fields)
                XACK("jobs:queue", "workers", entry_id)
            except Exception as e:
                # Job will be redelivered to another worker
                log_error(e)

# Supervisor: Reclaim stuck jobs
def reclaim_stuck_jobs():
    XAUTOCLAIM("jobs:queue", "workers", "supervisor",
               300000,  # 5 minutes idle
               "0-0",
               COUNT=100)

Use Case: Real-time Activity Feed

# Add activity
def add_activity(user_id, activity_type, metadata):
    stream = f"activity:{user_id}"
    XADD(stream, "*", MAXLEN="~", 1000, {
        "type": activity_type,
        **metadata,
        "timestamp": str(time.time())
    })

# Get recent activity
def get_activity(user_id, count=50):
    stream = f"activity:{user_id}"
    return XREVRANGE(stream, "+", "-", COUNT=count)

# Real-time listener
def listen_for_activity(user_id):
    stream = f"activity:{user_id}"
    last_id = "$"  # Start from now

    while True:
        result = XREAD(BLOCK=0, STREAMS=[stream], IDS=[last_id])
        if result:
            for entry_id, fields in result:
                yield fields
                last_id = entry_id
Knowledge is power.