From 8b245693e21a2f9ecff6e5a4f7ae78193ecb5488 Mon Sep 17 00:00:00 2001 From: Waleed Date: Wed, 18 Mar 2026 10:34:49 -0700 Subject: [PATCH 1/3] fix(hubspot): add missing tickets and oauth scopes to OAuth config (#3653) --- apps/sim/lib/oauth/oauth.ts | 3 +++ apps/sim/lib/oauth/utils.ts | 3 +++ 2 files changed, 6 insertions(+) diff --git a/apps/sim/lib/oauth/oauth.ts b/apps/sim/lib/oauth/oauth.ts index 38c6472cfd0..84430b805d3 100644 --- a/apps/sim/lib/oauth/oauth.ts +++ b/apps/sim/lib/oauth/oauth.ts @@ -836,6 +836,9 @@ export const OAUTH_PROVIDERS: Record = { 'crm.lists.read', 'crm.lists.write', 'crm.objects.tickets.read', + 'crm.objects.tickets.write', + 'tickets', + 'oauth', ], }, }, diff --git a/apps/sim/lib/oauth/utils.ts b/apps/sim/lib/oauth/utils.ts index 5ca42639195..2db01dc12c1 100644 --- a/apps/sim/lib/oauth/utils.ts +++ b/apps/sim/lib/oauth/utils.ts @@ -308,6 +308,9 @@ export const SCOPE_DESCRIPTIONS: Record = { 'crm.lists.read': 'Read HubSpot lists', 'crm.lists.write': 'Create and update HubSpot lists', 'crm.objects.tickets.read': 'Read HubSpot tickets', + 'crm.objects.tickets.write': 'Create and update HubSpot tickets', + tickets: 'Access HubSpot tickets', + oauth: 'Authenticate with HubSpot OAuth', // Salesforce scopes api: 'Access Salesforce API', From ff5d90e0c0f235169da57837eb10d205aa0974c3 Mon Sep 17 00:00:00 2001 From: Waleed Date: Wed, 18 Mar 2026 10:49:18 -0700 Subject: [PATCH 2/3] fix(knowledge): infer MIME type from file extension in create/upsert tools (#3651) * fix(knowledge): infer MIME type from file extension in create/upsert tools Both create_document and upsert_document forced .txt extension and text/plain MIME type regardless of the document name. Now the tools infer the correct MIME type from the file extension (html, md, csv, json, yaml, xml) and only default to .txt when no extension is given. Co-Authored-By: Claude Opus 4.6 * refactor(knowledge): reuse existing getMimeTypeFromExtension from uploads Replace duplicate EXTENSION_MIME_MAP and getMimeTypeFromExtension with the existing, more comprehensive version from lib/uploads/utils/file-utils. Co-Authored-By: Claude Opus 4.6 * fix(knowledge): fix btoa stack overflow and duplicate encoding in create_document Same fixes as upsert_document: use loop-based String.fromCharCode instead of spread, consolidate duplicate TextEncoder calls, and check byte length instead of character length for 1MB limit. Co-Authored-By: Claude Opus 4.6 * fix(knowledge): allowlist text-compatible MIME types in inferDocumentFileInfo Use an explicit allowlist instead of only checking for octet-stream, preventing binary MIME types (image/jpeg, audio/mpeg, etc.) from leaking through when a user names a document with a binary extension. Co-Authored-By: Claude Opus 4.6 * fix(knowledge): remove pdf/rtf from allowlist, normalize unrecognized extensions - Remove application/pdf and application/rtf from TEXT_COMPATIBLE_MIME_TYPES since these tools pass plain text content, not binary - Normalize unrecognized extensions (e.g. report.v2) to .txt instead of preserving the original extension with text/plain MIME type Co-Authored-By: Claude Opus 4.6 * fix(knowledge): handle dotfile names to avoid empty base in filename Dotfiles like .env would produce an empty base, resulting in '.txt'. Now falls back to the original name so .env becomes .env.txt. Co-Authored-By: Claude Opus 4.6 --------- Co-authored-by: Claude Opus 4.6 --- apps/sim/tools/knowledge/create_document.ts | 35 +++++++++++++-------- apps/sim/tools/knowledge/types.ts | 35 +++++++++++++++++++++ apps/sim/tools/knowledge/upsert_document.ts | 14 ++++----- 3 files changed, 64 insertions(+), 20 deletions(-) diff --git a/apps/sim/tools/knowledge/create_document.ts b/apps/sim/tools/knowledge/create_document.ts index 5b82d504a7c..e209a0e9bd2 100644 --- a/apps/sim/tools/knowledge/create_document.ts +++ b/apps/sim/tools/knowledge/create_document.ts @@ -1,4 +1,7 @@ -import type { KnowledgeCreateDocumentResponse } from '@/tools/knowledge/types' +import { + inferDocumentFileInfo, + type KnowledgeCreateDocumentResponse, +} from '@/tools/knowledge/types' import { enrichKBTagsSchema } from '@/tools/schema-enrichers' import { formatDocumentTagsForAPI, parseDocumentTags } from '@/tools/shared/tags' import type { ToolConfig } from '@/tools/types' @@ -63,30 +66,36 @@ export const knowledgeCreateDocumentTool: ToolConfig 1000000) { + const utf8Bytes = new TextEncoder().encode(textContent) + const contentBytes = utf8Bytes.length + + if (contentBytes > 1_000_000) { throw new Error('Document content exceeds maximum size of 1MB') } - const contentBytes = new TextEncoder().encode(textContent).length - - const utf8Bytes = new TextEncoder().encode(textContent) - const base64Content = - typeof Buffer !== 'undefined' - ? Buffer.from(textContent, 'utf8').toString('base64') - : btoa(String.fromCharCode(...utf8Bytes)) + let base64Content: string + if (typeof Buffer !== 'undefined') { + base64Content = Buffer.from(textContent, 'utf8').toString('base64') + } else { + let binary = '' + for (let i = 0; i < utf8Bytes.length; i++) { + binary += String.fromCharCode(utf8Bytes[i]) + } + base64Content = btoa(binary) + } - const dataUri = `data:text/plain;base64,${base64Content}` + const { filename, mimeType } = inferDocumentFileInfo(documentName) + const dataUri = `data:${mimeType};base64,${base64Content}` - // Parse document tags from various formats (object, array, JSON string) const parsedTags = parseDocumentTags(params.documentTags) const tagData = formatDocumentTagsForAPI(parsedTags) const documents = [ { - filename: documentName.endsWith('.txt') ? documentName : `${documentName}.txt`, + filename, fileUrl: dataUri, fileSize: contentBytes, - mimeType: 'text/plain', + mimeType, ...tagData, }, ] diff --git a/apps/sim/tools/knowledge/types.ts b/apps/sim/tools/knowledge/types.ts index 49fb6d8c338..3fa87ccaad7 100644 --- a/apps/sim/tools/knowledge/types.ts +++ b/apps/sim/tools/knowledge/types.ts @@ -1,3 +1,38 @@ +import { + getFileExtension, + getMimeTypeFromExtension as getUploadMimeType, +} from '@/lib/uploads/utils/file-utils' + +const TEXT_COMPATIBLE_MIME_TYPES = new Set([ + 'text/plain', + 'text/html', + 'text/markdown', + 'text/csv', + 'application/json', + 'application/xml', + 'application/x-yaml', +]) + +/** + * Extracts extension from a filename and returns the normalized filename and MIME type. + * If the extension maps to a recognized text-compatible MIME type, it is preserved. + * Otherwise, the filename is normalized to `.txt` with `text/plain`. + */ +export function inferDocumentFileInfo(documentName: string): { + filename: string + mimeType: string +} { + const ext = getFileExtension(documentName) + if (ext) { + const mimeType = getUploadMimeType(ext) + if (TEXT_COMPATIBLE_MIME_TYPES.has(mimeType)) { + return { filename: documentName, mimeType } + } + } + const base = ext ? documentName.slice(0, documentName.lastIndexOf('.')) : documentName + return { filename: `${base || documentName}.txt`, mimeType: 'text/plain' } +} + export interface KnowledgeSearchResult { documentId: string documentName: string diff --git a/apps/sim/tools/knowledge/upsert_document.ts b/apps/sim/tools/knowledge/upsert_document.ts index 0314350a0db..5d1155f78aa 100644 --- a/apps/sim/tools/knowledge/upsert_document.ts +++ b/apps/sim/tools/knowledge/upsert_document.ts @@ -1,6 +1,7 @@ -import type { - KnowledgeUpsertDocumentParams, - KnowledgeUpsertDocumentResponse, +import { + inferDocumentFileInfo, + type KnowledgeUpsertDocumentParams, + type KnowledgeUpsertDocumentResponse, } from '@/tools/knowledge/types' import { enrichKBTagsSchema } from '@/tools/schema-enrichers' import { formatDocumentTagsForAPI, parseDocumentTags } from '@/tools/shared/tags' @@ -94,18 +95,17 @@ export const knowledgeUpsertDocumentTool: ToolConfig< base64Content = btoa(binary) } - const dataUri = `data:text/plain;base64,${base64Content}` + const { filename, mimeType } = inferDocumentFileInfo(documentName) + const dataUri = `data:${mimeType};base64,${base64Content}` const parsedTags = parseDocumentTags(params.documentTags) const tagData = formatDocumentTagsForAPI(parsedTags) - const filename = documentName.endsWith('.txt') ? documentName : `${documentName}.txt` - const requestBody: Record = { filename, fileUrl: dataUri, fileSize: contentBytes, - mimeType: 'text/plain', + mimeType, ...tagData, processingOptions: { chunkSize: 1024, From 5332614a19a5828a4d409d4a46ad0573fc922642 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 18 Mar 2026 10:55:59 -0700 Subject: [PATCH 3/3] fix(mothership): mothership-ran workflows show workflow validation errors (#3634) * fix(mothership): mothership-ran workflows show workflow validation errors * Distinguish errors from 5xxs * Unify workflow event handling * Fix run from block * Fix lint --------- Co-authored-by: Theodore Li --- .../hooks/use-workflow-execution.ts | 177 +++------- .../utils/workflow-execution-utils.ts | 315 +++++++++++++----- apps/sim/hooks/use-execution-stream.ts | 9 +- 3 files changed, 286 insertions(+), 215 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts index 0943d3e04b4..e29c3849038 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts @@ -16,8 +16,12 @@ import { } from '@/lib/workflows/triggers/triggers' import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow' import { + addHttpErrorConsoleEntry, type BlockEventHandlerConfig, createBlockEventHandlers, + addExecutionErrorConsoleEntry as sharedAddExecutionErrorConsoleEntry, + handleExecutionCancelledConsole as sharedHandleExecutionCancelledConsole, + handleExecutionErrorConsole as sharedHandleExecutionErrorConsole, } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils' import { getBlock } from '@/blocks' import type { SerializableExecutionState } from '@/executor/execution/types' @@ -159,22 +163,7 @@ export function useWorkflowExecution() { setActiveBlocks, ]) - /** - * Builds timing fields for execution-level console entries. - */ - const buildExecutionTiming = useCallback((durationMs?: number) => { - const normalizedDuration = durationMs || 0 - return { - durationMs: normalizedDuration, - startedAt: new Date(Date.now() - normalizedDuration).toISOString(), - endedAt: new Date().toISOString(), - } - }, []) - - /** - * Adds an execution-level error entry to the console when appropriate. - */ - const addExecutionErrorConsoleEntry = useCallback( + const handleExecutionErrorConsole = useCallback( (params: { workflowId?: string executionId?: string @@ -184,102 +173,23 @@ export function useWorkflowExecution() { isPreExecutionError?: boolean }) => { if (!params.workflowId) return - - const hasBlockError = params.blockLogs.some((log) => log.error) - const isPreExecutionError = params.isPreExecutionError ?? false - if (!isPreExecutionError && hasBlockError) { - return - } - - const errorMessage = params.error || 'Execution failed' - const isTimeout = errorMessage.toLowerCase().includes('timed out') - const timing = buildExecutionTiming(params.durationMs) - - addConsole({ - input: {}, - output: {}, - success: false, - error: errorMessage, - durationMs: timing.durationMs, - startedAt: timing.startedAt, - executionOrder: isPreExecutionError ? 0 : Number.MAX_SAFE_INTEGER, - endedAt: timing.endedAt, + sharedHandleExecutionErrorConsole(addConsole, cancelRunningEntries, { + ...params, workflowId: params.workflowId, - blockId: isPreExecutionError - ? 'validation' - : isTimeout - ? 'timeout-error' - : 'execution-error', - executionId: params.executionId, - blockName: isPreExecutionError - ? 'Workflow Validation' - : isTimeout - ? 'Timeout Error' - : 'Execution Error', - blockType: isPreExecutionError ? 'validation' : 'error', }) }, - [addConsole, buildExecutionTiming] + [addConsole, cancelRunningEntries] ) - /** - * Adds an execution-level cancellation entry to the console. - */ - const addExecutionCancelledConsoleEntry = useCallback( + const handleExecutionCancelledConsole = useCallback( (params: { workflowId?: string; executionId?: string; durationMs?: number }) => { if (!params.workflowId) return - - const timing = buildExecutionTiming(params.durationMs) - addConsole({ - input: {}, - output: {}, - success: false, - error: 'Execution was cancelled', - durationMs: timing.durationMs, - startedAt: timing.startedAt, - executionOrder: Number.MAX_SAFE_INTEGER, - endedAt: timing.endedAt, + sharedHandleExecutionCancelledConsole(addConsole, cancelRunningEntries, { + ...params, workflowId: params.workflowId, - blockId: 'cancelled', - executionId: params.executionId, - blockName: 'Execution Cancelled', - blockType: 'cancelled', }) }, - [addConsole, buildExecutionTiming] - ) - - /** - * Handles workflow-level execution errors for console output. - */ - const handleExecutionErrorConsole = useCallback( - (params: { - workflowId?: string - executionId?: string - error?: string - durationMs?: number - blockLogs: BlockLog[] - isPreExecutionError?: boolean - }) => { - if (params.workflowId) { - cancelRunningEntries(params.workflowId) - } - addExecutionErrorConsoleEntry(params) - }, - [addExecutionErrorConsoleEntry, cancelRunningEntries] - ) - - /** - * Handles workflow-level execution cancellations for console output. - */ - const handleExecutionCancelledConsole = useCallback( - (params: { workflowId?: string; executionId?: string; durationMs?: number }) => { - if (params.workflowId) { - cancelRunningEntries(params.workflowId) - } - addExecutionCancelledConsoleEntry(params) - }, - [addExecutionCancelledConsoleEntry, cancelRunningEntries] + [addConsole, cancelRunningEntries] ) const buildBlockEventHandlers = useCallback( @@ -1319,31 +1229,42 @@ export function useWorkflowExecution() { } else { if (!executor) { try { - let blockId = 'serialization' - let blockName = 'Workflow' - let blockType = 'serializer' - if (error instanceof WorkflowValidationError) { - blockId = error.blockId || blockId - blockName = error.blockName || blockName - blockType = error.blockType || blockType + const httpStatus = + isRecord(error) && typeof error.httpStatus === 'number' ? error.httpStatus : undefined + const storeAddConsole = useTerminalConsoleStore.getState().addConsole + + if (httpStatus && activeWorkflowId) { + addHttpErrorConsoleEntry(storeAddConsole, { + workflowId: activeWorkflowId, + executionId: options?.executionId, + error: normalizedMessage, + httpStatus, + }) + } else if (error instanceof WorkflowValidationError) { + storeAddConsole({ + input: {}, + output: {}, + success: false, + error: normalizedMessage, + durationMs: 0, + startedAt: new Date().toISOString(), + executionOrder: Number.MAX_SAFE_INTEGER, + endedAt: new Date().toISOString(), + workflowId: activeWorkflowId || '', + blockId: error.blockId || 'serialization', + executionId: options?.executionId, + blockName: error.blockName || 'Workflow', + blockType: error.blockType || 'serializer', + }) + } else { + sharedAddExecutionErrorConsoleEntry(storeAddConsole, { + workflowId: activeWorkflowId || '', + executionId: options?.executionId, + error: normalizedMessage, + blockLogs: [], + isPreExecutionError: true, + }) } - - // Use MAX_SAFE_INTEGER so execution errors appear at the end of the log - useTerminalConsoleStore.getState().addConsole({ - input: {}, - output: {}, - success: false, - error: normalizedMessage, - durationMs: 0, - startedAt: new Date().toISOString(), - executionOrder: Number.MAX_SAFE_INTEGER, - endedAt: new Date().toISOString(), - workflowId: activeWorkflowId || '', - blockId, - executionId: options?.executionId, - blockName, - blockType, - }) } catch {} } @@ -1681,8 +1602,8 @@ export function useWorkflowExecution() { accumulatedBlockLogs, accumulatedBlockStates, executedBlockIds, - consoleMode: 'add', - includeStartConsoleEntry: false, + consoleMode: 'update', + includeStartConsoleEntry: true, }) await executionStream.executeFromBlock({ diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts index c0ca16cc9af..32d286ddf56 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts @@ -13,6 +13,7 @@ import type { StreamingExecution, } from '@/executor/types' import { stripCloneSuffixes } from '@/executor/utils/subflow-utils' +import { processSSEStream } from '@/hooks/use-execution-stream' const logger = createLogger('workflow-execution-utils') @@ -406,6 +407,161 @@ export function createBlockEventHandlers( return { onBlockStarted, onBlockCompleted, onBlockError, onBlockChildWorkflowStarted } } +type AddConsoleFn = (entry: Omit) => ConsoleEntry +type CancelRunningEntriesFn = (workflowId: string) => void + +export interface ExecutionTimingFields { + durationMs: number + startedAt: string + endedAt: string +} + +/** + * Builds timing fields for an execution-level console entry. + */ +export function buildExecutionTiming(durationMs?: number): ExecutionTimingFields { + const normalizedDuration = durationMs || 0 + return { + durationMs: normalizedDuration, + startedAt: new Date(Date.now() - normalizedDuration).toISOString(), + endedAt: new Date().toISOString(), + } +} + +export interface ExecutionErrorConsoleParams { + workflowId: string + executionId?: string + error?: string + durationMs?: number + blockLogs: BlockLog[] + isPreExecutionError?: boolean +} + +/** + * Adds an execution-level error entry to the console when no block-level error already covers it. + * Shared between direct user execution and mothership-initiated execution. + */ +export function addExecutionErrorConsoleEntry( + addConsole: AddConsoleFn, + params: ExecutionErrorConsoleParams +): void { + const hasBlockError = params.blockLogs.some((log) => log.error) + const isPreExecutionError = params.isPreExecutionError ?? false + if (!isPreExecutionError && hasBlockError) return + + const errorMessage = params.error || 'Execution failed' + const isTimeout = errorMessage.toLowerCase().includes('timed out') + const timing = buildExecutionTiming(params.durationMs) + + addConsole({ + input: {}, + output: {}, + success: false, + error: errorMessage, + durationMs: timing.durationMs, + startedAt: timing.startedAt, + executionOrder: isPreExecutionError ? 0 : Number.MAX_SAFE_INTEGER, + endedAt: timing.endedAt, + workflowId: params.workflowId, + blockId: isPreExecutionError ? 'validation' : isTimeout ? 'timeout-error' : 'execution-error', + executionId: params.executionId, + blockName: isPreExecutionError + ? 'Workflow Validation' + : isTimeout + ? 'Timeout Error' + : 'Execution Error', + blockType: isPreExecutionError ? 'validation' : 'error', + }) +} + +/** + * Cancels running entries and adds an execution-level error console entry. + */ +export function handleExecutionErrorConsole( + addConsole: AddConsoleFn, + cancelRunningEntries: CancelRunningEntriesFn, + params: ExecutionErrorConsoleParams +): void { + cancelRunningEntries(params.workflowId) + addExecutionErrorConsoleEntry(addConsole, params) +} + +export interface HttpErrorConsoleParams { + workflowId: string + executionId?: string + error: string + httpStatus: number +} + +/** + * Adds a console entry for HTTP-level execution errors (non-OK response before SSE streaming). + */ +export function addHttpErrorConsoleEntry( + addConsole: AddConsoleFn, + params: HttpErrorConsoleParams +): void { + const isValidationError = params.httpStatus >= 400 && params.httpStatus < 500 + const now = new Date().toISOString() + addConsole({ + input: {}, + output: {}, + success: false, + error: params.error, + durationMs: 0, + startedAt: now, + executionOrder: 0, + endedAt: now, + workflowId: params.workflowId, + blockId: isValidationError ? 'validation' : 'execution-error', + executionId: params.executionId, + blockName: isValidationError ? 'Workflow Validation' : 'Execution Error', + blockType: isValidationError ? 'validation' : 'error', + }) +} + +export interface CancelledConsoleParams { + workflowId: string + executionId?: string + durationMs?: number +} + +/** + * Adds a console entry for execution cancellation. + */ +export function addCancelledConsoleEntry( + addConsole: AddConsoleFn, + params: CancelledConsoleParams +): void { + const timing = buildExecutionTiming(params.durationMs) + addConsole({ + input: {}, + output: {}, + success: false, + error: 'Execution was cancelled', + durationMs: timing.durationMs, + startedAt: timing.startedAt, + executionOrder: Number.MAX_SAFE_INTEGER, + endedAt: timing.endedAt, + workflowId: params.workflowId, + blockId: 'cancelled', + executionId: params.executionId, + blockName: 'Execution Cancelled', + blockType: 'cancelled', + }) +} + +/** + * Cancels running entries and adds a cancelled console entry. + */ +export function handleExecutionCancelledConsole( + addConsole: AddConsoleFn, + cancelRunningEntries: CancelRunningEntriesFn, + params: CancelledConsoleParams +): void { + cancelRunningEntries(params.workflowId) + addCancelledConsoleEntry(addConsole, params) +} + export interface WorkflowExecutionOptions { workflowId?: string workflowInput?: any @@ -436,7 +592,7 @@ export async function executeWorkflowWithFullLogging( } const executionId = options.executionId || uuidv4() - const { addConsole, updateConsole } = useTerminalConsoleStore.getState() + const { addConsole, updateConsole, cancelRunningEntries } = useTerminalConsoleStore.getState() const { setActiveBlocks, setBlockRunStatus, setEdgeRunStatus, setCurrentExecutionId } = useExecutionStore.getState() const wfId = targetWorkflowId @@ -445,6 +601,7 @@ export async function executeWorkflowWithFullLogging( const activeBlocksSet = new Set() const activeBlockRefCounts = new Map() const executionIdRef = { current: executionId } + const accumulatedBlockLogs: BlockLog[] = [] const blockHandlers = createBlockEventHandlers( { @@ -453,7 +610,7 @@ export async function executeWorkflowWithFullLogging( workflowEdges, activeBlocksSet, activeBlockRefCounts, - accumulatedBlockLogs: [], + accumulatedBlockLogs, accumulatedBlockStates: new Map(), executedBlockIds: new Set(), consoleMode: 'update', @@ -490,16 +647,26 @@ export async function executeWorkflowWithFullLogging( if (!response.ok) { const error = await response.json() - throw new Error(error.error || 'Workflow execution failed') + const errorMessage = error.error || 'Workflow execution failed' + addHttpErrorConsoleEntry(addConsole, { + workflowId: wfId, + executionId, + error: errorMessage, + httpStatus: response.status, + }) + throw new Error(errorMessage) } if (!response.body) { throw new Error('No response body') } - const reader = response.body.getReader() - const decoder = new TextDecoder() - let buffer = '' + const serverExecutionId = response.headers.get('X-Execution-Id') + if (serverExecutionId) { + executionIdRef.current = serverExecutionId + setCurrentExecutionId(wfId, serverExecutionId) + } + let executionResult: ExecutionResult = { success: false, output: {}, @@ -507,89 +674,67 @@ export async function executeWorkflowWithFullLogging( } try { - while (true) { - const { done, value } = await reader.read() - if (done) break - - buffer += decoder.decode(value, { stream: true }) - const lines = buffer.split('\n\n') - buffer = lines.pop() || '' - - for (const line of lines) { - if (!line.trim() || !line.startsWith('data: ')) continue - - const data = line.substring(6).trim() - if (data === '[DONE]') continue - - let event: any - try { - event = JSON.parse(data) - } catch { - continue - } - - switch (event.type) { - case 'execution:started': { - setCurrentExecutionId(wfId, event.executionId) - executionIdRef.current = event.executionId || executionId - break + await processSSEStream( + response.body.getReader(), + { + onExecutionStarted: (data) => { + logger.info('Execution started', { startTime: data.startTime }) + }, + + onBlockStarted: blockHandlers.onBlockStarted, + onBlockCompleted: blockHandlers.onBlockCompleted, + onBlockError: blockHandlers.onBlockError, + onBlockChildWorkflowStarted: blockHandlers.onBlockChildWorkflowStarted, + + onExecutionCompleted: (data) => { + setCurrentExecutionId(wfId, null) + executionResult = { + success: data.success, + output: data.output, + logs: accumulatedBlockLogs, + metadata: { + duration: data.duration, + startTime: data.startTime, + endTime: data.endTime, + }, + } + }, + + onExecutionCancelled: () => { + setCurrentExecutionId(wfId, null) + executionResult = { + success: false, + output: {}, + error: 'Execution was cancelled', + logs: accumulatedBlockLogs, + } + }, + + onExecutionError: (data) => { + setCurrentExecutionId(wfId, null) + const errorMessage = data.error || 'Execution failed' + executionResult = { + success: false, + output: {}, + error: errorMessage, + logs: accumulatedBlockLogs, + metadata: { duration: data.duration }, } - case 'block:started': - blockHandlers.onBlockStarted(event.data) - break - - case 'block:completed': - blockHandlers.onBlockCompleted(event.data) - break - - case 'block:error': - blockHandlers.onBlockError(event.data) - break - - case 'block:childWorkflowStarted': - blockHandlers.onBlockChildWorkflowStarted(event.data) - break - - case 'execution:completed': - setCurrentExecutionId(wfId, null) - executionResult = { - success: event.data.success, - output: event.data.output, - logs: [], - metadata: { - duration: event.data.duration, - startTime: event.data.startTime, - endTime: event.data.endTime, - }, - } - break - - case 'execution:cancelled': - setCurrentExecutionId(wfId, null) - executionResult = { - success: false, - output: {}, - error: 'Execution was cancelled', - logs: [], - } - break - - case 'execution:error': - setCurrentExecutionId(wfId, null) - executionResult = { - success: false, - output: {}, - error: event.data.error || 'Execution failed', - logs: [], - } - break - } - } - } + handleExecutionErrorConsole(addConsole, cancelRunningEntries, { + workflowId: wfId, + executionId: executionIdRef.current, + error: errorMessage, + durationMs: data.duration || 0, + blockLogs: accumulatedBlockLogs, + isPreExecutionError: accumulatedBlockLogs.length === 0, + }) + }, + }, + 'CopilotExecution' + ) } finally { setCurrentExecutionId(wfId, null) - reader.releaseLock() setActiveBlocks(wfId, new Set()) } diff --git a/apps/sim/hooks/use-execution-stream.ts b/apps/sim/hooks/use-execution-stream.ts index 12a7dc8cabf..36fd801db63 100644 --- a/apps/sim/hooks/use-execution-stream.ts +++ b/apps/sim/hooks/use-execution-stream.ts @@ -31,8 +31,9 @@ function isClientDisconnectError(error: any): boolean { /** * Processes SSE events from a response body and invokes appropriate callbacks. + * Exported for use by standalone (non-hook) execution paths like executeWorkflowWithFullLogging. */ -async function processSSEStream( +export async function processSSEStream( reader: ReadableStreamDefaultReader, callbacks: ExecutionStreamCallbacks, logPrefix: string @@ -198,6 +199,7 @@ export function useExecutionStream() { if (errorResponse && typeof errorResponse === 'object') { Object.assign(error, { executionResult: errorResponse }) } + Object.assign(error, { httpStatus: response.status }) throw error } @@ -267,12 +269,15 @@ export function useExecutionStream() { try { errorResponse = await response.json() } catch { - throw new Error(`Server error (${response.status}): ${response.statusText}`) + const error = new Error(`Server error (${response.status}): ${response.statusText}`) + Object.assign(error, { httpStatus: response.status }) + throw error } const error = new Error(errorResponse.error || 'Failed to start execution') if (errorResponse && typeof errorResponse === 'object') { Object.assign(error, { executionResult: errorResponse }) } + Object.assign(error, { httpStatus: response.status }) throw error }