Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -348,13 +348,27 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
workspaceId: workflow.workspaceId,
})

let cachedWorkflowData: {
blocks: Record<string, any>
edges: any[]
loops: Record<string, any>
parallels: Record<string, any>
} | null = null

let processedInput = input
try {
const workflowData = shouldUseDraftState
? await loadWorkflowFromNormalizedTables(workflowId)
: await loadDeployedWorkflowState(workflowId)

if (workflowData) {
cachedWorkflowData = {
blocks: workflowData.blocks,
edges: workflowData.edges,
loops: workflowData.loops || {},
parallels: workflowData.parallels || {},
}

const serializedWorkflow = new Serializer().serializeWorkflow(
workflowData.blocks,
workflowData.edges,
Expand Down Expand Up @@ -402,6 +416,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
)
}

const effectiveWorkflowStateOverride = workflowStateOverride || cachedWorkflowData || undefined

if (!enableSSE) {
logger.info(`[${requestId}] Using non-SSE execution (direct JSON response)`)
try {
Expand All @@ -414,7 +430,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
triggerType,
useDraftState: shouldUseDraftState,
startTime: new Date().toISOString(),
workflowStateOverride,
workflowStateOverride: effectiveWorkflowStateOverride,
}

const snapshot = new ExecutionSnapshot(
Expand Down Expand Up @@ -479,8 +495,11 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
logger.info(`[${requestId}] Using SSE console log streaming (manual execution)`)
} else {
logger.info(`[${requestId}] Using streaming API response`)
const deployedData = await loadDeployedWorkflowState(workflowId)
const resolvedSelectedOutputs = resolveOutputIds(selectedOutputs, deployedData?.blocks || {})

const resolvedSelectedOutputs = resolveOutputIds(
selectedOutputs,
cachedWorkflowData?.blocks || {}
)
const stream = await createStreamingResponse({
requestId,
workflow: {
Expand Down Expand Up @@ -677,7 +696,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
triggerType,
useDraftState: shouldUseDraftState,
startTime: new Date().toISOString(),
workflowStateOverride,
workflowStateOverride: effectiveWorkflowStateOverride,
}

const snapshot = new ExecutionSnapshot(
Expand Down
22 changes: 12 additions & 10 deletions apps/sim/lib/environment/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,18 @@ export async function getPersonalAndWorkspaceEnv(
const workspaceEncrypted: Record<string, string> = (workspaceRows[0]?.variables as any) || {}

const decryptAll = async (src: Record<string, string>) => {
const out: Record<string, string> = {}
for (const [k, v] of Object.entries(src)) {
try {
const { decrypted } = await decryptSecret(v)
out[k] = decrypted
} catch {
out[k] = ''
}
}
return out
const entries = Object.entries(src)
const results = await Promise.all(
entries.map(async ([k, v]) => {
try {
const { decrypted } = await decryptSecret(v)
return [k, decrypted] as const
} catch {
return [k, ''] as const
}
})
)
return Object.fromEntries(results)
}

const [personalDecrypted, workspaceDecrypted] = await Promise.all([
Expand Down
43 changes: 17 additions & 26 deletions apps/sim/lib/workflows/executor/execution-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
import { createLogger } from '@/lib/logs/console/logger'
import type { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { decryptSecret } from '@/lib/utils'
import {
loadDeployedWorkflowState,
loadWorkflowFromNormalizedTables,
Expand Down Expand Up @@ -153,38 +152,37 @@ export async function executeWorkflowCore(
// Merge block states
const mergedStates = mergeSubblockState(blocks)

// Get and decrypt environment variables
const { personalEncrypted, workspaceEncrypted } = await getPersonalAndWorkspaceEnv(
userId,
providedWorkspaceId
)
const { personalEncrypted, workspaceEncrypted, personalDecrypted, workspaceDecrypted } =
await getPersonalAndWorkspaceEnv(userId, providedWorkspaceId)

// Use encrypted values for logging (don't log decrypted secrets)
const variables = EnvVarsSchema.parse({ ...personalEncrypted, ...workspaceEncrypted })

// Use already-decrypted values for execution (no redundant decryption)
const decryptedEnvVars: Record<string, string> = { ...personalDecrypted, ...workspaceDecrypted }

await loggingSession.safeStart({
userId,
workspaceId: providedWorkspaceId,
variables,
skipLogCreation, // Skip if resuming an existing execution
})

// Process block states with env var substitution
const currentBlockStates = await Object.entries(mergedStates).reduce(
async (accPromise, [id, block]) => {
const acc = await accPromise
acc[id] = await Object.entries(block.subBlocks).reduce(
async (subAccPromise, [key, subBlock]) => {
const subAcc = await subAccPromise
// Process block states with env var substitution using pre-decrypted values
const currentBlockStates = Object.entries(mergedStates).reduce(
(acc, [id, block]) => {
acc[id] = Object.entries(block.subBlocks).reduce(
(subAcc, [key, subBlock]) => {
let value = subBlock.value

if (typeof value === 'string' && value.includes('{{') && value.includes('}}')) {
const matches = value.match(/{{([^}]+)}}/g)
if (matches) {
for (const match of matches) {
const varName = match.slice(2, -2)
const encryptedValue = variables[varName]
if (encryptedValue) {
const { decrypted } = await decryptSecret(encryptedValue)
value = (value as string).replace(match, decrypted)
const decryptedValue = decryptedEnvVars[varName]
if (decryptedValue !== undefined) {
value = (value as string).replace(match, decryptedValue)
}
}
}
Expand All @@ -193,20 +191,13 @@ export async function executeWorkflowCore(
subAcc[key] = value
return subAcc
},
Promise.resolve({} as Record<string, any>)
{} as Record<string, any>
)
return acc
},
Promise.resolve({} as Record<string, Record<string, any>>)
{} as Record<string, Record<string, any>>
)

// Decrypt all env vars
const decryptedEnvVars: Record<string, string> = {}
for (const [key, encryptedValue] of Object.entries(variables)) {
const { decrypted } = await decryptSecret(encryptedValue)
decryptedEnvVars[key] = decrypted
}

// Process response format
const processedBlockStates = Object.entries(currentBlockStates).reduce(
(acc, [blockId, blockState]) => {
Expand Down