Commit 836abb4d authored by takatost's avatar takatost

fix workflow api return

parent 0da7ee81
import json import json
import logging
from typing import Generator
from flask import Response, stream_with_context
from flask_restful import Resource, marshal_with, reqparse from flask_restful import Resource, marshal_with, reqparse
from werkzeug.exceptions import NotFound, InternalServerError
import services
from controllers.console import api from controllers.console import api
from controllers.console.app.error import DraftWorkflowNotExist from controllers.console.app.error import DraftWorkflowNotExist, ConversationCompletedError
from controllers.console.app.wraps import get_app_model from controllers.console.app.wraps import get_app_model
from controllers.console.setup import setup_required from controllers.console.setup import setup_required
from controllers.console.wraps import account_initialization_required from controllers.console.wraps import account_initialization_required
from core.app.entities.app_invoke_entities import InvokeFrom
from fields.workflow_fields import workflow_fields from fields.workflow_fields import workflow_fields
from libs.helper import uuid_value
from libs.login import current_user, login_required from libs.login import current_user, login_required
from models.model import App, AppMode from models.model import App, AppMode
from services.workflow_service import WorkflowService from services.workflow_service import WorkflowService
logger = logging.getLogger(__name__)
class DraftWorkflowApi(Resource): class DraftWorkflowApi(Resource):
@setup_required @setup_required
@login_required @login_required
...@@ -59,23 +69,80 @@ class DraftWorkflowApi(Resource): ...@@ -59,23 +69,80 @@ class DraftWorkflowApi(Resource):
} }
class AdvancedChatDraftWorkflowRunApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT])
def post(self, app_model: App):
"""
Run draft workflow
"""
parser = reqparse.RequestParser()
parser.add_argument('inputs', type=dict, required=True, location='json')
parser.add_argument('query', type=str, location='json', default='')
parser.add_argument('files', type=list, required=False, location='json')
parser.add_argument('conversation_id', type=uuid_value, location='json')
args = parser.parse_args()
workflow_service = WorkflowService()
try:
response = workflow_service.run_advanced_chat_draft_workflow(
app_model=app_model,
user=current_user,
args=args,
invoke_from=InvokeFrom.DEBUGGER
)
except services.errors.conversation.ConversationNotExistsError:
raise NotFound("Conversation Not Exists.")
except services.errors.conversation.ConversationCompletedError:
raise ConversationCompletedError()
except ValueError as e:
raise e
except Exception as e:
logging.exception("internal server error.")
raise InternalServerError()
def generate() -> Generator:
yield from response
return Response(stream_with_context(generate()), status=200,
mimetype='text/event-stream')
class DraftWorkflowRunApi(Resource): class DraftWorkflowRunApi(Resource):
@setup_required @setup_required
@login_required @login_required
@account_initialization_required @account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) @get_app_model(mode=[AppMode.WORKFLOW])
def post(self, app_model: App): def post(self, app_model: App):
""" """
Run draft workflow Run draft workflow
""" """
# TODO parser = reqparse.RequestParser()
parser.add_argument('inputs', type=dict, required=True, location='json')
args = parser.parse_args()
workflow_service = WorkflowService() workflow_service = WorkflowService()
workflow_service.run_draft_workflow(app_model=app_model, account=current_user)
# TODO try:
return { response = workflow_service.run_draft_workflow(
"result": "success" app_model=app_model,
} user=current_user,
args=args,
invoke_from=InvokeFrom.DEBUGGER
)
except ValueError as e:
raise e
except Exception as e:
logging.exception("internal server error.")
raise InternalServerError()
def generate() -> Generator:
yield from response
return Response(stream_with_context(generate()), status=200,
mimetype='text/event-stream')
class WorkflowTaskStopApi(Resource): class WorkflowTaskStopApi(Resource):
...@@ -214,10 +281,12 @@ class ConvertToWorkflowApi(Resource): ...@@ -214,10 +281,12 @@ class ConvertToWorkflowApi(Resource):
api.add_resource(DraftWorkflowApi, '/apps/<uuid:app_id>/workflows/draft') api.add_resource(DraftWorkflowApi, '/apps/<uuid:app_id>/workflows/draft')
api.add_resource(AdvancedChatDraftWorkflowRunApi, '/apps/<uuid:app_id>/advanced-chat/workflows/draft/run')
api.add_resource(DraftWorkflowRunApi, '/apps/<uuid:app_id>/workflows/draft/run') api.add_resource(DraftWorkflowRunApi, '/apps/<uuid:app_id>/workflows/draft/run')
api.add_resource(WorkflowTaskStopApi, '/apps/<uuid:app_id>/workflows/tasks/<string:task_id>/stop') api.add_resource(WorkflowTaskStopApi, '/apps/<uuid:app_id>/workflows/tasks/<string:task_id>/stop')
api.add_resource(DraftWorkflowNodeRunApi, '/apps/<uuid:app_id>/workflows/draft/nodes/<uuid:node_id>/run') api.add_resource(DraftWorkflowNodeRunApi, '/apps/<uuid:app_id>/workflows/draft/nodes/<uuid:node_id>/run')
api.add_resource(PublishedWorkflowApi, '/apps/<uuid:app_id>/workflows/published') api.add_resource(PublishedWorkflowApi, '/apps/<uuid:app_id>/workflows/published')
api.add_resource(DefaultBlockConfigsApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs') api.add_resource(DefaultBlockConfigsApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs')
api.add_resource(DefaultBlockConfigApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs/:block_type') api.add_resource(DefaultBlockConfigApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs'
'/<string:block_type>')
api.add_resource(ConvertToWorkflowApi, '/apps/<uuid:app_id>/convert-to-workflow') api.add_resource(ConvertToWorkflowApi, '/apps/<uuid:app_id>/convert-to-workflow')
...@@ -16,18 +16,19 @@ from core.app.apps.message_based_app_generator import MessageBasedAppGenerator ...@@ -16,18 +16,19 @@ from core.app.apps.message_based_app_generator import MessageBasedAppGenerator
from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, InvokeFrom from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, InvokeFrom
from core.file.message_file_parser import MessageFileParser from core.file.message_file_parser import MessageFileParser
from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError
from core.workflow.workflow_engine_manager import WorkflowEngineManager
from extensions.ext_database import db from extensions.ext_database import db
from models.account import Account from models.account import Account
from models.model import App, Conversation, EndUser, Message from models.model import App, Conversation, EndUser, Message
from models.workflow import Workflow
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class AdvancedChatAppGenerator(MessageBasedAppGenerator): class AdvancedChatAppGenerator(MessageBasedAppGenerator):
def generate(self, app_model: App, def generate(self, app_model: App,
workflow: Workflow,
user: Union[Account, EndUser], user: Union[Account, EndUser],
args: Any, args: dict,
invoke_from: InvokeFrom, invoke_from: InvokeFrom,
stream: bool = True) \ stream: bool = True) \
-> Union[dict, Generator]: -> Union[dict, Generator]:
...@@ -35,6 +36,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): ...@@ -35,6 +36,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
Generate App response. Generate App response.
:param app_model: App :param app_model: App
:param workflow: Workflow
:param user: account or end user :param user: account or end user
:param args: request args :param args: request args
:param invoke_from: invoke from source :param invoke_from: invoke from source
...@@ -59,16 +61,6 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): ...@@ -59,16 +61,6 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
if args.get('conversation_id'): if args.get('conversation_id'):
conversation = self._get_conversation_by_user(app_model, args.get('conversation_id'), user) conversation = self._get_conversation_by_user(app_model, args.get('conversation_id'), user)
# get workflow
workflow_engine_manager = WorkflowEngineManager()
if invoke_from == InvokeFrom.DEBUGGER:
workflow = workflow_engine_manager.get_draft_workflow(app_model=app_model)
else:
workflow = workflow_engine_manager.get_published_workflow(app_model=app_model)
if not workflow:
raise ValueError('Workflow not initialized')
# parse files # parse files
files = args['files'] if 'files' in args and args['files'] else [] files = args['files'] if 'files' in args and args['files'] else []
message_file_parser = MessageFileParser(tenant_id=app_model.tenant_id, app_id=app_model.id) message_file_parser = MessageFileParser(tenant_id=app_model.tenant_id, app_id=app_model.id)
......
import logging import logging
import time
from typing import cast from typing import cast
from core.app.app_queue_manager import AppQueueManager, PublishFrom from core.app.app_queue_manager import AppQueueManager, PublishFrom
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfig from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfig
from core.app.apps.base_app_runner import AppRunner from core.app.apps.base_app_runner import AppRunner
from core.app.entities.app_invoke_entities import ( from core.app.entities.app_invoke_entities import (
AdvancedChatAppGenerateEntity, AdvancedChatAppGenerateEntity, InvokeFrom,
) )
from core.app.entities.queue_entities import QueueStopEvent
from core.moderation.base import ModerationException from core.moderation.base import ModerationException
from core.workflow.entities.node_entities import SystemVariable
from core.workflow.workflow_engine_manager import WorkflowEngineManager
from extensions.ext_database import db from extensions.ext_database import db
from models.model import App, Conversation, Message from models.account import Account
from models.model import App, Conversation, Message, EndUser
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -38,66 +43,151 @@ class AdvancedChatAppRunner(AppRunner): ...@@ -38,66 +43,151 @@ class AdvancedChatAppRunner(AppRunner):
if not app_record: if not app_record:
raise ValueError("App not found") raise ValueError("App not found")
workflow = WorkflowEngineManager().get_workflow(app_model=app_record, workflow_id=app_config.workflow_id)
if not workflow:
raise ValueError("Workflow not initialized")
inputs = application_generate_entity.inputs inputs = application_generate_entity.inputs
query = application_generate_entity.query query = application_generate_entity.query
files = application_generate_entity.files files = application_generate_entity.files
# moderation # moderation
if self.handle_input_moderation(
queue_manager=queue_manager,
app_record=app_record,
app_generate_entity=application_generate_entity,
inputs=inputs,
query=query
):
return
# annotation reply
if self.handle_annotation_reply(
app_record=app_record,
message=message,
query=query,
queue_manager=queue_manager,
app_generate_entity=application_generate_entity
):
return
# fetch user
if application_generate_entity.invoke_from in [InvokeFrom.DEBUGGER, InvokeFrom.EXPLORE]:
user = db.session.query(Account).filter(Account.id == application_generate_entity.user_id).first()
else:
user = db.session.query(EndUser).filter(EndUser.id == application_generate_entity.user_id).first()
# RUN WORKFLOW
workflow_engine_manager = WorkflowEngineManager()
result_generator = workflow_engine_manager.run_workflow(
app_model=app_record,
workflow=workflow,
user=user,
user_inputs=inputs,
system_inputs={
SystemVariable.QUERY: query,
SystemVariable.FILES: files,
SystemVariable.CONVERSATION: conversation.id,
}
)
for result in result_generator:
# todo handle workflow and node event
pass
def handle_input_moderation(self, queue_manager: AppQueueManager,
app_record: App,
app_generate_entity: AdvancedChatAppGenerateEntity,
inputs: dict,
query: str) -> bool:
"""
Handle input moderation
:param queue_manager: application queue manager
:param app_record: app record
:param app_generate_entity: application generate entity
:param inputs: inputs
:param query: query
:return:
"""
try: try:
# process sensitive_word_avoidance # process sensitive_word_avoidance
_, inputs, query = self.moderation_for_inputs( _, inputs, query = self.moderation_for_inputs(
app_id=app_record.id, app_id=app_record.id,
tenant_id=app_config.tenant_id, tenant_id=app_generate_entity.app_config.tenant_id,
app_generate_entity=application_generate_entity, app_generate_entity=app_generate_entity,
inputs=inputs, inputs=inputs,
query=query, query=query,
) )
except ModerationException as e: except ModerationException as e:
# TODO self._stream_output(
self.direct_output(
queue_manager=queue_manager, queue_manager=queue_manager,
app_generate_entity=application_generate_entity,
prompt_messages=prompt_messages,
text=str(e), text=str(e),
stream=application_generate_entity.stream stream=app_generate_entity.stream,
stopped_by=QueueStopEvent.StopBy.INPUT_MODERATION
) )
return return True
if query: return False
# annotation reply
annotation_reply = self.query_app_annotations_to_reply(
app_record=app_record,
message=message,
query=query,
user_id=application_generate_entity.user_id,
invoke_from=application_generate_entity.invoke_from
)
if annotation_reply: def handle_annotation_reply(self, app_record: App,
queue_manager.publish_annotation_reply( message: Message,
message_annotation_id=annotation_reply.id, query: str,
pub_from=PublishFrom.APPLICATION_MANAGER queue_manager: AppQueueManager,
) app_generate_entity: AdvancedChatAppGenerateEntity) -> bool:
"""
# TODO Handle annotation reply
self.direct_output( :param app_record: app record
queue_manager=queue_manager, :param message: message
app_generate_entity=application_generate_entity, :param query: query
prompt_messages=prompt_messages, :param queue_manager: application queue manager
text=annotation_reply.content, :param app_generate_entity: application generate entity
stream=application_generate_entity.stream """
) # annotation reply
return annotation_reply = self.query_app_annotations_to_reply(
app_record=app_record,
# check hosting moderation message=message,
# TODO query=query,
hosting_moderation_result = self.check_hosting_moderation( user_id=app_generate_entity.user_id,
application_generate_entity=application_generate_entity, invoke_from=app_generate_entity.invoke_from
queue_manager=queue_manager,
prompt_messages=prompt_messages
) )
if hosting_moderation_result: if annotation_reply:
return queue_manager.publish_annotation_reply(
message_annotation_id=annotation_reply.id,
pub_from=PublishFrom.APPLICATION_MANAGER
)
self._stream_output(
queue_manager=queue_manager,
text=annotation_reply.content,
stream=app_generate_entity.stream,
stopped_by=QueueStopEvent.StopBy.ANNOTATION_REPLY
)
return True
return False
# todo RUN WORKFLOW def _stream_output(self, queue_manager: AppQueueManager,
\ No newline at end of file text: str,
stream: bool,
stopped_by: QueueStopEvent.StopBy) -> None:
"""
Direct output
:param queue_manager: application queue manager
:param text: text
:param stream: stream
:return:
"""
if stream:
index = 0
for token in text:
queue_manager.publish_text_chunk(token, PublishFrom.APPLICATION_MANAGER)
index += 1
time.sleep(0.01)
queue_manager.publish(
QueueStopEvent(stopped_by=stopped_by),
PublishFrom.APPLICATION_MANAGER
)
queue_manager.stop_listen()
...@@ -165,6 +165,7 @@ class QueueStopEvent(AppQueueEvent): ...@@ -165,6 +165,7 @@ class QueueStopEvent(AppQueueEvent):
USER_MANUAL = "user-manual" USER_MANUAL = "user-manual"
ANNOTATION_REPLY = "annotation-reply" ANNOTATION_REPLY = "annotation-reply"
OUTPUT_MODERATION = "output-moderation" OUTPUT_MODERATION = "output-moderation"
INPUT_MODERATION = "input-moderation"
event = QueueEvent.STOP event = QueueEvent.STOP
stopped_by: StopBy stopped_by: StopBy
......
...@@ -30,3 +30,12 @@ class NodeType(Enum): ...@@ -30,3 +30,12 @@ class NodeType(Enum):
if node_type.value == value: if node_type.value == value:
return node_type return node_type
raise ValueError(f'invalid node type value {value}') raise ValueError(f'invalid node type value {value}')
class SystemVariable(Enum):
"""
System Variables.
"""
QUERY = 'query'
FILES = 'files'
CONVERSATION = 'conversation'
from enum import Enum
from typing import Optional, Union, Any
from core.workflow.entities.node_entities import SystemVariable
VariableValue = Union[str, int, float, dict, list]
class ValueType(Enum):
"""
Value Type Enum
"""
STRING = "string"
NUMBER = "number"
OBJECT = "object"
ARRAY = "array"
FILE = "file"
class VariablePool:
variables_mapping = {}
def __init__(self, system_variables: dict[SystemVariable, Any]) -> None:
# system variables
# for example:
# {
# 'query': 'abc',
# 'files': []
# }
for system_variable, value in system_variables.items():
self.append_variable('sys', [system_variable.value], value)
def append_variable(self, node_id: str, variable_key_list: list[str], value: VariableValue) -> None:
"""
Append variable
:param node_id: node id
:param variable_key_list: variable key list, like: ['result', 'text']
:param value: value
:return:
"""
if node_id not in self.variables_mapping:
self.variables_mapping[node_id] = {}
variable_key_list_hash = hash(tuple(variable_key_list))
self.variables_mapping[node_id][variable_key_list_hash] = value
def get_variable_value(self, variable_selector: list[str],
target_value_type: Optional[ValueType] = None) -> Optional[VariableValue]:
"""
Get variable
:param variable_selector: include node_id and variables
:param target_value_type: target value type
:return:
"""
if len(variable_selector) < 2:
raise ValueError('Invalid value selector')
node_id = variable_selector[0]
if node_id not in self.variables_mapping:
return None
# fetch variable keys, pop node_id
variable_key_list = variable_selector[1:]
variable_key_list_hash = hash(tuple(variable_key_list))
value = self.variables_mapping[node_id].get(variable_key_list_hash)
if target_value_type:
if target_value_type == ValueType.STRING:
return str(value)
elif target_value_type == ValueType.NUMBER:
return int(value)
elif target_value_type == ValueType.OBJECT:
if not isinstance(value, dict):
raise ValueError('Invalid value type: object')
elif target_value_type == ValueType.ARRAY:
if not isinstance(value, list):
raise ValueError('Invalid value type: array')
return value
from abc import abstractmethod
from typing import Optional from typing import Optional
from core.workflow.entities.node_entities import NodeType
from core.workflow.entities.variable_pool import VariablePool
class BaseNode: class BaseNode:
_node_type: NodeType
def __int__(self, node_config: dict) -> None:
self._node_config = node_config
@abstractmethod
def run(self, variable_pool: Optional[VariablePool] = None,
run_args: Optional[dict] = None) -> dict:
"""
Run node
:param variable_pool: variable pool
:param run_args: run args
:return:
"""
if variable_pool is None and run_args is None:
raise ValueError("At least one of `variable_pool` or `run_args` must be provided.")
return self._run(
variable_pool=variable_pool,
run_args=run_args
)
@abstractmethod
def _run(self, variable_pool: Optional[VariablePool] = None,
run_args: Optional[dict] = None) -> dict:
"""
Run node
:param variable_pool: variable pool
:param run_args: run args
:return:
"""
raise NotImplementedError
@classmethod @classmethod
def get_default_config(cls, filters: Optional[dict] = None) -> dict: def get_default_config(cls, filters: Optional[dict] = None) -> dict:
""" """
......
from typing import Optional from typing import Optional, Union, Generator
from core.memory.token_buffer_memory import TokenBufferMemory
from core.workflow.entities.node_entities import NodeType from core.workflow.entities.node_entities import NodeType
from core.workflow.nodes.code.code_node import CodeNode from core.workflow.nodes.code.code_node import CodeNode
from core.workflow.nodes.direct_answer.direct_answer_node import DirectAnswerNode from core.workflow.nodes.direct_answer.direct_answer_node import DirectAnswerNode
...@@ -14,7 +15,8 @@ from core.workflow.nodes.template_transform.template_transform_node import Templ ...@@ -14,7 +15,8 @@ from core.workflow.nodes.template_transform.template_transform_node import Templ
from core.workflow.nodes.tool.tool_node import ToolNode from core.workflow.nodes.tool.tool_node import ToolNode
from core.workflow.nodes.variable_assigner.variable_assigner_node import VariableAssignerNode from core.workflow.nodes.variable_assigner.variable_assigner_node import VariableAssignerNode
from extensions.ext_database import db from extensions.ext_database import db
from models.model import App from models.account import Account
from models.model import App, EndUser, Conversation
from models.workflow import Workflow from models.workflow import Workflow
node_classes = { node_classes = {
...@@ -56,13 +58,20 @@ class WorkflowEngineManager: ...@@ -56,13 +58,20 @@ class WorkflowEngineManager:
return None return None
# fetch published workflow by workflow_id # fetch published workflow by workflow_id
return self.get_workflow(app_model, app_model.workflow_id)
def get_workflow(self, app_model: App, workflow_id: str) -> Optional[Workflow]:
"""
Get workflow
"""
# fetch workflow by workflow_id
workflow = db.session.query(Workflow).filter( workflow = db.session.query(Workflow).filter(
Workflow.tenant_id == app_model.tenant_id, Workflow.tenant_id == app_model.tenant_id,
Workflow.app_id == app_model.id, Workflow.app_id == app_model.id,
Workflow.id == app_model.workflow_id Workflow.id == workflow_id
).first() ).first()
# return published workflow # return workflow
return workflow return workflow
def get_default_configs(self) -> list[dict]: def get_default_configs(self) -> list[dict]:
...@@ -96,3 +105,20 @@ class WorkflowEngineManager: ...@@ -96,3 +105,20 @@ class WorkflowEngineManager:
return None return None
return default_config return default_config
def run_workflow(self, app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
user_inputs: dict,
system_inputs: Optional[dict] = None) -> Generator:
"""
Run workflow
:param app_model: App instance
:param workflow: Workflow instance
:param user: account or end user
:param user_inputs: user variables inputs
:param system_inputs: system inputs, like: query, files
:return:
"""
# TODO
pass
...@@ -5,8 +5,8 @@ from libs.helper import TimestampField ...@@ -5,8 +5,8 @@ from libs.helper import TimestampField
workflow_fields = { workflow_fields = {
'id': fields.String, 'id': fields.String,
'graph': fields.Nested(simple_account_fields, attribute='graph_dict'), 'graph': fields.Raw(attribute='graph_dict'),
'features': fields.Nested(simple_account_fields, attribute='features_dict'), 'features': fields.Raw(attribute='features_dict'),
'created_by': fields.Nested(simple_account_fields, attribute='created_by_account'), 'created_by': fields.Nested(simple_account_fields, attribute='created_by_account'),
'created_at': TimestampField, 'created_at': TimestampField,
'updated_by': fields.Nested(simple_account_fields, attribute='updated_by_account', allow_null=True), 'updated_by': fields.Nested(simple_account_fields, attribute='updated_by_account', allow_null=True),
......
...@@ -22,10 +22,10 @@ workflow_run_for_list_fields = { ...@@ -22,10 +22,10 @@ workflow_run_for_list_fields = {
"id": fields.String, "id": fields.String,
"sequence_number": fields.Integer, "sequence_number": fields.Integer,
"version": fields.String, "version": fields.String,
"graph": fields.String, "graph": fields.Raw(attribute='graph_dict'),
"inputs": fields.String, "inputs": fields.Raw(attribute='inputs_dict'),
"status": fields.String, "status": fields.String,
"outputs": fields.String, "outputs": fields.Raw(attribute='outputs_dict'),
"error": fields.String, "error": fields.String,
"elapsed_time": fields.Float, "elapsed_time": fields.Float,
"total_tokens": fields.Integer, "total_tokens": fields.Integer,
...@@ -49,10 +49,10 @@ workflow_run_detail_fields = { ...@@ -49,10 +49,10 @@ workflow_run_detail_fields = {
"id": fields.String, "id": fields.String,
"sequence_number": fields.Integer, "sequence_number": fields.Integer,
"version": fields.String, "version": fields.String,
"graph": fields.String, "graph": fields.Raw(attribute='graph_dict'),
"inputs": fields.String, "inputs": fields.Raw(attribute='inputs_dict'),
"status": fields.String, "status": fields.String,
"outputs": fields.String, "outputs": fields.Raw(attribute='outputs_dict'),
"error": fields.String, "error": fields.String,
"elapsed_time": fields.Float, "elapsed_time": fields.Float,
"total_tokens": fields.Integer, "total_tokens": fields.Integer,
...@@ -73,13 +73,13 @@ workflow_run_node_execution_fields = { ...@@ -73,13 +73,13 @@ workflow_run_node_execution_fields = {
"node_id": fields.String, "node_id": fields.String,
"node_type": fields.String, "node_type": fields.String,
"title": fields.String, "title": fields.String,
"inputs": fields.String, "inputs": fields.Raw(attribute='inputs_dict'),
"process_data": fields.String, "process_data": fields.Raw(attribute='process_data_dict'),
"outputs": fields.String, "outputs": fields.Raw(attribute='outputs_dict'),
"status": fields.String, "status": fields.String,
"error": fields.String, "error": fields.String,
"elapsed_time": fields.Float, "elapsed_time": fields.Float,
"execution_metadata": fields.String, "execution_metadata": fields.Raw(attribute='execution_metadata_dict'),
"created_at": TimestampField, "created_at": TimestampField,
"created_by_role": fields.String, "created_by_role": fields.String,
"created_by_account": fields.Nested(simple_account_fields, attribute='created_by_account', allow_null=True), "created_by_account": fields.Nested(simple_account_fields, attribute='created_by_account', allow_null=True),
......
...@@ -272,6 +272,14 @@ class WorkflowRun(db.Model): ...@@ -272,6 +272,14 @@ class WorkflowRun(db.Model):
return EndUser.query.get(self.created_by) \ return EndUser.query.get(self.created_by) \
if created_by_role == CreatedByRole.END_USER else None if created_by_role == CreatedByRole.END_USER else None
@property
def graph_dict(self):
return self.graph if not self.graph else json.loads(self.graph)
@property
def inputs_dict(self):
return self.inputs if not self.inputs else json.loads(self.inputs)
@property @property
def outputs_dict(self): def outputs_dict(self):
return self.outputs if not self.outputs else json.loads(self.outputs) return self.outputs if not self.outputs else json.loads(self.outputs)
......
import json import json
from datetime import datetime from datetime import datetime
from typing import Optional from typing import Optional, Union, Any, Generator
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator
from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.entities.node_entities import NodeType from core.workflow.entities.node_entities import NodeType
from core.workflow.workflow_engine_manager import WorkflowEngineManager from core.workflow.workflow_engine_manager import WorkflowEngineManager
from extensions.ext_database import db from extensions.ext_database import db
from models.account import Account from models.account import Account
from models.model import App, AppMode from models.model import App, AppMode, EndUser
from models.workflow import Workflow, WorkflowType from models.workflow import Workflow, WorkflowType
from services.workflow.workflow_converter import WorkflowConverter from services.workflow.workflow_converter import WorkflowConverter
...@@ -142,6 +144,39 @@ class WorkflowService: ...@@ -142,6 +144,39 @@ class WorkflowService:
workflow_engine_manager = WorkflowEngineManager() workflow_engine_manager = WorkflowEngineManager()
return workflow_engine_manager.get_default_config(node_type, filters) return workflow_engine_manager.get_default_config(node_type, filters)
def run_advanced_chat_draft_workflow(self, app_model: App,
user: Union[Account, EndUser],
args: dict,
invoke_from: InvokeFrom) -> Union[dict, Generator]:
"""
Run advanced chatbot draft workflow
"""
# fetch draft workflow by app_model
draft_workflow = self.get_draft_workflow(app_model=app_model)
if not draft_workflow:
raise ValueError('Workflow not initialized')
# run draft workflow
app_generator = AdvancedChatAppGenerator()
response = app_generator.generate(
app_model=app_model,
workflow=draft_workflow,
user=user,
args=args,
invoke_from=invoke_from,
stream=True
)
return response
def run_draft_workflow(self, app_model: App,
user: Union[Account, EndUser],
args: dict,
invoke_from: InvokeFrom) -> Union[dict, Generator]:
# TODO
pass
def convert_to_workflow(self, app_model: App, account: Account) -> App: def convert_to_workflow(self, app_model: App, account: Account) -> App:
""" """
Basic mode of chatbot app(expert mode) to workflow Basic mode of chatbot app(expert mode) to workflow
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment