Commit ea0d41c3 authored by takatost's avatar takatost

fix workflow api return

parent 8af0e837
import json
import logging
from typing import Generator
from flask import Response, stream_with_context
from flask_restful import Resource, marshal_with, reqparse
from werkzeug.exceptions import NotFound, InternalServerError
import services
from controllers.console import api
from controllers.console.app.error import DraftWorkflowNotExist
from controllers.console.app.error import DraftWorkflowNotExist, ConversationCompletedError
from controllers.console.app.wraps import get_app_model
from controllers.console.setup import setup_required
from controllers.console.wraps import account_initialization_required
from core.app.entities.app_invoke_entities import InvokeFrom
from fields.workflow_fields import workflow_fields
from libs.helper import uuid_value
from libs.login import current_user, login_required
from models.model import App, AppMode
from services.workflow_service import WorkflowService
logger = logging.getLogger(__name__)
class DraftWorkflowApi(Resource):
@setup_required
@login_required
......@@ -59,23 +69,80 @@ class DraftWorkflowApi(Resource):
}
class AdvancedChatDraftWorkflowRunApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT])
def post(self, app_model: App):
"""
Run draft workflow
"""
parser = reqparse.RequestParser()
parser.add_argument('inputs', type=dict, required=True, location='json')
parser.add_argument('query', type=str, location='json', default='')
parser.add_argument('files', type=list, required=False, location='json')
parser.add_argument('conversation_id', type=uuid_value, location='json')
args = parser.parse_args()
workflow_service = WorkflowService()
try:
response = workflow_service.run_advanced_chat_draft_workflow(
app_model=app_model,
user=current_user,
args=args,
invoke_from=InvokeFrom.DEBUGGER
)
except services.errors.conversation.ConversationNotExistsError:
raise NotFound("Conversation Not Exists.")
except services.errors.conversation.ConversationCompletedError:
raise ConversationCompletedError()
except ValueError as e:
raise e
except Exception as e:
logging.exception("internal server error.")
raise InternalServerError()
def generate() -> Generator:
yield from response
return Response(stream_with_context(generate()), status=200,
mimetype='text/event-stream')
class DraftWorkflowRunApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@get_app_model(mode=[AppMode.WORKFLOW])
def post(self, app_model: App):
"""
Run draft workflow
"""
# TODO
parser = reqparse.RequestParser()
parser.add_argument('inputs', type=dict, required=True, location='json')
args = parser.parse_args()
workflow_service = WorkflowService()
workflow_service.run_draft_workflow(app_model=app_model, account=current_user)
# TODO
return {
"result": "success"
}
try:
response = workflow_service.run_draft_workflow(
app_model=app_model,
user=current_user,
args=args,
invoke_from=InvokeFrom.DEBUGGER
)
except ValueError as e:
raise e
except Exception as e:
logging.exception("internal server error.")
raise InternalServerError()
def generate() -> Generator:
yield from response
return Response(stream_with_context(generate()), status=200,
mimetype='text/event-stream')
class WorkflowTaskStopApi(Resource):
......@@ -214,10 +281,12 @@ class ConvertToWorkflowApi(Resource):
api.add_resource(DraftWorkflowApi, '/apps/<uuid:app_id>/workflows/draft')
api.add_resource(AdvancedChatDraftWorkflowRunApi, '/apps/<uuid:app_id>/advanced-chat/workflows/draft/run')
api.add_resource(DraftWorkflowRunApi, '/apps/<uuid:app_id>/workflows/draft/run')
api.add_resource(WorkflowTaskStopApi, '/apps/<uuid:app_id>/workflows/tasks/<string:task_id>/stop')
api.add_resource(DraftWorkflowNodeRunApi, '/apps/<uuid:app_id>/workflows/draft/nodes/<uuid:node_id>/run')
api.add_resource(PublishedWorkflowApi, '/apps/<uuid:app_id>/workflows/published')
api.add_resource(DefaultBlockConfigsApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs')
api.add_resource(DefaultBlockConfigApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs/:block_type')
api.add_resource(DefaultBlockConfigApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs'
'/<string:block_type>')
api.add_resource(ConvertToWorkflowApi, '/apps/<uuid:app_id>/convert-to-workflow')
......@@ -16,18 +16,19 @@ from core.app.apps.message_based_app_generator import MessageBasedAppGenerator
from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, InvokeFrom
from core.file.message_file_parser import MessageFileParser
from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError
from core.workflow.workflow_engine_manager import WorkflowEngineManager
from extensions.ext_database import db
from models.account import Account
from models.model import App, Conversation, EndUser, Message
from models.workflow import Workflow
logger = logging.getLogger(__name__)
class AdvancedChatAppGenerator(MessageBasedAppGenerator):
def generate(self, app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
args: Any,
args: dict,
invoke_from: InvokeFrom,
stream: bool = True) \
-> Union[dict, Generator]:
......@@ -35,6 +36,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
Generate App response.
:param app_model: App
:param workflow: Workflow
:param user: account or end user
:param args: request args
:param invoke_from: invoke from source
......@@ -59,16 +61,6 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
if args.get('conversation_id'):
conversation = self._get_conversation_by_user(app_model, args.get('conversation_id'), user)
# get workflow
workflow_engine_manager = WorkflowEngineManager()
if invoke_from == InvokeFrom.DEBUGGER:
workflow = workflow_engine_manager.get_draft_workflow(app_model=app_model)
else:
workflow = workflow_engine_manager.get_published_workflow(app_model=app_model)
if not workflow:
raise ValueError('Workflow not initialized')
# parse files
files = args['files'] if 'files' in args and args['files'] else []
message_file_parser = MessageFileParser(tenant_id=app_model.tenant_id, app_id=app_model.id)
......
import logging
import time
from typing import cast
from core.app.app_queue_manager import AppQueueManager, PublishFrom
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfig
from core.app.apps.base_app_runner import AppRunner
from core.app.entities.app_invoke_entities import (
AdvancedChatAppGenerateEntity,
AdvancedChatAppGenerateEntity, InvokeFrom,
)
from core.app.entities.queue_entities import QueueStopEvent
from core.moderation.base import ModerationException
from core.workflow.entities.node_entities import SystemVariable
from core.workflow.workflow_engine_manager import WorkflowEngineManager
from extensions.ext_database import db
from models.model import App, Conversation, Message
from models.account import Account
from models.model import App, Conversation, Message, EndUser
logger = logging.getLogger(__name__)
......@@ -38,39 +43,113 @@ class AdvancedChatAppRunner(AppRunner):
if not app_record:
raise ValueError("App not found")
workflow = WorkflowEngineManager().get_workflow(app_model=app_record, workflow_id=app_config.workflow_id)
if not workflow:
raise ValueError("Workflow not initialized")
inputs = application_generate_entity.inputs
query = application_generate_entity.query
files = application_generate_entity.files
# moderation
if self.handle_input_moderation(
queue_manager=queue_manager,
app_record=app_record,
app_generate_entity=application_generate_entity,
inputs=inputs,
query=query
):
return
# annotation reply
if self.handle_annotation_reply(
app_record=app_record,
message=message,
query=query,
queue_manager=queue_manager,
app_generate_entity=application_generate_entity
):
return
# fetch user
if application_generate_entity.invoke_from in [InvokeFrom.DEBUGGER, InvokeFrom.EXPLORE]:
user = db.session.query(Account).filter(Account.id == application_generate_entity.user_id).first()
else:
user = db.session.query(EndUser).filter(EndUser.id == application_generate_entity.user_id).first()
# RUN WORKFLOW
workflow_engine_manager = WorkflowEngineManager()
result_generator = workflow_engine_manager.run_workflow(
app_model=app_record,
workflow=workflow,
user=user,
user_inputs=inputs,
system_inputs={
SystemVariable.QUERY: query,
SystemVariable.FILES: files,
SystemVariable.CONVERSATION: conversation.id,
}
)
for result in result_generator:
# todo handle workflow and node event
pass
def handle_input_moderation(self, queue_manager: AppQueueManager,
app_record: App,
app_generate_entity: AdvancedChatAppGenerateEntity,
inputs: dict,
query: str) -> bool:
"""
Handle input moderation
:param queue_manager: application queue manager
:param app_record: app record
:param app_generate_entity: application generate entity
:param inputs: inputs
:param query: query
:return:
"""
try:
# process sensitive_word_avoidance
_, inputs, query = self.moderation_for_inputs(
app_id=app_record.id,
tenant_id=app_config.tenant_id,
app_generate_entity=application_generate_entity,
tenant_id=app_generate_entity.app_config.tenant_id,
app_generate_entity=app_generate_entity,
inputs=inputs,
query=query,
)
except ModerationException as e:
# TODO
self.direct_output(
self._stream_output(
queue_manager=queue_manager,
app_generate_entity=application_generate_entity,
prompt_messages=prompt_messages,
text=str(e),
stream=application_generate_entity.stream
stream=app_generate_entity.stream,
stopped_by=QueueStopEvent.StopBy.INPUT_MODERATION
)
return
return True
return False
if query:
def handle_annotation_reply(self, app_record: App,
message: Message,
query: str,
queue_manager: AppQueueManager,
app_generate_entity: AdvancedChatAppGenerateEntity) -> bool:
"""
Handle annotation reply
:param app_record: app record
:param message: message
:param query: query
:param queue_manager: application queue manager
:param app_generate_entity: application generate entity
"""
# annotation reply
annotation_reply = self.query_app_annotations_to_reply(
app_record=app_record,
message=message,
query=query,
user_id=application_generate_entity.user_id,
invoke_from=application_generate_entity.invoke_from
user_id=app_generate_entity.user_id,
invoke_from=app_generate_entity.invoke_from
)
if annotation_reply:
......@@ -79,25 +158,36 @@ class AdvancedChatAppRunner(AppRunner):
pub_from=PublishFrom.APPLICATION_MANAGER
)
# TODO
self.direct_output(
self._stream_output(
queue_manager=queue_manager,
app_generate_entity=application_generate_entity,
prompt_messages=prompt_messages,
text=annotation_reply.content,
stream=application_generate_entity.stream
stream=app_generate_entity.stream,
stopped_by=QueueStopEvent.StopBy.ANNOTATION_REPLY
)
return
return True
# check hosting moderation
# TODO
hosting_moderation_result = self.check_hosting_moderation(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
prompt_messages=prompt_messages
)
return False
if hosting_moderation_result:
return
def _stream_output(self, queue_manager: AppQueueManager,
text: str,
stream: bool,
stopped_by: QueueStopEvent.StopBy) -> None:
"""
Direct output
:param queue_manager: application queue manager
:param text: text
:param stream: stream
:return:
"""
if stream:
index = 0
for token in text:
queue_manager.publish_text_chunk(token, PublishFrom.APPLICATION_MANAGER)
index += 1
time.sleep(0.01)
# todo RUN WORKFLOW
\ No newline at end of file
queue_manager.publish(
QueueStopEvent(stopped_by=stopped_by),
PublishFrom.APPLICATION_MANAGER
)
queue_manager.stop_listen()
......@@ -165,6 +165,7 @@ class QueueStopEvent(AppQueueEvent):
USER_MANUAL = "user-manual"
ANNOTATION_REPLY = "annotation-reply"
OUTPUT_MODERATION = "output-moderation"
INPUT_MODERATION = "input-moderation"
event = QueueEvent.STOP
stopped_by: StopBy
......
......@@ -30,3 +30,12 @@ class NodeType(Enum):
if node_type.value == value:
return node_type
raise ValueError(f'invalid node type value {value}')
class SystemVariable(Enum):
"""
System Variables.
"""
QUERY = 'query'
FILES = 'files'
CONVERSATION = 'conversation'
from enum import Enum
from typing import Optional, Union, Any
from core.workflow.entities.node_entities import SystemVariable
VariableValue = Union[str, int, float, dict, list]
class ValueType(Enum):
"""
Value Type Enum
"""
STRING = "string"
NUMBER = "number"
OBJECT = "object"
ARRAY = "array"
FILE = "file"
class VariablePool:
variables_mapping = {}
def __init__(self, system_variables: dict[SystemVariable, Any]) -> None:
# system variables
# for example:
# {
# 'query': 'abc',
# 'files': []
# }
for system_variable, value in system_variables.items():
self.append_variable('sys', [system_variable.value], value)
def append_variable(self, node_id: str, variable_key_list: list[str], value: VariableValue) -> None:
"""
Append variable
:param node_id: node id
:param variable_key_list: variable key list, like: ['result', 'text']
:param value: value
:return:
"""
if node_id not in self.variables_mapping:
self.variables_mapping[node_id] = {}
variable_key_list_hash = hash(tuple(variable_key_list))
self.variables_mapping[node_id][variable_key_list_hash] = value
def get_variable_value(self, variable_selector: list[str],
target_value_type: Optional[ValueType] = None) -> Optional[VariableValue]:
"""
Get variable
:param variable_selector: include node_id and variables
:param target_value_type: target value type
:return:
"""
if len(variable_selector) < 2:
raise ValueError('Invalid value selector')
node_id = variable_selector[0]
if node_id not in self.variables_mapping:
return None
# fetch variable keys, pop node_id
variable_key_list = variable_selector[1:]
variable_key_list_hash = hash(tuple(variable_key_list))
value = self.variables_mapping[node_id].get(variable_key_list_hash)
if target_value_type:
if target_value_type == ValueType.STRING:
return str(value)
elif target_value_type == ValueType.NUMBER:
return int(value)
elif target_value_type == ValueType.OBJECT:
if not isinstance(value, dict):
raise ValueError('Invalid value type: object')
elif target_value_type == ValueType.ARRAY:
if not isinstance(value, list):
raise ValueError('Invalid value type: array')
return value
from abc import abstractmethod
from typing import Optional
from core.workflow.entities.node_entities import NodeType
from core.workflow.entities.variable_pool import VariablePool
class BaseNode:
_node_type: NodeType
def __int__(self, node_config: dict) -> None:
self._node_config = node_config
@abstractmethod
def run(self, variable_pool: Optional[VariablePool] = None,
run_args: Optional[dict] = None) -> dict:
"""
Run node
:param variable_pool: variable pool
:param run_args: run args
:return:
"""
if variable_pool is None and run_args is None:
raise ValueError("At least one of `variable_pool` or `run_args` must be provided.")
return self._run(
variable_pool=variable_pool,
run_args=run_args
)
@abstractmethod
def _run(self, variable_pool: Optional[VariablePool] = None,
run_args: Optional[dict] = None) -> dict:
"""
Run node
:param variable_pool: variable pool
:param run_args: run args
:return:
"""
raise NotImplementedError
@classmethod
def get_default_config(cls, filters: Optional[dict] = None) -> dict:
"""
......
from typing import Optional
from typing import Optional, Union, Generator
from core.memory.token_buffer_memory import TokenBufferMemory
from core.workflow.entities.node_entities import NodeType
from core.workflow.nodes.code.code_node import CodeNode
from core.workflow.nodes.direct_answer.direct_answer_node import DirectAnswerNode
......@@ -14,7 +15,8 @@ from core.workflow.nodes.template_transform.template_transform_node import Templ
from core.workflow.nodes.tool.tool_node import ToolNode
from core.workflow.nodes.variable_assigner.variable_assigner_node import VariableAssignerNode
from extensions.ext_database import db
from models.model import App
from models.account import Account
from models.model import App, EndUser, Conversation
from models.workflow import Workflow
node_classes = {
......@@ -56,13 +58,20 @@ class WorkflowEngineManager:
return None
# fetch published workflow by workflow_id
return self.get_workflow(app_model, app_model.workflow_id)
def get_workflow(self, app_model: App, workflow_id: str) -> Optional[Workflow]:
"""
Get workflow
"""
# fetch workflow by workflow_id
workflow = db.session.query(Workflow).filter(
Workflow.tenant_id == app_model.tenant_id,
Workflow.app_id == app_model.id,
Workflow.id == app_model.workflow_id
Workflow.id == workflow_id
).first()
# return published workflow
# return workflow
return workflow
def get_default_configs(self) -> list[dict]:
......@@ -96,3 +105,20 @@ class WorkflowEngineManager:
return None
return default_config
def run_workflow(self, app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
user_inputs: dict,
system_inputs: Optional[dict] = None) -> Generator:
"""
Run workflow
:param app_model: App instance
:param workflow: Workflow instance
:param user: account or end user
:param user_inputs: user variables inputs
:param system_inputs: system inputs, like: query, files
:return:
"""
# TODO
pass
......@@ -5,8 +5,8 @@ from libs.helper import TimestampField
workflow_fields = {
'id': fields.String,
'graph': fields.Nested(simple_account_fields, attribute='graph_dict'),
'features': fields.Nested(simple_account_fields, attribute='features_dict'),
'graph': fields.Raw(attribute='graph_dict'),
'features': fields.Raw(attribute='features_dict'),
'created_by': fields.Nested(simple_account_fields, attribute='created_by_account'),
'created_at': TimestampField,
'updated_by': fields.Nested(simple_account_fields, attribute='updated_by_account', allow_null=True),
......
......@@ -22,10 +22,10 @@ workflow_run_for_list_fields = {
"id": fields.String,
"sequence_number": fields.Integer,
"version": fields.String,
"graph": fields.String,
"inputs": fields.String,
"graph": fields.Raw(attribute='graph_dict'),
"inputs": fields.Raw(attribute='inputs_dict'),
"status": fields.String,
"outputs": fields.String,
"outputs": fields.Raw(attribute='outputs_dict'),
"error": fields.String,
"elapsed_time": fields.Float,
"total_tokens": fields.Integer,
......@@ -49,10 +49,10 @@ workflow_run_detail_fields = {
"id": fields.String,
"sequence_number": fields.Integer,
"version": fields.String,
"graph": fields.String,
"inputs": fields.String,
"graph": fields.Raw(attribute='graph_dict'),
"inputs": fields.Raw(attribute='inputs_dict'),
"status": fields.String,
"outputs": fields.String,
"outputs": fields.Raw(attribute='outputs_dict'),
"error": fields.String,
"elapsed_time": fields.Float,
"total_tokens": fields.Integer,
......@@ -73,13 +73,13 @@ workflow_run_node_execution_fields = {
"node_id": fields.String,
"node_type": fields.String,
"title": fields.String,
"inputs": fields.String,
"process_data": fields.String,
"outputs": fields.String,
"inputs": fields.Raw(attribute='inputs_dict'),
"process_data": fields.Raw(attribute='process_data_dict'),
"outputs": fields.Raw(attribute='outputs_dict'),
"status": fields.String,
"error": fields.String,
"elapsed_time": fields.Float,
"execution_metadata": fields.String,
"execution_metadata": fields.Raw(attribute='execution_metadata_dict'),
"created_at": TimestampField,
"created_by_role": fields.String,
"created_by_account": fields.Nested(simple_account_fields, attribute='created_by_account', allow_null=True),
......
......@@ -272,6 +272,14 @@ class WorkflowRun(db.Model):
return EndUser.query.get(self.created_by) \
if created_by_role == CreatedByRole.END_USER else None
@property
def graph_dict(self):
return self.graph if not self.graph else json.loads(self.graph)
@property
def inputs_dict(self):
return self.inputs if not self.inputs else json.loads(self.inputs)
@property
def outputs_dict(self):
return self.outputs if not self.outputs else json.loads(self.outputs)
......
import json
from datetime import datetime
from typing import Optional
from typing import Optional, Union, Any, Generator
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator
from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.entities.node_entities import NodeType
from core.workflow.workflow_engine_manager import WorkflowEngineManager
from extensions.ext_database import db
from models.account import Account
from models.model import App, AppMode
from models.model import App, AppMode, EndUser
from models.workflow import Workflow, WorkflowType
from services.workflow.workflow_converter import WorkflowConverter
......@@ -142,6 +144,39 @@ class WorkflowService:
workflow_engine_manager = WorkflowEngineManager()
return workflow_engine_manager.get_default_config(node_type, filters)
def run_advanced_chat_draft_workflow(self, app_model: App,
user: Union[Account, EndUser],
args: dict,
invoke_from: InvokeFrom) -> Union[dict, Generator]:
"""
Run advanced chatbot draft workflow
"""
# fetch draft workflow by app_model
draft_workflow = self.get_draft_workflow(app_model=app_model)
if not draft_workflow:
raise ValueError('Workflow not initialized')
# run draft workflow
app_generator = AdvancedChatAppGenerator()
response = app_generator.generate(
app_model=app_model,
workflow=draft_workflow,
user=user,
args=args,
invoke_from=invoke_from,
stream=True
)
return response
def run_draft_workflow(self, app_model: App,
user: Union[Account, EndUser],
args: dict,
invoke_from: InvokeFrom) -> Union[dict, Generator]:
# TODO
pass
def convert_to_workflow(self, app_model: App, account: Account) -> App:
"""
Basic mode of chatbot app(expert mode) to workflow
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment