Commit 3fc932b0 authored by takatost's avatar takatost

add updated_at to sync workflow api

parent 97cdc96f
......@@ -14,7 +14,7 @@ from controllers.console.setup import setup_required
from controllers.console.wraps import account_initialization_required
from core.app.entities.app_invoke_entities import InvokeFrom
from fields.workflow_fields import workflow_fields
from libs.helper import uuid_value
from libs.helper import TimestampField, uuid_value
from libs.login import current_user, login_required
from models.model import App, AppMode
from services.workflow_service import WorkflowService
......@@ -56,7 +56,7 @@ class DraftWorkflowApi(Resource):
args = parser.parse_args()
workflow_service = WorkflowService()
workflow_service.sync_draft_workflow(
workflow = workflow_service.sync_draft_workflow(
app_model=app_model,
graph=args.get('graph'),
features=args.get('features'),
......@@ -64,7 +64,8 @@ class DraftWorkflowApi(Resource):
)
return {
"result": "success"
"result": "success",
"updated_at": TimestampField().format(workflow.updated_at)
}
......
......@@ -82,7 +82,7 @@ class AdvancedChatAppRunner(AppRunner):
# RUN WORKFLOW
workflow_engine_manager = WorkflowEngineManager()
result_generator = workflow_engine_manager.run_workflow(
workflow_engine_manager.run_workflow(
app_model=app_record,
workflow=workflow,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING
......@@ -97,11 +97,6 @@ class AdvancedChatAppRunner(AppRunner):
callbacks=[WorkflowEventTriggerCallback(queue_manager=queue_manager)]
)
for result in result_generator:
# todo handle workflow and node event
pass
def handle_input_moderation(self, queue_manager: AppQueueManager,
app_record: App,
app_generate_entity: AdvancedChatAppGenerateEntity,
......
from abc import ABC
from typing import Optional
from pydantic import BaseModel
class BaseNodeData(ABC, BaseModel):
pass
type: str
title: str
desc: Optional[str] = None
from decimal import Decimal
from core.workflow.entities.variable_pool import VariablePool
from models.workflow import WorkflowNodeExecution, WorkflowRun
class WorkflowRunState:
workflow_run: WorkflowRun
start_at: float
variable_pool: VariablePool
total_tokens: int = 0
total_price: Decimal = Decimal(0)
currency: str = "USD"
workflow_node_executions: list[WorkflowNodeExecution] = []
from abc import abstractmethod
from typing import Optional
from core.workflow.callbacks.base_callback import BaseWorkflowCallback
from core.workflow.entities.base_node_data_entities import BaseNodeData
from core.workflow.entities.node_entities import NodeType
from core.workflow.entities.variable_pool import VariablePool
class BaseNode:
_node_type: NodeType
_node_data_cls: type[BaseNodeData]
_node_type: NodeType
node_id: str
node_data: BaseNodeData
def __init__(self, config: dict) -> None:
self._node_id = config.get("id")
if not self._node_id:
self.node_id = config.get("id")
if not self.node_id:
raise ValueError("Node ID is required.")
self._node_data = self._node_data_cls(**config.get("data", {}))
self.node_data = self._node_data_cls(**config.get("data", {}))
@abstractmethod
def _run(self, variable_pool: Optional[VariablePool] = None,
......@@ -29,11 +33,13 @@ class BaseNode:
raise NotImplementedError
def run(self, variable_pool: Optional[VariablePool] = None,
run_args: Optional[dict] = None) -> dict:
run_args: Optional[dict] = None,
callbacks: list[BaseWorkflowCallback] = None) -> dict:
"""
Run node entry
:param variable_pool: variable pool
:param run_args: run args
:param callbacks: callbacks
:return:
"""
if variable_pool is None and run_args is None:
......@@ -52,3 +58,11 @@ class BaseNode:
:return:
"""
return {}
@property
def node_type(self) -> NodeType:
"""
Get node type
:return:
"""
return self._node_type
from typing import Optional
from core.app.app_config.entities import VariableEntity
from core.workflow.entities.base_node_data_entities import BaseNodeData
from core.workflow.entities.node_entities import NodeType
......@@ -22,6 +20,4 @@ class StartNodeData(BaseNodeData):
"""
type: str = NodeType.START.value
title: str
desc: Optional[str] = None
variables: list[VariableEntity] = []
......@@ -7,8 +7,8 @@ from core.workflow.nodes.start.entities import StartNodeData
class StartNode(BaseNode):
_node_type = NodeType.START
_node_data_cls = StartNodeData
node_type = NodeType.START
def _run(self, variable_pool: Optional[VariablePool] = None,
run_args: Optional[dict] = None) -> dict:
......
import json
from collections.abc import Generator
import time
from typing import Optional, Union
from core.workflow.callbacks.base_callback import BaseWorkflowCallback
from core.workflow.entities.node_entities import NodeType
from core.workflow.entities.variable_pool import VariablePool
from core.workflow.entities.workflow_entities import WorkflowRunState
from core.workflow.nodes.base_node import BaseNode
from core.workflow.nodes.code.code_node import CodeNode
from core.workflow.nodes.direct_answer.direct_answer_node import DirectAnswerNode
from core.workflow.nodes.end.end_node import EndNode
......@@ -19,7 +22,16 @@ from core.workflow.nodes.variable_assigner.variable_assigner_node import Variabl
from extensions.ext_database import db
from models.account import Account
from models.model import App, EndUser
from models.workflow import CreatedByRole, Workflow, WorkflowRun, WorkflowRunStatus, WorkflowRunTriggeredFrom
from models.workflow import (
CreatedByRole,
Workflow,
WorkflowNodeExecution,
WorkflowNodeExecutionStatus,
WorkflowNodeExecutionTriggeredFrom,
WorkflowRun,
WorkflowRunStatus,
WorkflowRunTriggeredFrom,
)
node_classes = {
NodeType.START: StartNode,
......@@ -114,7 +126,7 @@ class WorkflowEngineManager:
user: Union[Account, EndUser],
user_inputs: dict,
system_inputs: Optional[dict] = None,
callbacks: list[BaseWorkflowCallback] = None) -> Generator:
callbacks: list[BaseWorkflowCallback] = None) -> None:
"""
Run workflow
:param app_model: App instance
......@@ -140,11 +152,66 @@ class WorkflowEngineManager:
system_inputs=system_inputs
)
# init workflow run state
workflow_run_state = WorkflowRunState(
workflow_run=workflow_run,
start_at=time.perf_counter(),
variable_pool=VariablePool(
system_variables=system_inputs,
)
)
if callbacks:
for callback in callbacks:
callback.on_workflow_run_started(workflow_run)
pass
# fetch start node
start_node = self._get_entry_node(graph)
if not start_node:
self._workflow_run_failed(
workflow_run_state=workflow_run_state,
error='Start node not found in workflow graph',
callbacks=callbacks
)
return
try:
predecessor_node = None
current_node = start_node
while True:
# run workflow
self._run_workflow_node(
workflow_run_state=workflow_run_state,
node=current_node,
predecessor_node=predecessor_node,
callbacks=callbacks
)
if current_node.node_type == NodeType.END:
break
# todo fetch next node until end node finished or no next node
current_node = None
if not current_node:
break
predecessor_node = current_node
# or max steps 30 reached
# or max execution time 10min reached
except Exception as e:
self._workflow_run_failed(
workflow_run_state=workflow_run_state,
error=str(e),
callbacks=callbacks
)
return
# workflow run success
self._workflow_run_success(
workflow_run_state=workflow_run_state,
callbacks=callbacks
)
def _init_workflow_run(self, workflow: Workflow,
triggered_from: WorkflowRunTriggeredFrom,
......@@ -184,7 +251,7 @@ class WorkflowEngineManager:
status=WorkflowRunStatus.RUNNING.value,
created_by_role=(CreatedByRole.ACCOUNT.value
if isinstance(user, Account) else CreatedByRole.END_USER.value),
created_by_id=user.id
created_by=user.id
)
db.session.add(workflow_run)
......@@ -195,6 +262,33 @@ class WorkflowEngineManager:
return workflow_run
def _workflow_run_failed(self, workflow_run_state: WorkflowRunState,
error: str,
callbacks: list[BaseWorkflowCallback] = None) -> WorkflowRun:
"""
Workflow run failed
:param workflow_run_state: workflow run state
:param error: error message
:param callbacks: workflow callbacks
:return:
"""
workflow_run = workflow_run_state.workflow_run
workflow_run.status = WorkflowRunStatus.FAILED.value
workflow_run.error = error
workflow_run.elapsed_time = time.perf_counter() - workflow_run_state.start_at
workflow_run.total_tokens = workflow_run_state.total_tokens
workflow_run.total_price = workflow_run_state.total_price
workflow_run.currency = workflow_run_state.currency
workflow_run.total_steps = len(workflow_run_state.workflow_node_executions)
db.session.commit()
if callbacks:
for callback in callbacks:
callback.on_workflow_run_finished(workflow_run)
return workflow_run
def _get_entry_node(self, graph: dict) -> Optional[StartNode]:
"""
Get entry node
......@@ -210,3 +304,83 @@ class WorkflowEngineManager:
return StartNode(config=node_config)
return None
def _run_workflow_node(self, workflow_run_state: WorkflowRunState,
node: BaseNode,
predecessor_node: Optional[BaseNode] = None,
callbacks: list[BaseWorkflowCallback] = None) -> WorkflowNodeExecution:
# init workflow node execution
start_at = time.perf_counter()
workflow_node_execution = self._init_node_execution_from_workflow_run(
workflow_run_state=workflow_run_state,
node=node,
predecessor_node=predecessor_node,
)
# add to workflow node executions
workflow_run_state.workflow_node_executions.append(workflow_node_execution)
try:
# run node, result must have inputs, process_data, outputs, execution_metadata
node_run_result = node.run(
variable_pool=workflow_run_state.variable_pool,
callbacks=callbacks
)
except Exception as e:
# node run failed
self._workflow_node_execution_failed(
workflow_node_execution=workflow_node_execution,
error=str(e),
callbacks=callbacks
)
raise
# node run success
self._workflow_node_execution_success(
workflow_node_execution=workflow_node_execution,
result=node_run_result,
callbacks=callbacks
)
return workflow_node_execution
def _init_node_execution_from_workflow_run(self, workflow_run_state: WorkflowRunState,
node: BaseNode,
predecessor_node: Optional[BaseNode] = None,
callbacks: list[BaseWorkflowCallback] = None) -> WorkflowNodeExecution:
"""
Init workflow node execution from workflow run
:param workflow_run_state: workflow run state
:param node: current node
:param predecessor_node: predecessor node if exists
:param callbacks: workflow callbacks
:return:
"""
workflow_run = workflow_run_state.workflow_run
# init workflow node execution
workflow_node_execution = WorkflowNodeExecution(
tenant_id=workflow_run.tenant_id,
app_id=workflow_run.app_id,
workflow_id=workflow_run.workflow_id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value,
workflow_run_id=workflow_run.id,
predecessor_node_id=predecessor_node.node_id if predecessor_node else None,
index=len(workflow_run_state.workflow_node_executions) + 1,
node_id=node.node_id,
node_type=node.node_type.value,
title=node.node_data.title,
type=node.node_type.value,
status=WorkflowNodeExecutionStatus.RUNNING.value,
created_by_role=workflow_run.created_by_role,
created_by=workflow_run.created_by
)
db.session.add(workflow_node_execution)
db.session.commit()
if callbacks:
for callback in callbacks:
callback.on_workflow_node_execute_started(workflow_node_execution)
return workflow_node_execution
......@@ -15,7 +15,7 @@ def run(script):
class TimestampField(fields.Raw):
def format(self, value):
def format(self, value) -> int:
return int(value.timestamp())
......
......@@ -31,14 +31,16 @@ if $api_modified; then
pip install ruff
fi
ruff check ./api
result=$?
ruff check ./api || status=$?
if [ $result -ne 0 ]; then
status=${status:-0}
if [ $status -ne 0 ]; then
echo "Ruff linter on api module error, exit code: $status"
echo "Please run 'dev/reformat' to fix the fixable linting errors."
exit 1
fi
exit $result
fi
if $web_modified; then
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment