diff --git a/apps/sim/app/api/knowledge/[id]/documents/upsert/route.ts b/apps/sim/app/api/knowledge/[id]/documents/upsert/route.ts new file mode 100644 index 00000000000..2499006ed35 --- /dev/null +++ b/apps/sim/app/api/knowledge/[id]/documents/upsert/route.ts @@ -0,0 +1,248 @@ +import { randomUUID } from 'crypto' +import { db } from '@sim/db' +import { document } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { and, eq, isNull } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log' +import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid' +import { + createDocumentRecords, + deleteDocument, + getProcessingConfig, + processDocumentsWithQueue, +} from '@/lib/knowledge/documents/service' +import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils' +import { checkKnowledgeBaseWriteAccess } from '@/app/api/knowledge/utils' + +const logger = createLogger('DocumentUpsertAPI') + +const UpsertDocumentSchema = z.object({ + documentId: z.string().optional(), + filename: z.string().min(1, 'Filename is required'), + fileUrl: z.string().min(1, 'File URL is required'), + fileSize: z.number().min(1, 'File size must be greater than 0'), + mimeType: z.string().min(1, 'MIME type is required'), + documentTagsData: z.string().optional(), + processingOptions: z.object({ + chunkSize: z.number().min(100).max(4000), + minCharactersPerChunk: z.number().min(1).max(2000), + recipe: z.string(), + lang: z.string(), + chunkOverlap: z.number().min(0).max(500), + }), + workflowId: z.string().optional(), +}) + +export async function POST(req: NextRequest, { params }: { params: Promise<{ id: string }> }) { + const requestId = randomUUID().slice(0, 8) + const { id: knowledgeBaseId } = await params + + try { + const body = await req.json() + + logger.info(`[${requestId}] Knowledge base document upsert request`, { + knowledgeBaseId, + hasDocumentId: !!body.documentId, + filename: body.filename, + }) + + const auth = await checkSessionOrInternalAuth(req, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + logger.warn(`[${requestId}] Authentication failed: ${auth.error || 'Unauthorized'}`) + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + const userId = auth.userId + + const validatedData = UpsertDocumentSchema.parse(body) + + if (validatedData.workflowId) { + const authorization = await authorizeWorkflowByWorkspacePermission({ + workflowId: validatedData.workflowId, + userId, + action: 'write', + }) + if (!authorization.allowed) { + return NextResponse.json( + { error: authorization.message || 'Access denied' }, + { status: authorization.status } + ) + } + } + + const accessCheck = await checkKnowledgeBaseWriteAccess(knowledgeBaseId, userId) + + if (!accessCheck.hasAccess) { + if ('notFound' in accessCheck && accessCheck.notFound) { + logger.warn(`[${requestId}] Knowledge base not found: ${knowledgeBaseId}`) + return NextResponse.json({ error: 'Knowledge base not found' }, { status: 404 }) + } + logger.warn( + `[${requestId}] User ${userId} attempted to upsert document in unauthorized knowledge base ${knowledgeBaseId}` + ) + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + let existingDocumentId: string | null = null + let isUpdate = false + + if (validatedData.documentId) { + const existingDoc = await db + .select({ id: document.id }) + .from(document) + .where( + and( + eq(document.id, validatedData.documentId), + eq(document.knowledgeBaseId, knowledgeBaseId), + isNull(document.deletedAt) + ) + ) + .limit(1) + + if (existingDoc.length > 0) { + existingDocumentId = existingDoc[0].id + } + } else { + const docsByFilename = await db + .select({ id: document.id }) + .from(document) + .where( + and( + eq(document.filename, validatedData.filename), + eq(document.knowledgeBaseId, knowledgeBaseId), + isNull(document.deletedAt) + ) + ) + .limit(1) + + if (docsByFilename.length > 0) { + existingDocumentId = docsByFilename[0].id + } + } + + if (existingDocumentId) { + isUpdate = true + logger.info( + `[${requestId}] Found existing document ${existingDocumentId}, creating replacement before deleting old` + ) + } + + const createdDocuments = await createDocumentRecords( + [ + { + filename: validatedData.filename, + fileUrl: validatedData.fileUrl, + fileSize: validatedData.fileSize, + mimeType: validatedData.mimeType, + ...(validatedData.documentTagsData && { + documentTagsData: validatedData.documentTagsData, + }), + }, + ], + knowledgeBaseId, + requestId + ) + + const firstDocument = createdDocuments[0] + if (!firstDocument) { + logger.error(`[${requestId}] createDocumentRecords returned empty array unexpectedly`) + return NextResponse.json({ error: 'Failed to create document record' }, { status: 500 }) + } + + if (existingDocumentId) { + try { + await deleteDocument(existingDocumentId, requestId) + } catch (deleteError) { + logger.error( + `[${requestId}] Failed to delete old document ${existingDocumentId}, rolling back new record`, + deleteError + ) + await deleteDocument(firstDocument.documentId, requestId).catch(() => {}) + return NextResponse.json({ error: 'Failed to replace existing document' }, { status: 500 }) + } + } + + processDocumentsWithQueue( + createdDocuments, + knowledgeBaseId, + validatedData.processingOptions, + requestId + ).catch((error: unknown) => { + logger.error(`[${requestId}] Critical error in document processing pipeline:`, error) + }) + + try { + const { PlatformEvents } = await import('@/lib/core/telemetry') + PlatformEvents.knowledgeBaseDocumentsUploaded({ + knowledgeBaseId, + documentsCount: 1, + uploadType: 'single', + chunkSize: validatedData.processingOptions.chunkSize, + recipe: validatedData.processingOptions.recipe, + }) + } catch (_e) { + // Silently fail + } + + recordAudit({ + workspaceId: accessCheck.knowledgeBase?.workspaceId ?? null, + actorId: userId, + actorName: auth.userName, + actorEmail: auth.userEmail, + action: isUpdate ? AuditAction.DOCUMENT_UPDATED : AuditAction.DOCUMENT_UPLOADED, + resourceType: AuditResourceType.DOCUMENT, + resourceId: knowledgeBaseId, + resourceName: validatedData.filename, + description: isUpdate + ? `Upserted (replaced) document "${validatedData.filename}" in knowledge base "${knowledgeBaseId}"` + : `Upserted (created) document "${validatedData.filename}" in knowledge base "${knowledgeBaseId}"`, + metadata: { + fileName: validatedData.filename, + previousDocumentId: existingDocumentId, + isUpdate, + }, + request: req, + }) + + return NextResponse.json({ + success: true, + data: { + documentsCreated: [ + { + documentId: firstDocument.documentId, + filename: firstDocument.filename, + status: 'pending', + }, + ], + isUpdate, + previousDocumentId: existingDocumentId, + processingMethod: 'background', + processingConfig: { + maxConcurrentDocuments: getProcessingConfig().maxConcurrentDocuments, + batchSize: getProcessingConfig().batchSize, + }, + }, + }) + } catch (error) { + if (error instanceof z.ZodError) { + logger.warn(`[${requestId}] Invalid upsert request data`, { errors: error.errors }) + return NextResponse.json( + { error: 'Invalid request data', details: error.errors }, + { status: 400 } + ) + } + + logger.error(`[${requestId}] Error upserting document`, error) + + const errorMessage = error instanceof Error ? error.message : 'Failed to upsert document' + const isStorageLimitError = + errorMessage.includes('Storage limit exceeded') || errorMessage.includes('storage limit') + const isMissingKnowledgeBase = errorMessage === 'Knowledge base not found' + + return NextResponse.json( + { error: errorMessage }, + { status: isMissingKnowledgeBase ? 404 : isStorageLimitError ? 413 : 500 } + ) + } +} diff --git a/apps/sim/blocks/blocks/knowledge.ts b/apps/sim/blocks/blocks/knowledge.ts index aaa2e78e565..f488c49db74 100644 --- a/apps/sim/blocks/blocks/knowledge.ts +++ b/apps/sim/blocks/blocks/knowledge.ts @@ -29,6 +29,7 @@ export const KnowledgeBlock: BlockConfig = { { label: 'List Documents', id: 'list_documents' }, { label: 'Get Document', id: 'get_document' }, { label: 'Create Document', id: 'create_document' }, + { label: 'Upsert Document', id: 'upsert_document' }, { label: 'Delete Document', id: 'delete_document' }, { label: 'List Chunks', id: 'list_chunks' }, { label: 'Upload Chunk', id: 'upload_chunk' }, @@ -175,14 +176,14 @@ export const KnowledgeBlock: BlockConfig = { condition: { field: 'operation', value: 'upload_chunk' }, }, - // --- Create Document --- + // --- Create Document / Upsert Document --- { id: 'name', title: 'Document Name', type: 'short-input', placeholder: 'Enter document name', required: true, - condition: { field: 'operation', value: 'create_document' }, + condition: { field: 'operation', value: ['create_document', 'upsert_document'] }, }, { id: 'content', @@ -191,14 +192,21 @@ export const KnowledgeBlock: BlockConfig = { placeholder: 'Enter the document content', rows: 6, required: true, - condition: { field: 'operation', value: 'create_document' }, + condition: { field: 'operation', value: ['create_document', 'upsert_document'] }, + }, + { + id: 'upsertDocumentId', + title: 'Document ID (Optional)', + type: 'short-input', + placeholder: 'Enter existing document ID to update (or leave empty to match by name)', + condition: { field: 'operation', value: 'upsert_document' }, }, { id: 'documentTags', title: 'Document Tags', type: 'document-tag-entry', dependsOn: ['knowledgeBaseSelector'], - condition: { field: 'operation', value: 'create_document' }, + condition: { field: 'operation', value: ['create_document', 'upsert_document'] }, }, // --- Update Chunk / Delete Chunk --- @@ -264,6 +272,7 @@ export const KnowledgeBlock: BlockConfig = { 'knowledge_search', 'knowledge_upload_chunk', 'knowledge_create_document', + 'knowledge_upsert_document', 'knowledge_list_tags', 'knowledge_list_documents', 'knowledge_get_document', @@ -284,6 +293,8 @@ export const KnowledgeBlock: BlockConfig = { return 'knowledge_upload_chunk' case 'create_document': return 'knowledge_create_document' + case 'upsert_document': + return 'knowledge_upsert_document' case 'list_tags': return 'knowledge_list_tags' case 'list_documents': @@ -355,6 +366,11 @@ export const KnowledgeBlock: BlockConfig = { if (params.chunkEnabledFilter) params.enabled = params.chunkEnabledFilter } + // Map upsert sub-block field to tool param + if (params.operation === 'upsert_document' && params.upsertDocumentId) { + params.documentId = String(params.upsertDocumentId).trim() + } + // Convert enabled dropdown string to boolean for update_chunk if (params.operation === 'update_chunk' && typeof params.enabled === 'string') { params.enabled = params.enabled === 'true' @@ -382,6 +398,7 @@ export const KnowledgeBlock: BlockConfig = { documentTags: { type: 'string', description: 'Document tags' }, chunkSearch: { type: 'string', description: 'Search filter for chunks' }, chunkEnabledFilter: { type: 'string', description: 'Filter chunks by enabled status' }, + upsertDocumentId: { type: 'string', description: 'Document ID for upsert operation' }, connectorId: { type: 'string', description: 'Connector identifier' }, }, outputs: { diff --git a/apps/sim/tools/knowledge/index.ts b/apps/sim/tools/knowledge/index.ts index 8fafa6a603c..0c0edd54448 100644 --- a/apps/sim/tools/knowledge/index.ts +++ b/apps/sim/tools/knowledge/index.ts @@ -11,6 +11,7 @@ import { knowledgeSearchTool } from '@/tools/knowledge/search' import { knowledgeTriggerSyncTool } from '@/tools/knowledge/trigger_sync' import { knowledgeUpdateChunkTool } from '@/tools/knowledge/update_chunk' import { knowledgeUploadChunkTool } from '@/tools/knowledge/upload_chunk' +import { knowledgeUpsertDocumentTool } from '@/tools/knowledge/upsert_document' export { knowledgeSearchTool, @@ -26,4 +27,5 @@ export { knowledgeListConnectorsTool, knowledgeGetConnectorTool, knowledgeTriggerSyncTool, + knowledgeUpsertDocumentTool, } diff --git a/apps/sim/tools/knowledge/types.ts b/apps/sim/tools/knowledge/types.ts index 09a4f8695a2..49fb6d8c338 100644 --- a/apps/sim/tools/knowledge/types.ts +++ b/apps/sim/tools/knowledge/types.ts @@ -286,3 +286,33 @@ export interface KnowledgeTriggerSyncResponse { } error?: string } + +export interface KnowledgeUpsertDocumentParams { + knowledgeBaseId: string + name: string + content: string + documentId?: string + documentTags?: Record + _context?: { workflowId?: string } +} + +export interface KnowledgeUpsertDocumentResult { + documentId: string + documentName: string + type: string + enabled: boolean + isUpdate: boolean + previousDocumentId: string | null + createdAt: string + updatedAt: string +} + +export interface KnowledgeUpsertDocumentResponse { + success: boolean + output: { + data: KnowledgeUpsertDocumentResult + message: string + documentId: string + } + error?: string +} diff --git a/apps/sim/tools/knowledge/upsert_document.ts b/apps/sim/tools/knowledge/upsert_document.ts new file mode 100644 index 00000000000..0314350a0db --- /dev/null +++ b/apps/sim/tools/knowledge/upsert_document.ts @@ -0,0 +1,189 @@ +import type { + KnowledgeUpsertDocumentParams, + KnowledgeUpsertDocumentResponse, +} from '@/tools/knowledge/types' +import { enrichKBTagsSchema } from '@/tools/schema-enrichers' +import { formatDocumentTagsForAPI, parseDocumentTags } from '@/tools/shared/tags' +import type { ToolConfig } from '@/tools/types' + +export const knowledgeUpsertDocumentTool: ToolConfig< + KnowledgeUpsertDocumentParams, + KnowledgeUpsertDocumentResponse +> = { + id: 'knowledge_upsert_document', + name: 'Knowledge Upsert Document', + description: + 'Create or update a document in a knowledge base. If a document with the given ID or filename already exists, it will be replaced with the new content.', + version: '1.0.0', + + params: { + knowledgeBaseId: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'ID of the knowledge base containing the document', + }, + documentId: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: + 'Optional ID of an existing document to update. If not provided, lookup is done by filename.', + }, + name: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Name of the document', + }, + content: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Content of the document', + }, + documentTags: { + type: 'json', + required: false, + visibility: 'user-or-llm', + description: 'Document tags', + }, + }, + + schemaEnrichment: { + documentTags: { + dependsOn: 'knowledgeBaseId', + enrichSchema: enrichKBTagsSchema, + }, + }, + + request: { + url: (params) => `/api/knowledge/${params.knowledgeBaseId}/documents/upsert`, + method: 'POST', + headers: () => ({ + 'Content-Type': 'application/json', + }), + body: (params) => { + const workflowId = params._context?.workflowId + const textContent = params.content?.trim() + const documentName = params.name?.trim() + + if (!documentName || documentName.length === 0) { + throw new Error('Document name is required') + } + if (documentName.length > 255) { + throw new Error('Document name must be 255 characters or less') + } + if (!textContent || textContent.length < 1) { + throw new Error('Document content cannot be empty') + } + const utf8Bytes = new TextEncoder().encode(textContent) + const contentBytes = utf8Bytes.length + + if (contentBytes > 1_000_000) { + throw new Error('Document content exceeds maximum size of 1MB') + } + let base64Content: string + if (typeof Buffer !== 'undefined') { + base64Content = Buffer.from(textContent, 'utf8').toString('base64') + } else { + let binary = '' + for (let i = 0; i < utf8Bytes.length; i++) { + binary += String.fromCharCode(utf8Bytes[i]) + } + base64Content = btoa(binary) + } + + const dataUri = `data:text/plain;base64,${base64Content}` + + const parsedTags = parseDocumentTags(params.documentTags) + const tagData = formatDocumentTagsForAPI(parsedTags) + + const filename = documentName.endsWith('.txt') ? documentName : `${documentName}.txt` + + const requestBody: Record = { + filename, + fileUrl: dataUri, + fileSize: contentBytes, + mimeType: 'text/plain', + ...tagData, + processingOptions: { + chunkSize: 1024, + minCharactersPerChunk: 1, + chunkOverlap: 200, + recipe: 'default', + lang: 'en', + }, + ...(workflowId && { workflowId }), + } + + if (params.documentId && String(params.documentId).trim().length > 0) { + requestBody.documentId = String(params.documentId).trim() + } + + return requestBody + }, + }, + + transformResponse: async (response): Promise => { + const result = await response.json() + const data = result.data ?? result + const documentsCreated = data.documentsCreated ?? [] + const firstDocument = documentsCreated[0] + const isUpdate = data.isUpdate ?? false + const previousDocumentId = data.previousDocumentId ?? null + const documentId = firstDocument?.documentId ?? firstDocument?.id ?? '' + + return { + success: true, + output: { + message: isUpdate + ? 'Successfully updated document in knowledge base' + : 'Successfully created document in knowledge base', + documentId, + data: { + documentId, + documentName: firstDocument?.filename ?? 'Unknown', + type: 'document', + enabled: true, + isUpdate, + previousDocumentId, + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }, + }, + } + }, + + outputs: { + data: { + type: 'object', + description: 'Information about the upserted document', + properties: { + documentId: { type: 'string', description: 'Document ID' }, + documentName: { type: 'string', description: 'Document name' }, + type: { type: 'string', description: 'Document type' }, + enabled: { type: 'boolean', description: 'Whether the document is enabled' }, + isUpdate: { + type: 'boolean', + description: 'Whether an existing document was replaced', + }, + previousDocumentId: { + type: 'string', + description: 'ID of the document that was replaced, if any', + optional: true, + }, + createdAt: { type: 'string', description: 'Creation timestamp' }, + updatedAt: { type: 'string', description: 'Last update timestamp' }, + }, + }, + message: { + type: 'string', + description: 'Success or error message describing the operation result', + }, + documentId: { + type: 'string', + description: 'ID of the upserted document', + }, + }, +} diff --git a/apps/sim/tools/registry.ts b/apps/sim/tools/registry.ts index f14ff024ebf..4c7f2af33ad 100644 --- a/apps/sim/tools/registry.ts +++ b/apps/sim/tools/registry.ts @@ -1195,6 +1195,7 @@ import { knowledgeTriggerSyncTool, knowledgeUpdateChunkTool, knowledgeUploadChunkTool, + knowledgeUpsertDocumentTool, } from '@/tools/knowledge' import { langsmithCreateRunsBatchTool, langsmithCreateRunTool } from '@/tools/langsmith' import { lemlistGetActivitiesTool, lemlistGetLeadTool, lemlistSendEmailTool } from '@/tools/lemlist' @@ -3703,6 +3704,7 @@ export const tools: Record = { knowledge_list_connectors: knowledgeListConnectorsTool, knowledge_get_connector: knowledgeGetConnectorTool, knowledge_trigger_sync: knowledgeTriggerSyncTool, + knowledge_upsert_document: knowledgeUpsertDocumentTool, search_tool: searchTool, elevenlabs_tts: elevenLabsTtsTool, fathom_list_meetings: fathomListMeetingsTool,