Spaces:
Build error
Build error
import { useCallback } from 'react' | |
import { | |
getIncomers, | |
useReactFlow, | |
useStoreApi, | |
} from 'reactflow' | |
import produce from 'immer' | |
import { v4 as uuidV4 } from 'uuid' | |
import { usePathname } from 'next/navigation' | |
import { useWorkflowStore } from '../store' | |
import { useNodesSyncDraft } from '../hooks' | |
import type { Node } from '../types' | |
import { | |
NodeRunningStatus, | |
WorkflowRunningStatus, | |
} from '../types' | |
import { DEFAULT_ITER_TIMES } from '../constants' | |
import { useWorkflowUpdate } from './use-workflow-interactions' | |
import { useStore as useAppStore } from '@/app/components/app/store' | |
import type { IOtherOptions } from '@/service/base' | |
import { ssePost } from '@/service/base' | |
import { | |
fetchPublishedWorkflow, | |
stopWorkflowRun, | |
} from '@/service/workflow' | |
import { useFeaturesStore } from '@/app/components/base/features/hooks' | |
import { AudioPlayerManager } from '@/app/components/base/audio-btn/audio.player.manager' | |
import { | |
getProcessedFilesFromResponse, | |
} from '@/app/components/base/file-uploader/utils' | |
export const useWorkflowRun = () => { | |
const store = useStoreApi() | |
const workflowStore = useWorkflowStore() | |
const reactflow = useReactFlow() | |
const featuresStore = useFeaturesStore() | |
const { doSyncWorkflowDraft } = useNodesSyncDraft() | |
const { handleUpdateWorkflowCanvas } = useWorkflowUpdate() | |
const pathname = usePathname() | |
const handleBackupDraft = useCallback(() => { | |
const { | |
getNodes, | |
edges, | |
} = store.getState() | |
const { getViewport } = reactflow | |
const { | |
backupDraft, | |
setBackupDraft, | |
environmentVariables, | |
} = workflowStore.getState() | |
const { features } = featuresStore!.getState() | |
if (!backupDraft) { | |
setBackupDraft({ | |
nodes: getNodes(), | |
edges, | |
viewport: getViewport(), | |
features, | |
environmentVariables, | |
}) | |
doSyncWorkflowDraft() | |
} | |
}, [reactflow, workflowStore, store, featuresStore, doSyncWorkflowDraft]) | |
const handleLoadBackupDraft = useCallback(() => { | |
const { | |
backupDraft, | |
setBackupDraft, | |
setEnvironmentVariables, | |
} = workflowStore.getState() | |
if (backupDraft) { | |
const { | |
nodes, | |
edges, | |
viewport, | |
features, | |
environmentVariables, | |
} = backupDraft | |
handleUpdateWorkflowCanvas({ | |
nodes, | |
edges, | |
viewport, | |
}) | |
setEnvironmentVariables(environmentVariables) | |
featuresStore!.setState({ features }) | |
setBackupDraft(undefined) | |
} | |
}, [handleUpdateWorkflowCanvas, workflowStore, featuresStore]) | |
const handleRun = useCallback(async ( | |
params: any, | |
callback?: IOtherOptions, | |
) => { | |
const { | |
getNodes, | |
setNodes, | |
} = store.getState() | |
const newNodes = produce(getNodes(), (draft) => { | |
draft.forEach((node) => { | |
node.data.selected = false | |
node.data._runningStatus = undefined | |
}) | |
}) | |
setNodes(newNodes) | |
await doSyncWorkflowDraft() | |
const { | |
onWorkflowStarted, | |
onWorkflowFinished, | |
onNodeStarted, | |
onNodeFinished, | |
onIterationStart, | |
onIterationNext, | |
onIterationFinish, | |
onError, | |
...restCallback | |
} = callback || {} | |
workflowStore.setState({ historyWorkflowData: undefined }) | |
const appDetail = useAppStore.getState().appDetail | |
const workflowContainer = document.getElementById('workflow-container') | |
const { | |
clientWidth, | |
clientHeight, | |
} = workflowContainer! | |
let url = '' | |
if (appDetail?.mode === 'advanced-chat') | |
url = `/apps/${appDetail.id}/advanced-chat/workflows/draft/run` | |
if (appDetail?.mode === 'workflow') | |
url = `/apps/${appDetail.id}/workflows/draft/run` | |
let prevNodeId = '' | |
const { | |
setWorkflowRunningData, | |
} = workflowStore.getState() | |
setWorkflowRunningData({ | |
result: { | |
status: WorkflowRunningStatus.Running, | |
}, | |
tracing: [], | |
resultText: '', | |
}) | |
let ttsUrl = '' | |
let ttsIsPublic = false | |
if (params.token) { | |
ttsUrl = '/text-to-audio' | |
ttsIsPublic = true | |
} | |
else if (params.appId) { | |
if (pathname.search('explore/installed') > -1) | |
ttsUrl = `/installed-apps/${params.appId}/text-to-audio` | |
else | |
ttsUrl = `/apps/${params.appId}/text-to-audio` | |
} | |
const player = AudioPlayerManager.getInstance().getAudioPlayer(ttsUrl, ttsIsPublic, uuidV4(), 'none', 'none', (_: any): any => {}) | |
ssePost( | |
url, | |
{ | |
body: params, | |
}, | |
{ | |
onWorkflowStarted: (params) => { | |
const { task_id, data } = params | |
const { | |
workflowRunningData, | |
setWorkflowRunningData, | |
setIterParallelLogMap, | |
} = workflowStore.getState() | |
const { | |
edges, | |
setEdges, | |
} = store.getState() | |
setIterParallelLogMap(new Map()) | |
setWorkflowRunningData(produce(workflowRunningData!, (draft) => { | |
draft.task_id = task_id | |
draft.result = { | |
...draft?.result, | |
...data, | |
status: WorkflowRunningStatus.Running, | |
} | |
})) | |
const newEdges = produce(edges, (draft) => { | |
draft.forEach((edge) => { | |
edge.data = { | |
...edge.data, | |
_run: false, | |
} | |
}) | |
}) | |
setEdges(newEdges) | |
if (onWorkflowStarted) | |
onWorkflowStarted(params) | |
}, | |
onWorkflowFinished: (params) => { | |
const { data } = params | |
const { | |
workflowRunningData, | |
setWorkflowRunningData, | |
} = workflowStore.getState() | |
const isStringOutput = data.outputs && Object.keys(data.outputs).length === 1 && typeof data.outputs[Object.keys(data.outputs)[0]] === 'string' | |
setWorkflowRunningData(produce(workflowRunningData!, (draft) => { | |
draft.result = { | |
...draft.result, | |
...data, | |
files: getProcessedFilesFromResponse(data.files || []), | |
} as any | |
if (isStringOutput) { | |
draft.resultTabActive = true | |
draft.resultText = data.outputs[Object.keys(data.outputs)[0]] | |
} | |
})) | |
prevNodeId = '' | |
if (onWorkflowFinished) | |
onWorkflowFinished(params) | |
}, | |
onError: (params) => { | |
const { | |
workflowRunningData, | |
setWorkflowRunningData, | |
} = workflowStore.getState() | |
setWorkflowRunningData(produce(workflowRunningData!, (draft) => { | |
draft.result = { | |
...draft.result, | |
status: WorkflowRunningStatus.Failed, | |
} | |
})) | |
if (onError) | |
onError(params) | |
}, | |
onNodeStarted: (params) => { | |
const { data } = params | |
const { | |
workflowRunningData, | |
setWorkflowRunningData, | |
iterParallelLogMap, | |
setIterParallelLogMap, | |
} = workflowStore.getState() | |
const { | |
getNodes, | |
setNodes, | |
edges, | |
setEdges, | |
transform, | |
} = store.getState() | |
const nodes = getNodes() | |
const node = nodes.find(node => node.id === data.node_id) | |
if (node?.parentId) { | |
setWorkflowRunningData(produce(workflowRunningData!, (draft) => { | |
const tracing = draft.tracing! | |
const iterations = tracing.find(trace => trace.node_id === node?.parentId) | |
const currIteration = iterations?.details![node.data.iteration_index] || iterations?.details![iterations.details!.length - 1] | |
if (!data.parallel_run_id) { | |
currIteration?.push({ | |
...data, | |
status: NodeRunningStatus.Running, | |
} as any) | |
} | |
else { | |
if (!iterParallelLogMap.has(data.parallel_run_id)) | |
iterParallelLogMap.set(data.parallel_run_id, [{ ...data, status: NodeRunningStatus.Running } as any]) | |
else | |
iterParallelLogMap.get(data.parallel_run_id)!.push({ ...data, status: NodeRunningStatus.Running } as any) | |
setIterParallelLogMap(iterParallelLogMap) | |
if (iterations) | |
iterations.details = Array.from(iterParallelLogMap.values()) | |
} | |
})) | |
} | |
else { | |
setWorkflowRunningData(produce(workflowRunningData!, (draft) => { | |
draft.tracing!.push({ | |
...data, | |
status: NodeRunningStatus.Running, | |
} as any) | |
})) | |
const { | |
setViewport, | |
} = reactflow | |
const currentNodeIndex = nodes.findIndex(node => node.id === data.node_id) | |
const currentNode = nodes[currentNodeIndex] | |
const position = currentNode.position | |
const zoom = transform[2] | |
if (!currentNode.parentId) { | |
setViewport({ | |
x: (clientWidth - 400 - currentNode.width! * zoom) / 2 - position.x * zoom, | |
y: (clientHeight - currentNode.height! * zoom) / 2 - position.y * zoom, | |
zoom: transform[2], | |
}) | |
} | |
const newNodes = produce(nodes, (draft) => { | |
draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running | |
}) | |
setNodes(newNodes) | |
const incomeNodesId = getIncomers({ id: data.node_id } as Node, newNodes, edges).filter(node => node.data._runningStatus === NodeRunningStatus.Succeeded).map(node => node.id) | |
const newEdges = produce(edges, (draft) => { | |
draft.forEach((edge) => { | |
if (edge.target === data.node_id && incomeNodesId.includes(edge.source)) | |
edge.data = { ...edge.data, _run: true } as any | |
}) | |
}) | |
setEdges(newEdges) | |
} | |
if (onNodeStarted) | |
onNodeStarted(params) | |
}, | |
onNodeFinished: (params) => { | |
const { data } = params | |
const { | |
workflowRunningData, | |
setWorkflowRunningData, | |
iterParallelLogMap, | |
setIterParallelLogMap, | |
} = workflowStore.getState() | |
const { | |
getNodes, | |
setNodes, | |
} = store.getState() | |
const nodes = getNodes() | |
const nodeParentId = nodes.find(node => node.id === data.node_id)!.parentId | |
if (nodeParentId) { | |
if (!data.execution_metadata.parallel_mode_run_id) { | |
setWorkflowRunningData(produce(workflowRunningData!, (draft) => { | |
const tracing = draft.tracing! | |
const iterations = tracing.find(trace => trace.node_id === nodeParentId) // the iteration node | |
if (iterations && iterations.details) { | |
const iterationIndex = data.execution_metadata?.iteration_index || 0 | |
if (!iterations.details[iterationIndex]) | |
iterations.details[iterationIndex] = [] | |
const currIteration = iterations.details[iterationIndex] | |
const nodeIndex = currIteration.findIndex(node => | |
node.node_id === data.node_id && ( | |
node.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id || node.parallel_id === data.execution_metadata?.parallel_id), | |
) | |
if (nodeIndex !== -1) { | |
currIteration[nodeIndex] = { | |
...currIteration[nodeIndex], | |
...data, | |
} as any | |
} | |
else { | |
currIteration.push({ | |
...data, | |
} as any) | |
} | |
} | |
})) | |
} | |
else { | |
// open parallel mode | |
setWorkflowRunningData(produce(workflowRunningData!, (draft) => { | |
const tracing = draft.tracing! | |
const iterations = tracing.find(trace => trace.node_id === nodeParentId) // the iteration node | |
if (iterations && iterations.details) { | |
const iterRunID = data.execution_metadata?.parallel_mode_run_id | |
const currIteration = iterParallelLogMap.get(iterRunID) | |
const nodeIndex = currIteration?.findIndex(node => | |
node.node_id === data.node_id && ( | |
node?.parallel_run_id === data.execution_metadata?.parallel_mode_run_id), | |
) | |
if (currIteration) { | |
if (nodeIndex !== undefined && nodeIndex !== -1) { | |
currIteration[nodeIndex] = { | |
...currIteration[nodeIndex], | |
...data, | |
} as any | |
} | |
else { | |
currIteration.push({ | |
...data, | |
} as any) | |
} | |
} | |
setIterParallelLogMap(iterParallelLogMap) | |
iterations.details = Array.from(iterParallelLogMap.values()) | |
} | |
})) | |
} | |
} | |
else { | |
setWorkflowRunningData(produce(workflowRunningData!, (draft) => { | |
const currentIndex = draft.tracing!.findIndex((trace) => { | |
if (!trace.execution_metadata?.parallel_id) | |
return trace.node_id === data.node_id | |
return trace.node_id === data.node_id && trace.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id | |
}) | |
if (currentIndex > -1 && draft.tracing) { | |
draft.tracing[currentIndex] = { | |
...(draft.tracing[currentIndex].extras | |
? { extras: draft.tracing[currentIndex].extras } | |
: {}), | |
...data, | |
} as any | |
} | |
})) | |
const newNodes = produce(nodes, (draft) => { | |
const currentNode = draft.find(node => node.id === data.node_id)! | |
currentNode.data._runningStatus = data.status as any | |
}) | |
setNodes(newNodes) | |
prevNodeId = data.node_id | |
} | |
if (onNodeFinished) | |
onNodeFinished(params) | |
}, | |
onIterationStart: (params) => { | |
const { data } = params | |
const { | |
workflowRunningData, | |
setWorkflowRunningData, | |
setIterTimes, | |
} = workflowStore.getState() | |
const { | |
getNodes, | |
setNodes, | |
edges, | |
setEdges, | |
transform, | |
} = store.getState() | |
const nodes = getNodes() | |
setIterTimes(DEFAULT_ITER_TIMES) | |
setWorkflowRunningData(produce(workflowRunningData!, (draft) => { | |
draft.tracing!.push({ | |
...data, | |
status: NodeRunningStatus.Running, | |
details: [], | |
} as any) | |
})) | |
const { | |
setViewport, | |
} = reactflow | |
const currentNodeIndex = nodes.findIndex(node => node.id === data.node_id) | |
const currentNode = nodes[currentNodeIndex] | |
const position = currentNode.position | |
const zoom = transform[2] | |
if (!currentNode.parentId) { | |
setViewport({ | |
x: (clientWidth - 400 - currentNode.width! * zoom) / 2 - position.x * zoom, | |
y: (clientHeight - currentNode.height! * zoom) / 2 - position.y * zoom, | |
zoom: transform[2], | |
}) | |
} | |
const newNodes = produce(nodes, (draft) => { | |
draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running | |
draft[currentNodeIndex].data._iterationLength = data.metadata.iterator_length | |
}) | |
setNodes(newNodes) | |
const newEdges = produce(edges, (draft) => { | |
const edge = draft.find(edge => edge.target === data.node_id && edge.source === prevNodeId) | |
if (edge) | |
edge.data = { ...edge.data, _run: true } as any | |
}) | |
setEdges(newEdges) | |
if (onIterationStart) | |
onIterationStart(params) | |
}, | |
onIterationNext: (params) => { | |
const { | |
workflowRunningData, | |
setWorkflowRunningData, | |
iterTimes, | |
setIterTimes, | |
} = workflowStore.getState() | |
const { data } = params | |
const { | |
getNodes, | |
setNodes, | |
} = store.getState() | |
setWorkflowRunningData(produce(workflowRunningData!, (draft) => { | |
const iteration = draft.tracing!.find(trace => trace.node_id === data.node_id) | |
if (iteration) { | |
if (iteration.details!.length >= iteration.metadata.iterator_length!) | |
return | |
} | |
if (!data.parallel_mode_run_id) | |
iteration?.details!.push([]) | |
})) | |
const nodes = getNodes() | |
const newNodes = produce(nodes, (draft) => { | |
const currentNode = draft.find(node => node.id === data.node_id)! | |
currentNode.data._iterationIndex = iterTimes | |
setIterTimes(iterTimes + 1) | |
}) | |
setNodes(newNodes) | |
if (onIterationNext) | |
onIterationNext(params) | |
}, | |
onIterationFinish: (params) => { | |
const { data } = params | |
const { | |
workflowRunningData, | |
setWorkflowRunningData, | |
setIterTimes, | |
} = workflowStore.getState() | |
const { | |
getNodes, | |
setNodes, | |
} = store.getState() | |
const nodes = getNodes() | |
setWorkflowRunningData(produce(workflowRunningData!, (draft) => { | |
const tracing = draft.tracing! | |
const currIterationNode = tracing.find(trace => trace.node_id === data.node_id) | |
if (currIterationNode) { | |
Object.assign(currIterationNode, { | |
...data, | |
status: NodeRunningStatus.Succeeded, | |
}) | |
} | |
})) | |
setIterTimes(DEFAULT_ITER_TIMES) | |
const newNodes = produce(nodes, (draft) => { | |
const currentNode = draft.find(node => node.id === data.node_id)! | |
currentNode.data._runningStatus = data.status | |
}) | |
setNodes(newNodes) | |
prevNodeId = data.node_id | |
if (onIterationFinish) | |
onIterationFinish(params) | |
}, | |
onParallelBranchStarted: (params) => { | |
// console.log(params, 'parallel start') | |
}, | |
onParallelBranchFinished: (params) => { | |
// console.log(params, 'finished') | |
}, | |
onTextChunk: (params) => { | |
const { data: { text } } = params | |
const { | |
workflowRunningData, | |
setWorkflowRunningData, | |
} = workflowStore.getState() | |
setWorkflowRunningData(produce(workflowRunningData!, (draft) => { | |
draft.resultTabActive = true | |
draft.resultText += text | |
})) | |
}, | |
onTextReplace: (params) => { | |
const { data: { text } } = params | |
const { | |
workflowRunningData, | |
setWorkflowRunningData, | |
} = workflowStore.getState() | |
setWorkflowRunningData(produce(workflowRunningData!, (draft) => { | |
draft.resultText = text | |
})) | |
}, | |
onTTSChunk: (messageId: string, audio: string, audioType?: string) => { | |
if (!audio || audio === '') | |
return | |
player.playAudioWithAudio(audio, true) | |
AudioPlayerManager.getInstance().resetMsgId(messageId) | |
}, | |
onTTSEnd: (messageId: string, audio: string, audioType?: string) => { | |
player.playAudioWithAudio(audio, false) | |
}, | |
...restCallback, | |
}, | |
) | |
}, [store, reactflow, workflowStore, doSyncWorkflowDraft]) | |
const handleStopRun = useCallback((taskId: string) => { | |
const appId = useAppStore.getState().appDetail?.id | |
stopWorkflowRun(`/apps/${appId}/workflow-runs/tasks/${taskId}/stop`) | |
}, []) | |
const handleRestoreFromPublishedWorkflow = useCallback(async () => { | |
const appDetail = useAppStore.getState().appDetail | |
const publishedWorkflow = await fetchPublishedWorkflow(`/apps/${appDetail?.id}/workflows/publish`) | |
if (publishedWorkflow) { | |
const nodes = publishedWorkflow.graph.nodes | |
const edges = publishedWorkflow.graph.edges | |
const viewport = publishedWorkflow.graph.viewport! | |
handleUpdateWorkflowCanvas({ | |
nodes, | |
edges, | |
viewport, | |
}) | |
featuresStore?.setState({ features: publishedWorkflow.features }) | |
workflowStore.getState().setPublishedAt(publishedWorkflow.created_at) | |
workflowStore.getState().setEnvironmentVariables(publishedWorkflow.environment_variables || []) | |
} | |
}, [featuresStore, handleUpdateWorkflowCanvas, workflowStore]) | |
return { | |
handleBackupDraft, | |
handleLoadBackupDraft, | |
handleRun, | |
handleStopRun, | |
handleRestoreFromPublishedWorkflow, | |
} | |
} | |