Commit 5ea7d4cb authored by takatost's avatar takatost

Merge branch 'feat/workflow-backend' into deploy/dev

parents ef861e07 19c9091d
......@@ -8,7 +8,7 @@ api = ExternalApi(bp)
from . import admin, apikey, extension, feature, setup, version, ping
# Import app controllers
from .app import (advanced_prompt_template, annotation, app, audio, completion, conversation, generator, message,
model_config, site, statistic, workflow, workflow_app_log)
model_config, site, statistic, workflow, workflow_run, workflow_app_log)
# Import auth controllers
from .auth import activate, data_source_oauth, login, oauth
# Import billing controllers
......
......@@ -15,6 +15,7 @@ from controllers.console.setup import setup_required
from controllers.console.wraps import account_initialization_required
from core.app.entities.app_invoke_entities import InvokeFrom
from fields.workflow_fields import workflow_fields
from fields.workflow_run_fields import workflow_run_node_execution_fields
from libs.helper import TimestampField, uuid_value
from libs.login import current_user, login_required
from models.model import App, AppMode
......@@ -164,18 +165,24 @@ class DraftWorkflowNodeRunApi(Resource):
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@marshal_with(workflow_run_node_execution_fields)
def post(self, app_model: App, node_id: str):
"""
Run draft workflow node
"""
# TODO
parser = reqparse.RequestParser()
parser.add_argument('inputs', type=dict, required=True, nullable=False, location='json')
args = parser.parse_args()
workflow_service = WorkflowService()
workflow_service.run_draft_workflow_node(app_model=app_model, node_id=node_id, account=current_user)
workflow_node_execution = workflow_service.run_draft_workflow_node(
app_model=app_model,
node_id=node_id,
user_inputs=args.get('inputs'),
account=current_user
)
# TODO
return {
"result": "success"
}
return workflow_node_execution
class PublishedWorkflowApi(Resource):
......@@ -291,7 +298,7 @@ api.add_resource(DraftWorkflowApi, '/apps/<uuid:app_id>/workflows/draft')
api.add_resource(AdvancedChatDraftWorkflowRunApi, '/apps/<uuid:app_id>/advanced-chat/workflows/draft/run')
api.add_resource(DraftWorkflowRunApi, '/apps/<uuid:app_id>/workflows/draft/run')
api.add_resource(WorkflowTaskStopApi, '/apps/<uuid:app_id>/workflows/tasks/<string:task_id>/stop')
api.add_resource(DraftWorkflowNodeRunApi, '/apps/<uuid:app_id>/workflows/draft/nodes/<uuid:node_id>/run')
api.add_resource(DraftWorkflowNodeRunApi, '/apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/run')
api.add_resource(PublishedWorkflowApi, '/apps/<uuid:app_id>/workflows/published')
api.add_resource(DefaultBlockConfigsApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs')
api.add_resource(DefaultBlockConfigApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs'
......
from core.workflow.entities.node_entities import NodeType
class WorkflowNodeRunFailedError(Exception):
def __init__(self, node_id: str, node_type: NodeType, node_title: str, error: str):
self.node_id = node_id
self.node_type = node_type
self.node_title = node_title
self.error = error
super().__init__(f"Node {node_title} run failed: {error}")
......@@ -108,7 +108,7 @@ class BaseNode(ABC):
)
@classmethod
def extract_variable_selector_to_variable_mapping(cls, config: dict) -> dict:
def extract_variable_selector_to_variable_mapping(cls, config: dict) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param config: node config
......@@ -119,7 +119,7 @@ class BaseNode(ABC):
@classmethod
@abstractmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param node_data: node data
......
......@@ -153,11 +153,13 @@ class CodeNode(BaseNode):
raise ValueError(f'{variable} in input form is out of range.')
if isinstance(value, float):
value = round(value, MAX_PRECISION)
# raise error if precision is too high
if len(str(value).split('.')[1]) > MAX_PRECISION:
raise ValueError(f'{variable} in output form has too high precision.')
return value
def _transform_result(self, result: dict, output_schema: dict[str, CodeNodeData.Output],
def _transform_result(self, result: dict, output_schema: Optional[dict[str, CodeNodeData.Output]],
prefix: str = '',
depth: int = 1) -> dict:
"""
......@@ -170,6 +172,47 @@ class CodeNode(BaseNode):
raise ValueError("Depth limit reached, object too deep.")
transformed_result = {}
if output_schema is None:
# validate output thought instance type
for output_name, output_value in result.items():
if isinstance(output_value, dict):
self._transform_result(
result=output_value,
output_schema=None,
prefix=f'{prefix}.{output_name}' if prefix else output_name,
depth=depth + 1
)
elif isinstance(output_value, int | float):
self._check_number(
value=output_value,
variable=f'{prefix}.{output_name}' if prefix else output_name
)
elif isinstance(output_value, str):
self._check_string(
value=output_value,
variable=f'{prefix}.{output_name}' if prefix else output_name
)
elif isinstance(output_value, list):
if all(isinstance(value, int | float) for value in output_value):
for value in output_value:
self._check_number(
value=value,
variable=f'{prefix}.{output_name}' if prefix else output_name
)
elif all(isinstance(value, str) for value in output_value):
for value in output_value:
self._check_string(
value=value,
variable=f'{prefix}.{output_name}' if prefix else output_name
)
else:
raise ValueError(f'Output {prefix}.{output_name} is not a valid array. make sure all elements are of the same type.')
else:
raise ValueError(f'Output {prefix}.{output_name} is not a valid type.')
return result
parameters_validated = {}
for output_name, output_config in output_schema.items():
if output_config.type == 'object':
# check if output is object
......@@ -237,10 +280,16 @@ class CodeNode(BaseNode):
else:
raise ValueError(f'Output type {output_config.type} is not supported.')
parameters_validated[output_name] = True
# check if all output parameters are validated
if len(parameters_validated) != len(result):
raise ValueError('Not all output parameters are validated.')
return transformed_result
@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: CodeNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: CodeNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param node_data: node data
......@@ -248,5 +297,5 @@ class CodeNode(BaseNode):
"""
return {
variable_selector.value_selector: variable_selector.variable for variable_selector in node_data.variables
variable_selector.variable: variable_selector.value_selector for variable_selector in node_data.variables
}
from typing import Literal, Union
from typing import Literal, Optional
from pydantic import BaseModel
......@@ -12,7 +12,7 @@ class CodeNodeData(BaseNodeData):
"""
class Output(BaseModel):
type: Literal['string', 'number', 'object', 'array[string]', 'array[number]']
children: Union[None, dict[str, 'Output']]
children: Optional[dict[str, 'CodeNodeData.Output']]
variables: list[VariableSelector]
answer: str
......
......@@ -50,10 +50,16 @@ class DirectAnswerNode(BaseNode):
)
@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param node_data: node data
:return:
"""
return {}
node_data = cast(cls._node_data_cls, node_data)
variable_mapping = {}
for variable_selector in node_data.variables:
variable_mapping[variable_selector.variable] = variable_selector.value_selector
return variable_mapping
......@@ -56,7 +56,7 @@ class EndNode(BaseNode):
)
@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param node_data: node data
......
......@@ -48,12 +48,12 @@ class HttpRequestNode(BaseNode):
@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: HttpRequestNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: HttpRequestNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param node_data: node data
:return:
"""
return {
variable_selector.value_selector: variable_selector.variable for variable_selector in node_data.variables
variable_selector.variable: variable_selector.value_selector for variable_selector in node_data.variables
}
......@@ -23,7 +23,7 @@ class LLMNode(BaseNode):
pass
@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param node_data: node data
......
......@@ -69,7 +69,7 @@ class StartNode(BaseNode):
return filtered_inputs
@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param node_data: node data
......
......@@ -72,12 +72,12 @@ class TemplateTransformNode(BaseNode):
)
@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: TemplateTransformNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: TemplateTransformNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param node_data: node data
:return:
"""
return {
variable_selector.value_selector: variable_selector.variable for variable_selector in node_data.variables
variable_selector.variable: variable_selector.value_selector for variable_selector in node_data.variables
}
\ No newline at end of file
......@@ -42,7 +42,7 @@ class ToolNode(BaseNode):
try:
# TODO: user_id
messages = tool_runtime.invoke(None, parameters)
messages = tool_runtime.invoke(self.user_id, parameters)
except Exception as e:
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
......@@ -133,8 +133,12 @@ class ToolNode(BaseNode):
@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
"""
pass
\ No newline at end of file
return {
k.variable: k.value_selector
for k in cast(ToolNodeData, node_data).tool_parameters
if k.variable_type == 'selector'
}
......@@ -6,6 +6,7 @@ from core.workflow.callbacks.base_workflow_callback import BaseWorkflowCallback
from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult, NodeType
from core.workflow.entities.variable_pool import VariablePool, VariableValue
from core.workflow.entities.workflow_entities import WorkflowNodeAndResult, WorkflowRunState
from core.workflow.errors import WorkflowNodeRunFailedError
from core.workflow.nodes.base_node import BaseNode, UserFrom
from core.workflow.nodes.code.code_node import CodeNode
from core.workflow.nodes.direct_answer.direct_answer_node import DirectAnswerNode
......@@ -180,6 +181,93 @@ class WorkflowEngineManager:
callbacks=callbacks
)
def single_step_run_workflow_node(self, workflow: Workflow,
node_id: str,
user_id: str,
user_inputs: dict) -> tuple[BaseNode, NodeRunResult]:
"""
Single step run workflow node
:param workflow: Workflow instance
:param node_id: node id
:param user_id: user id
:param user_inputs: user inputs
:return:
"""
# fetch node info from workflow graph
graph = workflow.graph_dict
if not graph:
raise ValueError('workflow graph not found')
nodes = graph.get('nodes')
if not nodes:
raise ValueError('nodes not found in workflow graph')
# fetch node config from node id
node_config = None
for node in nodes:
if node.get('id') == node_id:
node_config = node
break
if not node_config:
raise ValueError('node id not found in workflow graph')
# Get node class
node_cls = node_classes.get(NodeType.value_of(node_config.get('data', {}).get('type')))
# init workflow run state
node_instance = node_cls(
tenant_id=workflow.tenant_id,
app_id=workflow.app_id,
workflow_id=workflow.id,
user_id=user_id,
user_from=UserFrom.ACCOUNT,
config=node_config
)
try:
# init variable pool
variable_pool = VariablePool(
system_variables={},
user_inputs={}
)
# variable selector to variable mapping
try:
variable_mapping = node_cls.extract_variable_selector_to_variable_mapping(node_config)
except NotImplementedError:
variable_mapping = {}
for variable_key, variable_selector in variable_mapping.items():
if variable_key not in user_inputs:
raise ValueError(f'Variable key {variable_key} not found in user inputs.')
# fetch variable node id from variable selector
variable_node_id = variable_selector[0]
variable_key_list = variable_selector[1:]
# append variable and value to variable pool
variable_pool.append_variable(
node_id=variable_node_id,
variable_key_list=variable_key_list,
value=user_inputs.get(variable_key)
)
# run node
node_run_result = node_instance.run(
variable_pool=variable_pool
)
except Exception as e:
raise WorkflowNodeRunFailedError(
node_id=node_instance.node_id,
node_type=node_instance.node_type,
node_title=node_instance.node_data.title,
error=str(e)
)
return node_instance, node_run_result
def _workflow_run_success(self, callbacks: list[BaseWorkflowCallback] = None) -> None:
"""
Workflow run success
......
......@@ -34,11 +34,9 @@ workflow_run_for_list_fields = {
}
workflow_run_pagination_fields = {
'page': fields.Integer,
'limit': fields.Integer(attribute='per_page'),
'total': fields.Integer,
'has_more': fields.Boolean(attribute='has_next'),
'data': fields.List(fields.Nested(workflow_run_for_list_fields), attribute='items')
'limit': fields.Integer(attribute='limit'),
'has_more': fields.Boolean(attribute='has_more'),
'data': fields.List(fields.Nested(workflow_run_for_list_fields), attribute='data')
}
workflow_run_detail_fields = {
......
......@@ -34,26 +34,26 @@ class WorkflowRunService:
if not last_workflow_run:
raise ValueError('Last workflow run not exists')
conversations = base_query.filter(
workflow_runs = base_query.filter(
WorkflowRun.created_at < last_workflow_run.created_at,
WorkflowRun.id != last_workflow_run.id
).order_by(WorkflowRun.created_at.desc()).limit(limit).all()
else:
conversations = base_query.order_by(WorkflowRun.created_at.desc()).limit(limit).all()
workflow_runs = base_query.order_by(WorkflowRun.created_at.desc()).limit(limit).all()
has_more = False
if len(conversations) == limit:
current_page_first_conversation = conversations[-1]
if len(workflow_runs) == limit:
current_page_first_workflow_run = workflow_runs[-1]
rest_count = base_query.filter(
WorkflowRun.created_at < current_page_first_conversation.created_at,
WorkflowRun.id != current_page_first_conversation.id
WorkflowRun.created_at < current_page_first_workflow_run.created_at,
WorkflowRun.id != current_page_first_workflow_run.id
).count()
if rest_count > 0:
has_more = True
return InfiniteScrollPagination(
data=conversations,
data=workflow_runs,
limit=limit,
has_more=has_more
)
......
import json
import time
from collections.abc import Generator
from datetime import datetime
from typing import Optional, Union
......@@ -9,12 +10,21 @@ from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
from core.app.entities.app_invoke_entities import InvokeFrom
from core.model_runtime.utils.encoders import jsonable_encoder
from core.workflow.entities.node_entities import NodeType
from core.workflow.errors import WorkflowNodeRunFailedError
from core.workflow.workflow_engine_manager import WorkflowEngineManager
from extensions.ext_database import db
from models.account import Account
from models.model import App, AppMode, EndUser
from models.workflow import Workflow, WorkflowType
from models.workflow import (
CreatedByRole,
Workflow,
WorkflowNodeExecution,
WorkflowNodeExecutionStatus,
WorkflowNodeExecutionTriggeredFrom,
WorkflowType,
)
from services.workflow.workflow_converter import WorkflowConverter
......@@ -214,6 +224,80 @@ class WorkflowService:
"""
AppQueueManager.set_stop_flag(task_id, invoke_from, user.id)
def run_draft_workflow_node(self, app_model: App,
node_id: str,
user_inputs: dict,
account: Account) -> WorkflowNodeExecution:
"""
Run draft workflow node
"""
# fetch draft workflow by app_model
draft_workflow = self.get_draft_workflow(app_model=app_model)
if not draft_workflow:
raise ValueError('Workflow not initialized')
# run draft workflow node
workflow_engine_manager = WorkflowEngineManager()
start_at = time.perf_counter()
try:
node_instance, node_run_result = workflow_engine_manager.single_step_run_workflow_node(
workflow=draft_workflow,
node_id=node_id,
user_inputs=user_inputs,
user_id=account.id,
)
except WorkflowNodeRunFailedError as e:
workflow_node_execution = WorkflowNodeExecution(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
workflow_id=draft_workflow.id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value,
index=1,
node_id=e.node_id,
node_type=e.node_type.value,
title=e.node_title,
status=WorkflowNodeExecutionStatus.FAILED.value,
error=e.error,
elapsed_time=time.perf_counter() - start_at,
created_by_role=CreatedByRole.ACCOUNT.value,
created_by=account.id,
created_at=datetime.utcnow(),
finished_at=datetime.utcnow()
)
db.session.add(workflow_node_execution)
db.session.commit()
return workflow_node_execution
# create workflow node execution
workflow_node_execution = WorkflowNodeExecution(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
workflow_id=draft_workflow.id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value,
index=1,
node_id=node_id,
node_type=node_instance.node_type.value,
title=node_instance.node_data.title,
inputs=json.dumps(node_run_result.inputs) if node_run_result.inputs else None,
process_data=json.dumps(node_run_result.process_data) if node_run_result.process_data else None,
outputs=json.dumps(node_run_result.outputs) if node_run_result.outputs else None,
execution_metadata=(json.dumps(jsonable_encoder(node_run_result.metadata))
if node_run_result.metadata else None),
status=WorkflowNodeExecutionStatus.SUCCEEDED.value,
elapsed_time=time.perf_counter() - start_at,
created_by_role=CreatedByRole.ACCOUNT.value,
created_by=account.id,
created_at=datetime.utcnow(),
finished_at=datetime.utcnow()
)
db.session.add(workflow_node_execution)
db.session.commit()
return workflow_node_execution
def convert_to_workflow(self, app_model: App, account: Account) -> App:
"""
Basic mode of chatbot app(expert mode) to workflow
......
import pytest
from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.entities.variable_pool import VariablePool
from core.workflow.nodes.code.code_node import CodeNode
from models.workflow import WorkflowNodeExecutionStatus, WorkflowRunStatus
from models.workflow import WorkflowNodeExecutionStatus
from tests.integration_tests.workflow.nodes.__mock.code_executor import setup_code_executor_mock
@pytest.mark.parametrize('setup_code_executor_mock', [['none']], indirect=True)
......@@ -15,7 +16,13 @@ def test_execute_code(setup_code_executor_mock):
'''
# trim first 4 spaces at the beginning of each line
code = '\n'.join([line[4:] for line in code.split('\n')])
node = CodeNode(config={
node = CodeNode(
tenant_id='1',
app_id='1',
workflow_id='1',
user_id='1',
user_from=InvokeFrom.WEB_APP,
config={
'id': '1',
'data': {
'outputs': {
......@@ -38,7 +45,8 @@ def test_execute_code(setup_code_executor_mock):
'code_language': 'python3',
'code': code
}
})
}
)
# construct variable pool
pool = VariablePool(system_variables={}, user_inputs={})
......@@ -61,7 +69,13 @@ def test_execute_code_output_validator(setup_code_executor_mock):
'''
# trim first 4 spaces at the beginning of each line
code = '\n'.join([line[4:] for line in code.split('\n')])
node = CodeNode(config={
node = CodeNode(
tenant_id='1',
app_id='1',
workflow_id='1',
user_id='1',
user_from=InvokeFrom.WEB_APP,
config={
'id': '1',
'data': {
"outputs": {
......@@ -84,7 +98,8 @@ def test_execute_code_output_validator(setup_code_executor_mock):
'code_language': 'python3',
'code': code
}
})
}
)
# construct variable pool
pool = VariablePool(system_variables={}, user_inputs={})
......@@ -108,7 +123,13 @@ def test_execute_code_output_validator_depth():
'''
# trim first 4 spaces at the beginning of each line
code = '\n'.join([line[4:] for line in code.split('\n')])
node = CodeNode(config={
node = CodeNode(
tenant_id='1',
app_id='1',
workflow_id='1',
user_id='1',
user_from=InvokeFrom.WEB_APP,
config={
'id': '1',
'data': {
"outputs": {
......@@ -161,7 +182,8 @@ def test_execute_code_output_validator_depth():
'code_language': 'python3',
'code': code
}
})
}
)
# construct result
result = {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment