Commit 6cfda369 authored by takatost's avatar takatost

refactor workflow runner

parent 5a57ed25
...@@ -147,9 +147,12 @@ class WorkflowTaskStopApi(Resource): ...@@ -147,9 +147,12 @@ class WorkflowTaskStopApi(Resource):
""" """
Stop workflow task Stop workflow task
""" """
# TODO
workflow_service = WorkflowService() workflow_service = WorkflowService()
workflow_service.stop_workflow_task(app_model=app_model, task_id=task_id, account=current_user) workflow_service.stop_workflow_task(
task_id=task_id,
user=current_user,
invoke_from=InvokeFrom.DEBUGGER
)
return { return {
"result": "success" "result": "success"
......
...@@ -11,7 +11,7 @@ from core.app.app_config.features.file_upload.manager import FileUploadConfigMan ...@@ -11,7 +11,7 @@ from core.app.app_config.features.file_upload.manager import FileUploadConfigMan
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
from core.app.apps.advanced_chat.app_runner import AdvancedChatAppRunner from core.app.apps.advanced_chat.app_runner import AdvancedChatAppRunner
from core.app.apps.advanced_chat.generate_task_pipeline import AdvancedChatAppGenerateTaskPipeline from core.app.apps.advanced_chat.generate_task_pipeline import AdvancedChatAppGenerateTaskPipeline
from core.app.apps.base_app_queue_manager import AppQueueManager, ConversationTaskStoppedException, PublishFrom from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedException, PublishFrom
from core.app.apps.message_based_app_generator import MessageBasedAppGenerator from core.app.apps.message_based_app_generator import MessageBasedAppGenerator
from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager
from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, InvokeFrom from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, InvokeFrom
...@@ -123,11 +123,13 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): ...@@ -123,11 +123,13 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
worker_thread.start() worker_thread.start()
# return response or stream generator # return response or stream generator
return self._handle_response( return self._handle_advanced_chat_response(
application_generate_entity=application_generate_entity, application_generate_entity=application_generate_entity,
workflow=workflow,
queue_manager=queue_manager, queue_manager=queue_manager,
conversation=conversation, conversation=conversation,
message=message, message=message,
user=user,
stream=stream stream=stream
) )
...@@ -159,7 +161,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): ...@@ -159,7 +161,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
conversation=conversation, conversation=conversation,
message=message message=message
) )
except ConversationTaskStoppedException: except GenerateTaskStoppedException:
pass pass
except InvokeAuthorizationError: except InvokeAuthorizationError:
queue_manager.publish_error( queue_manager.publish_error(
...@@ -177,33 +179,40 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): ...@@ -177,33 +179,40 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
finally: finally:
db.session.remove() db.session.remove()
def _handle_response(self, application_generate_entity: AdvancedChatAppGenerateEntity, def _handle_advanced_chat_response(self, application_generate_entity: AdvancedChatAppGenerateEntity,
queue_manager: AppQueueManager, workflow: Workflow,
conversation: Conversation, queue_manager: AppQueueManager,
message: Message, conversation: Conversation,
stream: bool = False) -> Union[dict, Generator]: message: Message,
user: Union[Account, EndUser],
stream: bool = False) -> Union[dict, Generator]:
""" """
Handle response. Handle response.
:param application_generate_entity: application generate entity :param application_generate_entity: application generate entity
:param workflow: workflow
:param queue_manager: queue manager :param queue_manager: queue manager
:param conversation: conversation :param conversation: conversation
:param message: message :param message: message
:param user: account or end user
:param stream: is stream :param stream: is stream
:return: :return:
""" """
# init generate task pipeline # init generate task pipeline
generate_task_pipeline = AdvancedChatAppGenerateTaskPipeline( generate_task_pipeline = AdvancedChatAppGenerateTaskPipeline(
application_generate_entity=application_generate_entity, application_generate_entity=application_generate_entity,
workflow=workflow,
queue_manager=queue_manager, queue_manager=queue_manager,
conversation=conversation, conversation=conversation,
message=message message=message,
user=user,
stream=stream
) )
try: try:
return generate_task_pipeline.process(stream=stream) return generate_task_pipeline.process()
except ValueError as e: except ValueError as e:
if e.args[0] == "I/O operation on closed file.": # ignore this error if e.args[0] == "I/O operation on closed file.": # ignore this error
raise ConversationTaskStoppedException() raise GenerateTaskStoppedException()
else: else:
logger.exception(e) logger.exception(e)
raise e raise e
......
import logging import logging
import time import time
from typing import cast from typing import Optional, cast
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfig from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfig
from core.app.apps.advanced_chat.workflow_event_trigger_callback import WorkflowEventTriggerCallback from core.app.apps.advanced_chat.workflow_event_trigger_callback import WorkflowEventTriggerCallback
...@@ -8,16 +8,14 @@ from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom ...@@ -8,16 +8,14 @@ from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.apps.base_app_runner import AppRunner from core.app.apps.base_app_runner import AppRunner
from core.app.entities.app_invoke_entities import ( from core.app.entities.app_invoke_entities import (
AdvancedChatAppGenerateEntity, AdvancedChatAppGenerateEntity,
InvokeFrom,
) )
from core.app.entities.queue_entities import QueueAnnotationReplyEvent, QueueStopEvent, QueueTextChunkEvent from core.app.entities.queue_entities import QueueAnnotationReplyEvent, QueueStopEvent, QueueTextChunkEvent
from core.moderation.base import ModerationException from core.moderation.base import ModerationException
from core.workflow.entities.node_entities import SystemVariable from core.workflow.entities.node_entities import SystemVariable
from core.workflow.workflow_engine_manager import WorkflowEngineManager from core.workflow.workflow_engine_manager import WorkflowEngineManager
from extensions.ext_database import db from extensions.ext_database import db
from models.account import Account from models.model import App, Conversation, Message
from models.model import App, Conversation, EndUser, Message from models.workflow import Workflow
from models.workflow import WorkflowRunTriggeredFrom
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -46,7 +44,7 @@ class AdvancedChatAppRunner(AppRunner): ...@@ -46,7 +44,7 @@ class AdvancedChatAppRunner(AppRunner):
if not app_record: if not app_record:
raise ValueError("App not found") raise ValueError("App not found")
workflow = WorkflowEngineManager().get_workflow(app_model=app_record, workflow_id=app_config.workflow_id) workflow = self.get_workflow(app_model=app_record, workflow_id=app_config.workflow_id)
if not workflow: if not workflow:
raise ValueError("Workflow not initialized") raise ValueError("Workflow not initialized")
...@@ -74,19 +72,10 @@ class AdvancedChatAppRunner(AppRunner): ...@@ -74,19 +72,10 @@ class AdvancedChatAppRunner(AppRunner):
): ):
return return
# fetch user
if application_generate_entity.invoke_from in [InvokeFrom.DEBUGGER, InvokeFrom.EXPLORE]:
user = db.session.query(Account).filter(Account.id == application_generate_entity.user_id).first()
else:
user = db.session.query(EndUser).filter(EndUser.id == application_generate_entity.user_id).first()
# RUN WORKFLOW # RUN WORKFLOW
workflow_engine_manager = WorkflowEngineManager() workflow_engine_manager = WorkflowEngineManager()
workflow_engine_manager.run_workflow( workflow_engine_manager.run_workflow(
workflow=workflow, workflow=workflow,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING
if application_generate_entity.invoke_from == InvokeFrom.DEBUGGER else WorkflowRunTriggeredFrom.APP_RUN,
user=user,
user_inputs=inputs, user_inputs=inputs,
system_inputs={ system_inputs={
SystemVariable.QUERY: query, SystemVariable.QUERY: query,
...@@ -99,6 +88,20 @@ class AdvancedChatAppRunner(AppRunner): ...@@ -99,6 +88,20 @@ class AdvancedChatAppRunner(AppRunner):
)] )]
) )
def get_workflow(self, app_model: App, workflow_id: str) -> Optional[Workflow]:
"""
Get workflow
"""
# fetch workflow by workflow_id
workflow = db.session.query(Workflow).filter(
Workflow.tenant_id == app_model.tenant_id,
Workflow.app_id == app_model.id,
Workflow.id == workflow_id
).first()
# return workflow
return workflow
def handle_input_moderation(self, queue_manager: AppQueueManager, def handle_input_moderation(self, queue_manager: AppQueueManager,
app_record: App, app_record: App,
app_generate_entity: AdvancedChatAppGenerateEntity, app_generate_entity: AdvancedChatAppGenerateEntity,
......
...@@ -4,9 +4,10 @@ import time ...@@ -4,9 +4,10 @@ import time
from collections.abc import Generator from collections.abc import Generator
from typing import Optional, Union from typing import Optional, Union
from pydantic import BaseModel from pydantic import BaseModel, Extra
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.apps.workflow_based_generate_task_pipeline import WorkflowBasedGenerateTaskPipeline
from core.app.entities.app_invoke_entities import ( from core.app.entities.app_invoke_entities import (
AdvancedChatAppGenerateEntity, AdvancedChatAppGenerateEntity,
InvokeFrom, InvokeFrom,
...@@ -16,25 +17,35 @@ from core.app.entities.queue_entities import ( ...@@ -16,25 +17,35 @@ from core.app.entities.queue_entities import (
QueueErrorEvent, QueueErrorEvent,
QueueMessageFileEvent, QueueMessageFileEvent,
QueueMessageReplaceEvent, QueueMessageReplaceEvent,
QueueNodeFinishedEvent, QueueNodeFailedEvent,
QueueNodeStartedEvent, QueueNodeStartedEvent,
QueueNodeSucceededEvent,
QueuePingEvent, QueuePingEvent,
QueueRetrieverResourcesEvent, QueueRetrieverResourcesEvent,
QueueStopEvent, QueueStopEvent,
QueueTextChunkEvent, QueueTextChunkEvent,
QueueWorkflowFinishedEvent, QueueWorkflowFailedEvent,
QueueWorkflowStartedEvent, QueueWorkflowStartedEvent,
QueueWorkflowSucceededEvent,
) )
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from core.model_runtime.entities.llm_entities import LLMUsage from core.model_runtime.entities.llm_entities import LLMUsage
from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError
from core.moderation.output_moderation import ModerationRule, OutputModeration from core.moderation.output_moderation import ModerationRule, OutputModeration
from core.tools.tool_file_manager import ToolFileManager from core.tools.tool_file_manager import ToolFileManager
from core.workflow.entities.node_entities import NodeType from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeType, SystemVariable
from events.message_event import message_was_created from events.message_event import message_was_created
from extensions.ext_database import db from extensions.ext_database import db
from models.model import Conversation, Message, MessageFile from models.account import Account
from models.workflow import WorkflowNodeExecution, WorkflowNodeExecutionStatus, WorkflowRun, WorkflowRunStatus from models.model import Conversation, EndUser, Message, MessageFile
from models.workflow import (
Workflow,
WorkflowNodeExecution,
WorkflowNodeExecutionStatus,
WorkflowRun,
WorkflowRunStatus,
WorkflowRunTriggeredFrom,
)
from services.annotation_service import AppAnnotationService from services.annotation_service import AppAnnotationService
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -47,41 +58,63 @@ class TaskState(BaseModel): ...@@ -47,41 +58,63 @@ class TaskState(BaseModel):
answer: str = "" answer: str = ""
metadata: dict = {} metadata: dict = {}
usage: LLMUsage usage: LLMUsage
workflow_run_id: Optional[str] = None
workflow_run: Optional[WorkflowRun] = None
start_at: Optional[float] = None
total_tokens: int = 0
total_steps: int = 0
current_node_execution: Optional[WorkflowNodeExecution] = None
current_node_execution_start_at: Optional[float] = None
class Config:
"""Configuration for this pydantic object."""
extra = Extra.forbid
arbitrary_types_allowed = True
class AdvancedChatAppGenerateTaskPipeline: class AdvancedChatAppGenerateTaskPipeline(WorkflowBasedGenerateTaskPipeline):
""" """
AdvancedChatAppGenerateTaskPipeline is a class that generate stream output and state management for Application. AdvancedChatAppGenerateTaskPipeline is a class that generate stream output and state management for Application.
""" """
def __init__(self, application_generate_entity: AdvancedChatAppGenerateEntity, def __init__(self, application_generate_entity: AdvancedChatAppGenerateEntity,
workflow: Workflow,
queue_manager: AppQueueManager, queue_manager: AppQueueManager,
conversation: Conversation, conversation: Conversation,
message: Message) -> None: message: Message,
user: Union[Account, EndUser],
stream: bool) -> None:
""" """
Initialize GenerateTaskPipeline. Initialize GenerateTaskPipeline.
:param application_generate_entity: application generate entity :param application_generate_entity: application generate entity
:param workflow: workflow
:param queue_manager: queue manager :param queue_manager: queue manager
:param conversation: conversation :param conversation: conversation
:param message: message :param message: message
:param user: user
:param stream: stream
""" """
self._application_generate_entity = application_generate_entity self._application_generate_entity = application_generate_entity
self._workflow = workflow
self._queue_manager = queue_manager self._queue_manager = queue_manager
self._conversation = conversation self._conversation = conversation
self._message = message self._message = message
self._user = user
self._task_state = TaskState( self._task_state = TaskState(
usage=LLMUsage.empty_usage() usage=LLMUsage.empty_usage()
) )
self._start_at = time.perf_counter() self._start_at = time.perf_counter()
self._output_moderation_handler = self._init_output_moderation() self._output_moderation_handler = self._init_output_moderation()
self._stream = stream
def process(self, stream: bool) -> Union[dict, Generator]: def process(self) -> Union[dict, Generator]:
""" """
Process generate task pipeline. Process generate task pipeline.
:return: :return:
""" """
if stream: if self._stream:
return self._process_stream_response() return self._process_stream_response()
else: else:
return self._process_blocking_response() return self._process_blocking_response()
...@@ -112,22 +145,17 @@ class AdvancedChatAppGenerateTaskPipeline: ...@@ -112,22 +145,17 @@ class AdvancedChatAppGenerateTaskPipeline:
self._task_state.answer = annotation.content self._task_state.answer = annotation.content
elif isinstance(event, QueueWorkflowStartedEvent): elif isinstance(event, QueueWorkflowStartedEvent):
self._task_state.workflow_run_id = event.workflow_run_id self._on_workflow_start()
elif isinstance(event, QueueNodeFinishedEvent): elif isinstance(event, QueueNodeStartedEvent):
workflow_node_execution = self._get_workflow_node_execution(event.workflow_node_execution_id) self._on_node_start(event)
if workflow_node_execution.status == WorkflowNodeExecutionStatus.SUCCEEDED.value: elif isinstance(event, QueueNodeSucceededEvent | QueueNodeFailedEvent):
if workflow_node_execution.node_type == NodeType.LLM.value: self._on_node_finished(event)
outputs = workflow_node_execution.outputs_dict elif isinstance(event, QueueStopEvent | QueueWorkflowSucceededEvent | QueueWorkflowFailedEvent):
usage_dict = outputs.get('usage', {}) self._on_workflow_finished(event)
self._task_state.metadata['usage'] = usage_dict workflow_run = self._task_state.workflow_run
elif isinstance(event, QueueStopEvent | QueueWorkflowFinishedEvent):
if isinstance(event, QueueWorkflowFinishedEvent): if workflow_run.status != WorkflowRunStatus.SUCCEEDED.value:
workflow_run = self._get_workflow_run(event.workflow_run_id) raise self._handle_error(QueueErrorEvent(error=ValueError(f'Run failed: {workflow_run.error}')))
if workflow_run.status == WorkflowRunStatus.SUCCEEDED.value:
outputs = workflow_run.outputs
self._task_state.answer = outputs.get('text', '')
else:
raise self._handle_error(QueueErrorEvent(error=ValueError(f'Run failed: {workflow_run.error}')))
# response moderation # response moderation
if self._output_moderation_handler: if self._output_moderation_handler:
...@@ -173,8 +201,9 @@ class AdvancedChatAppGenerateTaskPipeline: ...@@ -173,8 +201,9 @@ class AdvancedChatAppGenerateTaskPipeline:
yield self._yield_response(data) yield self._yield_response(data)
break break
elif isinstance(event, QueueWorkflowStartedEvent): elif isinstance(event, QueueWorkflowStartedEvent):
workflow_run = self._get_workflow_run(event.workflow_run_id) self._on_workflow_start()
self._task_state.workflow_run_id = workflow_run.id workflow_run = self._task_state.workflow_run
response = { response = {
'event': 'workflow_started', 'event': 'workflow_started',
'task_id': self._application_generate_entity.task_id, 'task_id': self._application_generate_entity.task_id,
...@@ -188,7 +217,9 @@ class AdvancedChatAppGenerateTaskPipeline: ...@@ -188,7 +217,9 @@ class AdvancedChatAppGenerateTaskPipeline:
yield self._yield_response(response) yield self._yield_response(response)
elif isinstance(event, QueueNodeStartedEvent): elif isinstance(event, QueueNodeStartedEvent):
workflow_node_execution = self._get_workflow_node_execution(event.workflow_node_execution_id) self._on_node_start(event)
workflow_node_execution = self._task_state.current_node_execution
response = { response = {
'event': 'node_started', 'event': 'node_started',
'task_id': self._application_generate_entity.task_id, 'task_id': self._application_generate_entity.task_id,
...@@ -204,8 +235,10 @@ class AdvancedChatAppGenerateTaskPipeline: ...@@ -204,8 +235,10 @@ class AdvancedChatAppGenerateTaskPipeline:
} }
yield self._yield_response(response) yield self._yield_response(response)
elif isinstance(event, QueueNodeFinishedEvent): elif isinstance(event, QueueNodeSucceededEvent | QueueNodeFailedEvent):
workflow_node_execution = self._get_workflow_node_execution(event.workflow_node_execution_id) self._on_node_finished(event)
workflow_node_execution = self._task_state.current_node_execution
if workflow_node_execution.status == WorkflowNodeExecutionStatus.SUCCEEDED.value: if workflow_node_execution.status == WorkflowNodeExecutionStatus.SUCCEEDED.value:
if workflow_node_execution.node_type == NodeType.LLM.value: if workflow_node_execution.node_type == NodeType.LLM.value:
outputs = workflow_node_execution.outputs_dict outputs = workflow_node_execution.outputs_dict
...@@ -234,16 +267,11 @@ class AdvancedChatAppGenerateTaskPipeline: ...@@ -234,16 +267,11 @@ class AdvancedChatAppGenerateTaskPipeline:
} }
yield self._yield_response(response) yield self._yield_response(response)
elif isinstance(event, QueueStopEvent | QueueWorkflowFinishedEvent): elif isinstance(event, QueueStopEvent | QueueWorkflowSucceededEvent | QueueWorkflowFailedEvent):
if isinstance(event, QueueStopEvent): self._on_workflow_finished(event)
workflow_run = self._get_workflow_run(self._task_state.workflow_run_id) workflow_run = self._task_state.workflow_run
else:
workflow_run = self._get_workflow_run(event.workflow_run_id)
if workflow_run.status == WorkflowRunStatus.SUCCEEDED.value: if workflow_run.status != WorkflowRunStatus.SUCCEEDED.value:
outputs = workflow_run.outputs_dict
self._task_state.answer = outputs.get('text', '')
else:
err_event = QueueErrorEvent(error=ValueError(f'Run failed: {workflow_run.error}')) err_event = QueueErrorEvent(error=ValueError(f'Run failed: {workflow_run.error}'))
data = self._error_to_stream_response_data(self._handle_error(err_event)) data = self._error_to_stream_response_data(self._handle_error(err_event))
yield self._yield_response(data) yield self._yield_response(data)
...@@ -252,7 +280,7 @@ class AdvancedChatAppGenerateTaskPipeline: ...@@ -252,7 +280,7 @@ class AdvancedChatAppGenerateTaskPipeline:
workflow_run_response = { workflow_run_response = {
'event': 'workflow_finished', 'event': 'workflow_finished',
'task_id': self._application_generate_entity.task_id, 'task_id': self._application_generate_entity.task_id,
'workflow_run_id': event.workflow_run_id, 'workflow_run_id': workflow_run.id,
'data': { 'data': {
'id': workflow_run.id, 'id': workflow_run.id,
'workflow_id': workflow_run.workflow_id, 'workflow_id': workflow_run.workflow_id,
...@@ -390,6 +418,102 @@ class AdvancedChatAppGenerateTaskPipeline: ...@@ -390,6 +418,102 @@ class AdvancedChatAppGenerateTaskPipeline:
else: else:
continue continue
def _on_workflow_start(self) -> None:
self._task_state.start_at = time.perf_counter()
workflow_run = self._init_workflow_run(
workflow=self._workflow,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING
if self._application_generate_entity.invoke_from == InvokeFrom.DEBUGGER
else WorkflowRunTriggeredFrom.APP_RUN,
user=self._user,
user_inputs=self._application_generate_entity.inputs,
system_inputs={
SystemVariable.QUERY: self._message.query,
SystemVariable.FILES: self._application_generate_entity.files,
SystemVariable.CONVERSATION: self._conversation.id,
}
)
self._task_state.workflow_run = workflow_run
def _on_node_start(self, event: QueueNodeStartedEvent) -> None:
workflow_node_execution = self._init_node_execution_from_workflow_run(
workflow_run=self._task_state.workflow_run,
node_id=event.node_id,
node_type=event.node_type,
node_title=event.node_data.title,
node_run_index=event.node_run_index,
predecessor_node_id=event.predecessor_node_id
)
self._task_state.current_node_execution = workflow_node_execution
self._task_state.current_node_execution_start_at = time.perf_counter()
self._task_state.total_steps += 1
def _on_node_finished(self, event: QueueNodeSucceededEvent | QueueNodeFailedEvent) -> None:
if isinstance(event, QueueNodeSucceededEvent):
workflow_node_execution = self._workflow_node_execution_success(
workflow_node_execution=self._task_state.current_node_execution,
start_at=self._task_state.current_node_execution_start_at,
inputs=event.inputs,
process_data=event.process_data,
outputs=event.outputs,
execution_metadata=event.execution_metadata
)
if event.execution_metadata and event.execution_metadata.get(NodeRunMetadataKey.TOTAL_TOKENS):
self._task_state.total_tokens += (
int(event.execution_metadata.get(NodeRunMetadataKey.TOTAL_TOKENS)))
if workflow_node_execution.node_type == NodeType.LLM.value:
outputs = workflow_node_execution.outputs_dict
usage_dict = outputs.get('usage', {})
self._task_state.metadata['usage'] = usage_dict
else:
workflow_node_execution = self._workflow_node_execution_failed(
workflow_node_execution=self._task_state.current_node_execution,
start_at=self._task_state.current_node_execution_start_at,
error=event.error
)
self._task_state.current_node_execution = workflow_node_execution
def _on_workflow_finished(self, event: QueueStopEvent | QueueWorkflowSucceededEvent | QueueWorkflowFailedEvent) -> None:
if isinstance(event, QueueStopEvent):
workflow_run = self._workflow_run_failed(
workflow_run=self._task_state.workflow_run,
start_at=self._task_state.start_at,
total_tokens=self._task_state.total_tokens,
total_steps=self._task_state.total_steps,
status=WorkflowRunStatus.STOPPED,
error='Workflow stopped.'
)
elif isinstance(event, QueueWorkflowFailedEvent):
workflow_run = self._workflow_run_failed(
workflow_run=self._task_state.workflow_run,
start_at=self._task_state.start_at,
total_tokens=self._task_state.total_tokens,
total_steps=self._task_state.total_steps,
status=WorkflowRunStatus.FAILED,
error=event.error
)
else:
workflow_run = self._workflow_run_success(
workflow_run=self._task_state.workflow_run,
start_at=self._task_state.start_at,
total_tokens=self._task_state.total_tokens,
total_steps=self._task_state.total_steps,
outputs=self._task_state.current_node_execution.outputs
if self._task_state.current_node_execution else None
)
self._task_state.workflow_run = workflow_run
if workflow_run.status == WorkflowRunStatus.SUCCEEDED.value:
outputs = workflow_run.outputs_dict
self._task_state.answer = outputs.get('text', '')
def _get_workflow_run(self, workflow_run_id: str) -> WorkflowRun: def _get_workflow_run(self, workflow_run_id: str) -> WorkflowRun:
""" """
Get workflow run. Get workflow run.
...@@ -397,11 +521,6 @@ class AdvancedChatAppGenerateTaskPipeline: ...@@ -397,11 +521,6 @@ class AdvancedChatAppGenerateTaskPipeline:
:return: :return:
""" """
workflow_run = db.session.query(WorkflowRun).filter(WorkflowRun.id == workflow_run_id).first() workflow_run = db.session.query(WorkflowRun).filter(WorkflowRun.id == workflow_run_id).first()
if workflow_run:
# Because the workflow_run will be modified in the sub-thread,
# and the first query in the main thread will cache the entity,
# you need to expire the entity after the query
db.session.expire(workflow_run)
return workflow_run return workflow_run
def _get_workflow_node_execution(self, workflow_node_execution_id: str) -> WorkflowNodeExecution: def _get_workflow_node_execution(self, workflow_node_execution_id: str) -> WorkflowNodeExecution:
...@@ -412,11 +531,6 @@ class AdvancedChatAppGenerateTaskPipeline: ...@@ -412,11 +531,6 @@ class AdvancedChatAppGenerateTaskPipeline:
""" """
workflow_node_execution = (db.session.query(WorkflowNodeExecution) workflow_node_execution = (db.session.query(WorkflowNodeExecution)
.filter(WorkflowNodeExecution.id == workflow_node_execution_id).first()) .filter(WorkflowNodeExecution.id == workflow_node_execution_id).first())
if workflow_node_execution:
# Because the workflow_node_execution will be modified in the sub-thread,
# and the first query in the main thread will cache the entity,
# you need to expire the entity after the query
db.session.expire(workflow_node_execution)
return workflow_node_execution return workflow_node_execution
def _save_message(self) -> None: def _save_message(self) -> None:
...@@ -428,7 +542,7 @@ class AdvancedChatAppGenerateTaskPipeline: ...@@ -428,7 +542,7 @@ class AdvancedChatAppGenerateTaskPipeline:
self._message.answer = self._task_state.answer self._message.answer = self._task_state.answer
self._message.provider_response_latency = time.perf_counter() - self._start_at self._message.provider_response_latency = time.perf_counter() - self._start_at
self._message.workflow_run_id = self._task_state.workflow_run_id self._message.workflow_run_id = self._task_state.workflow_run.id
if self._task_state.metadata and self._task_state.metadata.get('usage'): if self._task_state.metadata and self._task_state.metadata.get('usage'):
usage = LLMUsage(**self._task_state.metadata['usage']) usage = LLMUsage(**self._task_state.metadata['usage'])
......
from typing import Optional
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.entities.queue_entities import ( from core.app.entities.queue_entities import (
QueueNodeFinishedEvent, QueueNodeFailedEvent,
QueueNodeStartedEvent, QueueNodeStartedEvent,
QueueNodeSucceededEvent,
QueueTextChunkEvent, QueueTextChunkEvent,
QueueWorkflowFinishedEvent, QueueWorkflowFailedEvent,
QueueWorkflowStartedEvent, QueueWorkflowStartedEvent,
QueueWorkflowSucceededEvent,
) )
from core.workflow.callbacks.base_workflow_callback import BaseWorkflowCallback from core.workflow.callbacks.base_workflow_callback import BaseWorkflowCallback
from core.workflow.entities.base_node_data_entities import BaseNodeData
from core.workflow.entities.node_entities import NodeType from core.workflow.entities.node_entities import NodeType
from models.workflow import Workflow, WorkflowNodeExecution, WorkflowRun from models.workflow import Workflow
class WorkflowEventTriggerCallback(BaseWorkflowCallback): class WorkflowEventTriggerCallback(BaseWorkflowCallback):
...@@ -17,39 +22,91 @@ class WorkflowEventTriggerCallback(BaseWorkflowCallback): ...@@ -17,39 +22,91 @@ class WorkflowEventTriggerCallback(BaseWorkflowCallback):
self._queue_manager = queue_manager self._queue_manager = queue_manager
self._streamable_node_ids = self._fetch_streamable_node_ids(workflow.graph_dict) self._streamable_node_ids = self._fetch_streamable_node_ids(workflow.graph_dict)
def on_workflow_run_started(self, workflow_run: WorkflowRun) -> None: def on_workflow_run_started(self) -> None:
""" """
Workflow run started Workflow run started
""" """
self._queue_manager.publish( self._queue_manager.publish(
QueueWorkflowStartedEvent(workflow_run_id=workflow_run.id), QueueWorkflowStartedEvent(),
PublishFrom.APPLICATION_MANAGER
)
def on_workflow_run_succeeded(self) -> None:
"""
Workflow run succeeded
"""
self._queue_manager.publish(
QueueWorkflowSucceededEvent(),
PublishFrom.APPLICATION_MANAGER PublishFrom.APPLICATION_MANAGER
) )
def on_workflow_run_finished(self, workflow_run: WorkflowRun) -> None: def on_workflow_run_failed(self, error: str) -> None:
""" """
Workflow run finished Workflow run failed
""" """
self._queue_manager.publish( self._queue_manager.publish(
QueueWorkflowFinishedEvent(workflow_run_id=workflow_run.id), QueueWorkflowFailedEvent(
error=error
),
PublishFrom.APPLICATION_MANAGER PublishFrom.APPLICATION_MANAGER
) )
def on_workflow_node_execute_started(self, workflow_node_execution: WorkflowNodeExecution) -> None: def on_workflow_node_execute_started(self, node_id: str,
node_type: NodeType,
node_data: BaseNodeData,
node_run_index: int = 1,
predecessor_node_id: Optional[str] = None) -> None:
""" """
Workflow node execute started Workflow node execute started
""" """
self._queue_manager.publish( self._queue_manager.publish(
QueueNodeStartedEvent(workflow_node_execution_id=workflow_node_execution.id), QueueNodeStartedEvent(
node_id=node_id,
node_type=node_type,
node_data=node_data,
node_run_index=node_run_index,
predecessor_node_id=predecessor_node_id
),
PublishFrom.APPLICATION_MANAGER
)
def on_workflow_node_execute_succeeded(self, node_id: str,
node_type: NodeType,
node_data: BaseNodeData,
inputs: Optional[dict] = None,
process_data: Optional[dict] = None,
outputs: Optional[dict] = None,
execution_metadata: Optional[dict] = None) -> None:
"""
Workflow node execute succeeded
"""
self._queue_manager.publish(
QueueNodeSucceededEvent(
node_id=node_id,
node_type=node_type,
node_data=node_data,
inputs=inputs,
process_data=process_data,
outputs=outputs,
execution_metadata=execution_metadata
),
PublishFrom.APPLICATION_MANAGER PublishFrom.APPLICATION_MANAGER
) )
def on_workflow_node_execute_finished(self, workflow_node_execution: WorkflowNodeExecution) -> None: def on_workflow_node_execute_failed(self, node_id: str,
node_type: NodeType,
node_data: BaseNodeData,
error: str) -> None:
""" """
Workflow node execute finished Workflow node execute failed
""" """
self._queue_manager.publish( self._queue_manager.publish(
QueueNodeFinishedEvent(workflow_node_execution_id=workflow_node_execution.id), QueueNodeFailedEvent(
node_id=node_id,
node_type=node_type,
node_data=node_data,
error=error
),
PublishFrom.APPLICATION_MANAGER PublishFrom.APPLICATION_MANAGER
) )
......
...@@ -11,7 +11,7 @@ from core.app.app_config.easy_ui_based_app.model_config.converter import ModelCo ...@@ -11,7 +11,7 @@ from core.app.app_config.easy_ui_based_app.model_config.converter import ModelCo
from core.app.app_config.features.file_upload.manager import FileUploadConfigManager from core.app.app_config.features.file_upload.manager import FileUploadConfigManager
from core.app.apps.agent_chat.app_config_manager import AgentChatAppConfigManager from core.app.apps.agent_chat.app_config_manager import AgentChatAppConfigManager
from core.app.apps.agent_chat.app_runner import AgentChatAppRunner from core.app.apps.agent_chat.app_runner import AgentChatAppRunner
from core.app.apps.base_app_queue_manager import AppQueueManager, ConversationTaskStoppedException, PublishFrom from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedException, PublishFrom
from core.app.apps.message_based_app_generator import MessageBasedAppGenerator from core.app.apps.message_based_app_generator import MessageBasedAppGenerator
from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager
from core.app.entities.app_invoke_entities import AgentChatAppGenerateEntity, InvokeFrom from core.app.entities.app_invoke_entities import AgentChatAppGenerateEntity, InvokeFrom
...@@ -177,7 +177,7 @@ class AgentChatAppGenerator(MessageBasedAppGenerator): ...@@ -177,7 +177,7 @@ class AgentChatAppGenerator(MessageBasedAppGenerator):
conversation=conversation, conversation=conversation,
message=message message=message
) )
except ConversationTaskStoppedException: except GenerateTaskStoppedException:
pass pass
except InvokeAuthorizationError: except InvokeAuthorizationError:
queue_manager.publish_error( queue_manager.publish_error(
......
...@@ -11,11 +11,8 @@ from core.app.entities.app_invoke_entities import InvokeFrom ...@@ -11,11 +11,8 @@ from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.entities.queue_entities import ( from core.app.entities.queue_entities import (
AppQueueEvent, AppQueueEvent,
QueueErrorEvent, QueueErrorEvent,
QueueMessage,
QueueMessageEndEvent,
QueuePingEvent, QueuePingEvent,
QueueStopEvent, QueueStopEvent,
QueueWorkflowFinishedEvent,
) )
from extensions.ext_redis import redis_client from extensions.ext_redis import redis_client
...@@ -103,22 +100,16 @@ class AppQueueManager: ...@@ -103,22 +100,16 @@ class AppQueueManager:
:return: :return:
""" """
self._check_for_sqlalchemy_models(event.dict()) self._check_for_sqlalchemy_models(event.dict())
self._publish(event, pub_from)
message = self.construct_queue_message(event)
self._q.put(message)
if isinstance(event, QueueStopEvent
| QueueErrorEvent
| QueueMessageEndEvent
| QueueWorkflowFinishedEvent):
self.stop_listen()
if pub_from == PublishFrom.APPLICATION_MANAGER and self._is_stopped():
raise ConversationTaskStoppedException()
@abstractmethod @abstractmethod
def construct_queue_message(self, event: AppQueueEvent) -> QueueMessage: def _publish(self, event: AppQueueEvent, pub_from: PublishFrom) -> None:
"""
Publish event to queue
:param event:
:param pub_from:
:return:
"""
raise NotImplementedError raise NotImplementedError
@classmethod @classmethod
...@@ -182,5 +173,5 @@ class AppQueueManager: ...@@ -182,5 +173,5 @@ class AppQueueManager:
"that cause thread safety issues is not allowed.") "that cause thread safety issues is not allowed.")
class ConversationTaskStoppedException(Exception): class GenerateTaskStoppedException(Exception):
pass pass
...@@ -9,7 +9,7 @@ from pydantic import ValidationError ...@@ -9,7 +9,7 @@ from pydantic import ValidationError
from core.app.app_config.easy_ui_based_app.model_config.converter import ModelConfigConverter from core.app.app_config.easy_ui_based_app.model_config.converter import ModelConfigConverter
from core.app.app_config.features.file_upload.manager import FileUploadConfigManager from core.app.app_config.features.file_upload.manager import FileUploadConfigManager
from core.app.apps.base_app_queue_manager import AppQueueManager, ConversationTaskStoppedException, PublishFrom from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedException, PublishFrom
from core.app.apps.chat.app_config_manager import ChatAppConfigManager from core.app.apps.chat.app_config_manager import ChatAppConfigManager
from core.app.apps.chat.app_runner import ChatAppRunner from core.app.apps.chat.app_runner import ChatAppRunner
from core.app.apps.message_based_app_generator import MessageBasedAppGenerator from core.app.apps.message_based_app_generator import MessageBasedAppGenerator
...@@ -177,7 +177,7 @@ class ChatAppGenerator(MessageBasedAppGenerator): ...@@ -177,7 +177,7 @@ class ChatAppGenerator(MessageBasedAppGenerator):
conversation=conversation, conversation=conversation,
message=message message=message
) )
except ConversationTaskStoppedException: except GenerateTaskStoppedException:
pass pass
except InvokeAuthorizationError: except InvokeAuthorizationError:
queue_manager.publish_error( queue_manager.publish_error(
......
...@@ -9,7 +9,7 @@ from pydantic import ValidationError ...@@ -9,7 +9,7 @@ from pydantic import ValidationError
from core.app.app_config.easy_ui_based_app.model_config.converter import ModelConfigConverter from core.app.app_config.easy_ui_based_app.model_config.converter import ModelConfigConverter
from core.app.app_config.features.file_upload.manager import FileUploadConfigManager from core.app.app_config.features.file_upload.manager import FileUploadConfigManager
from core.app.apps.base_app_queue_manager import AppQueueManager, ConversationTaskStoppedException, PublishFrom from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedException, PublishFrom
from core.app.apps.completion.app_config_manager import CompletionAppConfigManager from core.app.apps.completion.app_config_manager import CompletionAppConfigManager
from core.app.apps.completion.app_runner import CompletionAppRunner from core.app.apps.completion.app_runner import CompletionAppRunner
from core.app.apps.message_based_app_generator import MessageBasedAppGenerator from core.app.apps.message_based_app_generator import MessageBasedAppGenerator
...@@ -166,7 +166,7 @@ class CompletionAppGenerator(MessageBasedAppGenerator): ...@@ -166,7 +166,7 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
queue_manager=queue_manager, queue_manager=queue_manager,
message=message message=message
) )
except ConversationTaskStoppedException: except GenerateTaskStoppedException:
pass pass
except InvokeAuthorizationError: except InvokeAuthorizationError:
queue_manager.publish_error( queue_manager.publish_error(
......
...@@ -7,7 +7,7 @@ from sqlalchemy import and_ ...@@ -7,7 +7,7 @@ from sqlalchemy import and_
from core.app.app_config.entities import EasyUIBasedAppModelConfigFrom from core.app.app_config.entities import EasyUIBasedAppModelConfigFrom
from core.app.apps.base_app_generator import BaseAppGenerator from core.app.apps.base_app_generator import BaseAppGenerator
from core.app.apps.base_app_queue_manager import AppQueueManager, ConversationTaskStoppedException from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedException
from core.app.apps.easy_ui_based_generate_task_pipeline import EasyUIBasedGenerateTaskPipeline from core.app.apps.easy_ui_based_generate_task_pipeline import EasyUIBasedGenerateTaskPipeline
from core.app.entities.app_invoke_entities import ( from core.app.entities.app_invoke_entities import (
AdvancedChatAppGenerateEntity, AdvancedChatAppGenerateEntity,
...@@ -60,7 +60,7 @@ class MessageBasedAppGenerator(BaseAppGenerator): ...@@ -60,7 +60,7 @@ class MessageBasedAppGenerator(BaseAppGenerator):
return generate_task_pipeline.process(stream=stream) return generate_task_pipeline.process(stream=stream)
except ValueError as e: except ValueError as e:
if e.args[0] == "I/O operation on closed file.": # ignore this error if e.args[0] == "I/O operation on closed file.": # ignore this error
raise ConversationTaskStoppedException() raise GenerateTaskStoppedException()
else: else:
logger.exception(e) logger.exception(e)
raise e raise e
......
from core.app.apps.base_app_queue_manager import AppQueueManager from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedException, PublishFrom
from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.entities.queue_entities import ( from core.app.entities.queue_entities import (
AppQueueEvent, AppQueueEvent,
MessageQueueMessage, MessageQueueMessage,
QueueErrorEvent,
QueueMessage, QueueMessage,
QueueMessageEndEvent,
QueueStopEvent,
QueueWorkflowFailedEvent,
QueueWorkflowSucceededEvent,
) )
...@@ -28,3 +33,31 @@ class MessageBasedAppQueueManager(AppQueueManager): ...@@ -28,3 +33,31 @@ class MessageBasedAppQueueManager(AppQueueManager):
app_mode=self._app_mode, app_mode=self._app_mode,
event=event event=event
) )
def _publish(self, event: AppQueueEvent, pub_from: PublishFrom) -> None:
"""
Publish event to queue
:param event:
:param pub_from:
:return:
"""
message = MessageQueueMessage(
task_id=self._task_id,
message_id=self._message_id,
conversation_id=self._conversation_id,
app_mode=self._app_mode,
event=event
)
self._q.put(message)
if isinstance(event, QueueStopEvent
| QueueErrorEvent
| QueueMessageEndEvent
| QueueWorkflowSucceededEvent
| QueueWorkflowFailedEvent):
self.stop_listen()
if pub_from == PublishFrom.APPLICATION_MANAGER and self._is_stopped():
raise GenerateTaskStoppedException()
...@@ -9,7 +9,7 @@ from pydantic import ValidationError ...@@ -9,7 +9,7 @@ from pydantic import ValidationError
from core.app.app_config.features.file_upload.manager import FileUploadConfigManager from core.app.app_config.features.file_upload.manager import FileUploadConfigManager
from core.app.apps.base_app_generator import BaseAppGenerator from core.app.apps.base_app_generator import BaseAppGenerator
from core.app.apps.base_app_queue_manager import AppQueueManager, ConversationTaskStoppedException, PublishFrom from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedException, PublishFrom
from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
from core.app.apps.workflow.app_queue_manager import WorkflowAppQueueManager from core.app.apps.workflow.app_queue_manager import WorkflowAppQueueManager
from core.app.apps.workflow.app_runner import WorkflowAppRunner from core.app.apps.workflow.app_runner import WorkflowAppRunner
...@@ -95,7 +95,9 @@ class WorkflowAppGenerator(BaseAppGenerator): ...@@ -95,7 +95,9 @@ class WorkflowAppGenerator(BaseAppGenerator):
# return response or stream generator # return response or stream generator
return self._handle_response( return self._handle_response(
application_generate_entity=application_generate_entity, application_generate_entity=application_generate_entity,
workflow=workflow,
queue_manager=queue_manager, queue_manager=queue_manager,
user=user,
stream=stream stream=stream
) )
...@@ -117,7 +119,7 @@ class WorkflowAppGenerator(BaseAppGenerator): ...@@ -117,7 +119,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
application_generate_entity=application_generate_entity, application_generate_entity=application_generate_entity,
queue_manager=queue_manager queue_manager=queue_manager
) )
except ConversationTaskStoppedException: except GenerateTaskStoppedException:
pass pass
except InvokeAuthorizationError: except InvokeAuthorizationError:
queue_manager.publish_error( queue_manager.publish_error(
...@@ -136,19 +138,25 @@ class WorkflowAppGenerator(BaseAppGenerator): ...@@ -136,19 +138,25 @@ class WorkflowAppGenerator(BaseAppGenerator):
db.session.remove() db.session.remove()
def _handle_response(self, application_generate_entity: WorkflowAppGenerateEntity, def _handle_response(self, application_generate_entity: WorkflowAppGenerateEntity,
workflow: Workflow,
queue_manager: AppQueueManager, queue_manager: AppQueueManager,
user: Union[Account, EndUser],
stream: bool = False) -> Union[dict, Generator]: stream: bool = False) -> Union[dict, Generator]:
""" """
Handle response. Handle response.
:param application_generate_entity: application generate entity :param application_generate_entity: application generate entity
:param workflow: workflow
:param queue_manager: queue manager :param queue_manager: queue manager
:param user: account or end user
:param stream: is stream :param stream: is stream
:return: :return:
""" """
# init generate task pipeline # init generate task pipeline
generate_task_pipeline = WorkflowAppGenerateTaskPipeline( generate_task_pipeline = WorkflowAppGenerateTaskPipeline(
application_generate_entity=application_generate_entity, application_generate_entity=application_generate_entity,
workflow=workflow,
queue_manager=queue_manager, queue_manager=queue_manager,
user=user,
stream=stream stream=stream
) )
...@@ -156,7 +164,7 @@ class WorkflowAppGenerator(BaseAppGenerator): ...@@ -156,7 +164,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
return generate_task_pipeline.process() return generate_task_pipeline.process()
except ValueError as e: except ValueError as e:
if e.args[0] == "I/O operation on closed file.": # ignore this error if e.args[0] == "I/O operation on closed file.": # ignore this error
raise ConversationTaskStoppedException() raise GenerateTaskStoppedException()
else: else:
logger.exception(e) logger.exception(e)
raise e raise e
......
from core.app.apps.base_app_queue_manager import AppQueueManager from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedException, PublishFrom
from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.entities.queue_entities import ( from core.app.entities.queue_entities import (
AppQueueEvent, AppQueueEvent,
QueueMessage, QueueErrorEvent,
QueueMessageEndEvent,
QueueStopEvent,
QueueWorkflowFailedEvent,
QueueWorkflowSucceededEvent,
WorkflowQueueMessage, WorkflowQueueMessage,
) )
...@@ -16,9 +20,27 @@ class WorkflowAppQueueManager(AppQueueManager): ...@@ -16,9 +20,27 @@ class WorkflowAppQueueManager(AppQueueManager):
self._app_mode = app_mode self._app_mode = app_mode
def construct_queue_message(self, event: AppQueueEvent) -> QueueMessage: def _publish(self, event: AppQueueEvent, pub_from: PublishFrom) -> None:
return WorkflowQueueMessage( """
Publish event to queue
:param event:
:param pub_from:
:return:
"""
message = WorkflowQueueMessage(
task_id=self._task_id, task_id=self._task_id,
app_mode=self._app_mode, app_mode=self._app_mode,
event=event event=event
) )
self._q.put(message)
if isinstance(event, QueueStopEvent
| QueueErrorEvent
| QueueMessageEndEvent
| QueueWorkflowSucceededEvent
| QueueWorkflowFailedEvent):
self.stop_listen()
if pub_from == PublishFrom.APPLICATION_MANAGER and self._is_stopped():
raise GenerateTaskStoppedException()
import logging import logging
import time import time
from typing import cast from typing import Optional, cast
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.apps.workflow.app_config_manager import WorkflowAppConfig from core.app.apps.workflow.app_config_manager import WorkflowAppConfig
from core.app.apps.workflow.workflow_event_trigger_callback import WorkflowEventTriggerCallback from core.app.apps.workflow.workflow_event_trigger_callback import WorkflowEventTriggerCallback
from core.app.entities.app_invoke_entities import ( from core.app.entities.app_invoke_entities import (
AppGenerateEntity, AppGenerateEntity,
InvokeFrom,
WorkflowAppGenerateEntity, WorkflowAppGenerateEntity,
) )
from core.app.entities.queue_entities import QueueStopEvent, QueueTextChunkEvent from core.app.entities.queue_entities import QueueStopEvent, QueueTextChunkEvent
...@@ -16,9 +15,8 @@ from core.moderation.input_moderation import InputModeration ...@@ -16,9 +15,8 @@ from core.moderation.input_moderation import InputModeration
from core.workflow.entities.node_entities import SystemVariable from core.workflow.entities.node_entities import SystemVariable
from core.workflow.workflow_engine_manager import WorkflowEngineManager from core.workflow.workflow_engine_manager import WorkflowEngineManager
from extensions.ext_database import db from extensions.ext_database import db
from models.account import Account from models.model import App
from models.model import App, EndUser from models.workflow import Workflow
from models.workflow import WorkflowRunTriggeredFrom
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -43,7 +41,7 @@ class WorkflowAppRunner: ...@@ -43,7 +41,7 @@ class WorkflowAppRunner:
if not app_record: if not app_record:
raise ValueError("App not found") raise ValueError("App not found")
workflow = WorkflowEngineManager().get_workflow(app_model=app_record, workflow_id=app_config.workflow_id) workflow = self.get_workflow(app_model=app_record, workflow_id=app_config.workflow_id)
if not workflow: if not workflow:
raise ValueError("Workflow not initialized") raise ValueError("Workflow not initialized")
...@@ -59,19 +57,10 @@ class WorkflowAppRunner: ...@@ -59,19 +57,10 @@ class WorkflowAppRunner:
): ):
return return
# fetch user
if application_generate_entity.invoke_from in [InvokeFrom.DEBUGGER, InvokeFrom.EXPLORE]:
user = db.session.query(Account).filter(Account.id == application_generate_entity.user_id).first()
else:
user = db.session.query(EndUser).filter(EndUser.id == application_generate_entity.user_id).first()
# RUN WORKFLOW # RUN WORKFLOW
workflow_engine_manager = WorkflowEngineManager() workflow_engine_manager = WorkflowEngineManager()
workflow_engine_manager.run_workflow( workflow_engine_manager.run_workflow(
workflow=workflow, workflow=workflow,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING
if application_generate_entity.invoke_from == InvokeFrom.DEBUGGER else WorkflowRunTriggeredFrom.APP_RUN,
user=user,
user_inputs=inputs, user_inputs=inputs,
system_inputs={ system_inputs={
SystemVariable.FILES: files SystemVariable.FILES: files
...@@ -82,6 +71,20 @@ class WorkflowAppRunner: ...@@ -82,6 +71,20 @@ class WorkflowAppRunner:
)] )]
) )
def get_workflow(self, app_model: App, workflow_id: str) -> Optional[Workflow]:
"""
Get workflow
"""
# fetch workflow by workflow_id
workflow = db.session.query(Workflow).filter(
Workflow.tenant_id == app_model.tenant_id,
Workflow.app_id == app_model.id,
Workflow.id == workflow_id
).first()
# return workflow
return workflow
def handle_input_moderation(self, queue_manager: AppQueueManager, def handle_input_moderation(self, queue_manager: AppQueueManager,
app_record: App, app_record: App,
app_generate_entity: WorkflowAppGenerateEntity, app_generate_entity: WorkflowAppGenerateEntity,
......
...@@ -4,28 +4,35 @@ import time ...@@ -4,28 +4,35 @@ import time
from collections.abc import Generator from collections.abc import Generator
from typing import Optional, Union from typing import Optional, Union
from pydantic import BaseModel from pydantic import BaseModel, Extra
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.apps.workflow_based_generate_task_pipeline import WorkflowBasedGenerateTaskPipeline
from core.app.entities.app_invoke_entities import ( from core.app.entities.app_invoke_entities import (
InvokeFrom,
WorkflowAppGenerateEntity, WorkflowAppGenerateEntity,
) )
from core.app.entities.queue_entities import ( from core.app.entities.queue_entities import (
QueueErrorEvent, QueueErrorEvent,
QueueMessageReplaceEvent, QueueMessageReplaceEvent,
QueueNodeFinishedEvent, QueueNodeFailedEvent,
QueueNodeStartedEvent, QueueNodeStartedEvent,
QueueNodeSucceededEvent,
QueuePingEvent, QueuePingEvent,
QueueStopEvent, QueueStopEvent,
QueueTextChunkEvent, QueueTextChunkEvent,
QueueWorkflowFinishedEvent, QueueWorkflowFailedEvent,
QueueWorkflowStartedEvent, QueueWorkflowStartedEvent,
QueueWorkflowSucceededEvent,
) )
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError
from core.moderation.output_moderation import ModerationRule, OutputModeration from core.moderation.output_moderation import ModerationRule, OutputModeration
from core.workflow.entities.node_entities import NodeRunMetadataKey, SystemVariable
from extensions.ext_database import db from extensions.ext_database import db
from models.workflow import WorkflowNodeExecution, WorkflowRun, WorkflowRunStatus from models.account import Account
from models.model import EndUser
from models.workflow import Workflow, WorkflowNodeExecution, WorkflowRun, WorkflowRunStatus, WorkflowRunTriggeredFrom
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -36,24 +43,44 @@ class TaskState(BaseModel): ...@@ -36,24 +43,44 @@ class TaskState(BaseModel):
""" """
answer: str = "" answer: str = ""
metadata: dict = {} metadata: dict = {}
workflow_run_id: Optional[str] = None
workflow_run: Optional[WorkflowRun] = None
start_at: Optional[float] = None
total_tokens: int = 0
total_steps: int = 0
class WorkflowAppGenerateTaskPipeline: current_node_execution: Optional[WorkflowNodeExecution] = None
current_node_execution_start_at: Optional[float] = None
class Config:
"""Configuration for this pydantic object."""
extra = Extra.forbid
arbitrary_types_allowed = True
class WorkflowAppGenerateTaskPipeline(WorkflowBasedGenerateTaskPipeline):
""" """
WorkflowAppGenerateTaskPipeline is a class that generate stream output and state management for Application. WorkflowAppGenerateTaskPipeline is a class that generate stream output and state management for Application.
""" """
def __init__(self, application_generate_entity: WorkflowAppGenerateEntity, def __init__(self, application_generate_entity: WorkflowAppGenerateEntity,
workflow: Workflow,
queue_manager: AppQueueManager, queue_manager: AppQueueManager,
user: Union[Account, EndUser],
stream: bool) -> None: stream: bool) -> None:
""" """
Initialize GenerateTaskPipeline. Initialize GenerateTaskPipeline.
:param application_generate_entity: application generate entity :param application_generate_entity: application generate entity
:param workflow: workflow
:param queue_manager: queue manager :param queue_manager: queue manager
:param user: user
:param stream: is stream
""" """
self._application_generate_entity = application_generate_entity self._application_generate_entity = application_generate_entity
self._workflow = workflow
self._queue_manager = queue_manager self._queue_manager = queue_manager
self._user = user
self._task_state = TaskState() self._task_state = TaskState()
self._start_at = time.perf_counter() self._start_at = time.perf_counter()
self._output_moderation_handler = self._init_output_moderation() self._output_moderation_handler = self._init_output_moderation()
...@@ -79,17 +106,15 @@ class WorkflowAppGenerateTaskPipeline: ...@@ -79,17 +106,15 @@ class WorkflowAppGenerateTaskPipeline:
if isinstance(event, QueueErrorEvent): if isinstance(event, QueueErrorEvent):
raise self._handle_error(event) raise self._handle_error(event)
elif isinstance(event, QueueStopEvent | QueueWorkflowFinishedEvent): elif isinstance(event, QueueWorkflowStartedEvent):
if isinstance(event, QueueStopEvent): self._on_workflow_start()
workflow_run = self._get_workflow_run(self._task_state.workflow_run_id) elif isinstance(event, QueueNodeStartedEvent):
else: self._on_node_start(event)
workflow_run = self._get_workflow_run(event.workflow_run_id) elif isinstance(event, QueueNodeSucceededEvent | QueueNodeFailedEvent):
self._on_node_finished(event)
if workflow_run.status == WorkflowRunStatus.SUCCEEDED.value: elif isinstance(event, QueueStopEvent | QueueWorkflowSucceededEvent | QueueWorkflowFailedEvent):
outputs = workflow_run.outputs_dict self._on_workflow_finished(event)
self._task_state.answer = outputs.get('text', '') workflow_run = self._task_state.workflow_run
else:
raise self._handle_error(QueueErrorEvent(error=ValueError(f'Run failed: {workflow_run.error}')))
# response moderation # response moderation
if self._output_moderation_handler: if self._output_moderation_handler:
...@@ -100,10 +125,12 @@ class WorkflowAppGenerateTaskPipeline: ...@@ -100,10 +125,12 @@ class WorkflowAppGenerateTaskPipeline:
public_event=False public_event=False
) )
# save workflow app log
self._save_workflow_app_log()
response = { response = {
'event': 'workflow_finished',
'task_id': self._application_generate_entity.task_id, 'task_id': self._application_generate_entity.task_id,
'workflow_run_id': event.workflow_run_id, 'workflow_run_id': workflow_run.id,
'data': { 'data': {
'id': workflow_run.id, 'id': workflow_run.id,
'workflow_id': workflow_run.workflow_id, 'workflow_id': workflow_run.workflow_id,
...@@ -135,8 +162,9 @@ class WorkflowAppGenerateTaskPipeline: ...@@ -135,8 +162,9 @@ class WorkflowAppGenerateTaskPipeline:
yield self._yield_response(data) yield self._yield_response(data)
break break
elif isinstance(event, QueueWorkflowStartedEvent): elif isinstance(event, QueueWorkflowStartedEvent):
self._task_state.workflow_run_id = event.workflow_run_id self._on_workflow_start()
workflow_run = self._get_workflow_run(event.workflow_run_id) workflow_run = self._task_state.workflow_run
response = { response = {
'event': 'workflow_started', 'event': 'workflow_started',
'task_id': self._application_generate_entity.task_id, 'task_id': self._application_generate_entity.task_id,
...@@ -150,7 +178,9 @@ class WorkflowAppGenerateTaskPipeline: ...@@ -150,7 +178,9 @@ class WorkflowAppGenerateTaskPipeline:
yield self._yield_response(response) yield self._yield_response(response)
elif isinstance(event, QueueNodeStartedEvent): elif isinstance(event, QueueNodeStartedEvent):
workflow_node_execution = self._get_workflow_node_execution(event.workflow_node_execution_id) self._on_node_start(event)
workflow_node_execution = self._task_state.current_node_execution
response = { response = {
'event': 'node_started', 'event': 'node_started',
'task_id': self._application_generate_entity.task_id, 'task_id': self._application_generate_entity.task_id,
...@@ -166,8 +196,10 @@ class WorkflowAppGenerateTaskPipeline: ...@@ -166,8 +196,10 @@ class WorkflowAppGenerateTaskPipeline:
} }
yield self._yield_response(response) yield self._yield_response(response)
elif isinstance(event, QueueNodeFinishedEvent): elif isinstance(event, QueueNodeSucceededEvent | QueueNodeFailedEvent):
workflow_node_execution = self._get_workflow_node_execution(event.workflow_node_execution_id) self._on_node_finished(event)
workflow_node_execution = self._task_state.current_node_execution
response = { response = {
'event': 'node_finished', 'event': 'node_finished',
'task_id': self._application_generate_entity.task_id, 'task_id': self._application_generate_entity.task_id,
...@@ -190,20 +222,9 @@ class WorkflowAppGenerateTaskPipeline: ...@@ -190,20 +222,9 @@ class WorkflowAppGenerateTaskPipeline:
} }
yield self._yield_response(response) yield self._yield_response(response)
elif isinstance(event, QueueStopEvent | QueueWorkflowFinishedEvent): elif isinstance(event, QueueStopEvent | QueueWorkflowSucceededEvent | QueueWorkflowFailedEvent):
if isinstance(event, QueueStopEvent): self._on_workflow_finished(event)
workflow_run = self._get_workflow_run(self._task_state.workflow_run_id) workflow_run = self._task_state.workflow_run
else:
workflow_run = self._get_workflow_run(event.workflow_run_id)
if workflow_run.status == WorkflowRunStatus.SUCCEEDED.value:
outputs = workflow_run.outputs_dict
self._task_state.answer = outputs.get('text', '')
else:
err_event = QueueErrorEvent(error=ValueError(f'Run failed: {workflow_run.error}'))
data = self._error_to_stream_response_data(self._handle_error(err_event))
yield self._yield_response(data)
break
# response moderation # response moderation
if self._output_moderation_handler: if self._output_moderation_handler:
...@@ -219,7 +240,7 @@ class WorkflowAppGenerateTaskPipeline: ...@@ -219,7 +240,7 @@ class WorkflowAppGenerateTaskPipeline:
replace_response = { replace_response = {
'event': 'text_replace', 'event': 'text_replace',
'task_id': self._application_generate_entity.task_id, 'task_id': self._application_generate_entity.task_id,
'workflow_run_id': self._task_state.workflow_run_id, 'workflow_run_id': self._task_state.workflow_run.id,
'data': { 'data': {
'text': self._task_state.answer 'text': self._task_state.answer
} }
...@@ -233,7 +254,7 @@ class WorkflowAppGenerateTaskPipeline: ...@@ -233,7 +254,7 @@ class WorkflowAppGenerateTaskPipeline:
workflow_run_response = { workflow_run_response = {
'event': 'workflow_finished', 'event': 'workflow_finished',
'task_id': self._application_generate_entity.task_id, 'task_id': self._application_generate_entity.task_id,
'workflow_run_id': event.workflow_run_id, 'workflow_run_id': workflow_run.id,
'data': { 'data': {
'id': workflow_run.id, 'id': workflow_run.id,
'workflow_id': workflow_run.workflow_id, 'workflow_id': workflow_run.workflow_id,
...@@ -244,7 +265,7 @@ class WorkflowAppGenerateTaskPipeline: ...@@ -244,7 +265,7 @@ class WorkflowAppGenerateTaskPipeline:
'total_tokens': workflow_run.total_tokens, 'total_tokens': workflow_run.total_tokens,
'total_steps': workflow_run.total_steps, 'total_steps': workflow_run.total_steps,
'created_at': int(workflow_run.created_at.timestamp()), 'created_at': int(workflow_run.created_at.timestamp()),
'finished_at': int(workflow_run.finished_at.timestamp()) 'finished_at': int(workflow_run.finished_at.timestamp()) if workflow_run.finished_at else None
} }
} }
...@@ -279,7 +300,7 @@ class WorkflowAppGenerateTaskPipeline: ...@@ -279,7 +300,7 @@ class WorkflowAppGenerateTaskPipeline:
response = { response = {
'event': 'text_replace', 'event': 'text_replace',
'task_id': self._application_generate_entity.task_id, 'task_id': self._application_generate_entity.task_id,
'workflow_run_id': self._task_state.workflow_run_id, 'workflow_run_id': self._task_state.workflow_run.id,
'data': { 'data': {
'text': event.text 'text': event.text
} }
...@@ -291,6 +312,95 @@ class WorkflowAppGenerateTaskPipeline: ...@@ -291,6 +312,95 @@ class WorkflowAppGenerateTaskPipeline:
else: else:
continue continue
def _on_workflow_start(self) -> None:
self._task_state.start_at = time.perf_counter()
workflow_run = self._init_workflow_run(
workflow=self._workflow,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING
if self._application_generate_entity.invoke_from == InvokeFrom.DEBUGGER
else WorkflowRunTriggeredFrom.APP_RUN,
user=self._user,
user_inputs=self._application_generate_entity.inputs,
system_inputs={
SystemVariable.FILES: self._application_generate_entity.files
}
)
self._task_state.workflow_run = workflow_run
def _on_node_start(self, event: QueueNodeStartedEvent) -> None:
workflow_node_execution = self._init_node_execution_from_workflow_run(
workflow_run=self._task_state.workflow_run,
node_id=event.node_id,
node_type=event.node_type,
node_title=event.node_data.title,
node_run_index=event.node_run_index,
predecessor_node_id=event.predecessor_node_id
)
self._task_state.current_node_execution = workflow_node_execution
self._task_state.current_node_execution_start_at = time.perf_counter()
self._task_state.total_steps += 1
def _on_node_finished(self, event: QueueNodeSucceededEvent | QueueNodeFailedEvent) -> None:
if isinstance(event, QueueNodeSucceededEvent):
workflow_node_execution = self._workflow_node_execution_success(
workflow_node_execution=self._task_state.current_node_execution,
start_at=self._task_state.current_node_execution_start_at,
inputs=event.inputs,
process_data=event.process_data,
outputs=event.outputs,
execution_metadata=event.execution_metadata
)
if event.execution_metadata and event.execution_metadata.get(NodeRunMetadataKey.TOTAL_TOKENS):
self._task_state.total_tokens += (
int(event.execution_metadata.get(NodeRunMetadataKey.TOTAL_TOKENS)))
else:
workflow_node_execution = self._workflow_node_execution_failed(
workflow_node_execution=self._task_state.current_node_execution,
start_at=self._task_state.current_node_execution_start_at,
error=event.error
)
self._task_state.current_node_execution = workflow_node_execution
def _on_workflow_finished(self, event: QueueStopEvent | QueueWorkflowSucceededEvent | QueueWorkflowFailedEvent) -> None:
if isinstance(event, QueueStopEvent):
workflow_run = self._workflow_run_failed(
workflow_run=self._task_state.workflow_run,
start_at=self._task_state.start_at,
total_tokens=self._task_state.total_tokens,
total_steps=self._task_state.total_steps,
status=WorkflowRunStatus.STOPPED,
error='Workflow stopped.'
)
elif isinstance(event, QueueWorkflowFailedEvent):
workflow_run = self._workflow_run_failed(
workflow_run=self._task_state.workflow_run,
start_at=self._task_state.start_at,
total_tokens=self._task_state.total_tokens,
total_steps=self._task_state.total_steps,
status=WorkflowRunStatus.FAILED,
error=event.error
)
else:
workflow_run = self._workflow_run_success(
workflow_run=self._task_state.workflow_run,
start_at=self._task_state.start_at,
total_tokens=self._task_state.total_tokens,
total_steps=self._task_state.total_steps,
outputs=self._task_state.current_node_execution.outputs
if self._task_state.current_node_execution else None
)
self._task_state.workflow_run = workflow_run
if workflow_run.status == WorkflowRunStatus.SUCCEEDED.value:
outputs = workflow_run.outputs_dict
self._task_state.answer = outputs.get('text', '')
def _get_workflow_run(self, workflow_run_id: str) -> WorkflowRun: def _get_workflow_run(self, workflow_run_id: str) -> WorkflowRun:
""" """
Get workflow run. Get workflow run.
...@@ -298,11 +408,6 @@ class WorkflowAppGenerateTaskPipeline: ...@@ -298,11 +408,6 @@ class WorkflowAppGenerateTaskPipeline:
:return: :return:
""" """
workflow_run = db.session.query(WorkflowRun).filter(WorkflowRun.id == workflow_run_id).first() workflow_run = db.session.query(WorkflowRun).filter(WorkflowRun.id == workflow_run_id).first()
if workflow_run:
# Because the workflow_run will be modified in the sub-thread,
# and the first query in the main thread will cache the entity,
# you need to expire the entity after the query
db.session.expire(workflow_run)
return workflow_run return workflow_run
def _get_workflow_node_execution(self, workflow_node_execution_id: str) -> WorkflowNodeExecution: def _get_workflow_node_execution(self, workflow_node_execution_id: str) -> WorkflowNodeExecution:
...@@ -313,11 +418,6 @@ class WorkflowAppGenerateTaskPipeline: ...@@ -313,11 +418,6 @@ class WorkflowAppGenerateTaskPipeline:
""" """
workflow_node_execution = (db.session.query(WorkflowNodeExecution) workflow_node_execution = (db.session.query(WorkflowNodeExecution)
.filter(WorkflowNodeExecution.id == workflow_node_execution_id).first()) .filter(WorkflowNodeExecution.id == workflow_node_execution_id).first())
if workflow_node_execution:
# Because the workflow_node_execution will be modified in the sub-thread,
# and the first query in the main thread will cache the entity,
# you need to expire the entity after the query
db.session.expire(workflow_node_execution)
return workflow_node_execution return workflow_node_execution
def _save_workflow_app_log(self) -> None: def _save_workflow_app_log(self) -> None:
...@@ -335,7 +435,7 @@ class WorkflowAppGenerateTaskPipeline: ...@@ -335,7 +435,7 @@ class WorkflowAppGenerateTaskPipeline:
""" """
response = { response = {
'event': 'text_chunk', 'event': 'text_chunk',
'workflow_run_id': self._task_state.workflow_run_id, 'workflow_run_id': self._task_state.workflow_run.id,
'task_id': self._application_generate_entity.task_id, 'task_id': self._application_generate_entity.task_id,
'data': { 'data': {
'text': text 'text': text
...@@ -398,7 +498,6 @@ class WorkflowAppGenerateTaskPipeline: ...@@ -398,7 +498,6 @@ class WorkflowAppGenerateTaskPipeline:
return { return {
'event': 'error', 'event': 'error',
'task_id': self._application_generate_entity.task_id, 'task_id': self._application_generate_entity.task_id,
'workflow_run_id': self._task_state.workflow_run_id,
**data **data
} }
......
from typing import Optional
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.entities.queue_entities import ( from core.app.entities.queue_entities import (
QueueNodeFinishedEvent, QueueNodeFailedEvent,
QueueNodeStartedEvent, QueueNodeStartedEvent,
QueueNodeSucceededEvent,
QueueTextChunkEvent, QueueTextChunkEvent,
QueueWorkflowFinishedEvent, QueueWorkflowFailedEvent,
QueueWorkflowStartedEvent, QueueWorkflowStartedEvent,
QueueWorkflowSucceededEvent,
) )
from core.workflow.callbacks.base_workflow_callback import BaseWorkflowCallback from core.workflow.callbacks.base_workflow_callback import BaseWorkflowCallback
from core.workflow.entities.base_node_data_entities import BaseNodeData
from core.workflow.entities.node_entities import NodeType from core.workflow.entities.node_entities import NodeType
from models.workflow import Workflow, WorkflowNodeExecution, WorkflowRun from models.workflow import Workflow
class WorkflowEventTriggerCallback(BaseWorkflowCallback): class WorkflowEventTriggerCallback(BaseWorkflowCallback):
...@@ -17,39 +22,91 @@ class WorkflowEventTriggerCallback(BaseWorkflowCallback): ...@@ -17,39 +22,91 @@ class WorkflowEventTriggerCallback(BaseWorkflowCallback):
self._queue_manager = queue_manager self._queue_manager = queue_manager
self._streamable_node_ids = self._fetch_streamable_node_ids(workflow.graph_dict) self._streamable_node_ids = self._fetch_streamable_node_ids(workflow.graph_dict)
def on_workflow_run_started(self, workflow_run: WorkflowRun) -> None: def on_workflow_run_started(self) -> None:
""" """
Workflow run started Workflow run started
""" """
self._queue_manager.publish( self._queue_manager.publish(
QueueWorkflowStartedEvent(workflow_run_id=workflow_run.id), QueueWorkflowStartedEvent(),
PublishFrom.APPLICATION_MANAGER
)
def on_workflow_run_succeeded(self) -> None:
"""
Workflow run succeeded
"""
self._queue_manager.publish(
QueueWorkflowSucceededEvent(),
PublishFrom.APPLICATION_MANAGER PublishFrom.APPLICATION_MANAGER
) )
def on_workflow_run_finished(self, workflow_run: WorkflowRun) -> None: def on_workflow_run_failed(self, error: str) -> None:
""" """
Workflow run finished Workflow run failed
""" """
self._queue_manager.publish( self._queue_manager.publish(
QueueWorkflowFinishedEvent(workflow_run_id=workflow_run.id), QueueWorkflowFailedEvent(
error=error
),
PublishFrom.APPLICATION_MANAGER PublishFrom.APPLICATION_MANAGER
) )
def on_workflow_node_execute_started(self, workflow_node_execution: WorkflowNodeExecution) -> None: def on_workflow_node_execute_started(self, node_id: str,
node_type: NodeType,
node_data: BaseNodeData,
node_run_index: int = 1,
predecessor_node_id: Optional[str] = None) -> None:
""" """
Workflow node execute started Workflow node execute started
""" """
self._queue_manager.publish( self._queue_manager.publish(
QueueNodeStartedEvent(workflow_node_execution_id=workflow_node_execution.id), QueueNodeStartedEvent(
node_id=node_id,
node_type=node_type,
node_data=node_data,
node_run_index=node_run_index,
predecessor_node_id=predecessor_node_id
),
PublishFrom.APPLICATION_MANAGER
)
def on_workflow_node_execute_succeeded(self, node_id: str,
node_type: NodeType,
node_data: BaseNodeData,
inputs: Optional[dict] = None,
process_data: Optional[dict] = None,
outputs: Optional[dict] = None,
execution_metadata: Optional[dict] = None) -> None:
"""
Workflow node execute succeeded
"""
self._queue_manager.publish(
QueueNodeSucceededEvent(
node_id=node_id,
node_type=node_type,
node_data=node_data,
inputs=inputs,
process_data=process_data,
outputs=outputs,
execution_metadata=execution_metadata
),
PublishFrom.APPLICATION_MANAGER PublishFrom.APPLICATION_MANAGER
) )
def on_workflow_node_execute_finished(self, workflow_node_execution: WorkflowNodeExecution) -> None: def on_workflow_node_execute_failed(self, node_id: str,
node_type: NodeType,
node_data: BaseNodeData,
error: str) -> None:
""" """
Workflow node execute finished Workflow node execute failed
""" """
self._queue_manager.publish( self._queue_manager.publish(
QueueNodeFinishedEvent(workflow_node_execution_id=workflow_node_execution.id), QueueNodeFailedEvent(
node_id=node_id,
node_type=node_type,
node_data=node_data,
error=error
),
PublishFrom.APPLICATION_MANAGER PublishFrom.APPLICATION_MANAGER
) )
......
import json
import time
from datetime import datetime
from typing import Optional, Union
from core.model_runtime.utils.encoders import jsonable_encoder
from core.workflow.entities.node_entities import NodeType
from extensions.ext_database import db
from models.account import Account
from models.model import EndUser
from models.workflow import (
CreatedByRole,
Workflow,
WorkflowNodeExecution,
WorkflowNodeExecutionStatus,
WorkflowNodeExecutionTriggeredFrom,
WorkflowRun,
WorkflowRunStatus,
WorkflowRunTriggeredFrom,
)
class WorkflowBasedGenerateTaskPipeline:
def _init_workflow_run(self, workflow: Workflow,
triggered_from: WorkflowRunTriggeredFrom,
user: Union[Account, EndUser],
user_inputs: dict,
system_inputs: Optional[dict] = None) -> WorkflowRun:
"""
Init workflow run
:param workflow: Workflow instance
:param triggered_from: triggered from
:param user: account or end user
:param user_inputs: user variables inputs
:param system_inputs: system inputs, like: query, files
:return:
"""
max_sequence = db.session.query(db.func.max(WorkflowRun.sequence_number)) \
.filter(WorkflowRun.tenant_id == workflow.tenant_id) \
.filter(WorkflowRun.app_id == workflow.app_id) \
.scalar() or 0
new_sequence_number = max_sequence + 1
# init workflow run
workflow_run = WorkflowRun(
tenant_id=workflow.tenant_id,
app_id=workflow.app_id,
sequence_number=new_sequence_number,
workflow_id=workflow.id,
type=workflow.type,
triggered_from=triggered_from.value,
version=workflow.version,
graph=workflow.graph,
inputs=json.dumps({**user_inputs, **jsonable_encoder(system_inputs)}),
status=WorkflowRunStatus.RUNNING.value,
created_by_role=(CreatedByRole.ACCOUNT.value
if isinstance(user, Account) else CreatedByRole.END_USER.value),
created_by=user.id
)
db.session.add(workflow_run)
db.session.commit()
return workflow_run
def _workflow_run_success(self, workflow_run: WorkflowRun,
start_at: float,
total_tokens: int,
total_steps: int,
outputs: Optional[dict] = None) -> WorkflowRun:
"""
Workflow run success
:param workflow_run: workflow run
:param start_at: start time
:param total_tokens: total tokens
:param total_steps: total steps
:param outputs: outputs
:return:
"""
workflow_run.status = WorkflowRunStatus.SUCCEEDED.value
workflow_run.outputs = outputs
workflow_run.elapsed_time = time.perf_counter() - start_at
workflow_run.total_tokens = total_tokens
workflow_run.total_steps = total_steps
workflow_run.finished_at = datetime.utcnow()
db.session.commit()
return workflow_run
def _workflow_run_failed(self, workflow_run: WorkflowRun,
start_at: float,
total_tokens: int,
total_steps: int,
status: WorkflowRunStatus,
error: str) -> WorkflowRun:
"""
Workflow run failed
:param workflow_run: workflow run
:param start_at: start time
:param total_tokens: total tokens
:param total_steps: total steps
:param status: status
:param error: error message
:return:
"""
workflow_run.status = status.value
workflow_run.error = error
workflow_run.elapsed_time = time.perf_counter() - start_at
workflow_run.total_tokens = total_tokens
workflow_run.total_steps = total_steps
workflow_run.finished_at = datetime.utcnow()
db.session.commit()
return workflow_run
def _init_node_execution_from_workflow_run(self, workflow_run: WorkflowRun,
node_id: str,
node_type: NodeType,
node_title: str,
node_run_index: int = 1,
predecessor_node_id: Optional[str] = None) -> WorkflowNodeExecution:
"""
Init workflow node execution from workflow run
:param workflow_run: workflow run
:param node_id: node id
:param node_type: node type
:param node_title: node title
:param node_run_index: run index
:param predecessor_node_id: predecessor node id if exists
:return:
"""
# init workflow node execution
workflow_node_execution = WorkflowNodeExecution(
tenant_id=workflow_run.tenant_id,
app_id=workflow_run.app_id,
workflow_id=workflow_run.workflow_id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value,
workflow_run_id=workflow_run.id,
predecessor_node_id=predecessor_node_id,
index=node_run_index,
node_id=node_id,
node_type=node_type.value,
title=node_title,
status=WorkflowNodeExecutionStatus.RUNNING.value,
created_by_role=workflow_run.created_by_role,
created_by=workflow_run.created_by
)
db.session.add(workflow_node_execution)
db.session.commit()
return workflow_node_execution
def _workflow_node_execution_success(self, workflow_node_execution: WorkflowNodeExecution,
start_at: float,
inputs: Optional[dict] = None,
process_data: Optional[dict] = None,
outputs: Optional[dict] = None,
execution_metadata: Optional[dict] = None) -> WorkflowNodeExecution:
"""
Workflow node execution success
:param workflow_node_execution: workflow node execution
:param start_at: start time
:param inputs: inputs
:param process_data: process data
:param outputs: outputs
:param execution_metadata: execution metadata
:return:
"""
workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value
workflow_node_execution.elapsed_time = time.perf_counter() - start_at
workflow_node_execution.inputs = json.dumps(inputs) if inputs else None
workflow_node_execution.process_data = json.dumps(process_data) if process_data else None
workflow_node_execution.outputs = json.dumps(outputs) if outputs else None
workflow_node_execution.execution_metadata = json.dumps(jsonable_encoder(execution_metadata)) \
if execution_metadata else None
workflow_node_execution.finished_at = datetime.utcnow()
db.session.commit()
return workflow_node_execution
def _workflow_node_execution_failed(self, workflow_node_execution: WorkflowNodeExecution,
start_at: float,
error: str) -> WorkflowNodeExecution:
"""
Workflow node execution failed
:param workflow_node_execution: workflow node execution
:param start_at: start time
:param error: error message
:return:
"""
workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value
workflow_node_execution.error = error
workflow_node_execution.elapsed_time = time.perf_counter() - start_at
workflow_node_execution.finished_at = datetime.utcnow()
db.session.commit()
return workflow_node_execution
from enum import Enum from enum import Enum
from typing import Any from typing import Any, Optional
from pydantic import BaseModel from pydantic import BaseModel
from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk
from core.workflow.entities.base_node_data_entities import BaseNodeData
from core.workflow.entities.node_entities import NodeType
class QueueEvent(Enum): class QueueEvent(Enum):
...@@ -16,9 +18,11 @@ class QueueEvent(Enum): ...@@ -16,9 +18,11 @@ class QueueEvent(Enum):
MESSAGE_REPLACE = "message_replace" MESSAGE_REPLACE = "message_replace"
MESSAGE_END = "message_end" MESSAGE_END = "message_end"
WORKFLOW_STARTED = "workflow_started" WORKFLOW_STARTED = "workflow_started"
WORKFLOW_FINISHED = "workflow_finished" WORKFLOW_SUCCEEDED = "workflow_succeeded"
WORKFLOW_FAILED = "workflow_failed"
NODE_STARTED = "node_started" NODE_STARTED = "node_started"
NODE_FINISHED = "node_finished" NODE_SUCCEEDED = "node_succeeded"
NODE_FAILED = "node_failed"
RETRIEVER_RESOURCES = "retriever_resources" RETRIEVER_RESOURCES = "retriever_resources"
ANNOTATION_REPLY = "annotation_reply" ANNOTATION_REPLY = "annotation_reply"
AGENT_THOUGHT = "agent_thought" AGENT_THOUGHT = "agent_thought"
...@@ -96,15 +100,21 @@ class QueueWorkflowStartedEvent(AppQueueEvent): ...@@ -96,15 +100,21 @@ class QueueWorkflowStartedEvent(AppQueueEvent):
QueueWorkflowStartedEvent entity QueueWorkflowStartedEvent entity
""" """
event = QueueEvent.WORKFLOW_STARTED event = QueueEvent.WORKFLOW_STARTED
workflow_run_id: str
class QueueWorkflowFinishedEvent(AppQueueEvent): class QueueWorkflowSucceededEvent(AppQueueEvent):
""" """
QueueWorkflowFinishedEvent entity QueueWorkflowSucceededEvent entity
""" """
event = QueueEvent.WORKFLOW_FINISHED event = QueueEvent.WORKFLOW_SUCCEEDED
workflow_run_id: str
class QueueWorkflowFailedEvent(AppQueueEvent):
"""
QueueWorkflowFailedEvent entity
"""
event = QueueEvent.WORKFLOW_FAILED
error: str
class QueueNodeStartedEvent(AppQueueEvent): class QueueNodeStartedEvent(AppQueueEvent):
...@@ -112,17 +122,45 @@ class QueueNodeStartedEvent(AppQueueEvent): ...@@ -112,17 +122,45 @@ class QueueNodeStartedEvent(AppQueueEvent):
QueueNodeStartedEvent entity QueueNodeStartedEvent entity
""" """
event = QueueEvent.NODE_STARTED event = QueueEvent.NODE_STARTED
workflow_node_execution_id: str
node_id: str
node_type: NodeType
node_data: BaseNodeData
node_run_index: int = 1
predecessor_node_id: Optional[str] = None
class QueueNodeFinishedEvent(AppQueueEvent):
class QueueNodeSucceededEvent(AppQueueEvent):
""" """
QueueNodeFinishedEvent entity QueueNodeSucceededEvent entity
""" """
event = QueueEvent.NODE_FINISHED event = QueueEvent.NODE_SUCCEEDED
workflow_node_execution_id: str
node_id: str
node_type: NodeType
node_data: BaseNodeData
inputs: Optional[dict] = None
process_data: Optional[dict] = None
outputs: Optional[dict] = None
execution_metadata: Optional[dict] = None
error: Optional[str] = None
class QueueNodeFailedEvent(AppQueueEvent):
"""
QueueNodeFailedEvent entity
"""
event = QueueEvent.NODE_FAILED
node_id: str
node_type: NodeType
node_data: BaseNodeData
error: str
class QueueAgentThoughtEvent(AppQueueEvent): class QueueAgentThoughtEvent(AppQueueEvent):
""" """
QueueAgentThoughtEvent entity QueueAgentThoughtEvent entity
......
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Optional
from models.workflow import WorkflowNodeExecution, WorkflowRun from core.workflow.entities.base_node_data_entities import BaseNodeData
from core.workflow.entities.node_entities import NodeType
class BaseWorkflowCallback(ABC): class BaseWorkflowCallback(ABC):
@abstractmethod @abstractmethod
def on_workflow_run_started(self, workflow_run: WorkflowRun) -> None: def on_workflow_run_started(self) -> None:
""" """
Workflow run started Workflow run started
""" """
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def on_workflow_run_finished(self, workflow_run: WorkflowRun) -> None: def on_workflow_run_succeeded(self) -> None:
""" """
Workflow run finished Workflow run succeeded
""" """
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def on_workflow_node_execute_started(self, workflow_node_execution: WorkflowNodeExecution) -> None: def on_workflow_run_failed(self, error: str) -> None:
"""
Workflow run failed
"""
raise NotImplementedError
@abstractmethod
def on_workflow_node_execute_started(self, node_id: str,
node_type: NodeType,
node_data: BaseNodeData,
node_run_index: int = 1,
predecessor_node_id: Optional[str] = None) -> None:
""" """
Workflow node execute started Workflow node execute started
""" """
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def on_workflow_node_execute_finished(self, workflow_node_execution: WorkflowNodeExecution) -> None: def on_workflow_node_execute_succeeded(self, node_id: str,
node_type: NodeType,
node_data: BaseNodeData,
inputs: Optional[dict] = None,
process_data: Optional[dict] = None,
outputs: Optional[dict] = None,
execution_metadata: Optional[dict] = None) -> None:
""" """
Workflow node execute finished Workflow node execute succeeded
"""
raise NotImplementedError
@abstractmethod
def on_workflow_node_execute_failed(self, node_id: str,
node_type: NodeType,
node_data: BaseNodeData,
error: str) -> None:
"""
Workflow node execute failed
""" """
raise NotImplementedError raise NotImplementedError
...@@ -38,4 +67,3 @@ class BaseWorkflowCallback(ABC): ...@@ -38,4 +67,3 @@ class BaseWorkflowCallback(ABC):
Publish text chunk Publish text chunk
""" """
raise NotImplementedError raise NotImplementedError
from typing import Optional
from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.variable_pool import VariablePool
from models.workflow import WorkflowNodeExecution, WorkflowRun from core.workflow.nodes.base_node import BaseNode
from models.workflow import Workflow
class WorkflowNodeAndResult:
node: BaseNode
result: Optional[NodeRunResult] = None
def __init__(self, node: BaseNode, result: Optional[NodeRunResult] = None):
self.node = node
self.result = result
class WorkflowRunState: class WorkflowRunState:
workflow_run: WorkflowRun workflow: Workflow
start_at: float start_at: float
user_inputs: dict user_inputs: dict
variable_pool: VariablePool variable_pool: VariablePool
total_tokens: int = 0 total_tokens: int = 0
workflow_node_executions: list[WorkflowNodeExecution] = [] workflow_nodes_and_results: list[WorkflowNodeAndResult] = []
def __init__(self, workflow_run: WorkflowRun, def __init__(self, workflow: Workflow, start_at: float, user_inputs: dict, variable_pool: VariablePool):
start_at: float, self.workflow = workflow
user_inputs: dict,
variable_pool: VariablePool) -> None:
self.workflow_run = workflow_run
self.start_at = start_at self.start_at = start_at
self.user_inputs = user_inputs self.user_inputs = user_inputs
self.variable_pool = variable_pool self.variable_pool = variable_pool
...@@ -43,7 +43,7 @@ class DirectAnswerNode(BaseNode): ...@@ -43,7 +43,7 @@ class DirectAnswerNode(BaseNode):
# publish answer as stream # publish answer as stream
for word in answer: for word in answer:
self.publish_text_chunk(word) self.publish_text_chunk(word)
time.sleep(0.01) time.sleep(0.01) # todo sleep 0.01
return NodeRunResult( return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED, status=WorkflowNodeExecutionStatus.SUCCEEDED,
......
import json
import time import time
from datetime import datetime from typing import Optional
from typing import Optional, Union
from core.model_runtime.utils.encoders import jsonable_encoder from core.app.apps.base_app_queue_manager import GenerateTaskStoppedException
from core.workflow.callbacks.base_workflow_callback import BaseWorkflowCallback from core.workflow.callbacks.base_workflow_callback import BaseWorkflowCallback
from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult, NodeType from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult, NodeType
from core.workflow.entities.variable_pool import VariablePool, VariableValue from core.workflow.entities.variable_pool import VariablePool, VariableValue
from core.workflow.entities.workflow_entities import WorkflowRunState from core.workflow.entities.workflow_entities import WorkflowNodeAndResult, WorkflowRunState
from core.workflow.nodes.base_node import BaseNode from core.workflow.nodes.base_node import BaseNode
from core.workflow.nodes.code.code_node import CodeNode from core.workflow.nodes.code.code_node import CodeNode
from core.workflow.nodes.direct_answer.direct_answer_node import DirectAnswerNode from core.workflow.nodes.direct_answer.direct_answer_node import DirectAnswerNode
...@@ -21,18 +19,9 @@ from core.workflow.nodes.start.start_node import StartNode ...@@ -21,18 +19,9 @@ from core.workflow.nodes.start.start_node import StartNode
from core.workflow.nodes.template_transform.template_transform_node import TemplateTransformNode from core.workflow.nodes.template_transform.template_transform_node import TemplateTransformNode
from core.workflow.nodes.tool.tool_node import ToolNode from core.workflow.nodes.tool.tool_node import ToolNode
from core.workflow.nodes.variable_assigner.variable_assigner_node import VariableAssignerNode from core.workflow.nodes.variable_assigner.variable_assigner_node import VariableAssignerNode
from extensions.ext_database import db
from models.account import Account
from models.model import App, EndUser
from models.workflow import ( from models.workflow import (
CreatedByRole,
Workflow, Workflow,
WorkflowNodeExecution,
WorkflowNodeExecutionStatus, WorkflowNodeExecutionStatus,
WorkflowNodeExecutionTriggeredFrom,
WorkflowRun,
WorkflowRunStatus,
WorkflowRunTriggeredFrom,
WorkflowType, WorkflowType,
) )
...@@ -53,20 +42,6 @@ node_classes = { ...@@ -53,20 +42,6 @@ node_classes = {
class WorkflowEngineManager: class WorkflowEngineManager:
def get_workflow(self, app_model: App, workflow_id: str) -> Optional[Workflow]:
"""
Get workflow
"""
# fetch workflow by workflow_id
workflow = db.session.query(Workflow).filter(
Workflow.tenant_id == app_model.tenant_id,
Workflow.app_id == app_model.id,
Workflow.id == workflow_id
).first()
# return workflow
return workflow
def get_default_configs(self) -> list[dict]: def get_default_configs(self) -> list[dict]:
""" """
Get default block configs Get default block configs
...@@ -100,16 +75,12 @@ class WorkflowEngineManager: ...@@ -100,16 +75,12 @@ class WorkflowEngineManager:
return default_config return default_config
def run_workflow(self, workflow: Workflow, def run_workflow(self, workflow: Workflow,
triggered_from: WorkflowRunTriggeredFrom,
user: Union[Account, EndUser],
user_inputs: dict, user_inputs: dict,
system_inputs: Optional[dict] = None, system_inputs: Optional[dict] = None,
callbacks: list[BaseWorkflowCallback] = None) -> None: callbacks: list[BaseWorkflowCallback] = None) -> None:
""" """
Run workflow Run workflow
:param workflow: Workflow instance :param workflow: Workflow instance
:param triggered_from: triggered from
:param user: account or end user
:param user_inputs: user variables inputs :param user_inputs: user variables inputs
:param system_inputs: system inputs, like: query, files :param system_inputs: system inputs, like: query, files
:param callbacks: workflow callbacks :param callbacks: workflow callbacks
...@@ -130,18 +101,13 @@ class WorkflowEngineManager: ...@@ -130,18 +101,13 @@ class WorkflowEngineManager:
raise ValueError('edges in workflow graph must be a list') raise ValueError('edges in workflow graph must be a list')
# init workflow run # init workflow run
workflow_run = self._init_workflow_run( if callbacks:
workflow=workflow, for callback in callbacks:
triggered_from=triggered_from, callback.on_workflow_run_started()
user=user,
user_inputs=user_inputs,
system_inputs=system_inputs,
callbacks=callbacks
)
# init workflow run state # init workflow run state
workflow_run_state = WorkflowRunState( workflow_run_state = WorkflowRunState(
workflow_run=workflow_run, workflow=workflow,
start_at=time.perf_counter(), start_at=time.perf_counter(),
user_inputs=user_inputs, user_inputs=user_inputs,
variable_pool=VariablePool( variable_pool=VariablePool(
...@@ -166,7 +132,7 @@ class WorkflowEngineManager: ...@@ -166,7 +132,7 @@ class WorkflowEngineManager:
has_entry_node = True has_entry_node = True
# max steps 30 reached # max steps 30 reached
if len(workflow_run_state.workflow_node_executions) > 30: if len(workflow_run_state.workflow_nodes_and_results) > 30:
raise ValueError('Max steps 30 reached.') raise ValueError('Max steps 30 reached.')
# or max execution time 10min reached # or max execution time 10min reached
...@@ -188,14 +154,14 @@ class WorkflowEngineManager: ...@@ -188,14 +154,14 @@ class WorkflowEngineManager:
if not has_entry_node: if not has_entry_node:
self._workflow_run_failed( self._workflow_run_failed(
workflow_run_state=workflow_run_state,
error='Start node not found in workflow graph.', error='Start node not found in workflow graph.',
callbacks=callbacks callbacks=callbacks
) )
return return
except GenerateTaskStoppedException as e:
return
except Exception as e: except Exception as e:
self._workflow_run_failed( self._workflow_run_failed(
workflow_run_state=workflow_run_state,
error=str(e), error=str(e),
callbacks=callbacks callbacks=callbacks
) )
...@@ -203,112 +169,33 @@ class WorkflowEngineManager: ...@@ -203,112 +169,33 @@ class WorkflowEngineManager:
# workflow run success # workflow run success
self._workflow_run_success( self._workflow_run_success(
workflow_run_state=workflow_run_state,
callbacks=callbacks callbacks=callbacks
) )
def _init_workflow_run(self, workflow: Workflow, def _workflow_run_success(self, callbacks: list[BaseWorkflowCallback] = None) -> None:
triggered_from: WorkflowRunTriggeredFrom,
user: Union[Account, EndUser],
user_inputs: dict,
system_inputs: Optional[dict] = None,
callbacks: list[BaseWorkflowCallback] = None) -> WorkflowRun:
"""
Init workflow run
:param workflow: Workflow instance
:param triggered_from: triggered from
:param user: account or end user
:param user_inputs: user variables inputs
:param system_inputs: system inputs, like: query, files
:param callbacks: workflow callbacks
:return:
"""
max_sequence = db.session.query(db.func.max(WorkflowRun.sequence_number)) \
.filter(WorkflowRun.tenant_id == workflow.tenant_id) \
.filter(WorkflowRun.app_id == workflow.app_id) \
.scalar() or 0
new_sequence_number = max_sequence + 1
# init workflow run
workflow_run = WorkflowRun(
tenant_id=workflow.tenant_id,
app_id=workflow.app_id,
sequence_number=new_sequence_number,
workflow_id=workflow.id,
type=workflow.type,
triggered_from=triggered_from.value,
version=workflow.version,
graph=workflow.graph,
inputs=json.dumps({**user_inputs, **jsonable_encoder(system_inputs)}),
status=WorkflowRunStatus.RUNNING.value,
created_by_role=(CreatedByRole.ACCOUNT.value
if isinstance(user, Account) else CreatedByRole.END_USER.value),
created_by=user.id
)
db.session.add(workflow_run)
db.session.commit()
if callbacks:
for callback in callbacks:
callback.on_workflow_run_started(workflow_run)
return workflow_run
def _workflow_run_success(self, workflow_run_state: WorkflowRunState,
callbacks: list[BaseWorkflowCallback] = None) -> WorkflowRun:
""" """
Workflow run success Workflow run success
:param workflow_run_state: workflow run state
:param callbacks: workflow callbacks :param callbacks: workflow callbacks
:return: :return:
""" """
workflow_run = workflow_run_state.workflow_run
workflow_run.status = WorkflowRunStatus.SUCCEEDED.value
# fetch last workflow_node_executions
last_workflow_node_execution = workflow_run_state.workflow_node_executions[-1]
if last_workflow_node_execution:
workflow_run.outputs = last_workflow_node_execution.outputs
workflow_run.elapsed_time = time.perf_counter() - workflow_run_state.start_at
workflow_run.total_tokens = workflow_run_state.total_tokens
workflow_run.total_steps = len(workflow_run_state.workflow_node_executions)
workflow_run.finished_at = datetime.utcnow()
db.session.commit()
if callbacks: if callbacks:
for callback in callbacks: for callback in callbacks:
callback.on_workflow_run_finished(workflow_run) callback.on_workflow_run_succeeded()
return workflow_run def _workflow_run_failed(self, error: str,
callbacks: list[BaseWorkflowCallback] = None) -> None:
def _workflow_run_failed(self, workflow_run_state: WorkflowRunState,
error: str,
callbacks: list[BaseWorkflowCallback] = None) -> WorkflowRun:
""" """
Workflow run failed Workflow run failed
:param workflow_run_state: workflow run state
:param error: error message :param error: error message
:param callbacks: workflow callbacks :param callbacks: workflow callbacks
:return: :return:
""" """
workflow_run = workflow_run_state.workflow_run
workflow_run.status = WorkflowRunStatus.FAILED.value
workflow_run.error = error
workflow_run.elapsed_time = time.perf_counter() - workflow_run_state.start_at
workflow_run.total_tokens = workflow_run_state.total_tokens
workflow_run.total_steps = len(workflow_run_state.workflow_node_executions)
workflow_run.finished_at = datetime.utcnow()
db.session.commit()
if callbacks: if callbacks:
for callback in callbacks: for callback in callbacks:
callback.on_workflow_run_finished(workflow_run) callback.on_workflow_run_failed(
error=error
return workflow_run )
def _get_next_node(self, graph: dict, def _get_next_node(self, graph: dict,
predecessor_node: Optional[BaseNode] = None, predecessor_node: Optional[BaseNode] = None,
...@@ -384,18 +271,24 @@ class WorkflowEngineManager: ...@@ -384,18 +271,24 @@ class WorkflowEngineManager:
def _run_workflow_node(self, workflow_run_state: WorkflowRunState, def _run_workflow_node(self, workflow_run_state: WorkflowRunState,
node: BaseNode, node: BaseNode,
predecessor_node: Optional[BaseNode] = None, predecessor_node: Optional[BaseNode] = None,
callbacks: list[BaseWorkflowCallback] = None) -> WorkflowNodeExecution: callbacks: list[BaseWorkflowCallback] = None) -> None:
# init workflow node execution if callbacks:
start_at = time.perf_counter() for callback in callbacks:
workflow_node_execution = self._init_node_execution_from_workflow_run( callback.on_workflow_node_execute_started(
workflow_run_state=workflow_run_state, node_id=node.node_id,
node_type=node.node_type,
node_data=node.node_data,
node_run_index=len(workflow_run_state.workflow_nodes_and_results) + 1,
predecessor_node_id=predecessor_node.node_id if predecessor_node else None
)
workflow_nodes_and_result = WorkflowNodeAndResult(
node=node, node=node,
predecessor_node=predecessor_node, result=None
callbacks=callbacks
) )
# add to workflow node executions # add to workflow_nodes_and_results
workflow_run_state.workflow_node_executions.append(workflow_node_execution) workflow_run_state.workflow_nodes_and_results.append(workflow_nodes_and_result)
# run node, result must have inputs, process_data, outputs, execution_metadata # run node, result must have inputs, process_data, outputs, execution_metadata
node_run_result = node.run( node_run_result = node.run(
...@@ -406,24 +299,34 @@ class WorkflowEngineManager: ...@@ -406,24 +299,34 @@ class WorkflowEngineManager:
if node_run_result.status == WorkflowNodeExecutionStatus.FAILED: if node_run_result.status == WorkflowNodeExecutionStatus.FAILED:
# node run failed # node run failed
self._workflow_node_execution_failed( if callbacks:
workflow_node_execution=workflow_node_execution, for callback in callbacks:
start_at=start_at, callback.on_workflow_node_execute_failed(
error=node_run_result.error, node_id=node.node_id,
callbacks=callbacks node_type=node.node_type,
) node_data=node.node_data,
error=node_run_result.error
)
raise ValueError(f"Node {node.node_data.title} run failed: {node_run_result.error}") raise ValueError(f"Node {node.node_data.title} run failed: {node_run_result.error}")
# set end node output if in chat # set end node output if in chat
self._set_end_node_output_if_in_chat(workflow_run_state, node, node_run_result) self._set_end_node_output_if_in_chat(workflow_run_state, node, node_run_result)
workflow_nodes_and_result.result = node_run_result
# node run success # node run success
self._workflow_node_execution_success( if callbacks:
workflow_node_execution=workflow_node_execution, for callback in callbacks:
start_at=start_at, callback.on_workflow_node_execute_succeeded(
result=node_run_result, node_id=node.node_id,
callbacks=callbacks node_type=node.node_type,
) node_data=node.node_data,
inputs=node_run_result.inputs,
process_data=node_run_result.process_data,
outputs=node_run_result.outputs,
execution_metadata=node_run_result.metadata
)
if node_run_result.outputs: if node_run_result.outputs:
for variable_key, variable_value in node_run_result.outputs.items(): for variable_key, variable_value in node_run_result.outputs.items():
...@@ -438,105 +341,9 @@ class WorkflowEngineManager: ...@@ -438,105 +341,9 @@ class WorkflowEngineManager:
if node_run_result.metadata and node_run_result.metadata.get(NodeRunMetadataKey.TOTAL_TOKENS): if node_run_result.metadata and node_run_result.metadata.get(NodeRunMetadataKey.TOTAL_TOKENS):
workflow_run_state.total_tokens += int(node_run_result.metadata.get(NodeRunMetadataKey.TOTAL_TOKENS)) workflow_run_state.total_tokens += int(node_run_result.metadata.get(NodeRunMetadataKey.TOTAL_TOKENS))
return workflow_node_execution
def _init_node_execution_from_workflow_run(self, workflow_run_state: WorkflowRunState,
node: BaseNode,
predecessor_node: Optional[BaseNode] = None,
callbacks: list[BaseWorkflowCallback] = None) -> WorkflowNodeExecution:
"""
Init workflow node execution from workflow run
:param workflow_run_state: workflow run state
:param node: current node
:param predecessor_node: predecessor node if exists
:param callbacks: workflow callbacks
:return:
"""
workflow_run = workflow_run_state.workflow_run
# init workflow node execution
workflow_node_execution = WorkflowNodeExecution(
tenant_id=workflow_run.tenant_id,
app_id=workflow_run.app_id,
workflow_id=workflow_run.workflow_id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value,
workflow_run_id=workflow_run.id,
predecessor_node_id=predecessor_node.node_id if predecessor_node else None,
index=len(workflow_run_state.workflow_node_executions) + 1,
node_id=node.node_id,
node_type=node.node_type.value,
title=node.node_data.title,
status=WorkflowNodeExecutionStatus.RUNNING.value,
created_by_role=workflow_run.created_by_role,
created_by=workflow_run.created_by
)
db.session.add(workflow_node_execution)
db.session.commit()
if callbacks:
for callback in callbacks:
callback.on_workflow_node_execute_started(workflow_node_execution)
return workflow_node_execution
def _workflow_node_execution_success(self, workflow_node_execution: WorkflowNodeExecution,
start_at: float,
result: NodeRunResult,
callbacks: list[BaseWorkflowCallback] = None) -> WorkflowNodeExecution:
"""
Workflow node execution success
:param workflow_node_execution: workflow node execution
:param start_at: start time
:param result: node run result
:param callbacks: workflow callbacks
:return:
"""
workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value
workflow_node_execution.elapsed_time = time.perf_counter() - start_at
workflow_node_execution.inputs = json.dumps(result.inputs) if result.inputs else None
workflow_node_execution.process_data = json.dumps(result.process_data) if result.process_data else None
workflow_node_execution.outputs = json.dumps(result.outputs) if result.outputs else None
workflow_node_execution.execution_metadata = json.dumps(jsonable_encoder(result.metadata)) \
if result.metadata else None
workflow_node_execution.finished_at = datetime.utcnow()
db.session.commit()
if callbacks:
for callback in callbacks:
callback.on_workflow_node_execute_finished(workflow_node_execution)
return workflow_node_execution
def _workflow_node_execution_failed(self, workflow_node_execution: WorkflowNodeExecution,
start_at: float,
error: str,
callbacks: list[BaseWorkflowCallback] = None) -> WorkflowNodeExecution:
"""
Workflow node execution failed
:param workflow_node_execution: workflow node execution
:param start_at: start time
:param error: error message
:param callbacks: workflow callbacks
:return:
"""
workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value
workflow_node_execution.error = error
workflow_node_execution.elapsed_time = time.perf_counter() - start_at
workflow_node_execution.finished_at = datetime.utcnow()
db.session.commit()
if callbacks:
for callback in callbacks:
callback.on_workflow_node_execute_finished(workflow_node_execution)
return workflow_node_execution
def _set_end_node_output_if_in_chat(self, workflow_run_state: WorkflowRunState, def _set_end_node_output_if_in_chat(self, workflow_run_state: WorkflowRunState,
node: BaseNode, node: BaseNode,
node_run_result: NodeRunResult): node_run_result: NodeRunResult) -> None:
""" """
Set end node output if in chat Set end node output if in chat
:param workflow_run_state: workflow run state :param workflow_run_state: workflow run state
...@@ -544,21 +351,19 @@ class WorkflowEngineManager: ...@@ -544,21 +351,19 @@ class WorkflowEngineManager:
:param node_run_result: node run result :param node_run_result: node run result
:return: :return:
""" """
if workflow_run_state.workflow_run.type == WorkflowType.CHAT.value and node.node_type == NodeType.END: if workflow_run_state.workflow.type == WorkflowType.CHAT.value and node.node_type == NodeType.END:
workflow_node_execution_before_end = workflow_run_state.workflow_node_executions[-2] workflow_nodes_and_result_before_end = workflow_run_state.workflow_nodes_and_results[-2]
if workflow_node_execution_before_end: if workflow_nodes_and_result_before_end:
if workflow_node_execution_before_end.node_type == NodeType.LLM.value: if workflow_nodes_and_result_before_end.node.node_type == NodeType.LLM.value:
if not node_run_result.outputs: if not node_run_result.outputs:
node_run_result.outputs = {} node_run_result.outputs = {}
node_run_result.outputs['text'] = workflow_node_execution_before_end.outputs_dict.get('text') node_run_result.outputs['text'] = workflow_nodes_and_result_before_end.result.outputs.get('text')
elif workflow_node_execution_before_end.node_type == NodeType.DIRECT_ANSWER.value: elif workflow_nodes_and_result_before_end.node.node_type == NodeType.DIRECT_ANSWER.value:
if not node_run_result.outputs: if not node_run_result.outputs:
node_run_result.outputs = {} node_run_result.outputs = {}
node_run_result.outputs['text'] = workflow_node_execution_before_end.outputs_dict.get('answer') node_run_result.outputs['text'] = workflow_nodes_and_result_before_end.result.outputs.get('answer')
return node_run_result
def _append_variables_recursively(self, variable_pool: VariablePool, def _append_variables_recursively(self, variable_pool: VariablePool,
node_id: str, node_id: str,
......
...@@ -5,6 +5,7 @@ from typing import Optional, Union ...@@ -5,6 +5,7 @@ from typing import Optional, Union
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
from core.app.apps.workflow.app_generator import WorkflowAppGenerator from core.app.apps.workflow.app_generator import WorkflowAppGenerator
from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.app_invoke_entities import InvokeFrom
...@@ -44,10 +45,14 @@ class WorkflowService: ...@@ -44,10 +45,14 @@ class WorkflowService:
if not app_model.workflow_id: if not app_model.workflow_id:
return None return None
workflow_engine_manager = WorkflowEngineManager()
# fetch published workflow by workflow_id # fetch published workflow by workflow_id
return workflow_engine_manager.get_workflow(app_model, app_model.workflow_id) workflow = db.session.query(Workflow).filter(
Workflow.tenant_id == app_model.tenant_id,
Workflow.app_id == app_model.id,
Workflow.id == app_model.workflow_id
).first()
return workflow
def sync_draft_workflow(self, app_model: App, def sync_draft_workflow(self, app_model: App,
graph: dict, graph: dict,
...@@ -201,6 +206,14 @@ class WorkflowService: ...@@ -201,6 +206,14 @@ class WorkflowService:
return response return response
def stop_workflow_task(self, task_id: str,
user: Union[Account, EndUser],
invoke_from: InvokeFrom) -> None:
"""
Stop workflow task
"""
AppQueueManager.set_stop_flag(task_id, invoke_from, user.id)
def convert_to_workflow(self, app_model: App, account: Account) -> App: def convert_to_workflow(self, app_model: App, account: Account) -> App:
""" """
Basic mode of chatbot app(expert mode) to workflow Basic mode of chatbot app(expert mode) to workflow
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment