Commit d3660f90 authored by takatost's avatar takatost

add updated_at to sync workflow api

parent 33e0499b
...@@ -14,7 +14,7 @@ from controllers.console.setup import setup_required ...@@ -14,7 +14,7 @@ from controllers.console.setup import setup_required
from controllers.console.wraps import account_initialization_required from controllers.console.wraps import account_initialization_required
from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.app_invoke_entities import InvokeFrom
from fields.workflow_fields import workflow_fields from fields.workflow_fields import workflow_fields
from libs.helper import uuid_value from libs.helper import TimestampField, uuid_value
from libs.login import current_user, login_required from libs.login import current_user, login_required
from models.model import App, AppMode from models.model import App, AppMode
from services.workflow_service import WorkflowService from services.workflow_service import WorkflowService
...@@ -56,7 +56,7 @@ class DraftWorkflowApi(Resource): ...@@ -56,7 +56,7 @@ class DraftWorkflowApi(Resource):
args = parser.parse_args() args = parser.parse_args()
workflow_service = WorkflowService() workflow_service = WorkflowService()
workflow_service.sync_draft_workflow( workflow = workflow_service.sync_draft_workflow(
app_model=app_model, app_model=app_model,
graph=args.get('graph'), graph=args.get('graph'),
features=args.get('features'), features=args.get('features'),
...@@ -64,7 +64,8 @@ class DraftWorkflowApi(Resource): ...@@ -64,7 +64,8 @@ class DraftWorkflowApi(Resource):
) )
return { return {
"result": "success" "result": "success",
"updated_at": TimestampField().format(workflow.updated_at)
} }
......
...@@ -82,7 +82,7 @@ class AdvancedChatAppRunner(AppRunner): ...@@ -82,7 +82,7 @@ class AdvancedChatAppRunner(AppRunner):
# RUN WORKFLOW # RUN WORKFLOW
workflow_engine_manager = WorkflowEngineManager() workflow_engine_manager = WorkflowEngineManager()
result_generator = workflow_engine_manager.run_workflow( workflow_engine_manager.run_workflow(
app_model=app_record, app_model=app_record,
workflow=workflow, workflow=workflow,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING triggered_from=WorkflowRunTriggeredFrom.DEBUGGING
...@@ -97,11 +97,6 @@ class AdvancedChatAppRunner(AppRunner): ...@@ -97,11 +97,6 @@ class AdvancedChatAppRunner(AppRunner):
callbacks=[WorkflowEventTriggerCallback(queue_manager=queue_manager)] callbacks=[WorkflowEventTriggerCallback(queue_manager=queue_manager)]
) )
for result in result_generator:
# todo handle workflow and node event
pass
def handle_input_moderation(self, queue_manager: AppQueueManager, def handle_input_moderation(self, queue_manager: AppQueueManager,
app_record: App, app_record: App,
app_generate_entity: AdvancedChatAppGenerateEntity, app_generate_entity: AdvancedChatAppGenerateEntity,
......
from abc import ABC from abc import ABC
from typing import Optional
from pydantic import BaseModel from pydantic import BaseModel
class BaseNodeData(ABC, BaseModel): class BaseNodeData(ABC, BaseModel):
pass type: str
title: str
desc: Optional[str] = None
from decimal import Decimal
from core.workflow.entities.variable_pool import VariablePool
from models.workflow import WorkflowNodeExecution, WorkflowRun
class WorkflowRunState:
workflow_run: WorkflowRun
start_at: float
variable_pool: VariablePool
total_tokens: int = 0
total_price: Decimal = Decimal(0)
currency: str = "USD"
workflow_node_executions: list[WorkflowNodeExecution] = []
from abc import abstractmethod from abc import abstractmethod
from typing import Optional from typing import Optional
from core.workflow.callbacks.base_callback import BaseWorkflowCallback
from core.workflow.entities.base_node_data_entities import BaseNodeData from core.workflow.entities.base_node_data_entities import BaseNodeData
from core.workflow.entities.node_entities import NodeType from core.workflow.entities.node_entities import NodeType
from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.variable_pool import VariablePool
class BaseNode: class BaseNode:
_node_type: NodeType
_node_data_cls: type[BaseNodeData] _node_data_cls: type[BaseNodeData]
_node_type: NodeType
node_id: str
node_data: BaseNodeData
def __init__(self, config: dict) -> None: def __init__(self, config: dict) -> None:
self._node_id = config.get("id") self.node_id = config.get("id")
if not self._node_id: if not self.node_id:
raise ValueError("Node ID is required.") raise ValueError("Node ID is required.")
self._node_data = self._node_data_cls(**config.get("data", {})) self.node_data = self._node_data_cls(**config.get("data", {}))
@abstractmethod @abstractmethod
def _run(self, variable_pool: Optional[VariablePool] = None, def _run(self, variable_pool: Optional[VariablePool] = None,
...@@ -29,11 +33,13 @@ class BaseNode: ...@@ -29,11 +33,13 @@ class BaseNode:
raise NotImplementedError raise NotImplementedError
def run(self, variable_pool: Optional[VariablePool] = None, def run(self, variable_pool: Optional[VariablePool] = None,
run_args: Optional[dict] = None) -> dict: run_args: Optional[dict] = None,
callbacks: list[BaseWorkflowCallback] = None) -> dict:
""" """
Run node entry Run node entry
:param variable_pool: variable pool :param variable_pool: variable pool
:param run_args: run args :param run_args: run args
:param callbacks: callbacks
:return: :return:
""" """
if variable_pool is None and run_args is None: if variable_pool is None and run_args is None:
...@@ -52,3 +58,11 @@ class BaseNode: ...@@ -52,3 +58,11 @@ class BaseNode:
:return: :return:
""" """
return {} return {}
@property
def node_type(self) -> NodeType:
"""
Get node type
:return:
"""
return self._node_type
from typing import Optional
from core.app.app_config.entities import VariableEntity from core.app.app_config.entities import VariableEntity
from core.workflow.entities.base_node_data_entities import BaseNodeData from core.workflow.entities.base_node_data_entities import BaseNodeData
from core.workflow.entities.node_entities import NodeType from core.workflow.entities.node_entities import NodeType
...@@ -22,6 +20,4 @@ class StartNodeData(BaseNodeData): ...@@ -22,6 +20,4 @@ class StartNodeData(BaseNodeData):
""" """
type: str = NodeType.START.value type: str = NodeType.START.value
title: str
desc: Optional[str] = None
variables: list[VariableEntity] = [] variables: list[VariableEntity] = []
...@@ -7,8 +7,8 @@ from core.workflow.nodes.start.entities import StartNodeData ...@@ -7,8 +7,8 @@ from core.workflow.nodes.start.entities import StartNodeData
class StartNode(BaseNode): class StartNode(BaseNode):
_node_type = NodeType.START
_node_data_cls = StartNodeData _node_data_cls = StartNodeData
node_type = NodeType.START
def _run(self, variable_pool: Optional[VariablePool] = None, def _run(self, variable_pool: Optional[VariablePool] = None,
run_args: Optional[dict] = None) -> dict: run_args: Optional[dict] = None) -> dict:
......
import json import json
from collections.abc import Generator import time
from typing import Optional, Union from typing import Optional, Union
from core.workflow.callbacks.base_callback import BaseWorkflowCallback from core.workflow.callbacks.base_callback import BaseWorkflowCallback
from core.workflow.entities.node_entities import NodeType from core.workflow.entities.node_entities import NodeType
from core.workflow.entities.variable_pool import VariablePool
from core.workflow.entities.workflow_entities import WorkflowRunState
from core.workflow.nodes.base_node import BaseNode
from core.workflow.nodes.code.code_node import CodeNode from core.workflow.nodes.code.code_node import CodeNode
from core.workflow.nodes.direct_answer.direct_answer_node import DirectAnswerNode from core.workflow.nodes.direct_answer.direct_answer_node import DirectAnswerNode
from core.workflow.nodes.end.end_node import EndNode from core.workflow.nodes.end.end_node import EndNode
...@@ -19,7 +22,16 @@ from core.workflow.nodes.variable_assigner.variable_assigner_node import Variabl ...@@ -19,7 +22,16 @@ from core.workflow.nodes.variable_assigner.variable_assigner_node import Variabl
from extensions.ext_database import db from extensions.ext_database import db
from models.account import Account from models.account import Account
from models.model import App, EndUser from models.model import App, EndUser
from models.workflow import CreatedByRole, Workflow, WorkflowRun, WorkflowRunStatus, WorkflowRunTriggeredFrom from models.workflow import (
CreatedByRole,
Workflow,
WorkflowNodeExecution,
WorkflowNodeExecutionStatus,
WorkflowNodeExecutionTriggeredFrom,
WorkflowRun,
WorkflowRunStatus,
WorkflowRunTriggeredFrom,
)
node_classes = { node_classes = {
NodeType.START: StartNode, NodeType.START: StartNode,
...@@ -114,7 +126,7 @@ class WorkflowEngineManager: ...@@ -114,7 +126,7 @@ class WorkflowEngineManager:
user: Union[Account, EndUser], user: Union[Account, EndUser],
user_inputs: dict, user_inputs: dict,
system_inputs: Optional[dict] = None, system_inputs: Optional[dict] = None,
callbacks: list[BaseWorkflowCallback] = None) -> Generator: callbacks: list[BaseWorkflowCallback] = None) -> None:
""" """
Run workflow Run workflow
:param app_model: App instance :param app_model: App instance
...@@ -140,11 +152,66 @@ class WorkflowEngineManager: ...@@ -140,11 +152,66 @@ class WorkflowEngineManager:
system_inputs=system_inputs system_inputs=system_inputs
) )
# init workflow run state
workflow_run_state = WorkflowRunState(
workflow_run=workflow_run,
start_at=time.perf_counter(),
variable_pool=VariablePool(
system_variables=system_inputs,
)
)
if callbacks: if callbacks:
for callback in callbacks: for callback in callbacks:
callback.on_workflow_run_started(workflow_run) callback.on_workflow_run_started(workflow_run)
pass # fetch start node
start_node = self._get_entry_node(graph)
if not start_node:
self._workflow_run_failed(
workflow_run_state=workflow_run_state,
error='Start node not found in workflow graph',
callbacks=callbacks
)
return
try:
predecessor_node = None
current_node = start_node
while True:
# run workflow
self._run_workflow_node(
workflow_run_state=workflow_run_state,
node=current_node,
predecessor_node=predecessor_node,
callbacks=callbacks
)
if current_node.node_type == NodeType.END:
break
# todo fetch next node until end node finished or no next node
current_node = None
if not current_node:
break
predecessor_node = current_node
# or max steps 30 reached
# or max execution time 10min reached
except Exception as e:
self._workflow_run_failed(
workflow_run_state=workflow_run_state,
error=str(e),
callbacks=callbacks
)
return
# workflow run success
self._workflow_run_success(
workflow_run_state=workflow_run_state,
callbacks=callbacks
)
def _init_workflow_run(self, workflow: Workflow, def _init_workflow_run(self, workflow: Workflow,
triggered_from: WorkflowRunTriggeredFrom, triggered_from: WorkflowRunTriggeredFrom,
...@@ -184,7 +251,7 @@ class WorkflowEngineManager: ...@@ -184,7 +251,7 @@ class WorkflowEngineManager:
status=WorkflowRunStatus.RUNNING.value, status=WorkflowRunStatus.RUNNING.value,
created_by_role=(CreatedByRole.ACCOUNT.value created_by_role=(CreatedByRole.ACCOUNT.value
if isinstance(user, Account) else CreatedByRole.END_USER.value), if isinstance(user, Account) else CreatedByRole.END_USER.value),
created_by_id=user.id created_by=user.id
) )
db.session.add(workflow_run) db.session.add(workflow_run)
...@@ -195,6 +262,33 @@ class WorkflowEngineManager: ...@@ -195,6 +262,33 @@ class WorkflowEngineManager:
return workflow_run return workflow_run
def _workflow_run_failed(self, workflow_run_state: WorkflowRunState,
error: str,
callbacks: list[BaseWorkflowCallback] = None) -> WorkflowRun:
"""
Workflow run failed
:param workflow_run_state: workflow run state
:param error: error message
:param callbacks: workflow callbacks
:return:
"""
workflow_run = workflow_run_state.workflow_run
workflow_run.status = WorkflowRunStatus.FAILED.value
workflow_run.error = error
workflow_run.elapsed_time = time.perf_counter() - workflow_run_state.start_at
workflow_run.total_tokens = workflow_run_state.total_tokens
workflow_run.total_price = workflow_run_state.total_price
workflow_run.currency = workflow_run_state.currency
workflow_run.total_steps = len(workflow_run_state.workflow_node_executions)
db.session.commit()
if callbacks:
for callback in callbacks:
callback.on_workflow_run_finished(workflow_run)
return workflow_run
def _get_entry_node(self, graph: dict) -> Optional[StartNode]: def _get_entry_node(self, graph: dict) -> Optional[StartNode]:
""" """
Get entry node Get entry node
...@@ -210,3 +304,83 @@ class WorkflowEngineManager: ...@@ -210,3 +304,83 @@ class WorkflowEngineManager:
return StartNode(config=node_config) return StartNode(config=node_config)
return None return None
def _run_workflow_node(self, workflow_run_state: WorkflowRunState,
node: BaseNode,
predecessor_node: Optional[BaseNode] = None,
callbacks: list[BaseWorkflowCallback] = None) -> WorkflowNodeExecution:
# init workflow node execution
start_at = time.perf_counter()
workflow_node_execution = self._init_node_execution_from_workflow_run(
workflow_run_state=workflow_run_state,
node=node,
predecessor_node=predecessor_node,
)
# add to workflow node executions
workflow_run_state.workflow_node_executions.append(workflow_node_execution)
try:
# run node, result must have inputs, process_data, outputs, execution_metadata
node_run_result = node.run(
variable_pool=workflow_run_state.variable_pool,
callbacks=callbacks
)
except Exception as e:
# node run failed
self._workflow_node_execution_failed(
workflow_node_execution=workflow_node_execution,
error=str(e),
callbacks=callbacks
)
raise
# node run success
self._workflow_node_execution_success(
workflow_node_execution=workflow_node_execution,
result=node_run_result,
callbacks=callbacks
)
return workflow_node_execution
def _init_node_execution_from_workflow_run(self, workflow_run_state: WorkflowRunState,
node: BaseNode,
predecessor_node: Optional[BaseNode] = None,
callbacks: list[BaseWorkflowCallback] = None) -> WorkflowNodeExecution:
"""
Init workflow node execution from workflow run
:param workflow_run_state: workflow run state
:param node: current node
:param predecessor_node: predecessor node if exists
:param callbacks: workflow callbacks
:return:
"""
workflow_run = workflow_run_state.workflow_run
# init workflow node execution
workflow_node_execution = WorkflowNodeExecution(
tenant_id=workflow_run.tenant_id,
app_id=workflow_run.app_id,
workflow_id=workflow_run.workflow_id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value,
workflow_run_id=workflow_run.id,
predecessor_node_id=predecessor_node.node_id if predecessor_node else None,
index=len(workflow_run_state.workflow_node_executions) + 1,
node_id=node.node_id,
node_type=node.node_type.value,
title=node.node_data.title,
type=node.node_type.value,
status=WorkflowNodeExecutionStatus.RUNNING.value,
created_by_role=workflow_run.created_by_role,
created_by=workflow_run.created_by
)
db.session.add(workflow_node_execution)
db.session.commit()
if callbacks:
for callback in callbacks:
callback.on_workflow_node_execute_started(workflow_node_execution)
return workflow_node_execution
...@@ -15,7 +15,7 @@ def run(script): ...@@ -15,7 +15,7 @@ def run(script):
class TimestampField(fields.Raw): class TimestampField(fields.Raw):
def format(self, value): def format(self, value) -> int:
return int(value.timestamp()) return int(value.timestamp())
......
...@@ -31,14 +31,16 @@ if $api_modified; then ...@@ -31,14 +31,16 @@ if $api_modified; then
pip install ruff pip install ruff
fi fi
ruff check ./api ruff check ./api || status=$?
result=$?
if [ $result -ne 0 ]; then status=${status:-0}
if [ $status -ne 0 ]; then
echo "Ruff linter on api module error, exit code: $status"
echo "Please run 'dev/reformat' to fix the fixable linting errors." echo "Please run 'dev/reformat' to fix the fixable linting errors."
exit 1
fi fi
exit $result
fi fi
if $web_modified; then if $web_modified; then
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment