Commit 19c9091d authored by takatost's avatar takatost

add single step run

parent 91a35ded
......@@ -8,7 +8,7 @@ api = ExternalApi(bp)
from . import admin, apikey, extension, feature, setup, version, ping
# Import app controllers
from .app import (advanced_prompt_template, annotation, app, audio, completion, conversation, generator, message,
model_config, site, statistic, workflow, workflow_app_log)
model_config, site, statistic, workflow, workflow_run, workflow_app_log)
# Import auth controllers
from .auth import activate, data_source_oauth, login, oauth
# Import billing controllers
......
......@@ -15,6 +15,7 @@ from controllers.console.setup import setup_required
from controllers.console.wraps import account_initialization_required
from core.app.entities.app_invoke_entities import InvokeFrom
from fields.workflow_fields import workflow_fields
from fields.workflow_run_fields import workflow_run_node_execution_fields
from libs.helper import TimestampField, uuid_value
from libs.login import current_user, login_required
from models.model import App, AppMode
......@@ -164,18 +165,24 @@ class DraftWorkflowNodeRunApi(Resource):
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@marshal_with(workflow_run_node_execution_fields)
def post(self, app_model: App, node_id: str):
"""
Run draft workflow node
"""
# TODO
parser = reqparse.RequestParser()
parser.add_argument('inputs', type=dict, required=True, nullable=False, location='json')
args = parser.parse_args()
workflow_service = WorkflowService()
workflow_service.run_draft_workflow_node(app_model=app_model, node_id=node_id, account=current_user)
workflow_node_execution = workflow_service.run_draft_workflow_node(
app_model=app_model,
node_id=node_id,
user_inputs=args.get('inputs'),
account=current_user
)
# TODO
return {
"result": "success"
}
return workflow_node_execution
class PublishedWorkflowApi(Resource):
......@@ -291,7 +298,7 @@ api.add_resource(DraftWorkflowApi, '/apps/<uuid:app_id>/workflows/draft')
api.add_resource(AdvancedChatDraftWorkflowRunApi, '/apps/<uuid:app_id>/advanced-chat/workflows/draft/run')
api.add_resource(DraftWorkflowRunApi, '/apps/<uuid:app_id>/workflows/draft/run')
api.add_resource(WorkflowTaskStopApi, '/apps/<uuid:app_id>/workflows/tasks/<string:task_id>/stop')
api.add_resource(DraftWorkflowNodeRunApi, '/apps/<uuid:app_id>/workflows/draft/nodes/<uuid:node_id>/run')
api.add_resource(DraftWorkflowNodeRunApi, '/apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/run')
api.add_resource(PublishedWorkflowApi, '/apps/<uuid:app_id>/workflows/published')
api.add_resource(DefaultBlockConfigsApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs')
api.add_resource(DefaultBlockConfigApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs'
......
from core.workflow.entities.node_entities import NodeType
class WorkflowNodeRunFailedError(Exception):
def __init__(self, node_id: str, node_type: NodeType, node_title: str, error: str):
self.node_id = node_id
self.node_type = node_type
self.node_title = node_title
self.error = error
super().__init__(f"Node {node_title} run failed: {error}")
......@@ -108,7 +108,7 @@ class BaseNode(ABC):
)
@classmethod
def extract_variable_selector_to_variable_mapping(cls, config: dict) -> dict:
def extract_variable_selector_to_variable_mapping(cls, config: dict) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param config: node config
......@@ -119,7 +119,7 @@ class BaseNode(ABC):
@classmethod
@abstractmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param node_data: node data
......
......@@ -289,7 +289,7 @@ class CodeNode(BaseNode):
return transformed_result
@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: CodeNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: CodeNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param node_data: node data
......@@ -297,5 +297,5 @@ class CodeNode(BaseNode):
"""
return {
variable_selector.value_selector: variable_selector.variable for variable_selector in node_data.variables
}
\ No newline at end of file
variable_selector.variable: variable_selector.value_selector for variable_selector in node_data.variables
}
......@@ -50,10 +50,16 @@ class DirectAnswerNode(BaseNode):
)
@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param node_data: node data
:return:
"""
return {}
node_data = cast(cls._node_data_cls, node_data)
variable_mapping = {}
for variable_selector in node_data.variables:
variable_mapping[variable_selector.variable] = variable_selector.value_selector
return variable_mapping
......@@ -56,7 +56,7 @@ class EndNode(BaseNode):
)
@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param node_data: node data
......
......@@ -48,12 +48,12 @@ class HttpRequestNode(BaseNode):
@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: HttpRequestNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: HttpRequestNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param node_data: node data
:return:
"""
return {
variable_selector.value_selector: variable_selector.variable for variable_selector in node_data.variables
}
\ No newline at end of file
variable_selector.variable: variable_selector.value_selector for variable_selector in node_data.variables
}
......@@ -23,7 +23,7 @@ class LLMNode(BaseNode):
pass
@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param node_data: node data
......
......@@ -69,7 +69,7 @@ class StartNode(BaseNode):
return filtered_inputs
@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param node_data: node data
......
......@@ -72,12 +72,12 @@ class TemplateTransformNode(BaseNode):
)
@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: TemplateTransformNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: TemplateTransformNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param node_data: node data
:return:
"""
return {
variable_selector.value_selector: variable_selector.variable for variable_selector in node_data.variables
variable_selector.variable: variable_selector.value_selector for variable_selector in node_data.variables
}
\ No newline at end of file
......@@ -133,12 +133,12 @@ class ToolNode(BaseNode):
@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
"""
return {
k.value_selector: k.variable
k.variable: k.value_selector
for k in cast(ToolNodeData, node_data).tool_parameters
if k.variable_type == 'selector'
}
\ No newline at end of file
}
......@@ -6,6 +6,7 @@ from core.workflow.callbacks.base_workflow_callback import BaseWorkflowCallback
from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult, NodeType
from core.workflow.entities.variable_pool import VariablePool, VariableValue
from core.workflow.entities.workflow_entities import WorkflowNodeAndResult, WorkflowRunState
from core.workflow.errors import WorkflowNodeRunFailedError
from core.workflow.nodes.base_node import BaseNode, UserFrom
from core.workflow.nodes.code.code_node import CodeNode
from core.workflow.nodes.direct_answer.direct_answer_node import DirectAnswerNode
......@@ -180,6 +181,93 @@ class WorkflowEngineManager:
callbacks=callbacks
)
def single_step_run_workflow_node(self, workflow: Workflow,
node_id: str,
user_id: str,
user_inputs: dict) -> tuple[BaseNode, NodeRunResult]:
"""
Single step run workflow node
:param workflow: Workflow instance
:param node_id: node id
:param user_id: user id
:param user_inputs: user inputs
:return:
"""
# fetch node info from workflow graph
graph = workflow.graph_dict
if not graph:
raise ValueError('workflow graph not found')
nodes = graph.get('nodes')
if not nodes:
raise ValueError('nodes not found in workflow graph')
# fetch node config from node id
node_config = None
for node in nodes:
if node.get('id') == node_id:
node_config = node
break
if not node_config:
raise ValueError('node id not found in workflow graph')
# Get node class
node_cls = node_classes.get(NodeType.value_of(node_config.get('data', {}).get('type')))
# init workflow run state
node_instance = node_cls(
tenant_id=workflow.tenant_id,
app_id=workflow.app_id,
workflow_id=workflow.id,
user_id=user_id,
user_from=UserFrom.ACCOUNT,
config=node_config
)
try:
# init variable pool
variable_pool = VariablePool(
system_variables={},
user_inputs={}
)
# variable selector to variable mapping
try:
variable_mapping = node_cls.extract_variable_selector_to_variable_mapping(node_config)
except NotImplementedError:
variable_mapping = {}
for variable_key, variable_selector in variable_mapping.items():
if variable_key not in user_inputs:
raise ValueError(f'Variable key {variable_key} not found in user inputs.')
# fetch variable node id from variable selector
variable_node_id = variable_selector[0]
variable_key_list = variable_selector[1:]
# append variable and value to variable pool
variable_pool.append_variable(
node_id=variable_node_id,
variable_key_list=variable_key_list,
value=user_inputs.get(variable_key)
)
# run node
node_run_result = node_instance.run(
variable_pool=variable_pool
)
except Exception as e:
raise WorkflowNodeRunFailedError(
node_id=node_instance.node_id,
node_type=node_instance.node_type,
node_title=node_instance.node_data.title,
error=str(e)
)
return node_instance, node_run_result
def _workflow_run_success(self, callbacks: list[BaseWorkflowCallback] = None) -> None:
"""
Workflow run success
......
......@@ -34,11 +34,9 @@ workflow_run_for_list_fields = {
}
workflow_run_pagination_fields = {
'page': fields.Integer,
'limit': fields.Integer(attribute='per_page'),
'total': fields.Integer,
'has_more': fields.Boolean(attribute='has_next'),
'data': fields.List(fields.Nested(workflow_run_for_list_fields), attribute='items')
'limit': fields.Integer(attribute='limit'),
'has_more': fields.Boolean(attribute='has_more'),
'data': fields.List(fields.Nested(workflow_run_for_list_fields), attribute='data')
}
workflow_run_detail_fields = {
......
......@@ -34,26 +34,26 @@ class WorkflowRunService:
if not last_workflow_run:
raise ValueError('Last workflow run not exists')
conversations = base_query.filter(
workflow_runs = base_query.filter(
WorkflowRun.created_at < last_workflow_run.created_at,
WorkflowRun.id != last_workflow_run.id
).order_by(WorkflowRun.created_at.desc()).limit(limit).all()
else:
conversations = base_query.order_by(WorkflowRun.created_at.desc()).limit(limit).all()
workflow_runs = base_query.order_by(WorkflowRun.created_at.desc()).limit(limit).all()
has_more = False
if len(conversations) == limit:
current_page_first_conversation = conversations[-1]
if len(workflow_runs) == limit:
current_page_first_workflow_run = workflow_runs[-1]
rest_count = base_query.filter(
WorkflowRun.created_at < current_page_first_conversation.created_at,
WorkflowRun.id != current_page_first_conversation.id
WorkflowRun.created_at < current_page_first_workflow_run.created_at,
WorkflowRun.id != current_page_first_workflow_run.id
).count()
if rest_count > 0:
has_more = True
return InfiniteScrollPagination(
data=conversations,
data=workflow_runs,
limit=limit,
has_more=has_more
)
......
import json
import time
from collections.abc import Generator
from datetime import datetime
from typing import Optional, Union
......@@ -9,12 +10,21 @@ from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
from core.app.entities.app_invoke_entities import InvokeFrom
from core.model_runtime.utils.encoders import jsonable_encoder
from core.workflow.entities.node_entities import NodeType
from core.workflow.errors import WorkflowNodeRunFailedError
from core.workflow.workflow_engine_manager import WorkflowEngineManager
from extensions.ext_database import db
from models.account import Account
from models.model import App, AppMode, EndUser
from models.workflow import Workflow, WorkflowType
from models.workflow import (
CreatedByRole,
Workflow,
WorkflowNodeExecution,
WorkflowNodeExecutionStatus,
WorkflowNodeExecutionTriggeredFrom,
WorkflowType,
)
from services.workflow.workflow_converter import WorkflowConverter
......@@ -214,6 +224,80 @@ class WorkflowService:
"""
AppQueueManager.set_stop_flag(task_id, invoke_from, user.id)
def run_draft_workflow_node(self, app_model: App,
node_id: str,
user_inputs: dict,
account: Account) -> WorkflowNodeExecution:
"""
Run draft workflow node
"""
# fetch draft workflow by app_model
draft_workflow = self.get_draft_workflow(app_model=app_model)
if not draft_workflow:
raise ValueError('Workflow not initialized')
# run draft workflow node
workflow_engine_manager = WorkflowEngineManager()
start_at = time.perf_counter()
try:
node_instance, node_run_result = workflow_engine_manager.single_step_run_workflow_node(
workflow=draft_workflow,
node_id=node_id,
user_inputs=user_inputs,
user_id=account.id,
)
except WorkflowNodeRunFailedError as e:
workflow_node_execution = WorkflowNodeExecution(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
workflow_id=draft_workflow.id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value,
index=1,
node_id=e.node_id,
node_type=e.node_type.value,
title=e.node_title,
status=WorkflowNodeExecutionStatus.FAILED.value,
error=e.error,
elapsed_time=time.perf_counter() - start_at,
created_by_role=CreatedByRole.ACCOUNT.value,
created_by=account.id,
created_at=datetime.utcnow(),
finished_at=datetime.utcnow()
)
db.session.add(workflow_node_execution)
db.session.commit()
return workflow_node_execution
# create workflow node execution
workflow_node_execution = WorkflowNodeExecution(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
workflow_id=draft_workflow.id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value,
index=1,
node_id=node_id,
node_type=node_instance.node_type.value,
title=node_instance.node_data.title,
inputs=json.dumps(node_run_result.inputs) if node_run_result.inputs else None,
process_data=json.dumps(node_run_result.process_data) if node_run_result.process_data else None,
outputs=json.dumps(node_run_result.outputs) if node_run_result.outputs else None,
execution_metadata=(json.dumps(jsonable_encoder(node_run_result.metadata))
if node_run_result.metadata else None),
status=WorkflowNodeExecutionStatus.SUCCEEDED.value,
elapsed_time=time.perf_counter() - start_at,
created_by_role=CreatedByRole.ACCOUNT.value,
created_by=account.id,
created_at=datetime.utcnow(),
finished_at=datetime.utcnow()
)
db.session.add(workflow_node_execution)
db.session.commit()
return workflow_node_execution
def convert_to_workflow(self, app_model: App, account: Account) -> App:
"""
Basic mode of chatbot app(expert mode) to workflow
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment