Commit 0c6c33f1 authored by takatost's avatar takatost

Merge branch 'feat/workflow-backend' into deploy/dev

parents 91e56dda d3660f90
...@@ -15,7 +15,7 @@ from libs.rsa import generate_key_pair ...@@ -15,7 +15,7 @@ from libs.rsa import generate_key_pair
from models.account import Tenant from models.account import Tenant
from models.dataset import Dataset, DatasetCollectionBinding, DocumentSegment from models.dataset import Dataset, DatasetCollectionBinding, DocumentSegment
from models.dataset import Document as DatasetDocument from models.dataset import Document as DatasetDocument
from models.model import Account, App, AppMode, AppModelConfig, AppAnnotationSetting, Conversation, MessageAnnotation from models.model import Account, App, AppAnnotationSetting, AppMode, Conversation, MessageAnnotation
from models.provider import Provider, ProviderModel from models.provider import Provider, ProviderModel
......
...@@ -14,7 +14,7 @@ from controllers.console.setup import setup_required ...@@ -14,7 +14,7 @@ from controllers.console.setup import setup_required
from controllers.console.wraps import account_initialization_required from controllers.console.wraps import account_initialization_required
from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.app_invoke_entities import InvokeFrom
from fields.workflow_fields import workflow_fields from fields.workflow_fields import workflow_fields
from libs.helper import uuid_value from libs.helper import TimestampField, uuid_value
from libs.login import current_user, login_required from libs.login import current_user, login_required
from models.model import App, AppMode from models.model import App, AppMode
from services.workflow_service import WorkflowService from services.workflow_service import WorkflowService
...@@ -56,7 +56,7 @@ class DraftWorkflowApi(Resource): ...@@ -56,7 +56,7 @@ class DraftWorkflowApi(Resource):
args = parser.parse_args() args = parser.parse_args()
workflow_service = WorkflowService() workflow_service = WorkflowService()
workflow_service.sync_draft_workflow( workflow = workflow_service.sync_draft_workflow(
app_model=app_model, app_model=app_model,
graph=args.get('graph'), graph=args.get('graph'),
features=args.get('features'), features=args.get('features'),
...@@ -64,7 +64,8 @@ class DraftWorkflowApi(Resource): ...@@ -64,7 +64,8 @@ class DraftWorkflowApi(Resource):
) )
return { return {
"result": "success" "result": "success",
"updated_at": TimestampField().format(workflow.updated_at)
} }
......
...@@ -112,6 +112,7 @@ class VariableEntity(BaseModel): ...@@ -112,6 +112,7 @@ class VariableEntity(BaseModel):
max_length: Optional[int] = None max_length: Optional[int] = None
options: Optional[list[str]] = None options: Optional[list[str]] = None
default: Optional[str] = None default: Optional[str] = None
hint: Optional[str] = None
class ExternalDataVariableEntity(BaseModel): class ExternalDataVariableEntity(BaseModel):
......
...@@ -10,12 +10,14 @@ from core.app.entities.app_invoke_entities import ( ...@@ -10,12 +10,14 @@ from core.app.entities.app_invoke_entities import (
InvokeFrom, InvokeFrom,
) )
from core.app.entities.queue_entities import QueueStopEvent from core.app.entities.queue_entities import QueueStopEvent
from core.callback_handler.workflow_event_trigger_callback import WorkflowEventTriggerCallback
from core.moderation.base import ModerationException from core.moderation.base import ModerationException
from core.workflow.entities.node_entities import SystemVariable from core.workflow.entities.node_entities import SystemVariable
from core.workflow.workflow_engine_manager import WorkflowEngineManager from core.workflow.workflow_engine_manager import WorkflowEngineManager
from extensions.ext_database import db from extensions.ext_database import db
from models.account import Account from models.account import Account
from models.model import App, Conversation, EndUser, Message from models.model import App, Conversation, EndUser, Message
from models.workflow import WorkflowRunTriggeredFrom
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -80,23 +82,21 @@ class AdvancedChatAppRunner(AppRunner): ...@@ -80,23 +82,21 @@ class AdvancedChatAppRunner(AppRunner):
# RUN WORKFLOW # RUN WORKFLOW
workflow_engine_manager = WorkflowEngineManager() workflow_engine_manager = WorkflowEngineManager()
result_generator = workflow_engine_manager.run_workflow( workflow_engine_manager.run_workflow(
app_model=app_record, app_model=app_record,
workflow=workflow, workflow=workflow,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING
if application_generate_entity.invoke_from == InvokeFrom.DEBUGGER else WorkflowRunTriggeredFrom.APP_RUN,
user=user, user=user,
user_inputs=inputs, user_inputs=inputs,
system_inputs={ system_inputs={
SystemVariable.QUERY: query, SystemVariable.QUERY: query,
SystemVariable.FILES: files, SystemVariable.FILES: files,
SystemVariable.CONVERSATION: conversation.id, SystemVariable.CONVERSATION: conversation.id,
} },
callbacks=[WorkflowEventTriggerCallback(queue_manager=queue_manager)]
) )
for result in result_generator:
# todo handle workflow and node event
pass
def handle_input_moderation(self, queue_manager: AppQueueManager, def handle_input_moderation(self, queue_manager: AppQueueManager,
app_record: App, app_record: App,
app_generate_entity: AdvancedChatAppGenerateEntity, app_generate_entity: AdvancedChatAppGenerateEntity,
......
import os
import sys
from typing import Any, Optional, Union
from langchain.callbacks.base import BaseCallbackHandler
from langchain.input import print_text
from langchain.schema import AgentAction, AgentFinish, BaseMessage, LLMResult
class DifyStdOutCallbackHandler(BaseCallbackHandler):
"""Callback Handler that prints to std out."""
def __init__(self, color: Optional[str] = None) -> None:
"""Initialize callback handler."""
self.color = color
def on_chat_model_start(
self,
serialized: dict[str, Any],
messages: list[list[BaseMessage]],
**kwargs: Any
) -> Any:
print_text("\n[on_chat_model_start]\n", color='blue')
for sub_messages in messages:
for sub_message in sub_messages:
print_text(str(sub_message) + "\n", color='blue')
def on_llm_start(
self, serialized: dict[str, Any], prompts: list[str], **kwargs: Any
) -> None:
"""Print out the prompts."""
print_text("\n[on_llm_start]\n", color='blue')
print_text(prompts[0] + "\n", color='blue')
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
"""Do nothing."""
print_text("\n[on_llm_end]\nOutput: " + str(response.generations[0][0].text) + "\nllm_output: " + str(
response.llm_output) + "\n", color='blue')
def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
"""Do nothing."""
pass
def on_llm_error(
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
) -> None:
"""Do nothing."""
print_text("\n[on_llm_error]\nError: " + str(error) + "\n", color='blue')
def on_chain_start(
self, serialized: dict[str, Any], inputs: dict[str, Any], **kwargs: Any
) -> None:
"""Print out that we are entering a chain."""
chain_type = serialized['id'][-1]
print_text("\n[on_chain_start]\nChain: " + chain_type + "\nInputs: " + str(inputs) + "\n", color='pink')
def on_chain_end(self, outputs: dict[str, Any], **kwargs: Any) -> None:
"""Print out that we finished a chain."""
print_text("\n[on_chain_end]\nOutputs: " + str(outputs) + "\n", color='pink')
def on_chain_error(
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
) -> None:
"""Do nothing."""
print_text("\n[on_chain_error]\nError: " + str(error) + "\n", color='pink')
def on_tool_start(
self,
serialized: dict[str, Any],
input_str: str,
**kwargs: Any,
) -> None:
"""Do nothing."""
print_text("\n[on_tool_start] " + str(serialized), color='yellow')
def on_agent_action(
self, action: AgentAction, color: Optional[str] = None, **kwargs: Any
) -> Any:
"""Run on agent action."""
tool = action.tool
tool_input = action.tool_input
try:
action_name_position = action.log.index("\nAction:") + 1 if action.log else -1
thought = action.log[:action_name_position].strip() if action.log else ''
except ValueError:
thought = ''
log = f"Thought: {thought}\nTool: {tool}\nTool Input: {tool_input}"
print_text("\n[on_agent_action]\n" + log + "\n", color='green')
def on_tool_end(
self,
output: str,
color: Optional[str] = None,
observation_prefix: Optional[str] = None,
llm_prefix: Optional[str] = None,
**kwargs: Any,
) -> None:
"""If not the final action, print out observation."""
print_text("\n[on_tool_end]\n", color='yellow')
if observation_prefix:
print_text(f"\n{observation_prefix}")
print_text(output, color='yellow')
if llm_prefix:
print_text(f"\n{llm_prefix}")
print_text("\n")
def on_tool_error(
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
) -> None:
"""Do nothing."""
print_text("\n[on_tool_error] Error: " + str(error) + "\n", color='yellow')
def on_text(
self,
text: str,
color: Optional[str] = None,
end: str = "",
**kwargs: Optional[str],
) -> None:
"""Run when agent ends."""
print_text("\n[on_text] " + text + "\n", color=color if color else self.color, end=end)
def on_agent_finish(
self, finish: AgentFinish, color: Optional[str] = None, **kwargs: Any
) -> None:
"""Run on agent end."""
print_text("[on_agent_finish] " + finish.return_values['output'] + "\n", color='green', end="\n")
@property
def ignore_llm(self) -> bool:
"""Whether to ignore LLM callbacks."""
return not os.environ.get("DEBUG") or os.environ.get("DEBUG").lower() != 'true'
@property
def ignore_chain(self) -> bool:
"""Whether to ignore chain callbacks."""
return not os.environ.get("DEBUG") or os.environ.get("DEBUG").lower() != 'true'
@property
def ignore_agent(self) -> bool:
"""Whether to ignore agent callbacks."""
return not os.environ.get("DEBUG") or os.environ.get("DEBUG").lower() != 'true'
@property
def ignore_chat_model(self) -> bool:
"""Whether to ignore chat model callbacks."""
return not os.environ.get("DEBUG") or os.environ.get("DEBUG").lower() != 'true'
class DifyStreamingStdOutCallbackHandler(DifyStdOutCallbackHandler):
"""Callback handler for streaming. Only works with LLMs that support streaming."""
def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
"""Run on new LLM token. Only available when streaming is enabled."""
sys.stdout.write(token)
sys.stdout.flush()
from core.app.app_queue_manager import AppQueueManager, PublishFrom
from core.workflow.callbacks.base_callback import BaseWorkflowCallback
from models.workflow import WorkflowNodeExecution, WorkflowRun
class WorkflowEventTriggerCallback(BaseWorkflowCallback):
def __init__(self, queue_manager: AppQueueManager):
self._queue_manager = queue_manager
def on_workflow_run_started(self, workflow_run: WorkflowRun) -> None:
"""
Workflow run started
"""
self._queue_manager.publish_workflow_started(
workflow_run_id=workflow_run.id,
pub_from=PublishFrom.TASK_PIPELINE
)
def on_workflow_run_finished(self, workflow_run: WorkflowRun) -> None:
"""
Workflow run finished
"""
self._queue_manager.publish_workflow_finished(
workflow_run_id=workflow_run.id,
pub_from=PublishFrom.TASK_PIPELINE
)
def on_workflow_node_execute_started(self, workflow_node_execution: WorkflowNodeExecution) -> None:
"""
Workflow node execute started
"""
self._queue_manager.publish_node_started(
workflow_node_execution_id=workflow_node_execution.id,
pub_from=PublishFrom.TASK_PIPELINE
)
def on_workflow_node_execute_finished(self, workflow_node_execution: WorkflowNodeExecution) -> None:
"""
Workflow node execute finished
"""
self._queue_manager.publish_node_finished(
workflow_node_execution_id=workflow_node_execution.id,
pub_from=PublishFrom.TASK_PIPELINE
)
from abc import abstractmethod
from models.workflow import WorkflowNodeExecution, WorkflowRun
class BaseWorkflowCallback:
@abstractmethod
def on_workflow_run_started(self, workflow_run: WorkflowRun) -> None:
"""
Workflow run started
"""
raise NotImplementedError
@abstractmethod
def on_workflow_run_finished(self, workflow_run: WorkflowRun) -> None:
"""
Workflow run finished
"""
raise NotImplementedError
@abstractmethod
def on_workflow_node_execute_started(self, workflow_node_execution: WorkflowNodeExecution) -> None:
"""
Workflow node execute started
"""
raise NotImplementedError
@abstractmethod
def on_workflow_node_execute_finished(self, workflow_node_execution: WorkflowNodeExecution) -> None:
"""
Workflow node execute finished
"""
raise NotImplementedError
from abc import ABC
from typing import Optional
from pydantic import BaseModel
class BaseNodeData(ABC, BaseModel):
type: str
title: str
desc: Optional[str] = None
from decimal import Decimal
from core.workflow.entities.variable_pool import VariablePool
from models.workflow import WorkflowNodeExecution, WorkflowRun
class WorkflowRunState:
workflow_run: WorkflowRun
start_at: float
variable_pool: VariablePool
total_tokens: int = 0
total_price: Decimal = Decimal(0)
currency: str = "USD"
workflow_node_executions: list[WorkflowNodeExecution] = []
from abc import abstractmethod from abc import abstractmethod
from typing import Optional from typing import Optional
from core.workflow.callbacks.base_callback import BaseWorkflowCallback
from core.workflow.entities.base_node_data_entities import BaseNodeData
from core.workflow.entities.node_entities import NodeType from core.workflow.entities.node_entities import NodeType
from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.variable_pool import VariablePool
class BaseNode: class BaseNode:
_node_data_cls: type[BaseNodeData]
_node_type: NodeType _node_type: NodeType
def __int__(self, node_config: dict) -> None: node_id: str
self._node_config = node_config node_data: BaseNodeData
def __init__(self, config: dict) -> None:
self.node_id = config.get("id")
if not self.node_id:
raise ValueError("Node ID is required.")
self.node_data = self._node_data_cls(**config.get("data", {}))
@abstractmethod @abstractmethod
def run(self, variable_pool: Optional[VariablePool] = None, def _run(self, variable_pool: Optional[VariablePool] = None,
run_args: Optional[dict] = None) -> dict: run_args: Optional[dict] = None) -> dict:
""" """
Run node Run node
:param variable_pool: variable pool :param variable_pool: variable pool
:param run_args: run args :param run_args: run args
:return: :return:
""" """
raise NotImplementedError
def run(self, variable_pool: Optional[VariablePool] = None,
run_args: Optional[dict] = None,
callbacks: list[BaseWorkflowCallback] = None) -> dict:
"""
Run node entry
:param variable_pool: variable pool
:param run_args: run args
:param callbacks: callbacks
:return:
"""
if variable_pool is None and run_args is None: if variable_pool is None and run_args is None:
raise ValueError("At least one of `variable_pool` or `run_args` must be provided.") raise ValueError("At least one of `variable_pool` or `run_args` must be provided.")
...@@ -28,17 +50,6 @@ class BaseNode: ...@@ -28,17 +50,6 @@ class BaseNode:
run_args=run_args run_args=run_args
) )
@abstractmethod
def _run(self, variable_pool: Optional[VariablePool] = None,
run_args: Optional[dict] = None) -> dict:
"""
Run node
:param variable_pool: variable pool
:param run_args: run args
:return:
"""
raise NotImplementedError
@classmethod @classmethod
def get_default_config(cls, filters: Optional[dict] = None) -> dict: def get_default_config(cls, filters: Optional[dict] = None) -> dict:
""" """
...@@ -47,3 +58,11 @@ class BaseNode: ...@@ -47,3 +58,11 @@ class BaseNode:
:return: :return:
""" """
return {} return {}
@property
def node_type(self) -> NodeType:
"""
Get node type
:return:
"""
return self._node_type
from core.app.app_config.entities import VariableEntity
from core.workflow.entities.base_node_data_entities import BaseNodeData
from core.workflow.entities.node_entities import NodeType
class StartNodeData(BaseNodeData):
"""
- title (string) 节点标题
- desc (string) optional 节点描述
- type (string) 节点类型,固定为 start
- variables (array[object]) 表单变量列表
- type (string) 表单变量类型,text-input, paragraph, select, number, files(文件暂不支持自定义)
- label (string) 控件展示标签名
- variable (string) 变量 key
- max_length (int) 最大长度,适用于 text-input 和 paragraph
- default (string) optional 默认值
- required (bool) optional是否必填,默认 false
- hint (string) optional 提示信息
- options (array[string]) 选项值(仅 select 可用)
"""
type: str = NodeType.START.value
variables: list[VariableEntity] = []
from typing import Optional
from core.workflow.entities.node_entities import NodeType
from core.workflow.entities.variable_pool import VariablePool
from core.workflow.nodes.base_node import BaseNode from core.workflow.nodes.base_node import BaseNode
from core.workflow.nodes.start.entities import StartNodeData
class StartNode(BaseNode): class StartNode(BaseNode):
pass _node_data_cls = StartNodeData
node_type = NodeType.START
def _run(self, variable_pool: Optional[VariablePool] = None,
run_args: Optional[dict] = None) -> dict:
"""
Run node
:param variable_pool: variable pool
:param run_args: run args
:return:
"""
pass
from collections.abc import Generator import json
import time
from typing import Optional, Union from typing import Optional, Union
from core.workflow.callbacks.base_callback import BaseWorkflowCallback
from core.workflow.entities.node_entities import NodeType from core.workflow.entities.node_entities import NodeType
from core.workflow.entities.variable_pool import VariablePool
from core.workflow.entities.workflow_entities import WorkflowRunState
from core.workflow.nodes.base_node import BaseNode
from core.workflow.nodes.code.code_node import CodeNode from core.workflow.nodes.code.code_node import CodeNode
from core.workflow.nodes.direct_answer.direct_answer_node import DirectAnswerNode from core.workflow.nodes.direct_answer.direct_answer_node import DirectAnswerNode
from core.workflow.nodes.end.end_node import EndNode from core.workflow.nodes.end.end_node import EndNode
...@@ -17,7 +22,16 @@ from core.workflow.nodes.variable_assigner.variable_assigner_node import Variabl ...@@ -17,7 +22,16 @@ from core.workflow.nodes.variable_assigner.variable_assigner_node import Variabl
from extensions.ext_database import db from extensions.ext_database import db
from models.account import Account from models.account import Account
from models.model import App, EndUser from models.model import App, EndUser
from models.workflow import Workflow from models.workflow import (
CreatedByRole,
Workflow,
WorkflowNodeExecution,
WorkflowNodeExecutionStatus,
WorkflowNodeExecutionTriggeredFrom,
WorkflowRun,
WorkflowRunStatus,
WorkflowRunTriggeredFrom,
)
node_classes = { node_classes = {
NodeType.START: StartNode, NodeType.START: StartNode,
...@@ -108,17 +122,265 @@ class WorkflowEngineManager: ...@@ -108,17 +122,265 @@ class WorkflowEngineManager:
def run_workflow(self, app_model: App, def run_workflow(self, app_model: App,
workflow: Workflow, workflow: Workflow,
triggered_from: WorkflowRunTriggeredFrom,
user: Union[Account, EndUser], user: Union[Account, EndUser],
user_inputs: dict, user_inputs: dict,
system_inputs: Optional[dict] = None) -> Generator: system_inputs: Optional[dict] = None,
callbacks: list[BaseWorkflowCallback] = None) -> None:
""" """
Run workflow Run workflow
:param app_model: App instance :param app_model: App instance
:param workflow: Workflow instance :param workflow: Workflow instance
:param triggered_from: triggered from
:param user: account or end user :param user: account or end user
:param user_inputs: user variables inputs :param user_inputs: user variables inputs
:param system_inputs: system inputs, like: query, files :param system_inputs: system inputs, like: query, files
:param callbacks: workflow callbacks
:return: :return:
""" """
# TODO # fetch workflow graph
pass graph = workflow.graph_dict
if not graph:
raise ValueError('workflow graph not found')
# init workflow run
workflow_run = self._init_workflow_run(
workflow=workflow,
triggered_from=triggered_from,
user=user,
user_inputs=user_inputs,
system_inputs=system_inputs
)
# init workflow run state
workflow_run_state = WorkflowRunState(
workflow_run=workflow_run,
start_at=time.perf_counter(),
variable_pool=VariablePool(
system_variables=system_inputs,
)
)
if callbacks:
for callback in callbacks:
callback.on_workflow_run_started(workflow_run)
# fetch start node
start_node = self._get_entry_node(graph)
if not start_node:
self._workflow_run_failed(
workflow_run_state=workflow_run_state,
error='Start node not found in workflow graph',
callbacks=callbacks
)
return
try:
predecessor_node = None
current_node = start_node
while True:
# run workflow
self._run_workflow_node(
workflow_run_state=workflow_run_state,
node=current_node,
predecessor_node=predecessor_node,
callbacks=callbacks
)
if current_node.node_type == NodeType.END:
break
# todo fetch next node until end node finished or no next node
current_node = None
if not current_node:
break
predecessor_node = current_node
# or max steps 30 reached
# or max execution time 10min reached
except Exception as e:
self._workflow_run_failed(
workflow_run_state=workflow_run_state,
error=str(e),
callbacks=callbacks
)
return
# workflow run success
self._workflow_run_success(
workflow_run_state=workflow_run_state,
callbacks=callbacks
)
def _init_workflow_run(self, workflow: Workflow,
triggered_from: WorkflowRunTriggeredFrom,
user: Union[Account, EndUser],
user_inputs: dict,
system_inputs: Optional[dict] = None) -> WorkflowRun:
"""
Init workflow run
:param workflow: Workflow instance
:param triggered_from: triggered from
:param user: account or end user
:param user_inputs: user variables inputs
:param system_inputs: system inputs, like: query, files
:return:
"""
try:
db.session.begin()
max_sequence = db.session.query(db.func.max(WorkflowRun.sequence_number)) \
.filter(WorkflowRun.tenant_id == workflow.tenant_id) \
.filter(WorkflowRun.app_id == workflow.app_id) \
.for_update() \
.scalar() or 0
new_sequence_number = max_sequence + 1
# init workflow run
workflow_run = WorkflowRun(
tenant_id=workflow.tenant_id,
app_id=workflow.app_id,
sequence_number=new_sequence_number,
workflow_id=workflow.id,
type=workflow.type,
triggered_from=triggered_from.value,
version=workflow.version,
graph=workflow.graph,
inputs=json.dumps({**user_inputs, **system_inputs}),
status=WorkflowRunStatus.RUNNING.value,
created_by_role=(CreatedByRole.ACCOUNT.value
if isinstance(user, Account) else CreatedByRole.END_USER.value),
created_by=user.id
)
db.session.add(workflow_run)
db.session.commit()
except:
db.session.rollback()
raise
return workflow_run
def _workflow_run_failed(self, workflow_run_state: WorkflowRunState,
error: str,
callbacks: list[BaseWorkflowCallback] = None) -> WorkflowRun:
"""
Workflow run failed
:param workflow_run_state: workflow run state
:param error: error message
:param callbacks: workflow callbacks
:return:
"""
workflow_run = workflow_run_state.workflow_run
workflow_run.status = WorkflowRunStatus.FAILED.value
workflow_run.error = error
workflow_run.elapsed_time = time.perf_counter() - workflow_run_state.start_at
workflow_run.total_tokens = workflow_run_state.total_tokens
workflow_run.total_price = workflow_run_state.total_price
workflow_run.currency = workflow_run_state.currency
workflow_run.total_steps = len(workflow_run_state.workflow_node_executions)
db.session.commit()
if callbacks:
for callback in callbacks:
callback.on_workflow_run_finished(workflow_run)
return workflow_run
def _get_entry_node(self, graph: dict) -> Optional[StartNode]:
"""
Get entry node
:param graph: workflow graph
:return:
"""
nodes = graph.get('nodes')
if not nodes:
return None
for node_config in nodes.items():
if node_config.get('type') == NodeType.START.value:
return StartNode(config=node_config)
return None
def _run_workflow_node(self, workflow_run_state: WorkflowRunState,
node: BaseNode,
predecessor_node: Optional[BaseNode] = None,
callbacks: list[BaseWorkflowCallback] = None) -> WorkflowNodeExecution:
# init workflow node execution
start_at = time.perf_counter()
workflow_node_execution = self._init_node_execution_from_workflow_run(
workflow_run_state=workflow_run_state,
node=node,
predecessor_node=predecessor_node,
)
# add to workflow node executions
workflow_run_state.workflow_node_executions.append(workflow_node_execution)
try:
# run node, result must have inputs, process_data, outputs, execution_metadata
node_run_result = node.run(
variable_pool=workflow_run_state.variable_pool,
callbacks=callbacks
)
except Exception as e:
# node run failed
self._workflow_node_execution_failed(
workflow_node_execution=workflow_node_execution,
error=str(e),
callbacks=callbacks
)
raise
# node run success
self._workflow_node_execution_success(
workflow_node_execution=workflow_node_execution,
result=node_run_result,
callbacks=callbacks
)
return workflow_node_execution
def _init_node_execution_from_workflow_run(self, workflow_run_state: WorkflowRunState,
node: BaseNode,
predecessor_node: Optional[BaseNode] = None,
callbacks: list[BaseWorkflowCallback] = None) -> WorkflowNodeExecution:
"""
Init workflow node execution from workflow run
:param workflow_run_state: workflow run state
:param node: current node
:param predecessor_node: predecessor node if exists
:param callbacks: workflow callbacks
:return:
"""
workflow_run = workflow_run_state.workflow_run
# init workflow node execution
workflow_node_execution = WorkflowNodeExecution(
tenant_id=workflow_run.tenant_id,
app_id=workflow_run.app_id,
workflow_id=workflow_run.workflow_id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value,
workflow_run_id=workflow_run.id,
predecessor_node_id=predecessor_node.node_id if predecessor_node else None,
index=len(workflow_run_state.workflow_node_executions) + 1,
node_id=node.node_id,
node_type=node.node_type.value,
title=node.node_data.title,
type=node.node_type.value,
status=WorkflowNodeExecutionStatus.RUNNING.value,
created_by_role=workflow_run.created_by_role,
created_by=workflow_run.created_by
)
db.session.add(workflow_node_execution)
db.session.commit()
if callbacks:
for callback in callbacks:
callback.on_workflow_node_execute_started(workflow_node_execution)
return workflow_node_execution
...@@ -15,7 +15,7 @@ def run(script): ...@@ -15,7 +15,7 @@ def run(script):
class TimestampField(fields.Raw): class TimestampField(fields.Raw):
def format(self, value): def format(self, value) -> int:
return int(value.timestamp()) return int(value.timestamp())
......
...@@ -24,7 +24,23 @@ done ...@@ -24,7 +24,23 @@ done
if $api_modified; then if $api_modified; then
echo "Running Ruff linter on api module" echo "Running Ruff linter on api module"
./dev/reformat
# python style checks rely on `ruff` in path
if ! command -v ruff &> /dev/null; then
echo "Installing Ruff ..."
pip install ruff
fi
ruff check ./api || status=$?
status=${status:-0}
if [ $status -ne 0 ]; then
echo "Ruff linter on api module error, exit code: $status"
echo "Please run 'dev/reformat' to fix the fixable linting errors."
exit 1
fi
fi fi
if $web_modified; then if $web_modified; then
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment