README - maintainSource Edge Function

Documentation for the maintainSource Edge Function REST API for document management with queue-based processing. Modified: 2026-Jan-19 20:29:29 UTC

maintainSource Edge Function

Purpose: Document management (add, update, delete) with queue-based processing, RLS, and transaction logging Status: Production Last Updated: 2026-01-01


Change Log

Date Change By
2026-01-01 skipParentStorage: Added flag to bypass parent document storage for faster processing of large files Claude
2026-01-01 User filtering: Added user_id=mine filter to manageQueue for viewing own submissions Claude
2025-12-31 Fire-and-forget mode: Added async parameter for immediate return with queueId (HTTP 202) Claude
2025-12-31 Enhanced edge_function_response: Added processingStats with parent/vector doc counts, file type, and load mode Claude
2025-11-18 Queue Everything Pattern: All submissions logged for audit trail and retry capability Claude
2025-11-14 Phase 3 Integration: Account balance + platform pricing Claude
2025-11-06 Initial implementation with RLS and transaction logging Claude

Overview

This Edge Function wraps the maintainSource service from _langchain/services/documents-manager.ts. It provides:

  1. JWT validation and user claim extraction
  2. Content moderation via OpenAI Moderation API (optional)
  3. Queue-based processing for audit trail and retry capability
  4. Transaction logging for business model tracking
  5. RLS enforcement via hybrid client approach

API Reference

Endpoint

POST /maintainSource
Authorization: Bearer <jwt_token>
Content-Type: application/json

Request Body

{
  // Required
  sourceUrl: string;        // URL of document to process
  sourceDate: string;       // ISO date string (e.g., "2025-12-31")

  // Actions (at least one required)
  delDocs?: boolean;        // Delete existing documents for this source
  addDocs?: boolean;        // Add new documents (fetch, chunk, embed)
  updDocs?: boolean;        // Update metadata only (no re-embedding)

  // Optional metadata
  sourceTitle?: string;     // Document title
  iprOwner?: string;        // IPR owner identifier (UUID)
  isIprOwner?: boolean;     // User is the rights holder
  dcCreator?: string;       // Dublin Core: Creator
  dcPublisher?: string;     // Dublin Core: Publisher
  dcRights?: string;        // Dublin Core: Rights statement
  dcIdentifier?: string;    // Dublin Core: Alternative ID (DOI, ISBN)
  dcSource?: string;        // Dublin Core: Source reference
  metadata?: object;        // Additional custom metadata

  // Queue options
  priority?: number;        // Queue priority 0-100 (higher = first)

  // Admin-only options (requires accessLevel >= 9)
  active?: boolean;         // Set document active status
  access_level?: number;    // Set document access level (0-10)
  onBehalfOfUserId?: string; // Transfer ownership to another user
  adminOverride?: boolean;  // Bypass ownership check for replace/delete

  // Processing options
  verbose?: boolean;        // Enable detailed logging
  skipModeration?: boolean; // Skip content moderation check
  content?: string;         // Pre-fetched content (skips fetch for moderation)
  queueId?: string;         // Skip queue creation (already queued)

  // Async mode (fire-and-forget)
  async?: boolean;          // Return immediately with queueId (HTTP 202)

  // Performance optimization
  skipParentStorage?: boolean; // Skip parent doc storage (faster for large files)
}

Response

Standard Response (sync mode)

{
  success: boolean;
  statusCode: number;        // 200 on success
  message: string;

  // On success
  transactionId?: string;     // Transaction ID for tracking
  queueId?: string;           // Queue item ID
  transactionResult?: {       // Business model details
    success: boolean;
    platformFee: number;
    transactionType: string;
  };

  // Dispute support
  disputeMailto?: string;     // Pre-filled mailto link for disputes

  // Processing details (in queue via edge_function_response)
  processingStats?: {
    parentDocs: { deleted: number; added: number };
    vectorDocs: { deleted: number; added: number };
    sourceDocsLoaded?: number;
    fileType?: string;        // "pdf", "html", "txt", "vtt"
    loadMode?: string;        // "local", "remote", "auto"
  };
  duration?: number;          // Document load duration (ms)
  storageDuration?: number;   // Storage/embedding duration (ms)
  netlifyDuration?: number;   // Netlify processing duration (ms)
}

Async Response (fire-and-forget mode)

When async: true is set, the function returns immediately with HTTP 202:

{
  success: true;
  statusCode: 202;
  message: "Document queued for processing";
  queueId: string; // Use this to poll for status
  userId: string;
  orgId: string;
  sourceUrl: string;
}

Fire-and-Forget Mode

For long-running document processing, use async mode to avoid client timeouts:

1. Submit with async mode

const response = await fetch(`${SUPABASE_URL}/functions/v1/maintainSource`, {
  method: "POST",
  headers: {
    Authorization: `Bearer ${jwt}`,
    "Content-Type": "application/json",
  },
  body: JSON.stringify({
    sourceUrl: "https://example.com/large-document.pdf",
    sourceDate: new Date().toISOString(),
    addDocs: true,
    async: true, // Fire-and-forget mode
  }),
});

const { queueId } = await response.json();
// Returns immediately with HTTP 202

2. Check status (choose one approach)

Use the user_id=mine filter to show the user's submissions. User clicks refresh to see updates.

// List user's queue items with current status
const response = await fetch(
  `${SUPABASE_URL}/functions/v1/manageQueue/items?user_id=mine`,
  { headers: { Authorization: `Bearer ${jwt}` } }
);

const { items } = await response.json();
// items contains all user's submissions with their current status
// completed items have edge_function_response with results

Option B: Active polling (optional)

For real-time updates (toast notifications, progress indicators):

async function pollQueueStatus(queueId, jwt, maxWaitMs = 600000) {
  let delayMs = 1000; // Start with 1s
  const maxDelayMs = 30000; // Max 30s between polls
  const startTime = Date.now();

  while (Date.now() - startTime < maxWaitMs) {
    const response = await fetch(
      `${SUPABASE_URL}/functions/v1/manageQueue/items/${queueId}`,
      { headers: { Authorization: `Bearer ${jwt}` } }
    );

    const item = await response.json();

    if (item.status === "completed") {
      return { success: true, result: item.edge_function_response };
    }
    if (item.status === "failed") {
      return { success: false, error: item.error_message };
    }

    // Still processing - wait and retry
    await new Promise((r) => setTimeout(r, delayMs));
    delayMs = Math.min(delayMs * 1.5, maxDelayMs); // Exponential backoff
  }

  throw new Error("Queue processing timeout");
}

// Usage: poll in background after submission
const { queueId } = await submitAsync(sourceUrl);
pollQueueStatus(queueId, jwt).then((result) => {
  if (result.success) {
    showToast("Document processed successfully");
    refreshQueueList();
  }
});

3. Trigger processing

Processing is triggered via the admin UI "Process Queue" button, which:

  1. Calls get_next_upload_queue_item() RPC
  2. Processes each item via maintainSource
  3. Updates status via mark_upload_queue_completed or mark_upload_queue_failed

Queue Integration

All submissions are logged to document_upload_queue table for:

  1. Audit trail - Complete record of all document operations
  2. Retry capability - Failed items can be retried
  3. Admin visibility - Queue management UI shows all operations

Queue Status Flow

pending → processing → completed/failed
                    ↓
                  expired (if JWT expires during retry)

Viewing Queue Details

The edge_function_response JSONB column stores processing results:

{
  "success": true,
  "statusCode": 200,
  "message": "Successfully processed https://example.com/doc.pdf. Before: 0, After: 150",
  "duration": 12500,
  "storageDuration": 8200,
  "netlifyDuration": 4300,
  "processingStats": {
    "parentDocs": { "deleted": 0, "added": 25 },
    "vectorDocs": { "deleted": 0, "added": 150 },
    "sourceDocsLoaded": 45,
    "fileType": "pdf",
    "loadMode": "remote"
  }
}

Supported File Types

Extension Handler Notes
.pdf maintainParentDocumentsText Uses local/remote fallback (Netlify for large files)
.html, .md maintainParentDocumentsHTML CSS selector support for content extraction
.txt maintainParentDocumentsText Plain text processing
.vtt maintainParentDocumentsText WebVTT captions (pre-chunked by dialogue)
No extension maintainParentDocumentsHTML Assumes HTML (e.g., 11ty pages)
Google Docs maintainParentDocumentsHTML Auto-converts to export/view URLs

Content Moderation

Optional content moderation via OpenAI Moderation API:

// Skip moderation
{
  skipModeration: true;
}

// Pre-fetch content for moderation
{
  content: "Pre-fetched document text...";
}

If content is flagged, the request is rejected with category details.


Admin Operations

Users with accessLevel >= 9 can:

  1. Override ownership - Delete/replace documents they don't own
  2. Transfer ownership - Assign documents to other users
  3. Set access level - Control document visibility
  4. Set active status - Activate/deactivate documents
{
  adminOverride: true,           // Bypass ownership check
  onBehalfOfUserId: "uuid...",   // Transfer to this user
  access_level: 5,               // Set access level
  active: false                  // Deactivate document
}

Timeout Constraints

Document processing involves multiple components with different timeout limits:

Architecture

Client → maintainSource (Edge Function) → document-loader → Netlify Background Function
                ↓                                                    ↓
         Polls database ←←←←←←←← writes results to ←←←←←←←←←←←←←←←←←┘
         (document_loading_jobs table)

Timeout Limits

Component Limit Notes
Supabase Edge Function 400 seconds max Hard limit, cannot be extended
Polling timeout 390 seconds default Just under Edge Function limit
Netlify Background Function 15 minutes Runs independently in Node.js

How It Works

  1. Edge Function calls Netlify with a job_id
  2. Netlify returns 202 Accepted immediately (background execution)
  3. Edge Function polls document_loading_jobs table for completion
  4. Netlify writes results to database when done

The Bottleneck

Even though Netlify can run for 15 minutes, the Edge Function will stop waiting after its timeout. If the Edge Function times out:

  • The Netlify function continues processing
  • Results are written to document_loading_jobs table
  • But the original HTTP request returns a timeout error
  • The queue item is marked as failed (can be retried)

Recommendations for Large Files

The default polling timeout is now 390 seconds (6.5 minutes), which should handle most documents. For files that may still timeout:

  1. Use skipParentStorage: true - Bypass parent document storage (significant speedup for large files)
  2. Pre-split very large PDFs - Break into smaller files before upload (recommended for > 50MB)
  3. Use queue retry - Failed items can be retried automatically
  4. Monitor document_loading_jobs - Check for completed jobs that timed out client-side

Performance Optimization: skipParentStorage

The skipParentStorage flag bypasses the parent document retriever pattern, storing vector chunks directly in the vectorstore without saving parent documents to Supabase Storage.

When to Use

  • Large documents (> 10MB) - Eliminates Storage bucket overhead
  • High-volume processing - Faster chunk embedding
  • When parent document retrieval isn't needed - RAG queries that work fine with vector chunks

Trade-offs

Feature skipParentStorage: false (default) skipParentStorage: true
Storage Parent docs in Storage bucket + vector chunks Vector chunks only
Speed Slower (Storage operations) Faster (direct embedding)
Parent Doc Retrieval ✅ Supported ❌ Not available
Context window Larger chunks available Standard 400-char chunks

Example

// Fast processing for large file (no parent doc storage)
{
  sourceUrl: "https://example.com/large-document.pdf",
  sourceDate: "2026-01-01",
  addDocs: true,
  skipParentStorage: true  // Skip parent doc storage for speed
}

Stats Difference

When skipParentStorage: true, the processingStats will show:

{
  "parentDocs": { "deleted": 0, "added": 0 }, // Always 0
  "vectorDocs": { "deleted": 0, "added": 250 }
}

Environment Variables

# Document loader mode: "local" | "remote" | "auto" (default)
DOCUMENT_LOADER_MODE=auto

# Netlify background function URL
NETLIFY_DOCUMENT_LOADER_URL=https://your-site.netlify.app/.netlify/functions/document-loader-background

# API key for Netlify function
NETLIFY_BACKGROUND_API_KEY=your-api-key

File Purpose
manageQueue/README.md Queue API documentation (for polling status)
documents-manager.ts Core processing logic
document-loader.ts Local/remote loader with Netlify fallback
QUEUE_UI_DESIGN.md Queue management UI design
upload-transaction-service.ts Transaction logging
20251118000002_simplified_queue.sql Queue table migration

Testing

# Local testing
curl -X POST http://localhost:54321/functions/v1/maintainSource \
  -H "Authorization: Bearer $JWT_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "sourceUrl": "https://example.com/doc.pdf",
    "sourceDate": "2025-12-31",
    "addDocs": true,
    "verbose": true
  }'

Deployment

# Deploy to Supabase
deno task deploy:maintainSource

# Or via supabase CLI
supabase functions deploy maintainSource