Skip to main content

Multi-User Concurrency

How the backend handles concurrent requests from multiple users without blocking or race conditions.


Overview

The AI Slide Generator supports multiple simultaneous users through:

  • Session-scoped state – each user operates on their own session with isolated slide decks
  • Async endpoint handlers – FastAPI endpoints use asyncio.to_thread() for blocking LLM calls
  • Database-based locking – prevents concurrent mutations to the same session
  • Per-request tool binding – eliminates shared mutable state in the LangChain agent
  • Per-user session isolation – sessions are scoped to the authenticated user via the created_by column

Per-User Session Isolation

Sessions are isolated per user using the created_by column. Authentication middleware extracts the username from the Databricks token in production (or from DEV_USER_ID in local development) and stores it in a ContextVar. The GET /api/sessions endpoint automatically filters by created_by = current_user, so each user sees only their own sessions. The POST /api/sessions endpoint sets created_by server-side from the authenticated user identity; clients do not pass or override this value.


Architecture

┌─────────────────────────────────────────────────────────────────────────────┐
│ uvicorn (4 workers) │
├─────────────────────────────────────────────────────────────────────────────┤
│ FastAPI async handlers │
│ │ │
│ ▼ │
│ asyncio.to_thread() ──► ChatService (singleton per worker) │
│ │ │
│ ├── _cache_lock (threading.Lock) │
│ └── _deck_cache: Dict[session_id, SlideDeck] │
│ │ │
│ ▼ │
│ SlideGeneratorAgent │
│ └── _create_tools_for_session(session_id) │
│ └── Closure-bound genie wrapper │
├─────────────────────────────────────────────────────────────────────────────┤
│ PostgreSQL / Lakebase │
│ └── user_sessions (is_processing, processing_started_at) │
└─────────────────────────────────────────────────────────────────────────────┘

Key Mechanisms

1. Async Request Handling

All FastAPI endpoints are async def. Blocking operations (LLM calls, database queries) are wrapped with asyncio.to_thread() so the event loop remains responsive:

# src/api/routes/chat.py
@router.post("/chat")
async def send_message(request: ChatRequest):
result = await asyncio.to_thread(
chat_service.send_message,
request.session_id,
request.message,
...
)

This allows one worker to handle multiple in-flight requests concurrently.

2. Database-Based Session Locking

Mutation endpoints (chat, reorder, update, duplicate, delete) acquire a session lock before proceeding:

# src/api/services/session_manager.py
def acquire_session_lock(self, session_id: str, timeout_seconds: int = 300) -> bool:
with get_db_session() as db:
# SELECT ... FOR UPDATE — blocks concurrent callers until this
# transaction commits, providing row-level locking in PostgreSQL.
session = (
db.query(UserSession)
.filter(UserSession.session_id == session_id)
.with_for_update(nowait=False)
.first()
)

if not session:
return True # Auto-creation path

if session.is_processing:
if session.processing_started_at:
age = (datetime.utcnow() - session.processing_started_at).total_seconds()
if age < timeout_seconds:
return False # Busy
# Stale lock — proceed to acquire

session.is_processing = True
session.processing_started_at = datetime.utcnow()
db.commit()
return True

Note: The .with_for_update(nowait=False) call issues a PostgreSQL SELECT ... FOR UPDATE, which takes a row-level lock for the duration of the transaction. This prevents two workers from reading the same row simultaneously and both concluding the session is free. If the database does not support FOR UPDATE (e.g., during tests), the code falls back to a plain query.

Lock lifecycle:

  1. acquire_session_lock() called at endpoint start
  2. If locked → return HTTP 409 Conflict
  3. If available → set is_processing=True, proceed
  4. release_session_lock() called in finally block

Columns added to user_sessions:

ColumnTypePurpose
is_processingBOOLEAN NOT NULL DEFAULT FALSELock flag
processing_started_atTIMESTAMPStale lock detection (>5 min)

Optimistic Locking (Version Conflict Detection)

In addition to the pessimistic session lock above, ChatService uses optimistic locking when saving slide decks. Before invoking the LLM, the service snapshots the current deck version. After the LLM completes, it passes expected_version to the session manager's save method. If another request modified the deck in the meantime, VersionConflictError is raised and the chat result is discarded rather than silently overwriting the concurrent edit:

# src/api/services/chat_service.py  (simplified)
_deck_version_before_llm = self._get_deck_version(session_id)

try:
session_manager.save_slide_deck(..., expected_version=_deck_version_before_llm)
except VersionConflictError:
logger.warning("Chat save rejected: deck was edited during LLM call, reloading")
self._invalidate_deck_cache(session_id)

VersionConflictError is defined in src/api/services/session_manager.py and carries current_version and expected_version for diagnostics.

5. Async Chat Requests (Polling Mode)

For polling-based streaming, async requests are tracked in the chat_requests table:

ColumnTypePurpose
request_idVARCHAR(64)Unique request identifier
session_idINTEGER FKLinks to user_sessions
statusVARCHAR(20)pending/running/completed/error
result_jsonTEXTFinal result (slides, raw_html)
created_atTIMESTAMPRequest creation time
completed_atTIMESTAMPRequest completion time

Messages are linked to requests via session_messages.request_id for efficient polling.

3. Per-Request Tool Binding

The LangChain agent previously used a shared current_session_id instance variable—a race condition when multiple requests ran in parallel. Now tools are created per-request with the session ID bound via closure:

# src/services/agent.py
def generate_slides(self, question: str, session_id: str, ...):
# Create tools with session_id bound via closure
tools = self._create_tools_for_session(session_id)
agent_executor = self._create_agent_executor(tools)

result = agent_executor.invoke(agent_input)

The Genie wrapper inside _create_tools_for_session() captures the session dict reference at creation time, eliminating any shared mutable state.

4. Thread-Safe Deck Cache

ChatService maintains an in-memory slide deck cache keyed by session ID. All cache access is protected by _cache_lock:

# src/api/services/chat_service.py
class ChatService:
def __init__(self):
self._cache_lock = threading.Lock()
self._deck_cache: Dict[str, SlideDeck] = {}

def _get_or_load_deck(self, session_id: str) -> Optional[SlideDeck]:
with self._cache_lock:
if session_id in self._deck_cache:
return self._deck_cache[session_id]
# Load from database (outside lock)
...

Endpoints Requiring Session ID

All slide manipulation and chat endpoints require session_id:

EndpointMethodSession ID Location
/api/chatPOSTRequest body (session_id)
/api/slidesGETQuery param (?session_id=...)
/api/slides/reorderPUTRequest body (session_id)
/api/slides/{index}PATCHRequest body (session_id)
/api/slides/{index}/duplicatePOSTRequest body (session_id)
/api/slides/{index}DELETEQuery param (?session_id=...)

Session endpoints (/api/sessions/*) manage session lifecycle and do not require locking.


Error Responses

StatusMeaningWhen
409 ConflictSession is busyAnother request is processing the same session
404 Not FoundSession doesn't existInvalid session ID (for operations that require existing session)

Frontend should handle 409 by showing a "please wait" message or retrying after delay.


Production Deployment

Multiple Workers

run.py starts uvicorn with a configurable worker count (default 4):

# packages/databricks-tellr-app/databricks_tellr_app/run.py
workers = int(os.getenv("UVICORN_WORKERS", "4"))
uvicorn.run("src.api.main:app", host=host, port=port, workers=workers)

Set the UVICORN_WORKERS environment variable to override the default. Each worker has its own ChatService singleton and deck cache. The database-based session locking ensures correctness across workers.

Scaling Considerations

  • Worker count: Defaults to 4 (configurable via UVICORN_WORKERS env var). Increase for higher concurrency; each worker can handle multiple async requests.
  • Lock timeout: 5 minutes covers long LLM generations. Stale locks are automatically overridden.
  • Cache coherence: Each worker maintains its own cache. Decks are loaded from database on cache miss, ensuring consistency after cross-worker updates.

Database Migration

Existing databases need the session locking columns:

ALTER TABLE user_sessions ADD COLUMN is_processing BOOLEAN NOT NULL DEFAULT FALSE;
ALTER TABLE user_sessions ADD COLUMN processing_started_at TIMESTAMP;

For polling support, run scripts/migrate_polling_support.sql:

CREATE TABLE IF NOT EXISTS chat_requests (
id SERIAL PRIMARY KEY,
request_id VARCHAR(64) UNIQUE NOT NULL,
session_id INTEGER NOT NULL REFERENCES user_sessions(id) ON DELETE CASCADE,
status VARCHAR(20) DEFAULT 'pending',
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP,
result_json TEXT
);
CREATE INDEX IF NOT EXISTS ix_chat_requests_request_id ON chat_requests(request_id);
CREATE INDEX IF NOT EXISTS ix_chat_requests_session_id ON chat_requests(session_id);

ALTER TABLE session_messages ADD COLUMN IF NOT EXISTS request_id VARCHAR(64);
CREATE INDEX IF NOT EXISTS ix_session_messages_request_id ON session_messages(request_id);

Fresh deployments create these tables automatically via SQLAlchemy model definitions.


Cross-References