Skip to main content

Real-Time Streaming & Conversation Persistence

Dual-mode streaming with SSE (for local development) and polling (for Databricks Apps), plus database-backed message persistence for conversation restoration.


Overview

The streaming system provides:

  • Dual-mode delivery – SSE for local dev, polling for Databricks Apps (60s proxy timeout)
  • Real-time updates – Tool calls, responses, and results appear as they happen
  • Message persistence – All conversation messages stored in database for history
  • Session restoration – Chat history rehydrated from database when resuming sessions
  • Navigation lock – UI prevents navigation during active generation

Architecture

SSE Mode (Local Development)

Frontend                         Backend SSE Endpoint              Agent Thread
│ │ │
│ POST /api/chat/stream │ │
│──────────────────────────────────►│ │
│ [Navigation disabled] │ Start agent in thread │
│ │──────────────────────────────►│
│ │ │
│ event: assistant │◄── StreamingCallbackHandler ──│
│◄──────────────────────────────────│ │ │
│ │ [Persist to DB] │
│ event: tool_call │◄──────────────────────────────│
│◄──────────────────────────────────│ │
│ event: tool_result │◄──────────────────────────────│
│◄──────────────────────────────────│ │
│ event: complete │◄── agent finished ────────────│
│◄──────────────────────────────────│ │
│ [Navigation re-enabled] │ │

Polling Mode (Databricks Apps)

Databricks Apps runs behind a reverse proxy with a 60-second connection timeout, which breaks SSE for long-running agent requests. Polling mode works around this:

Frontend                    Backend
│ │
│ POST /chat/async │
│──────────────────────────>│ -> Check session lock
│ { request_id } │ -> Create ChatRequest in DB
│<──────────────────────────│ -> Queue job (in-memory)
│ │ -> Return immediately
│ │
│ GET /poll/{request_id} │ Worker processes job:
│──────────────────────────>│ - StreamingCallbackHandler
│ { events, status } │ persists messages with request_id
│<──────────────────────────│
│ (repeat every 2s) │
│ │
│ status: complete │
│<──────────────────────────│

Key Components:

  • ChatRequest – Database model tracking request status (pending/running/completed/error)
  • Job Queue – In-memory asyncio queue with background worker (src/api/services/job_queue.py)
  • request_id – Links messages to specific chat requests for efficient polling
  • Auto-creation – Sessions are auto-created on first async request if they don't exist

SSE Event Types

Defined in src/api/schemas/streaming.py:

Event TypePurposePayload Fields
assistantLLM reasoning/responsecontent, message_id
tool_callTool invocation startedtool_name, tool_input, message_id
tool_resultTool returned resulttool_name, tool_output, message_id
errorError occurrederror, tool_name?
completeGeneration finishedslides, raw_html, replacement_info, metadata, experiment_url
session_titleAuto-generated session titlesession_title
session_createdNew session created on first messagesession_id
class StreamEvent(BaseModel):
type: StreamEventType
content: Optional[str] = None
tool_name: Optional[str] = None
tool_input: Optional[Dict[str, Any]] = None
tool_output: Optional[str] = None
slides: Optional[Dict[str, Any]] = None
error: Optional[str] = None
message_id: Optional[int] = None
raw_html: Optional[str] = None
replacement_info: Optional[Dict[str, Any]] = None
metadata: Optional[Dict[str, Any]] = None
experiment_url: Optional[str] = None # MLflow experiment URL
session_title: Optional[str] = None # Auto-generated session title
session_id: Optional[str] = None # Session ID (for session_created events)

def to_sse(self) -> str:
return f"event: {self.type.value}\ndata: {self.model_dump_json()}\n\n"

Backend Components

StreamingCallbackHandler (src/services/streaming_callback.py)

LangChain callback that intercepts agent events and:

  1. Emits SSE events to a queue for real-time streaming
  2. Persists messages to database for history
Callback MethodEvent EmittedPersisted As
on_agent_actionassistant (reasoning)message_type="reasoning"
on_tool_starttool_callmessage_type="tool_call"
on_tool_endtool_resultmessage_type="tool_result"
on_chain_errorerrorNot persisted
emit_completecompleteNot persisted (slides saved separately)
class StreamingCallbackHandler(BaseCallbackHandler):
def __init__(self, event_queue: queue.Queue, session_id: str, request_id: str = None):
self.event_queue = event_queue
self.session_id = session_id
self.request_id = request_id # Links messages to async requests

def on_agent_action(self, action: AgentAction, **kwargs):
# Extract LLM reasoning before tool call
reasoning = action.log.split("Invoking:")[0].strip()
if reasoning:
self.session_manager.add_message(..., request_id=self.request_id)
self.event_queue.put(StreamEvent(type=ASSISTANT, content=reasoning))

def on_tool_start(self, serialized, input_str, **kwargs):
# Parse tool input (handles JSON and Python dict strings)
tool_input = self._parse_tool_input(input_str)
self.session_manager.add_message(..., request_id=self.request_id)
self.event_queue.put(StreamEvent(type=TOOL_CALL, ...))

Streaming Agent Method (src/services/agent.py)

generate_slides_streaming() accepts a callback handler and passes it via the invoke() config:

def generate_slides_streaming(self, question, session_id, callback_handler, slide_context=None):
tools = self._create_tools_for_session(session_id)
agent_executor = self._create_agent_executor_with_callbacks(tools, [callback_handler])

result = agent_executor.invoke(
agent_input,
config={"callbacks": [callback_handler]}, # Required for real-time events
)

Streaming Chat Service (src/api/services/chat_service.py)

send_message_streaming() is a generator that:

  1. Persists user message to database first
  2. Hydrates agent chat history from database
  3. Yields SSE events as they arrive from the callback queue
  4. Processes final result and yields complete event
def send_message_streaming(
self,
session_id,
message,
slide_context=None,
request_id=None,
image_ids=None,
is_first_message_override=None,
):
# Get or create session in database (inline — no separate method)
try:
db_session = session_manager.get_session(session_id)
except SessionNotFoundError:
db_session = session_manager.create_session(session_id=session_id)

# Detect first message (async path uses is_first_message_override
# because the user message is already persisted before the job runs)
if is_first_message_override is not None:
is_first_message = is_first_message_override
else:
is_first_message = db_session.get("message_count", 0) == 0

# Persist user message FIRST (skip if async path already did it)
if not request_id:
session_manager.add_message(session_id, role="user", content=message)

# Build agent and hydrate chat history from DB
agent, session_data, experiment_url = self._build_agent_for_session(
session_id, db_session
)

# Create callback handler with queue
event_queue = queue.Queue()
callback_handler = StreamingCallbackHandler(event_queue, session_id)

# Run agent in thread, yield events as they arrive
def run_agent():
result = self.agent.generate_slides_streaming(...)
event_queue.put(None) # Signal completion

thread = threading.Thread(target=run_agent)
thread.start()

while True:
event = event_queue.get()
if event is None:
break
yield event

# Yield final complete event with slides
yield StreamEvent(type=COMPLETE, slides=slide_deck_dict, ...)

Key parameters:

  • request_id – Links messages to an async chat request. When set, user message persistence is skipped (already done by the async endpoint).
  • is_first_message_override – Overrides DB-based first-message detection. The async path pre-persists the user message before the job runs, so the DB count is already incremented; the override preserves the correct value.
  • image_ids – Optional list of image IDs; when present, image context is injected into the message before agent execution.

Chat History Hydration

When restoring a session, _hydrate_chat_history() loads messages from database into the agent's ChatMessageHistory:

def _hydrate_chat_history(self, session_id, chat_history):
db_messages = session_manager.get_messages(session_id)

for msg in db_messages:
if msg["role"] == "user":
chat_history.add_message(HumanMessage(content=msg["content"]))
elif msg["role"] == "assistant":
chat_history.add_message(AIMessage(content=msg["content"]))

return len(db_messages)

API Endpoints

Streaming Endpoint (SSE)

MethodPathPurpose
POST/api/chat/streamSSE streaming chat

Request: Same as /api/chat

{
"session_id": "abc123",
"message": "Create slides about...",
"slide_context": { ... }
}

Response: text/event-stream with headers:

Cache-Control: no-cache
Connection: keep-alive
X-Accel-Buffering: no

Polling Endpoints

MethodPathPurpose
POST/api/chat/asyncSubmit for async processing
GET/api/chat/poll/{request_id}Poll for status and events

Submit Request:

// POST /api/chat/async
{
"session_id": "abc123",
"message": "Create slides about...",
"slide_context": { ... }
}

// Response
{
"request_id": "xYz123...",
"status": "pending"
}

Poll Response:

// GET /api/chat/poll/{request_id}?after_message_id=0
{
"status": "running", // pending | running | completed | error
"events": [
{ "type": "assistant", "content": "I'll analyze...", "message_id": 42 },
{ "type": "tool_call", "tool_name": "query_genie", "tool_input": {...} }
],
"last_message_id": 45,
"result": null, // Populated when status=completed
"error": null // Populated when status=error
}

Updated Session Endpoint

GET /api/sessions/{id} now returns messages and slide deck for restoration:

{
"session_id": "abc123",
"title": "...",
"messages": [
{ "id": 1, "role": "user", "content": "...", "created_at": "..." },
{ "id": 2, "role": "assistant", "content": "...", "message_type": "reasoning" }
],
"slide_deck": { ... }
}

Frontend Components

GenerationContext (src/contexts/GenerationContext.tsx)

App-level state for tracking generation status:

interface GenerationContextType {
isGenerating: boolean;
setIsGenerating: (value: boolean) => void;
}

Used by ChatPanel to set state, consumed by AppLayout for navigation locking.

Unified Chat API (src/services/api.ts)

The frontend automatically detects the environment and uses the appropriate method:

// Auto-detect: uses SSE locally, polling on Databricks Apps
sendChatMessage(
sessionId: string,
message: string,
slideContext: SlideContext | undefined,
onEvent: (event: StreamEvent) => void,
onError: (error: Error) => void,
): () => void // Returns cancel function

Environment Detection:

const isPollingMode = (): boolean => {
// Explicit override via env var
if (import.meta.env.VITE_USE_POLLING === 'true') return true;

// Production mode always uses polling (Databricks Apps has proxy timeouts)
if (import.meta.env.MODE === 'production') return true;

// Auto-detect Databricks Apps (for dev builds deployed to Databricks)
const hostname = window.location.hostname;
return hostname.includes('.databricks.com') ||
hostname.includes('.azuredatabricks.net');
};

Key behavior: Production builds always use polling to avoid SSE timeout issues.

SSE Mode – Uses streamChat() with ReadableStream:

const reader = response.body.getReader();
while (true) {
const { done, value } = await reader.read();
// Parse SSE lines, extract event type and JSON data
const event = JSON.parse(data) as StreamEvent;
onEvent(event);
}

Polling Mode – Uses startPolling() with setInterval:

const { request_id } = await api.submitChatAsync(sessionId, message, slideContext);
let lastMessageId = 0;

const pollInterval = setInterval(async () => {
const response = await api.pollChat(request_id, lastMessageId);
for (const event of response.events) onEvent(event);
lastMessageId = response.last_message_id;

if (response.status === 'completed' || response.status === 'error') {
clearInterval(pollInterval);
// Emit final complete/error event
}
}, 2000);

ChatPanel Event Handling

const handleStreamEvent = (event: StreamEvent) => {
switch (event.type) {
case 'assistant':
setMessages(prev => [...prev, { role: 'assistant', content: event.content }]);
break;
case 'tool_call':
setMessages(prev => [...prev, {
role: 'assistant',
tool_call: { name: event.tool_name, arguments: event.tool_input }
}]);
break;
case 'tool_result':
setMessages(prev => [...prev, { role: 'tool', content: event.tool_output }]);
break;
case 'complete':
setIsGenerating(false);
if (event.slides) onSlidesGenerated(event.slides, event.raw_html);
break;
}
};

Session Message Loading

On session change, ChatPanel loads persisted messages:

useEffect(() => {
if (!sessionId) return;
api.getSession(sessionId).then(session => {
if (session.messages?.length > 0) {
setMessages(session.messages.map(msg => ({
role: msg.role,
content: msg.content,
tool_call: msg.metadata?.tool_name ? { ... } : undefined,
})));
}
}).catch(err => {
// 404 expected for new sessions - silently ignore
});
}, [sessionId]);

AppLayout disables navigation during generation:

const { isGenerating } = useGeneration();

<button
disabled={isGenerating}
className={isGenerating ? 'opacity-50 cursor-not-allowed' : ''}
>
History
</button>

{isGenerating && <span className="animate-pulse">Generating...</span>}

Disabled elements: History, Settings, Help, Save As, New, Profile Selector.


Message Display

Message Component (src/components/ChatPanel/Message.tsx)

Message TypeDisplay Style
User messageBlue background, right-aligned
Assistant reasoningWhite background, normal text
Tool callCollapsed accordion with query preview
Tool resultCollapsed accordion with output preview
HTML outputCollapsed accordion labeled "(HTML)"

Tool calls show the query directly when expanded:

if (message.tool_call) {
return renderCollapsibleContent(
`Tool call: ${message.tool_call.name}`,
queryPreview,
<div>Query: {toolArgs.query}</div>
);
}

Database Schema

session_messages Table

ColumnTypePurpose
idINTEGERPrimary key
session_idINTEGERFK to user_sessions
roleVARCHARuser, assistant, system
contentTEXTMessage content
message_typeVARCHARuser_input, user_query, reasoning, tool_call, tool_result, llm_response, clarification, info, error
metadata_jsonTEXTJSON with tool_name, tool_input
request_idVARCHAR(64)Links to async chat request (for polling)
created_atTIMESTAMPMessage timestamp

chat_requests Table (Polling Support)

ColumnTypePurpose
idINTEGERPrimary key
request_idVARCHAR(64)Unique request identifier
session_idINTEGERFK to user_sessions
statusVARCHAR(20)pending, running, completed, error
error_messageTEXTError details if status=error
result_jsonTEXTJSON with slides, raw_html, replacement_info
created_atTIMESTAMPRequest creation time
completed_atTIMESTAMPRequest completion time

Migration SQL: See scripts/migrate_polling_support.sql


Error Handling

ScenarioBehavior
Stream errorerror event emitted, isGenerating reset
Session lock conflict409 returned, stream not started
Network disconnectAbortController cancels stream
Tool errorerror event with tool name

Testing Checklist

Both Modes

  1. Navigation lock – Start generation, verify nav buttons disabled
  2. Real-time events – Tool calls appear before results
  3. Message persistence – Messages survive page reload
  4. Session restore – Load session from History, verify messages + slides
  5. Continue conversation – After restore, agent has context from history
  6. Error recovery – Errors re-enable navigation

Polling Mode (Databricks Apps)

  1. Async submission – POST /api/chat/async returns request_id
  2. Poll updates – Events arrive via polling every 2 seconds
  3. Completion detection – Polling stops when status=completed
  4. Error handling – Errors propagate correctly
  5. Environment detection – Polling used automatically on *.databricks.com

To force polling mode locally for testing: set VITE_USE_POLLING=true in frontend env.


Cross-References