v0.71.1 - make SSE PEP compliant and improve implementation
What's Changed
- fix: make SSE PEP compliant and improve implementation by @sansyrox in https://github.com/sparckles/Robyn/pull/1202
"""
Server-Sent Events (SSE) Example for Robyn
This example demonstrates how to implement Server-Sent Events using Robyn. SSE allows real-time server-to-client
communication over a single HTTP connection.
"""
import asyncio
import time
from robyn import Robyn, SSEMessage, SSEResponse
app = Robyn(__file__)
@app.get("/events")
def stream_events(request):
"""Basic SSE endpoint that sends a message every second"""
def event_generator():
"""Generator function that yields SSE-formatted messages"""
for i in range(10):
yield SSEMessage(f"Message {i} - {time.strftime('%H:%M:%S')}", id=str(i))
time.sleep(1)
# Send a final message
yield SSEMessage("Stream ended", event="end")
return SSEResponse(event_generator())
@app.get("/events/json")
def stream_json_events(request):
"""SSE endpoint that sends JSON data"""
import json
def json_event_generator():
"""Generator that yields JSON data as SSE"""
for i in range(5):
data = {"id": i, "message": f"JSON message {i}", "timestamp": time.time(), "type": "notification"}
yield SSEMessage(json.dumps(data), event="notification", id=str(i))
time.sleep(2)
return SSEResponse(json_event_generator())
@app.get("/events/named")
def stream_named_events(request):
"""SSE endpoint with named events"""
def named_event_generator():
"""Generator that yields named SSE events"""
events = [
("user_joined", "Alice joined the chat"),
("message", "Hello everyone!"),
("user_left", "Bob left the chat"),
("message", "How is everyone doing?"),
("user_joined", "Charlie joined the chat"),
]
for i, (event_type, message) in enumerate(events):
yield SSEMessage(message, event=event_type, id=str(i))
time.sleep(1.5)
return SSEResponse(named_event_generator())
@app.get("/events/async")
async def stream_async_events(request):
"""Async SSE endpoint demonstrating async generators"""
async def async_event_generator():
"""Async generator for SSE events"""
for i in range(8):
# Simulate async work
await asyncio.sleep(0.5)
yield SSEMessage(f"Async message {i} - {time.strftime('%H:%M:%S')}", event="async", id=str(i))
return SSEResponse(async_event_generator())
@app.get("/events/heartbeat")
def stream_heartbeat(request):
"""SSE endpoint that sends heartbeat messages"""
def heartbeat_generator():
"""Generator that sends heartbeat pings"""
counter = 0
while counter < 20: # Send 20 heartbeats
yield SSE_Message(f"heartbeat {counter}", event="heartbeat", id=str(counter))
counter += 1
time.sleep(0.5)
yield SSEMessage("heartbeat ended", event="end")
return SSEResponse(heartbeat_generator())
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.71.0...v0.71.1