Commit 98f29f9a authored by takatost's avatar takatost

add AdvancedChatAppGenerateTaskPipeline

parent d31abcb1
......@@ -8,19 +8,24 @@ from sqlalchemy.orm import DeclarativeMeta
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.entities.queue_entities import (
AnnotationReplyEvent,
AppQueueEvent,
QueueAgentMessageEvent,
QueueAgentThoughtEvent,
QueueAnnotationReplyEvent,
QueueErrorEvent,
QueueLLMChunkEvent,
QueueMessage,
QueueMessageEndEvent,
QueueMessageEvent,
QueueMessageFileEvent,
QueueMessageReplaceEvent,
QueueNodeFinishedEvent,
QueueNodeStartedEvent,
QueuePingEvent,
QueueRetrieverResourcesEvent,
QueueStopEvent,
QueueTextChunkEvent,
QueueWorkflowFinishedEvent,
QueueWorkflowStartedEvent,
)
from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk
from extensions.ext_redis import redis_client
......@@ -97,18 +102,30 @@ class AppQueueManager:
"""
self._q.put(None)
def publish_chunk_message(self, chunk: LLMResultChunk, pub_from: PublishFrom) -> None:
def publish_llm_chunk(self, chunk: LLMResultChunk, pub_from: PublishFrom) -> None:
"""
Publish chunk message to channel
Publish llm chunk to channel
:param chunk: chunk
:param chunk: llm chunk
:param pub_from: publish from
:return:
"""
self.publish(QueueMessageEvent(
self.publish(QueueLLMChunkEvent(
chunk=chunk
), pub_from)
def publish_text_chunk(self, text: str, pub_from: PublishFrom) -> None:
"""
Publish text chunk to channel
:param text: text
:param pub_from: publish from
:return:
"""
self.publish(QueueTextChunkEvent(
text=text
), pub_from)
def publish_agent_chunk_message(self, chunk: LLMResultChunk, pub_from: PublishFrom) -> None:
"""
Publish agent chunk message to channel
......@@ -146,7 +163,7 @@ class AppQueueManager:
:param pub_from: publish from
:return:
"""
self.publish(AnnotationReplyEvent(message_annotation_id=message_annotation_id), pub_from)
self.publish(QueueAnnotationReplyEvent(message_annotation_id=message_annotation_id), pub_from)
def publish_message_end(self, llm_result: LLMResult, pub_from: PublishFrom) -> None:
"""
......@@ -158,6 +175,42 @@ class AppQueueManager:
self.publish(QueueMessageEndEvent(llm_result=llm_result), pub_from)
self.stop_listen()
def publish_workflow_started(self, workflow_run_id: str, pub_from: PublishFrom) -> None:
"""
Publish workflow started
:param workflow_run_id: workflow run id
:param pub_from: publish from
:return:
"""
self.publish(QueueWorkflowStartedEvent(workflow_run_id=workflow_run_id), pub_from)
def publish_workflow_finished(self, workflow_run_id: str, pub_from: PublishFrom) -> None:
"""
Publish workflow finished
:param workflow_run_id: workflow run id
:param pub_from: publish from
:return:
"""
self.publish(QueueWorkflowFinishedEvent(workflow_run_id=workflow_run_id), pub_from)
def publish_node_started(self, workflow_node_execution_id: str, pub_from: PublishFrom) -> None:
"""
Publish node started
:param workflow_node_execution_id: workflow node execution id
:param pub_from: publish from
:return:
"""
self.publish(QueueNodeStartedEvent(workflow_node_execution_id=workflow_node_execution_id), pub_from)
def publish_node_finished(self, workflow_node_execution_id: str, pub_from: PublishFrom) -> None:
"""
Publish node finished
:param workflow_node_execution_id: workflow node execution id
:param pub_from: publish from
:return:
"""
self.publish(QueueNodeFinishedEvent(workflow_node_execution_id=workflow_node_execution_id), pub_from)
def publish_agent_thought(self, message_agent_thought: MessageAgentThought, pub_from: PublishFrom) -> None:
"""
Publish agent thought
......
from typing import Optional
from core.app.app_config.base_app_config_manager import BaseAppConfigManager
from core.app.app_config.common.sensitive_word_avoidance.manager import SensitiveWordAvoidanceConfigManager
......@@ -12,7 +11,7 @@ from core.app.app_config.features.suggested_questions_after_answer.manager impor
)
from core.app.app_config.features.text_to_speech.manager import TextToSpeechConfigManager
from core.app.app_config.workflow_ui_based_app.variables.manager import WorkflowVariablesConfigManager
from models.model import App, AppMode, Conversation
from models.model import App, AppMode
from models.workflow import Workflow
......@@ -26,8 +25,7 @@ class AdvancedChatAppConfig(WorkflowUIBasedAppConfig):
class AdvancedChatAppConfigManager(BaseAppConfigManager):
@classmethod
def get_app_config(cls, app_model: App,
workflow: Workflow,
conversation: Optional[Conversation] = None) -> AdvancedChatAppConfig:
workflow: Workflow) -> AdvancedChatAppConfig:
features_dict = workflow.features_dict
app_config = AdvancedChatAppConfig(
......
import logging
import threading
import uuid
from collections.abc import Generator
from typing import Any, Union
from flask import Flask, current_app
from pydantic import ValidationError
from core.app.app_config.features.file_upload.manager import FileUploadConfigManager
from core.app.app_queue_manager import AppQueueManager, ConversationTaskStoppedException, PublishFrom
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
from core.app.apps.advanced_chat.app_runner import AdvancedChatAppRunner
from core.app.apps.advanced_chat.generate_task_pipeline import AdvancedChatAppGenerateTaskPipeline
from core.app.apps.message_based_app_generator import MessageBasedAppGenerator
from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, InvokeFrom
from core.file.message_file_parser import MessageFileParser
from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError
from core.workflow.workflow_engine_manager import WorkflowEngineManager
from extensions.ext_database import db
from models.account import Account
from models.model import App, Conversation, EndUser, Message
logger = logging.getLogger(__name__)
class AdvancedChatAppGenerator(MessageBasedAppGenerator):
def generate(self, app_model: App,
user: Union[Account, EndUser],
args: Any,
invoke_from: InvokeFrom,
stream: bool = True) \
-> Union[dict, Generator]:
"""
Generate App response.
:param app_model: App
:param user: account or end user
:param args: request args
:param invoke_from: invoke from source
:param stream: is stream
"""
if not args.get('query'):
raise ValueError('query is required')
query = args['query']
if not isinstance(query, str):
raise ValueError('query must be a string')
query = query.replace('\x00', '')
inputs = args['inputs']
extras = {
"auto_generate_conversation_name": args['auto_generate_name'] if 'auto_generate_name' in args else True
}
# get conversation
conversation = None
if args.get('conversation_id'):
conversation = self._get_conversation_by_user(app_model, args.get('conversation_id'), user)
# get workflow
workflow_engine_manager = WorkflowEngineManager()
if invoke_from == InvokeFrom.DEBUGGER:
workflow = workflow_engine_manager.get_draft_workflow(app_model=app_model)
else:
workflow = workflow_engine_manager.get_published_workflow(app_model=app_model)
if not workflow:
raise ValueError('Workflow not initialized')
# parse files
files = args['files'] if 'files' in args and args['files'] else []
message_file_parser = MessageFileParser(tenant_id=app_model.tenant_id, app_id=app_model.id)
file_upload_entity = FileUploadConfigManager.convert(workflow.features_dict)
if file_upload_entity:
file_objs = message_file_parser.validate_and_transform_files_arg(
files,
file_upload_entity,
user
)
else:
file_objs = []
# convert to app config
app_config = AdvancedChatAppConfigManager.get_app_config(
app_model=app_model,
workflow=workflow
)
# init application generate entity
application_generate_entity = AdvancedChatAppGenerateEntity(
task_id=str(uuid.uuid4()),
app_config=app_config,
conversation_id=conversation.id if conversation else None,
inputs=conversation.inputs if conversation else self._get_cleaned_inputs(inputs, app_config),
query=query,
files=file_objs,
user_id=user.id,
stream=stream,
invoke_from=invoke_from,
extras=extras
)
# init generate records
(
conversation,
message
) = self._init_generate_records(application_generate_entity, conversation)
# init queue manager
queue_manager = AppQueueManager(
task_id=application_generate_entity.task_id,
user_id=application_generate_entity.user_id,
invoke_from=application_generate_entity.invoke_from,
conversation_id=conversation.id,
app_mode=conversation.mode,
message_id=message.id
)
# new thread
worker_thread = threading.Thread(target=self._generate_worker, kwargs={
'flask_app': current_app._get_current_object(),
'application_generate_entity': application_generate_entity,
'queue_manager': queue_manager,
'conversation_id': conversation.id,
'message_id': message.id,
})
worker_thread.start()
# return response or stream generator
return self._handle_response(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation=conversation,
message=message,
stream=stream
)
def _generate_worker(self, flask_app: Flask,
application_generate_entity: AdvancedChatAppGenerateEntity,
queue_manager: AppQueueManager,
conversation_id: str,
message_id: str) -> None:
"""
Generate worker in a new thread.
:param flask_app: Flask app
:param application_generate_entity: application generate entity
:param queue_manager: queue manager
:param conversation_id: conversation ID
:param message_id: message ID
:return:
"""
with flask_app.app_context():
try:
# get conversation and message
conversation = self._get_conversation(conversation_id)
message = self._get_message(message_id)
# chatbot app
runner = AdvancedChatAppRunner()
runner.run(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation=conversation,
message=message
)
except ConversationTaskStoppedException:
pass
except InvokeAuthorizationError:
queue_manager.publish_error(
InvokeAuthorizationError('Incorrect API key provided'),
PublishFrom.APPLICATION_MANAGER
)
except ValidationError as e:
logger.exception("Validation Error when generating")
queue_manager.publish_error(e, PublishFrom.APPLICATION_MANAGER)
except (ValueError, InvokeError) as e:
queue_manager.publish_error(e, PublishFrom.APPLICATION_MANAGER)
except Exception as e:
logger.exception("Unknown Error when generating")
queue_manager.publish_error(e, PublishFrom.APPLICATION_MANAGER)
finally:
db.session.remove()
def _handle_response(self, application_generate_entity: AdvancedChatAppGenerateEntity,
queue_manager: AppQueueManager,
conversation: Conversation,
message: Message,
stream: bool = False) -> Union[dict, Generator]:
"""
Handle response.
:param application_generate_entity: application generate entity
:param queue_manager: queue manager
:param conversation: conversation
:param message: message
:param stream: is stream
:return:
"""
# init generate task pipeline
generate_task_pipeline = AdvancedChatAppGenerateTaskPipeline(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation=conversation,
message=message
)
try:
return generate_task_pipeline.process(stream=stream)
except ValueError as e:
if e.args[0] == "I/O operation on closed file.": # ignore this error
raise ConversationTaskStoppedException()
else:
logger.exception(e)
raise e
finally:
db.session.remove()
import logging
from typing import cast
from core.app.app_queue_manager import AppQueueManager, PublishFrom
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfig
from core.app.apps.base_app_runner import AppRunner
from core.app.entities.app_invoke_entities import (
AdvancedChatAppGenerateEntity,
)
from core.moderation.base import ModerationException
from extensions.ext_database import db
from models.model import App, Conversation, Message
logger = logging.getLogger(__name__)
class AdvancedChatAppRunner(AppRunner):
"""
AdvancedChat Application Runner
"""
def run(self, application_generate_entity: AdvancedChatAppGenerateEntity,
queue_manager: AppQueueManager,
conversation: Conversation,
message: Message) -> None:
"""
Run application
:param application_generate_entity: application generate entity
:param queue_manager: application queue manager
:param conversation: conversation
:param message: message
:return:
"""
app_config = application_generate_entity.app_config
app_config = cast(AdvancedChatAppConfig, app_config)
app_record = db.session.query(App).filter(App.id == app_config.app_id).first()
if not app_record:
raise ValueError("App not found")
inputs = application_generate_entity.inputs
query = application_generate_entity.query
files = application_generate_entity.files
# moderation
try:
# process sensitive_word_avoidance
_, inputs, query = self.moderation_for_inputs(
app_id=app_record.id,
tenant_id=app_config.tenant_id,
app_generate_entity=application_generate_entity,
inputs=inputs,
query=query,
)
except ModerationException as e:
# TODO
self.direct_output(
queue_manager=queue_manager,
app_generate_entity=application_generate_entity,
prompt_messages=prompt_messages,
text=str(e),
stream=application_generate_entity.stream
)
return
if query:
# annotation reply
annotation_reply = self.query_app_annotations_to_reply(
app_record=app_record,
message=message,
query=query,
user_id=application_generate_entity.user_id,
invoke_from=application_generate_entity.invoke_from
)
if annotation_reply:
queue_manager.publish_annotation_reply(
message_annotation_id=annotation_reply.id,
pub_from=PublishFrom.APPLICATION_MANAGER
)
# TODO
self.direct_output(
queue_manager=queue_manager,
app_generate_entity=application_generate_entity,
prompt_messages=prompt_messages,
text=annotation_reply.content,
stream=application_generate_entity.stream
)
return
# check hosting moderation
# TODO
hosting_moderation_result = self.check_hosting_moderation(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
prompt_messages=prompt_messages
)
if hosting_moderation_result:
return
# todo RUN WORKFLOW
\ No newline at end of file
......@@ -187,7 +187,7 @@ class AppRunner:
if stream:
index = 0
for token in text:
queue_manager.publish_chunk_message(LLMResultChunk(
queue_manager.publish_llm_chunk(LLMResultChunk(
model=app_generate_entity.model_config.model,
prompt_messages=prompt_messages,
delta=LLMResultChunkDelta(
......@@ -261,7 +261,7 @@ class AppRunner:
usage = None
for result in invoke_result:
if not agent:
queue_manager.publish_chunk_message(result, PublishFrom.APPLICATION_MANAGER)
queue_manager.publish_llm_chunk(result, PublishFrom.APPLICATION_MANAGER)
else:
queue_manager.publish_agent_chunk_message(result, PublishFrom.APPLICATION_MANAGER)
......
......@@ -8,14 +8,15 @@ from sqlalchemy import and_
from core.app.app_config.entities import EasyUIBasedAppModelConfigFrom
from core.app.app_queue_manager import AppQueueManager, ConversationTaskStoppedException
from core.app.apps.base_app_generator import BaseAppGenerator
from core.app.apps.easy_ui_based_generate_task_pipeline import EasyUIBasedGenerateTaskPipeline
from core.app.entities.app_invoke_entities import (
AdvancedChatAppGenerateEntity,
AgentChatAppGenerateEntity,
AppGenerateEntity,
ChatAppGenerateEntity,
CompletionAppGenerateEntity,
InvokeFrom,
)
from core.app.generate_task_pipeline import GenerateTaskPipeline
from core.prompt.utils.prompt_template_parser import PromptTemplateParser
from extensions.ext_database import db
from models.account import Account
......@@ -31,7 +32,8 @@ class MessageBasedAppGenerator(BaseAppGenerator):
def _handle_response(self, application_generate_entity: Union[
ChatAppGenerateEntity,
CompletionAppGenerateEntity,
AgentChatAppGenerateEntity
AgentChatAppGenerateEntity,
AdvancedChatAppGenerateEntity
],
queue_manager: AppQueueManager,
conversation: Conversation,
......@@ -47,7 +49,7 @@ class MessageBasedAppGenerator(BaseAppGenerator):
:return:
"""
# init generate task pipeline
generate_task_pipeline = GenerateTaskPipeline(
generate_task_pipeline = EasyUIBasedGenerateTaskPipeline(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation=conversation,
......@@ -114,7 +116,8 @@ class MessageBasedAppGenerator(BaseAppGenerator):
application_generate_entity: Union[
ChatAppGenerateEntity,
CompletionAppGenerateEntity,
AgentChatAppGenerateEntity
AgentChatAppGenerateEntity,
AdvancedChatAppGenerateEntity
],
conversation: Optional[Conversation] = None) \
-> tuple[Conversation, Message]:
......@@ -135,10 +138,19 @@ class MessageBasedAppGenerator(BaseAppGenerator):
from_source = 'console'
account_id = application_generate_entity.user_id
override_model_configs = None
if app_config.app_model_config_from == EasyUIBasedAppModelConfigFrom.ARGS \
and app_config.app_mode in [AppMode.AGENT_CHAT, AppMode.CHAT, AppMode.COMPLETION]:
override_model_configs = app_config.app_model_config_dict
if isinstance(application_generate_entity, AdvancedChatAppGenerateEntity):
app_model_config_id = None
override_model_configs = None
model_provider = None
model_id = None
else:
app_model_config_id = app_config.app_model_config_id
model_provider = application_generate_entity.model_config.provider
model_id = application_generate_entity.model_config.model
override_model_configs = None
if app_config.app_model_config_from == EasyUIBasedAppModelConfigFrom.ARGS \
and app_config.app_mode in [AppMode.AGENT_CHAT, AppMode.CHAT, AppMode.COMPLETION]:
override_model_configs = app_config.app_model_config_dict
# get conversation introduction
introduction = self._get_conversation_introduction(application_generate_entity)
......@@ -146,9 +158,9 @@ class MessageBasedAppGenerator(BaseAppGenerator):
if not conversation:
conversation = Conversation(
app_id=app_config.app_id,
app_model_config_id=app_config.app_model_config_id,
model_provider=application_generate_entity.model_config.provider,
model_id=application_generate_entity.model_config.model,
app_model_config_id=app_model_config_id,
model_provider=model_provider,
model_id=model_id,
override_model_configs=json.dumps(override_model_configs) if override_model_configs else None,
mode=app_config.app_mode.value,
name='New conversation',
......@@ -167,8 +179,8 @@ class MessageBasedAppGenerator(BaseAppGenerator):
message = Message(
app_id=app_config.app_id,
model_provider=application_generate_entity.model_config.provider,
model_id=application_generate_entity.model_config.model,
model_provider=model_provider,
model_id=model_id,
override_model_configs=json.dumps(override_model_configs) if override_model_configs else None,
conversation_id=conversation.id,
inputs=application_generate_entity.inputs,
......
......@@ -10,14 +10,19 @@ class QueueEvent(Enum):
"""
QueueEvent enum
"""
MESSAGE = "message"
LLM_CHUNK = "llm_chunk"
TEXT_CHUNK = "text_chunk"
AGENT_MESSAGE = "agent_message"
MESSAGE_REPLACE = "message-replace"
MESSAGE_END = "message-end"
RETRIEVER_RESOURCES = "retriever-resources"
ANNOTATION_REPLY = "annotation-reply"
AGENT_THOUGHT = "agent-thought"
MESSAGE_FILE = "message-file"
MESSAGE_REPLACE = "message_replace"
MESSAGE_END = "message_end"
WORKFLOW_STARTED = "workflow_started"
WORKFLOW_FINISHED = "workflow_finished"
NODE_STARTED = "node_started"
NODE_FINISHED = "node_finished"
RETRIEVER_RESOURCES = "retriever_resources"
ANNOTATION_REPLY = "annotation_reply"
AGENT_THOUGHT = "agent_thought"
MESSAGE_FILE = "message_file"
ERROR = "error"
PING = "ping"
STOP = "stop"
......@@ -30,13 +35,22 @@ class AppQueueEvent(BaseModel):
event: QueueEvent
class QueueMessageEvent(AppQueueEvent):
class QueueLLMChunkEvent(AppQueueEvent):
"""
QueueMessageEvent entity
QueueLLMChunkEvent entity
"""
event = QueueEvent.MESSAGE
event = QueueEvent.LLM_CHUNK
chunk: LLMResultChunk
class QueueTextChunkEvent(AppQueueEvent):
"""
QueueTextChunkEvent entity
"""
event = QueueEvent.TEXT_CHUNK
chunk_text: str
class QueueAgentMessageEvent(AppQueueEvent):
"""
QueueMessageEvent entity
......@@ -61,9 +75,9 @@ class QueueRetrieverResourcesEvent(AppQueueEvent):
retriever_resources: list[dict]
class AnnotationReplyEvent(AppQueueEvent):
class QueueAnnotationReplyEvent(AppQueueEvent):
"""
AnnotationReplyEvent entity
QueueAnnotationReplyEvent entity
"""
event = QueueEvent.ANNOTATION_REPLY
message_annotation_id: str
......@@ -76,6 +90,38 @@ class QueueMessageEndEvent(AppQueueEvent):
event = QueueEvent.MESSAGE_END
llm_result: LLMResult
class QueueWorkflowStartedEvent(AppQueueEvent):
"""
QueueWorkflowStartedEvent entity
"""
event = QueueEvent.WORKFLOW_STARTED
workflow_run_id: str
class QueueWorkflowFinishedEvent(AppQueueEvent):
"""
QueueWorkflowFinishedEvent entity
"""
event = QueueEvent.WORKFLOW_FINISHED
workflow_run_id: str
class QueueNodeStartedEvent(AppQueueEvent):
"""
QueueNodeStartedEvent entity
"""
event = QueueEvent.NODE_STARTED
workflow_node_execution_id: str
class QueueNodeFinishedEvent(AppQueueEvent):
"""
QueueNodeFinishedEvent entity
"""
event = QueueEvent.NODE_FINISHED
workflow_node_execution_id: str
class QueueAgentThoughtEvent(AppQueueEvent):
"""
......@@ -84,13 +130,15 @@ class QueueAgentThoughtEvent(AppQueueEvent):
event = QueueEvent.AGENT_THOUGHT
agent_thought_id: str
class QueueMessageFileEvent(AppQueueEvent):
"""
QueueAgentThoughtEvent entity
"""
event = QueueEvent.MESSAGE_FILE
message_file_id: str
class QueueErrorEvent(AppQueueEvent):
"""
QueueErrorEvent entity
......
from typing import Optional
from extensions.ext_database import db
from models.model import App
from models.workflow import Workflow
class WorkflowEngineManager:
def get_draft_workflow(self, app_model: App) -> Optional[Workflow]:
"""
Get draft workflow
"""
# fetch draft workflow by app_model
workflow = db.session.query(Workflow).filter(
Workflow.tenant_id == app_model.tenant_id,
Workflow.app_id == app_model.id,
Workflow.version == 'draft'
).first()
# return draft workflow
return workflow
def get_published_workflow(self, app_model: App) -> Optional[Workflow]:
"""
Get published workflow
"""
if not app_model.workflow_id:
return None
# fetch published workflow by workflow_id
workflow = db.session.query(Workflow).filter(
Workflow.tenant_id == app_model.tenant_id,
Workflow.app_id == app_model.id,
Workflow.id == app_model.workflow_id
).first()
# return published workflow
return workflow
from core.app.entities.app_invoke_entities import ChatAppGenerateEntity
from core.app.entities.app_invoke_entities import AgentChatAppGenerateEntity, ChatAppGenerateEntity
from core.entities.provider_entities import QuotaUnit
from events.message_event import message_was_created
from extensions.ext_database import db
......@@ -8,7 +8,10 @@ from models.provider import Provider, ProviderType
@message_was_created.connect
def handle(sender, **kwargs):
message = sender
application_generate_entity: ChatAppGenerateEntity = kwargs.get('application_generate_entity')
application_generate_entity = kwargs.get('application_generate_entity')
if not isinstance(application_generate_entity, ChatAppGenerateEntity | AgentChatAppGenerateEntity):
return
model_config = application_generate_entity.model_config
provider_model_bundle = model_config.provider_model_bundle
......
from core.llm_generator.llm_generator import LLMGenerator
from events.message_event import message_was_created
from extensions.ext_database import db
from models.model import AppMode
@message_was_created.connect
......@@ -15,7 +16,7 @@ def handle(sender, **kwargs):
auto_generate_conversation_name = extras.get('auto_generate_conversation_name', True)
if auto_generate_conversation_name and is_first_message:
if conversation.mode == 'chat':
if conversation.mode != AppMode.COMPLETION.value:
app_model = conversation.app
if not app_model:
return
......
from datetime import datetime
from core.app.entities.app_invoke_entities import ChatAppGenerateEntity
from core.app.entities.app_invoke_entities import AgentChatAppGenerateEntity, ChatAppGenerateEntity
from events.message_event import message_was_created
from extensions.ext_database import db
from models.provider import Provider
......@@ -9,7 +9,10 @@ from models.provider import Provider
@message_was_created.connect
def handle(sender, **kwargs):
message = sender
application_generate_entity: ChatAppGenerateEntity = kwargs.get('application_generate_entity')
application_generate_entity = kwargs.get('application_generate_entity')
if not isinstance(application_generate_entity, ChatAppGenerateEntity | AgentChatAppGenerateEntity):
return
db.session.query(Provider).filter(
Provider.tenant_id == application_generate_entity.app_config.tenant_id,
......
......@@ -451,10 +451,10 @@ class Conversation(db.Model):
id = db.Column(UUID, server_default=db.text('uuid_generate_v4()'))
app_id = db.Column(UUID, nullable=False)
app_model_config_id = db.Column(UUID, nullable=False)
model_provider = db.Column(db.String(255), nullable=False)
app_model_config_id = db.Column(UUID, nullable=True)
model_provider = db.Column(db.String(255), nullable=True)
override_model_configs = db.Column(db.Text)
model_id = db.Column(db.String(255), nullable=False)
model_id = db.Column(db.String(255), nullable=True)
mode = db.Column(db.String(255), nullable=False)
name = db.Column(db.String(255), nullable=False)
summary = db.Column(db.Text)
......
......@@ -272,6 +272,10 @@ class WorkflowRun(db.Model):
return EndUser.query.get(self.created_by) \
if created_by_role == CreatedByRole.END_USER else None
@property
def outputs_dict(self):
return self.outputs if not self.outputs else json.loads(self.outputs)
class WorkflowNodeExecutionTriggeredFrom(Enum):
"""
......@@ -294,6 +298,28 @@ class WorkflowNodeExecutionTriggeredFrom(Enum):
raise ValueError(f'invalid workflow node execution triggered from value {value}')
class WorkflowNodeExecutionStatus(Enum):
"""
Workflow Node Execution Status Enum
"""
RUNNING = 'running'
SUCCEEDED = 'succeeded'
FAILED = 'failed'
@classmethod
def value_of(cls, value: str) -> 'WorkflowNodeExecutionStatus':
"""
Get value of given mode.
:param value: mode value
:return: mode
"""
for mode in cls:
if mode.value == value:
return mode
raise ValueError(f'invalid workflow node execution status value {value}')
class WorkflowNodeExecution(db.Model):
"""
Workflow Node Execution
......@@ -387,6 +413,21 @@ class WorkflowNodeExecution(db.Model):
return EndUser.query.get(self.created_by) \
if created_by_role == CreatedByRole.END_USER else None
@property
def inputs_dict(self):
return self.inputs if not self.inputs else json.loads(self.inputs)
@property
def outputs_dict(self):
return self.outputs if not self.outputs else json.loads(self.outputs)
@property
def process_data_dict(self):
return self.process_data if not self.process_data else json.loads(self.process_data)
@property
def execution_metadata_dict(self):
return self.execution_metadata if not self.execution_metadata else json.loads(self.execution_metadata)
class WorkflowAppLog(db.Model):
"""
......
......@@ -4,6 +4,7 @@ from typing import Optional
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
from core.workflow.workflow_engine_manager import WorkflowEngineManager
from extensions.ext_database import db
from models.account import Account
from models.model import App, AppMode
......@@ -21,15 +22,10 @@ class WorkflowService:
"""
Get draft workflow
"""
# fetch draft workflow by app_model
workflow = db.session.query(Workflow).filter(
Workflow.tenant_id == app_model.tenant_id,
Workflow.app_id == app_model.id,
Workflow.version == 'draft'
).first()
workflow_engine_manager = WorkflowEngineManager()
# return draft workflow
return workflow
return workflow_engine_manager.get_draft_workflow(app_model=app_model)
def get_published_workflow(self, app_model: App) -> Optional[Workflow]:
"""
......@@ -38,15 +34,10 @@ class WorkflowService:
if not app_model.workflow_id:
return None
# fetch published workflow by workflow_id
workflow = db.session.query(Workflow).filter(
Workflow.tenant_id == app_model.tenant_id,
Workflow.app_id == app_model.id,
Workflow.id == app_model.workflow_id
).first()
workflow_engine_manager = WorkflowEngineManager()
# return published workflow
return workflow
return workflow_engine_manager.get_published_workflow(app_model=app_model)
def sync_draft_workflow(self, app_model: App,
graph: dict,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment