Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
D
dify
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ai-tech
dify
Commits
63721834
Commit
63721834
authored
Mar 06, 2024
by
takatost
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
refactor workflow generate pipeline
parent
5963e7d1
Changes
30
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
30 changed files
with
1175 additions
and
110 deletions
+1175
-110
completion.py
api/controllers/console/app/completion.py
+1
-1
completion.py
api/controllers/console/explore/completion.py
+1
-1
completion.py
api/controllers/service_api/app/completion.py
+1
-1
completion.py
api/controllers/web/completion.py
+1
-1
base_agent_runner.py
api/core/agent/base_agent_runner.py
+1
-1
cot_agent_runner.py
api/core/agent/cot_agent_runner.py
+22
-9
fc_agent_runner.py
api/core/agent/fc_agent_runner.py
+20
-10
app_generator.py
api/core/app/apps/advanced_chat/app_generator.py
+3
-2
app_runner.py
api/core/app/apps/advanced_chat/app_runner.py
+11
-8
generate_task_pipeline.py
api/core/app/apps/advanced_chat/generate_task_pipeline.py
+9
-3
app_generator.py
api/core/app/apps/agent_chat/app_generator.py
+3
-2
app_runner.py
api/core/app/apps/agent_chat/app_runner.py
+6
-4
base_app_queue_manager.py
api/core/app/apps/base_app_queue_manager.py
+181
-0
base_app_runner.py
api/core/app/apps/base_app_runner.py
+38
-20
app_generator.py
api/core/app/apps/chat/app_generator.py
+3
-2
app_runner.py
api/core/app/apps/chat/app_runner.py
+6
-4
app_generator.py
api/core/app/apps/completion/app_generator.py
+4
-3
app_runner.py
api/core/app/apps/completion/app_runner.py
+1
-1
easy_ui_based_generate_task_pipeline.py
api/core/app/apps/easy_ui_based_generate_task_pipeline.py
+15
-10
message_based_app_generator.py
api/core/app/apps/message_based_app_generator.py
+1
-1
message_based_app_queue_manager.py
api/core/app/apps/message_based_app_queue_manager.py
+29
-0
app_generator.py
api/core/app/apps/workflow/app_generator.py
+164
-0
app_queue_manager.py
api/core/app/apps/workflow/app_queue_manager.py
+23
-0
app_runner.py
api/core/app/apps/workflow/app_runner.py
+156
-0
generate_task_pipeline.py
api/core/app/apps/workflow/generate_task_pipeline.py
+408
-0
app_invoke_entities.py
api/core/app/entities/app_invoke_entities.py
+2
-2
index_tool_callback_handler.py
api/core/callback_handler/index_tool_callback_handler.py
+6
-2
workflow_event_trigger_callback.py
api/core/callback_handler/workflow_event_trigger_callback.py
+25
-16
output_moderation.py
api/core/moderation/output_moderation.py
+15
-4
workflow_service.py
api/services/workflow_service.py
+19
-2
No files found.
api/controllers/console/app/completion.py
View file @
63721834
...
...
@@ -21,7 +21,7 @@ from controllers.console.app.error import (
from
controllers.console.app.wraps
import
get_app_model
from
controllers.console.setup
import
setup_required
from
controllers.console.wraps
import
account_initialization_required
from
core.app.app_queue_manager
import
AppQueueManager
from
core.app.app
s.base_app
_queue_manager
import
AppQueueManager
from
core.app.entities.app_invoke_entities
import
InvokeFrom
from
core.errors.error
import
ModelCurrentlyNotSupportError
,
ProviderTokenNotInitError
,
QuotaExceededError
from
core.model_runtime.errors.invoke
import
InvokeError
...
...
api/controllers/console/explore/completion.py
View file @
63721834
...
...
@@ -21,7 +21,7 @@ from controllers.console.app.error import (
)
from
controllers.console.explore.error
import
NotChatAppError
,
NotCompletionAppError
from
controllers.console.explore.wraps
import
InstalledAppResource
from
core.app.app_queue_manager
import
AppQueueManager
from
core.app.app
s.base_app
_queue_manager
import
AppQueueManager
from
core.app.entities.app_invoke_entities
import
InvokeFrom
from
core.errors.error
import
ModelCurrentlyNotSupportError
,
ProviderTokenNotInitError
,
QuotaExceededError
from
core.model_runtime.errors.invoke
import
InvokeError
...
...
api/controllers/service_api/app/completion.py
View file @
63721834
...
...
@@ -19,7 +19,7 @@ from controllers.service_api.app.error import (
ProviderQuotaExceededError
,
)
from
controllers.service_api.wraps
import
FetchUserArg
,
WhereisUserArg
,
validate_app_token
from
core.app.app_queue_manager
import
AppQueueManager
from
core.app.app
s.base_app
_queue_manager
import
AppQueueManager
from
core.app.entities.app_invoke_entities
import
InvokeFrom
from
core.errors.error
import
ModelCurrentlyNotSupportError
,
ProviderTokenNotInitError
,
QuotaExceededError
from
core.model_runtime.errors.invoke
import
InvokeError
...
...
api/controllers/web/completion.py
View file @
63721834
...
...
@@ -20,7 +20,7 @@ from controllers.web.error import (
ProviderQuotaExceededError
,
)
from
controllers.web.wraps
import
WebApiResource
from
core.app.app_queue_manager
import
AppQueueManager
from
core.app.app
s.base_app
_queue_manager
import
AppQueueManager
from
core.app.entities.app_invoke_entities
import
InvokeFrom
from
core.errors.error
import
ModelCurrentlyNotSupportError
,
ProviderTokenNotInitError
,
QuotaExceededError
from
core.model_runtime.errors.invoke
import
InvokeError
...
...
api/core/agent/base_agent_runner.py
View file @
63721834
...
...
@@ -6,8 +6,8 @@ from mimetypes import guess_extension
from
typing
import
Optional
,
Union
,
cast
from
core.agent.entities
import
AgentEntity
,
AgentToolEntity
from
core.app.app_queue_manager
import
AppQueueManager
from
core.app.apps.agent_chat.app_config_manager
import
AgentChatAppConfig
from
core.app.apps.base_app_queue_manager
import
AppQueueManager
from
core.app.apps.base_app_runner
import
AppRunner
from
core.app.entities.app_invoke_entities
import
(
AgentChatAppGenerateEntity
,
...
...
api/core/agent/cot_agent_runner.py
View file @
63721834
...
...
@@ -5,7 +5,8 @@ from typing import Literal, Union
from
core.agent.base_agent_runner
import
BaseAgentRunner
from
core.agent.entities
import
AgentPromptEntity
,
AgentScratchpadUnit
from
core.app.app_queue_manager
import
PublishFrom
from
core.app.apps.base_app_queue_manager
import
PublishFrom
from
core.app.entities.queue_entities
import
QueueAgentThoughtEvent
,
QueueMessageEndEvent
,
QueueMessageFileEvent
from
core.model_runtime.entities.llm_entities
import
LLMResult
,
LLMResultChunk
,
LLMResultChunkDelta
,
LLMUsage
from
core.model_runtime.entities.message_entities
import
(
AssistantPromptMessage
,
...
...
@@ -121,7 +122,9 @@ class CotAgentRunner(BaseAgentRunner):
)
if
iteration_step
>
1
:
self
.
queue_manager
.
publish_agent_thought
(
agent_thought
,
PublishFrom
.
APPLICATION_MANAGER
)
self
.
queue_manager
.
publish
(
QueueAgentThoughtEvent
(
agent_thought_id
=
agent_thought
.
id
),
PublishFrom
.
APPLICATION_MANAGER
)
# update prompt messages
prompt_messages
=
self
.
_organize_cot_prompt_messages
(
...
...
@@ -163,7 +166,9 @@ class CotAgentRunner(BaseAgentRunner):
# publish agent thought if it's first iteration
if
iteration_step
==
1
:
self
.
queue_manager
.
publish_agent_thought
(
agent_thought
,
PublishFrom
.
APPLICATION_MANAGER
)
self
.
queue_manager
.
publish
(
QueueAgentThoughtEvent
(
agent_thought_id
=
agent_thought
.
id
),
PublishFrom
.
APPLICATION_MANAGER
)
for
chunk
in
react_chunks
:
if
isinstance
(
chunk
,
dict
):
...
...
@@ -225,7 +230,9 @@ class CotAgentRunner(BaseAgentRunner):
llm_usage
=
usage_dict
[
'usage'
])
if
scratchpad
.
action
and
scratchpad
.
action
.
action_name
.
lower
()
!=
"final answer"
:
self
.
queue_manager
.
publish_agent_thought
(
agent_thought
,
PublishFrom
.
APPLICATION_MANAGER
)
self
.
queue_manager
.
publish
(
QueueAgentThoughtEvent
(
agent_thought_id
=
agent_thought
.
id
),
PublishFrom
.
APPLICATION_MANAGER
)
if
not
scratchpad
.
action
:
# failed to extract action, return final answer directly
...
...
@@ -255,7 +262,9 @@ class CotAgentRunner(BaseAgentRunner):
observation
=
answer
,
answer
=
answer
,
messages_ids
=
[])
self
.
queue_manager
.
publish_agent_thought
(
agent_thought
,
PublishFrom
.
APPLICATION_MANAGER
)
self
.
queue_manager
.
publish
(
QueueAgentThoughtEvent
(
agent_thought_id
=
agent_thought
.
id
),
PublishFrom
.
APPLICATION_MANAGER
)
else
:
# invoke tool
error_response
=
None
...
...
@@ -282,7 +291,9 @@ class CotAgentRunner(BaseAgentRunner):
self
.
variables_pool
.
set_file
(
tool_name
=
tool_call_name
,
value
=
message_file
.
id
,
name
=
save_as
)
self
.
queue_manager
.
publish_message_file
(
message_file
,
PublishFrom
.
APPLICATION_MANAGER
)
self
.
queue_manager
.
publish
(
QueueMessageFileEvent
(
message_file_id
=
message_file
.
id
),
PublishFrom
.
APPLICATION_MANAGER
)
message_file_ids
=
[
message_file
.
id
for
message_file
,
_
in
message_files
]
except
ToolProviderCredentialValidationError
as
e
:
...
...
@@ -318,7 +329,9 @@ class CotAgentRunner(BaseAgentRunner):
answer
=
scratchpad
.
agent_response
,
messages_ids
=
message_file_ids
,
)
self
.
queue_manager
.
publish_agent_thought
(
agent_thought
,
PublishFrom
.
APPLICATION_MANAGER
)
self
.
queue_manager
.
publish
(
QueueAgentThoughtEvent
(
agent_thought_id
=
agent_thought
.
id
),
PublishFrom
.
APPLICATION_MANAGER
)
# update prompt tool message
for
prompt_tool
in
prompt_messages_tools
:
...
...
@@ -352,7 +365,7 @@ class CotAgentRunner(BaseAgentRunner):
self
.
update_db_variables
(
self
.
variables_pool
,
self
.
db_variables_pool
)
# publish end event
self
.
queue_manager
.
publish
_message_end
(
LLMResult
(
self
.
queue_manager
.
publish
(
QueueMessageEndEvent
(
llm_result
=
LLMResult
(
model
=
model_instance
.
model
,
prompt_messages
=
prompt_messages
,
message
=
AssistantPromptMessage
(
...
...
@@ -360,7 +373,7 @@ class CotAgentRunner(BaseAgentRunner):
),
usage
=
llm_usage
[
'usage'
]
if
llm_usage
[
'usage'
]
else
LLMUsage
.
empty_usage
(),
system_fingerprint
=
''
),
PublishFrom
.
APPLICATION_MANAGER
)
)
)
,
PublishFrom
.
APPLICATION_MANAGER
)
def
_handle_stream_react
(
self
,
llm_response
:
Generator
[
LLMResultChunk
,
None
,
None
],
usage
:
dict
)
\
->
Generator
[
Union
[
str
,
dict
],
None
,
None
]:
...
...
api/core/agent/fc_agent_runner.py
View file @
63721834
...
...
@@ -4,7 +4,8 @@ from collections.abc import Generator
from
typing
import
Any
,
Union
from
core.agent.base_agent_runner
import
BaseAgentRunner
from
core.app.app_queue_manager
import
PublishFrom
from
core.app.apps.base_app_queue_manager
import
PublishFrom
from
core.app.entities.queue_entities
import
QueueAgentThoughtEvent
,
QueueMessageEndEvent
,
QueueMessageFileEvent
from
core.model_runtime.entities.llm_entities
import
LLMResult
,
LLMResultChunk
,
LLMResultChunkDelta
,
LLMUsage
from
core.model_runtime.entities.message_entities
import
(
AssistantPromptMessage
,
...
...
@@ -135,7 +136,9 @@ class FunctionCallAgentRunner(BaseAgentRunner):
is_first_chunk
=
True
for
chunk
in
chunks
:
if
is_first_chunk
:
self
.
queue_manager
.
publish_agent_thought
(
agent_thought
,
PublishFrom
.
APPLICATION_MANAGER
)
self
.
queue_manager
.
publish
(
QueueAgentThoughtEvent
(
agent_thought_id
=
agent_thought
.
id
),
PublishFrom
.
APPLICATION_MANAGER
)
is_first_chunk
=
False
# check if there is any tool call
if
self
.
check_tool_calls
(
chunk
):
...
...
@@ -195,7 +198,9 @@ class FunctionCallAgentRunner(BaseAgentRunner):
if
not
result
.
message
.
content
:
result
.
message
.
content
=
''
self
.
queue_manager
.
publish_agent_thought
(
agent_thought
,
PublishFrom
.
APPLICATION_MANAGER
)
self
.
queue_manager
.
publish
(
QueueAgentThoughtEvent
(
agent_thought_id
=
agent_thought
.
id
),
PublishFrom
.
APPLICATION_MANAGER
)
yield
LLMResultChunk
(
model
=
model_instance
.
model
,
...
...
@@ -233,8 +238,9 @@ class FunctionCallAgentRunner(BaseAgentRunner):
messages_ids
=
[],
llm_usage
=
current_llm_usage
)
self
.
queue_manager
.
publish_agent_thought
(
agent_thought
,
PublishFrom
.
APPLICATION_MANAGER
)
self
.
queue_manager
.
publish
(
QueueAgentThoughtEvent
(
agent_thought_id
=
agent_thought
.
id
),
PublishFrom
.
APPLICATION_MANAGER
)
final_answer
+=
response
+
'
\n
'
...
...
@@ -275,7 +281,9 @@ class FunctionCallAgentRunner(BaseAgentRunner):
self
.
variables_pool
.
set_file
(
tool_name
=
tool_call_name
,
value
=
message_file
.
id
,
name
=
save_as
)
# publish message file
self
.
queue_manager
.
publish_message_file
(
message_file
,
PublishFrom
.
APPLICATION_MANAGER
)
self
.
queue_manager
.
publish
(
QueueMessageFileEvent
(
message_file_id
=
message_file
.
id
),
PublishFrom
.
APPLICATION_MANAGER
)
# add message file ids
message_file_ids
.
append
(
message_file
.
id
)
...
...
@@ -331,7 +339,9 @@ class FunctionCallAgentRunner(BaseAgentRunner):
answer
=
None
,
messages_ids
=
message_file_ids
)
self
.
queue_manager
.
publish_agent_thought
(
agent_thought
,
PublishFrom
.
APPLICATION_MANAGER
)
self
.
queue_manager
.
publish
(
QueueAgentThoughtEvent
(
agent_thought_id
=
agent_thought
.
id
),
PublishFrom
.
APPLICATION_MANAGER
)
# update prompt tool
for
prompt_tool
in
prompt_messages_tools
:
...
...
@@ -341,15 +351,15 @@ class FunctionCallAgentRunner(BaseAgentRunner):
self
.
update_db_variables
(
self
.
variables_pool
,
self
.
db_variables_pool
)
# publish end event
self
.
queue_manager
.
publish
_message_end
(
LLMResult
(
self
.
queue_manager
.
publish
(
QueueMessageEndEvent
(
llm_result
=
LLMResult
(
model
=
model_instance
.
model
,
prompt_messages
=
prompt_messages
,
message
=
AssistantPromptMessage
(
content
=
final_answer
,
content
=
final_answer
),
usage
=
llm_usage
[
'usage'
]
if
llm_usage
[
'usage'
]
else
LLMUsage
.
empty_usage
(),
system_fingerprint
=
''
),
PublishFrom
.
APPLICATION_MANAGER
)
)
)
,
PublishFrom
.
APPLICATION_MANAGER
)
def
check_tool_calls
(
self
,
llm_result_chunk
:
LLMResultChunk
)
->
bool
:
"""
...
...
api/core/app/apps/advanced_chat/app_generator.py
View file @
63721834
...
...
@@ -8,11 +8,12 @@ from flask import Flask, current_app
from
pydantic
import
ValidationError
from
core.app.app_config.features.file_upload.manager
import
FileUploadConfigManager
from
core.app.app_queue_manager
import
AppQueueManager
,
ConversationTaskStoppedException
,
PublishFrom
from
core.app.apps.advanced_chat.app_config_manager
import
AdvancedChatAppConfigManager
from
core.app.apps.advanced_chat.app_runner
import
AdvancedChatAppRunner
from
core.app.apps.advanced_chat.generate_task_pipeline
import
AdvancedChatAppGenerateTaskPipeline
from
core.app.apps.base_app_queue_manager
import
AppQueueManager
,
ConversationTaskStoppedException
,
PublishFrom
from
core.app.apps.message_based_app_generator
import
MessageBasedAppGenerator
from
core.app.apps.message_based_app_queue_manager
import
MessageBasedAppQueueManager
from
core.app.entities.app_invoke_entities
import
AdvancedChatAppGenerateEntity
,
InvokeFrom
from
core.file.message_file_parser
import
MessageFileParser
from
core.model_runtime.errors.invoke
import
InvokeAuthorizationError
,
InvokeError
...
...
@@ -101,7 +102,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
)
=
self
.
_init_generate_records
(
application_generate_entity
,
conversation
)
# init queue manager
queue_manager
=
AppQueueManager
(
queue_manager
=
MessageBased
AppQueueManager
(
task_id
=
application_generate_entity
.
task_id
,
user_id
=
application_generate_entity
.
user_id
,
invoke_from
=
application_generate_entity
.
invoke_from
,
...
...
api/core/app/apps/advanced_chat/app_runner.py
View file @
63721834
...
...
@@ -2,14 +2,14 @@ import logging
import
time
from
typing
import
cast
from
core.app.app_queue_manager
import
AppQueueManager
,
PublishFrom
from
core.app.apps.advanced_chat.app_config_manager
import
AdvancedChatAppConfig
from
core.app.apps.base_app_queue_manager
import
AppQueueManager
,
PublishFrom
from
core.app.apps.base_app_runner
import
AppRunner
from
core.app.entities.app_invoke_entities
import
(
AdvancedChatAppGenerateEntity
,
InvokeFrom
,
)
from
core.app.entities.queue_entities
import
Queue
Stop
Event
from
core.app.entities.queue_entities
import
Queue
AnnotationReplyEvent
,
QueueStopEvent
,
QueueTextChunk
Event
from
core.callback_handler.workflow_event_trigger_callback
import
WorkflowEventTriggerCallback
from
core.moderation.base
import
ModerationException
from
core.workflow.entities.node_entities
import
SystemVariable
...
...
@@ -93,7 +93,7 @@ class AdvancedChatAppRunner(AppRunner):
SystemVariable
.
FILES
:
files
,
SystemVariable
.
CONVERSATION
:
conversation
.
id
,
},
callbacks
=
[
WorkflowEventTriggerCallback
(
queue_manager
=
queue_manager
)]
,
callbacks
=
[
WorkflowEventTriggerCallback
(
queue_manager
=
queue_manager
)]
)
def
handle_input_moderation
(
self
,
queue_manager
:
AppQueueManager
,
...
...
@@ -153,9 +153,9 @@ class AdvancedChatAppRunner(AppRunner):
)
if
annotation_reply
:
queue_manager
.
publish
_annotation_reply
(
message_annotation_id
=
annotation_reply
.
id
,
pub_from
=
PublishFrom
.
APPLICATION_MANAGER
queue_manager
.
publish
(
QueueAnnotationReplyEvent
(
message_annotation_id
=
annotation_reply
.
id
)
,
PublishFrom
.
APPLICATION_MANAGER
)
self
.
_stream_output
(
...
...
@@ -182,7 +182,11 @@ class AdvancedChatAppRunner(AppRunner):
if
stream
:
index
=
0
for
token
in
text
:
queue_manager
.
publish_text_chunk
(
token
,
PublishFrom
.
APPLICATION_MANAGER
)
queue_manager
.
publish
(
QueueTextChunkEvent
(
text
=
token
),
PublishFrom
.
APPLICATION_MANAGER
)
index
+=
1
time
.
sleep
(
0.01
)
...
...
@@ -190,4 +194,3 @@ class AdvancedChatAppRunner(AppRunner):
QueueStopEvent
(
stopped_by
=
stopped_by
),
PublishFrom
.
APPLICATION_MANAGER
)
queue_manager
.
stop_listen
()
api/core/app/apps/advanced_chat/generate_task_pipeline.py
View file @
63721834
...
...
@@ -6,7 +6,7 @@ from typing import Optional, Union
from
pydantic
import
BaseModel
from
core.app.app_queue_manager
import
AppQueueManager
,
PublishFrom
from
core.app.app
s.base_app
_queue_manager
import
AppQueueManager
,
PublishFrom
from
core.app.entities.app_invoke_entities
import
(
AdvancedChatAppGenerateEntity
,
InvokeFrom
,
...
...
@@ -46,6 +46,7 @@ class TaskState(BaseModel):
"""
answer
:
str
=
""
metadata
:
dict
=
{}
usage
:
LLMUsage
class
AdvancedChatAppGenerateTaskPipeline
:
...
...
@@ -349,7 +350,12 @@ class AdvancedChatAppGenerateTaskPipeline:
if
self
.
_output_moderation_handler
.
should_direct_output
():
# stop subscribe new token when output moderation should direct output
self
.
_task_state
.
answer
=
self
.
_output_moderation_handler
.
get_final_output
()
self
.
_queue_manager
.
publish_text_chunk
(
self
.
_task_state
.
answer
,
PublishFrom
.
TASK_PIPELINE
)
self
.
_queue_manager
.
publish
(
QueueTextChunkEvent
(
text
=
self
.
_task_state
.
answer
),
PublishFrom
.
TASK_PIPELINE
)
self
.
_queue_manager
.
publish
(
QueueStopEvent
(
stopped_by
=
QueueStopEvent
.
StopBy
.
OUTPUT_MODERATION
),
PublishFrom
.
TASK_PIPELINE
...
...
@@ -558,5 +564,5 @@ class AdvancedChatAppGenerateTaskPipeline:
type
=
sensitive_word_avoidance
.
type
,
config
=
sensitive_word_avoidance
.
config
),
on_message_replace_func
=
self
.
_queue_manager
.
publish_message_replace
queue_manager
=
self
.
_queue_manager
)
api/core/app/apps/agent_chat/app_generator.py
View file @
63721834
...
...
@@ -9,10 +9,11 @@ from pydantic import ValidationError
from
core.app.app_config.easy_ui_based_app.model_config.converter
import
ModelConfigConverter
from
core.app.app_config.features.file_upload.manager
import
FileUploadConfigManager
from
core.app.app_queue_manager
import
AppQueueManager
,
ConversationTaskStoppedException
,
PublishFrom
from
core.app.apps.agent_chat.app_config_manager
import
AgentChatAppConfigManager
from
core.app.apps.agent_chat.app_runner
import
AgentChatAppRunner
from
core.app.apps.base_app_queue_manager
import
AppQueueManager
,
ConversationTaskStoppedException
,
PublishFrom
from
core.app.apps.message_based_app_generator
import
MessageBasedAppGenerator
from
core.app.apps.message_based_app_queue_manager
import
MessageBasedAppQueueManager
from
core.app.entities.app_invoke_entities
import
AgentChatAppGenerateEntity
,
InvokeFrom
from
core.file.message_file_parser
import
MessageFileParser
from
core.model_runtime.errors.invoke
import
InvokeAuthorizationError
,
InvokeError
...
...
@@ -119,7 +120,7 @@ class AgentChatAppGenerator(MessageBasedAppGenerator):
)
=
self
.
_init_generate_records
(
application_generate_entity
,
conversation
)
# init queue manager
queue_manager
=
AppQueueManager
(
queue_manager
=
MessageBased
AppQueueManager
(
task_id
=
application_generate_entity
.
task_id
,
user_id
=
application_generate_entity
.
user_id
,
invoke_from
=
application_generate_entity
.
invoke_from
,
...
...
api/core/app/apps/agent_chat/app_runner.py
View file @
63721834
...
...
@@ -4,10 +4,11 @@ from typing import cast
from
core.agent.cot_agent_runner
import
CotAgentRunner
from
core.agent.entities
import
AgentEntity
from
core.agent.fc_agent_runner
import
FunctionCallAgentRunner
from
core.app.app_queue_manager
import
AppQueueManager
,
PublishFrom
from
core.app.apps.agent_chat.app_config_manager
import
AgentChatAppConfig
from
core.app.apps.base_app_queue_manager
import
AppQueueManager
,
PublishFrom
from
core.app.apps.base_app_runner
import
AppRunner
from
core.app.entities.app_invoke_entities
import
AgentChatAppGenerateEntity
,
ModelConfigWithCredentialsEntity
from
core.app.entities.queue_entities
import
QueueAnnotationReplyEvent
from
core.memory.token_buffer_memory
import
TokenBufferMemory
from
core.model_manager
import
ModelInstance
from
core.model_runtime.entities.llm_entities
import
LLMUsage
...
...
@@ -120,10 +121,11 @@ class AgentChatAppRunner(AppRunner):
)
if
annotation_reply
:
queue_manager
.
publish
_annotation_reply
(
message_annotation_id
=
annotation_reply
.
id
,
pub_from
=
PublishFrom
.
APPLICATION_MANAGER
queue_manager
.
publish
(
QueueAnnotationReplyEvent
(
message_annotation_id
=
annotation_reply
.
id
)
,
PublishFrom
.
APPLICATION_MANAGER
)
self
.
direct_output
(
queue_manager
=
queue_manager
,
app_generate_entity
=
application_generate_entity
,
...
...
api/core/app/app_queue_manager.py
→
api/core/app/app
s/base_app
_queue_manager.py
View file @
63721834
import
queue
import
time
from
abc
import
abstractmethod
from
collections.abc
import
Generator
from
enum
import
Enum
from
typing
import
Any
...
...
@@ -9,27 +10,13 @@ from sqlalchemy.orm import DeclarativeMeta
from
core.app.entities.app_invoke_entities
import
InvokeFrom
from
core.app.entities.queue_entities
import
(
AppQueueEvent
,
QueueAgentMessageEvent
,
QueueAgentThoughtEvent
,
QueueAnnotationReplyEvent
,
QueueErrorEvent
,
QueueLLMChunkEvent
,
QueueMessage
,
QueueMessageEndEvent
,
QueueMessageFileEvent
,
QueueMessageReplaceEvent
,
QueueNodeFinishedEvent
,
QueueNodeStartedEvent
,
QueuePingEvent
,
QueueRetrieverResourcesEvent
,
QueueStopEvent
,
QueueTextChunkEvent
,
QueueWorkflowFinishedEvent
,
QueueWorkflowStartedEvent
,
)
from
core.model_runtime.entities.llm_entities
import
LLMResult
,
LLMResultChunk
from
extensions.ext_redis
import
redis_client
from
models.model
import
MessageAgentThought
,
MessageFile
class
PublishFrom
(
Enum
):
...
...
@@ -40,19 +27,13 @@ class PublishFrom(Enum):
class
AppQueueManager
:
def
__init__
(
self
,
task_id
:
str
,
user_id
:
str
,
invoke_from
:
InvokeFrom
,
conversation_id
:
str
,
app_mode
:
str
,
message_id
:
str
)
->
None
:
invoke_from
:
InvokeFrom
)
->
None
:
if
not
user_id
:
raise
ValueError
(
"user is required"
)
self
.
_task_id
=
task_id
self
.
_user_id
=
user_id
self
.
_invoke_from
=
invoke_from
self
.
_conversation_id
=
str
(
conversation_id
)
self
.
_app_mode
=
app_mode
self
.
_message_id
=
str
(
message_id
)
user_prefix
=
'account'
if
self
.
_invoke_from
in
[
InvokeFrom
.
EXPLORE
,
InvokeFrom
.
DEBUGGER
]
else
'end-user'
redis_client
.
setex
(
AppQueueManager
.
_generate_task_belong_cache_key
(
self
.
_task_id
),
1800
,
f
"{user_prefix}-{self._user_id}"
)
...
...
@@ -89,7 +70,6 @@ class AppQueueManager:
QueueStopEvent
(
stopped_by
=
QueueStopEvent
.
StopBy
.
USER_MANUAL
),
PublishFrom
.
TASK_PIPELINE
)
self
.
stop_listen
()
if
elapsed_time
//
10
>
last_ping_time
:
self
.
publish
(
QueuePingEvent
(),
PublishFrom
.
TASK_PIPELINE
)
...
...
@@ -102,137 +82,6 @@ class AppQueueManager:
"""
self
.
_q
.
put
(
None
)
def
publish_llm_chunk
(
self
,
chunk
:
LLMResultChunk
,
pub_from
:
PublishFrom
)
->
None
:
"""
Publish llm chunk to channel
:param chunk: llm chunk
:param pub_from: publish from
:return:
"""
self
.
publish
(
QueueLLMChunkEvent
(
chunk
=
chunk
),
pub_from
)
def
publish_text_chunk
(
self
,
text
:
str
,
pub_from
:
PublishFrom
)
->
None
:
"""
Publish text chunk to channel
:param text: text
:param pub_from: publish from
:return:
"""
self
.
publish
(
QueueTextChunkEvent
(
text
=
text
),
pub_from
)
def
publish_agent_chunk_message
(
self
,
chunk
:
LLMResultChunk
,
pub_from
:
PublishFrom
)
->
None
:
"""
Publish agent chunk message to channel
:param chunk: chunk
:param pub_from: publish from
:return:
"""
self
.
publish
(
QueueAgentMessageEvent
(
chunk
=
chunk
),
pub_from
)
def
publish_message_replace
(
self
,
text
:
str
,
pub_from
:
PublishFrom
)
->
None
:
"""
Publish message replace
:param text: text
:param pub_from: publish from
:return:
"""
self
.
publish
(
QueueMessageReplaceEvent
(
text
=
text
),
pub_from
)
def
publish_retriever_resources
(
self
,
retriever_resources
:
list
[
dict
],
pub_from
:
PublishFrom
)
->
None
:
"""
Publish retriever resources
:return:
"""
self
.
publish
(
QueueRetrieverResourcesEvent
(
retriever_resources
=
retriever_resources
),
pub_from
)
def
publish_annotation_reply
(
self
,
message_annotation_id
:
str
,
pub_from
:
PublishFrom
)
->
None
:
"""
Publish annotation reply
:param message_annotation_id: message annotation id
:param pub_from: publish from
:return:
"""
self
.
publish
(
QueueAnnotationReplyEvent
(
message_annotation_id
=
message_annotation_id
),
pub_from
)
def
publish_message_end
(
self
,
llm_result
:
LLMResult
,
pub_from
:
PublishFrom
)
->
None
:
"""
Publish message end
:param llm_result: llm result
:param pub_from: publish from
:return:
"""
self
.
publish
(
QueueMessageEndEvent
(
llm_result
=
llm_result
),
pub_from
)
self
.
stop_listen
()
def
publish_workflow_started
(
self
,
workflow_run_id
:
str
,
pub_from
:
PublishFrom
)
->
None
:
"""
Publish workflow started
:param workflow_run_id: workflow run id
:param pub_from: publish from
:return:
"""
self
.
publish
(
QueueWorkflowStartedEvent
(
workflow_run_id
=
workflow_run_id
),
pub_from
)
def
publish_workflow_finished
(
self
,
workflow_run_id
:
str
,
pub_from
:
PublishFrom
)
->
None
:
"""
Publish workflow finished
:param workflow_run_id: workflow run id
:param pub_from: publish from
:return:
"""
self
.
publish
(
QueueWorkflowFinishedEvent
(
workflow_run_id
=
workflow_run_id
),
pub_from
)
def
publish_node_started
(
self
,
workflow_node_execution_id
:
str
,
pub_from
:
PublishFrom
)
->
None
:
"""
Publish node started
:param workflow_node_execution_id: workflow node execution id
:param pub_from: publish from
:return:
"""
self
.
publish
(
QueueNodeStartedEvent
(
workflow_node_execution_id
=
workflow_node_execution_id
),
pub_from
)
def
publish_node_finished
(
self
,
workflow_node_execution_id
:
str
,
pub_from
:
PublishFrom
)
->
None
:
"""
Publish node finished
:param workflow_node_execution_id: workflow node execution id
:param pub_from: publish from
:return:
"""
self
.
publish
(
QueueNodeFinishedEvent
(
workflow_node_execution_id
=
workflow_node_execution_id
),
pub_from
)
def
publish_agent_thought
(
self
,
message_agent_thought
:
MessageAgentThought
,
pub_from
:
PublishFrom
)
->
None
:
"""
Publish agent thought
:param message_agent_thought: message agent thought
:param pub_from: publish from
:return:
"""
self
.
publish
(
QueueAgentThoughtEvent
(
agent_thought_id
=
message_agent_thought
.
id
),
pub_from
)
def
publish_message_file
(
self
,
message_file
:
MessageFile
,
pub_from
:
PublishFrom
)
->
None
:
"""
Publish agent thought
:param message_file: message file
:param pub_from: publish from
:return:
"""
self
.
publish
(
QueueMessageFileEvent
(
message_file_id
=
message_file
.
id
),
pub_from
)
def
publish_error
(
self
,
e
,
pub_from
:
PublishFrom
)
->
None
:
"""
Publish error
...
...
@@ -243,7 +92,6 @@ class AppQueueManager:
self
.
publish
(
QueueErrorEvent
(
error
=
e
),
pub_from
)
self
.
stop_listen
()
def
publish
(
self
,
event
:
AppQueueEvent
,
pub_from
:
PublishFrom
)
->
None
:
"""
...
...
@@ -254,22 +102,20 @@ class AppQueueManager:
"""
self
.
_check_for_sqlalchemy_models
(
event
.
dict
())
message
=
QueueMessage
(
task_id
=
self
.
_task_id
,
message_id
=
self
.
_message_id
,
conversation_id
=
self
.
_conversation_id
,
app_mode
=
self
.
_app_mode
,
event
=
event
)
message
=
self
.
construct_queue_message
(
event
)
self
.
_q
.
put
(
message
)
if
isinstance
(
event
,
QueueStopEvent
):
if
isinstance
(
event
,
QueueStopEvent
|
QueueErrorEvent
|
QueueMessageEndEvent
):
self
.
stop_listen
()
if
pub_from
==
PublishFrom
.
APPLICATION_MANAGER
and
self
.
_is_stopped
():
raise
ConversationTaskStoppedException
()
@
abstractmethod
def
construct_queue_message
(
self
,
event
:
AppQueueEvent
)
->
QueueMessage
:
raise
NotImplementedError
@
classmethod
def
set_stop_flag
(
cls
,
task_id
:
str
,
invoke_from
:
InvokeFrom
,
user_id
:
str
)
->
None
:
"""
...
...
api/core/app/apps/base_app_runner.py
View file @
63721834
...
...
@@ -3,13 +3,14 @@ from collections.abc import Generator
from
typing
import
Optional
,
Union
,
cast
from
core.app.app_config.entities
import
ExternalDataVariableEntity
,
PromptTemplateEntity
from
core.app.app_queue_manager
import
AppQueueManager
,
PublishFrom
from
core.app.app
s.base_app
_queue_manager
import
AppQueueManager
,
PublishFrom
from
core.app.entities.app_invoke_entities
import
(
AppGenerateEntity
,
EasyUIBasedAppGenerateEntity
,
InvokeFrom
,
ModelConfigWithCredentialsEntity
,
)
from
core.app.entities.queue_entities
import
QueueAgentMessageEvent
,
QueueLLMChunkEvent
,
QueueMessageEndEvent
from
core.app.features.annotation_reply.annotation_reply
import
AnnotationReplyFeature
from
core.app.features.hosting_moderation.hosting_moderation
import
HostingModerationFeature
from
core.external_data_tool.external_data_fetch
import
ExternalDataFetch
...
...
@@ -187,25 +188,32 @@ class AppRunner:
if
stream
:
index
=
0
for
token
in
text
:
queue_manager
.
publish_llm_chunk
(
LLMResultChunk
(
chunk
=
LLMResultChunk
(
model
=
app_generate_entity
.
model_config
.
model
,
prompt_messages
=
prompt_messages
,
delta
=
LLMResultChunkDelta
(
index
=
index
,
message
=
AssistantPromptMessage
(
content
=
token
)
)
),
PublishFrom
.
APPLICATION_MANAGER
)
)
queue_manager
.
publish
(
QueueLLMChunkEvent
(
chunk
=
chunk
),
PublishFrom
.
APPLICATION_MANAGER
)
index
+=
1
time
.
sleep
(
0.01
)
queue_manager
.
publish_message_end
(
llm_result
=
LLMResult
(
model
=
app_generate_entity
.
model_config
.
model
,
prompt_messages
=
prompt_messages
,
message
=
AssistantPromptMessage
(
content
=
text
),
usage
=
usage
if
usage
else
LLMUsage
.
empty_usage
()
),
pub_from
=
PublishFrom
.
APPLICATION_MANAGER
queue_manager
.
publish
(
QueueMessageEndEvent
(
llm_result
=
LLMResult
(
model
=
app_generate_entity
.
model_config
.
model
,
prompt_messages
=
prompt_messages
,
message
=
AssistantPromptMessage
(
content
=
text
),
usage
=
usage
if
usage
else
LLMUsage
.
empty_usage
()
),
),
PublishFrom
.
APPLICATION_MANAGER
)
def
_handle_invoke_result
(
self
,
invoke_result
:
Union
[
LLMResult
,
Generator
],
...
...
@@ -241,9 +249,10 @@ class AppRunner:
:param queue_manager: application queue manager
:return:
"""
queue_manager
.
publish_message_end
(
llm_result
=
invoke_result
,
pub_from
=
PublishFrom
.
APPLICATION_MANAGER
queue_manager
.
publish
(
QueueMessageEndEvent
(
llm_result
=
invoke_result
,
),
PublishFrom
.
APPLICATION_MANAGER
)
def
_handle_invoke_result_stream
(
self
,
invoke_result
:
Generator
,
...
...
@@ -261,9 +270,17 @@ class AppRunner:
usage
=
None
for
result
in
invoke_result
:
if
not
agent
:
queue_manager
.
publish_llm_chunk
(
result
,
PublishFrom
.
APPLICATION_MANAGER
)
queue_manager
.
publish
(
QueueLLMChunkEvent
(
chunk
=
result
),
PublishFrom
.
APPLICATION_MANAGER
)
else
:
queue_manager
.
publish_agent_chunk_message
(
result
,
PublishFrom
.
APPLICATION_MANAGER
)
queue_manager
.
publish
(
QueueAgentMessageEvent
(
chunk
=
result
),
PublishFrom
.
APPLICATION_MANAGER
)
text
+=
result
.
delta
.
message
.
content
...
...
@@ -286,9 +303,10 @@ class AppRunner:
usage
=
usage
)
queue_manager
.
publish_message_end
(
llm_result
=
llm_result
,
pub_from
=
PublishFrom
.
APPLICATION_MANAGER
queue_manager
.
publish
(
QueueMessageEndEvent
(
llm_result
=
llm_result
,
),
PublishFrom
.
APPLICATION_MANAGER
)
def
moderation_for_inputs
(
self
,
app_id
:
str
,
...
...
@@ -311,7 +329,7 @@ class AppRunner:
tenant_id
=
tenant_id
,
app_config
=
app_generate_entity
.
app_config
,
inputs
=
inputs
,
query
=
query
,
query
=
query
if
query
else
''
)
def
check_hosting_moderation
(
self
,
application_generate_entity
:
EasyUIBasedAppGenerateEntity
,
...
...
api/core/app/apps/chat/app_generator.py
View file @
63721834
...
...
@@ -9,10 +9,11 @@ from pydantic import ValidationError
from
core.app.app_config.easy_ui_based_app.model_config.converter
import
ModelConfigConverter
from
core.app.app_config.features.file_upload.manager
import
FileUploadConfigManager
from
core.app.app_queue_manager
import
AppQueueManager
,
ConversationTaskStoppedException
,
PublishFrom
from
core.app.app
s.base_app
_queue_manager
import
AppQueueManager
,
ConversationTaskStoppedException
,
PublishFrom
from
core.app.apps.chat.app_config_manager
import
ChatAppConfigManager
from
core.app.apps.chat.app_runner
import
ChatAppRunner
from
core.app.apps.message_based_app_generator
import
MessageBasedAppGenerator
from
core.app.apps.message_based_app_queue_manager
import
MessageBasedAppQueueManager
from
core.app.entities.app_invoke_entities
import
ChatAppGenerateEntity
,
InvokeFrom
from
core.file.message_file_parser
import
MessageFileParser
from
core.model_runtime.errors.invoke
import
InvokeAuthorizationError
,
InvokeError
...
...
@@ -119,7 +120,7 @@ class ChatAppGenerator(MessageBasedAppGenerator):
)
=
self
.
_init_generate_records
(
application_generate_entity
,
conversation
)
# init queue manager
queue_manager
=
AppQueueManager
(
queue_manager
=
MessageBased
AppQueueManager
(
task_id
=
application_generate_entity
.
task_id
,
user_id
=
application_generate_entity
.
user_id
,
invoke_from
=
application_generate_entity
.
invoke_from
,
...
...
api/core/app/apps/chat/app_runner.py
View file @
63721834
import
logging
from
typing
import
cast
from
core.app.app_queue_manager
import
AppQueueManager
,
PublishFrom
from
core.app.app
s.base_app
_queue_manager
import
AppQueueManager
,
PublishFrom
from
core.app.apps.base_app_runner
import
AppRunner
from
core.app.apps.chat.app_config_manager
import
ChatAppConfig
from
core.app.entities.app_invoke_entities
import
(
ChatAppGenerateEntity
,
)
from
core.app.entities.queue_entities
import
QueueAnnotationReplyEvent
from
core.callback_handler.index_tool_callback_handler
import
DatasetIndexToolCallbackHandler
from
core.memory.token_buffer_memory
import
TokenBufferMemory
from
core.model_manager
import
ModelInstance
...
...
@@ -117,10 +118,11 @@ class ChatAppRunner(AppRunner):
)
if
annotation_reply
:
queue_manager
.
publish
_annotation_reply
(
message_annotation_id
=
annotation_reply
.
id
,
pub_from
=
PublishFrom
.
APPLICATION_MANAGER
queue_manager
.
publish
(
QueueAnnotationReplyEvent
(
message_annotation_id
=
annotation_reply
.
id
)
,
PublishFrom
.
APPLICATION_MANAGER
)
self
.
direct_output
(
queue_manager
=
queue_manager
,
app_generate_entity
=
application_generate_entity
,
...
...
api/core/app/apps/completion/app_generator.py
View file @
63721834
...
...
@@ -9,10 +9,11 @@ from pydantic import ValidationError
from
core.app.app_config.easy_ui_based_app.model_config.converter
import
ModelConfigConverter
from
core.app.app_config.features.file_upload.manager
import
FileUploadConfigManager
from
core.app.app_queue_manager
import
AppQueueManager
,
ConversationTaskStoppedException
,
PublishFrom
from
core.app.app
s.base_app
_queue_manager
import
AppQueueManager
,
ConversationTaskStoppedException
,
PublishFrom
from
core.app.apps.completion.app_config_manager
import
CompletionAppConfigManager
from
core.app.apps.completion.app_runner
import
CompletionAppRunner
from
core.app.apps.message_based_app_generator
import
MessageBasedAppGenerator
from
core.app.apps.message_based_app_queue_manager
import
MessageBasedAppQueueManager
from
core.app.entities.app_invoke_entities
import
CompletionAppGenerateEntity
,
InvokeFrom
from
core.file.message_file_parser
import
MessageFileParser
from
core.model_runtime.errors.invoke
import
InvokeAuthorizationError
,
InvokeError
...
...
@@ -112,7 +113,7 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
)
=
self
.
_init_generate_records
(
application_generate_entity
)
# init queue manager
queue_manager
=
AppQueueManager
(
queue_manager
=
MessageBased
AppQueueManager
(
task_id
=
application_generate_entity
.
task_id
,
user_id
=
application_generate_entity
.
user_id
,
invoke_from
=
application_generate_entity
.
invoke_from
,
...
...
@@ -263,7 +264,7 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
)
=
self
.
_init_generate_records
(
application_generate_entity
)
# init queue manager
queue_manager
=
AppQueueManager
(
queue_manager
=
MessageBased
AppQueueManager
(
task_id
=
application_generate_entity
.
task_id
,
user_id
=
application_generate_entity
.
user_id
,
invoke_from
=
application_generate_entity
.
invoke_from
,
...
...
api/core/app/apps/completion/app_runner.py
View file @
63721834
import
logging
from
typing
import
cast
from
core.app.app_queue_manager
import
AppQueueManager
from
core.app.app
s.base_app
_queue_manager
import
AppQueueManager
from
core.app.apps.base_app_runner
import
AppRunner
from
core.app.apps.completion.app_config_manager
import
CompletionAppConfig
from
core.app.entities.app_invoke_entities
import
(
...
...
api/core/app/apps/easy_ui_based_generate_task_pipeline.py
View file @
63721834
...
...
@@ -6,7 +6,7 @@ from typing import Optional, Union, cast
from
pydantic
import
BaseModel
from
core.app.app_queue_manager
import
AppQueueManager
,
PublishFrom
from
core.app.app
s.base_app
_queue_manager
import
AppQueueManager
,
PublishFrom
from
core.app.entities.app_invoke_entities
import
(
AgentChatAppGenerateEntity
,
ChatAppGenerateEntity
,
...
...
@@ -385,14 +385,19 @@ class EasyUIBasedGenerateTaskPipeline:
if
self
.
_output_moderation_handler
.
should_direct_output
():
# stop subscribe new token when output moderation should direct output
self
.
_task_state
.
llm_result
.
message
.
content
=
self
.
_output_moderation_handler
.
get_final_output
()
self
.
_queue_manager
.
publish_llm_chunk
(
LLMResultChunk
(
model
=
self
.
_task_state
.
llm_result
.
model
,
prompt_messages
=
self
.
_task_state
.
llm_result
.
prompt_messages
,
delta
=
LLMResultChunkDelta
(
index
=
0
,
message
=
AssistantPromptMessage
(
content
=
self
.
_task_state
.
llm_result
.
message
.
content
)
)
),
PublishFrom
.
TASK_PIPELINE
)
self
.
_queue_manager
.
publish
(
QueueLLMChunkEvent
(
chunk
=
LLMResultChunk
(
model
=
self
.
_task_state
.
llm_result
.
model
,
prompt_messages
=
self
.
_task_state
.
llm_result
.
prompt_messages
,
delta
=
LLMResultChunkDelta
(
index
=
0
,
message
=
AssistantPromptMessage
(
content
=
self
.
_task_state
.
llm_result
.
message
.
content
)
)
)
),
PublishFrom
.
TASK_PIPELINE
)
self
.
_queue_manager
.
publish
(
QueueStopEvent
(
stopped_by
=
QueueStopEvent
.
StopBy
.
OUTPUT_MODERATION
),
PublishFrom
.
TASK_PIPELINE
...
...
@@ -664,5 +669,5 @@ class EasyUIBasedGenerateTaskPipeline:
type
=
sensitive_word_avoidance
.
type
,
config
=
sensitive_word_avoidance
.
config
),
on_message_replace_func
=
self
.
_queue_manager
.
publish_message_replace
queue_manager
=
self
.
_queue_manager
)
api/core/app/apps/message_based_app_generator.py
View file @
63721834
...
...
@@ -6,8 +6,8 @@ from typing import Optional, Union
from
sqlalchemy
import
and_
from
core.app.app_config.entities
import
EasyUIBasedAppModelConfigFrom
from
core.app.app_queue_manager
import
AppQueueManager
,
ConversationTaskStoppedException
from
core.app.apps.base_app_generator
import
BaseAppGenerator
from
core.app.apps.base_app_queue_manager
import
AppQueueManager
,
ConversationTaskStoppedException
from
core.app.apps.easy_ui_based_generate_task_pipeline
import
EasyUIBasedGenerateTaskPipeline
from
core.app.entities.app_invoke_entities
import
(
AdvancedChatAppGenerateEntity
,
...
...
api/core/app/apps/message_based_app_queue_manager.py
0 → 100644
View file @
63721834
from
core.app.apps.base_app_queue_manager
import
AppQueueManager
from
core.app.entities.app_invoke_entities
import
InvokeFrom
from
core.app.entities.queue_entities
import
(
AppQueueEvent
,
QueueMessage
,
)
class
MessageBasedAppQueueManager
(
AppQueueManager
):
def
__init__
(
self
,
task_id
:
str
,
user_id
:
str
,
invoke_from
:
InvokeFrom
,
conversation_id
:
str
,
app_mode
:
str
,
message_id
:
str
)
->
None
:
super
()
.
__init__
(
task_id
,
user_id
,
invoke_from
)
self
.
_conversation_id
=
str
(
conversation_id
)
self
.
_app_mode
=
app_mode
self
.
_message_id
=
str
(
message_id
)
def
construct_queue_message
(
self
,
event
:
AppQueueEvent
)
->
QueueMessage
:
return
QueueMessage
(
task_id
=
self
.
_task_id
,
message_id
=
self
.
_message_id
,
conversation_id
=
self
.
_conversation_id
,
app_mode
=
self
.
_app_mode
,
event
=
event
)
api/core/app/apps/workflow/app_generator.py
0 → 100644
View file @
63721834
import
logging
import
threading
import
uuid
from
collections.abc
import
Generator
from
typing
import
Union
from
flask
import
Flask
,
current_app
from
pydantic
import
ValidationError
from
core.app.app_config.features.file_upload.manager
import
FileUploadConfigManager
from
core.app.apps.base_app_generator
import
BaseAppGenerator
from
core.app.apps.base_app_queue_manager
import
AppQueueManager
,
ConversationTaskStoppedException
,
PublishFrom
from
core.app.apps.workflow.app_config_manager
import
WorkflowAppConfigManager
from
core.app.apps.workflow.app_queue_manager
import
WorkflowAppQueueManager
from
core.app.apps.workflow.app_runner
import
WorkflowAppRunner
from
core.app.apps.workflow.generate_task_pipeline
import
WorkflowAppGenerateTaskPipeline
from
core.app.entities.app_invoke_entities
import
InvokeFrom
,
WorkflowAppGenerateEntity
from
core.file.message_file_parser
import
MessageFileParser
from
core.model_runtime.errors.invoke
import
InvokeAuthorizationError
,
InvokeError
from
extensions.ext_database
import
db
from
models.account
import
Account
from
models.model
import
App
,
EndUser
from
models.workflow
import
Workflow
logger
=
logging
.
getLogger
(
__name__
)
class
WorkflowAppGenerator
(
BaseAppGenerator
):
def
generate
(
self
,
app_model
:
App
,
workflow
:
Workflow
,
user
:
Union
[
Account
,
EndUser
],
args
:
dict
,
invoke_from
:
InvokeFrom
,
stream
:
bool
=
True
)
\
->
Union
[
dict
,
Generator
]:
"""
Generate App response.
:param app_model: App
:param workflow: Workflow
:param user: account or end user
:param args: request args
:param invoke_from: invoke from source
:param stream: is stream
"""
inputs
=
args
[
'inputs'
]
# parse files
files
=
args
[
'files'
]
if
'files'
in
args
and
args
[
'files'
]
else
[]
message_file_parser
=
MessageFileParser
(
tenant_id
=
app_model
.
tenant_id
,
app_id
=
app_model
.
id
)
file_upload_entity
=
FileUploadConfigManager
.
convert
(
workflow
.
features_dict
)
if
file_upload_entity
:
file_objs
=
message_file_parser
.
validate_and_transform_files_arg
(
files
,
file_upload_entity
,
user
)
else
:
file_objs
=
[]
# convert to app config
app_config
=
WorkflowAppConfigManager
.
get_app_config
(
app_model
=
app_model
,
workflow
=
workflow
)
# init application generate entity
application_generate_entity
=
WorkflowAppGenerateEntity
(
task_id
=
str
(
uuid
.
uuid4
()),
app_config
=
app_config
,
inputs
=
self
.
_get_cleaned_inputs
(
inputs
,
app_config
),
files
=
file_objs
,
user_id
=
user
.
id
,
stream
=
stream
,
invoke_from
=
invoke_from
)
# init queue manager
queue_manager
=
WorkflowAppQueueManager
(
task_id
=
application_generate_entity
.
task_id
,
user_id
=
application_generate_entity
.
user_id
,
invoke_from
=
application_generate_entity
.
invoke_from
,
app_mode
=
app_model
.
mode
)
# new thread
worker_thread
=
threading
.
Thread
(
target
=
self
.
_generate_worker
,
kwargs
=
{
'flask_app'
:
current_app
.
_get_current_object
(),
'application_generate_entity'
:
application_generate_entity
,
'queue_manager'
:
queue_manager
})
worker_thread
.
start
()
# return response or stream generator
return
self
.
_handle_response
(
application_generate_entity
=
application_generate_entity
,
queue_manager
=
queue_manager
,
stream
=
stream
)
def
_generate_worker
(
self
,
flask_app
:
Flask
,
application_generate_entity
:
WorkflowAppGenerateEntity
,
queue_manager
:
AppQueueManager
)
->
None
:
"""
Generate worker in a new thread.
:param flask_app: Flask app
:param application_generate_entity: application generate entity
:param queue_manager: queue manager
:return:
"""
with
flask_app
.
app_context
():
try
:
# workflow app
runner
=
WorkflowAppRunner
()
runner
.
run
(
application_generate_entity
=
application_generate_entity
,
queue_manager
=
queue_manager
)
except
ConversationTaskStoppedException
:
pass
except
InvokeAuthorizationError
:
queue_manager
.
publish_error
(
InvokeAuthorizationError
(
'Incorrect API key provided'
),
PublishFrom
.
APPLICATION_MANAGER
)
except
ValidationError
as
e
:
logger
.
exception
(
"Validation Error when generating"
)
queue_manager
.
publish_error
(
e
,
PublishFrom
.
APPLICATION_MANAGER
)
except
(
ValueError
,
InvokeError
)
as
e
:
queue_manager
.
publish_error
(
e
,
PublishFrom
.
APPLICATION_MANAGER
)
except
Exception
as
e
:
logger
.
exception
(
"Unknown Error when generating"
)
queue_manager
.
publish_error
(
e
,
PublishFrom
.
APPLICATION_MANAGER
)
finally
:
db
.
session
.
remove
()
def
_handle_response
(
self
,
application_generate_entity
:
WorkflowAppGenerateEntity
,
queue_manager
:
AppQueueManager
,
stream
:
bool
=
False
)
->
Union
[
dict
,
Generator
]:
"""
Handle response.
:param application_generate_entity: application generate entity
:param queue_manager: queue manager
:param stream: is stream
:return:
"""
# init generate task pipeline
generate_task_pipeline
=
WorkflowAppGenerateTaskPipeline
(
application_generate_entity
=
application_generate_entity
,
queue_manager
=
queue_manager
,
stream
=
stream
)
try
:
return
generate_task_pipeline
.
process
()
except
ValueError
as
e
:
if
e
.
args
[
0
]
==
"I/O operation on closed file."
:
# ignore this error
raise
ConversationTaskStoppedException
()
else
:
logger
.
exception
(
e
)
raise
e
finally
:
db
.
session
.
remove
()
api/core/app/apps/workflow/app_queue_manager.py
0 → 100644
View file @
63721834
from
core.app.apps.base_app_queue_manager
import
AppQueueManager
from
core.app.entities.app_invoke_entities
import
InvokeFrom
from
core.app.entities.queue_entities
import
(
AppQueueEvent
,
QueueMessage
,
)
class
WorkflowAppQueueManager
(
AppQueueManager
):
def
__init__
(
self
,
task_id
:
str
,
user_id
:
str
,
invoke_from
:
InvokeFrom
,
app_mode
:
str
)
->
None
:
super
()
.
__init__
(
task_id
,
user_id
,
invoke_from
)
self
.
_app_mode
=
app_mode
def
construct_queue_message
(
self
,
event
:
AppQueueEvent
)
->
QueueMessage
:
return
QueueMessage
(
task_id
=
self
.
_task_id
,
app_mode
=
self
.
_app_mode
,
event
=
event
)
api/core/app/apps/workflow/app_runner.py
0 → 100644
View file @
63721834
import
logging
import
time
from
typing
import
cast
from
core.app.apps.base_app_queue_manager
import
AppQueueManager
,
PublishFrom
from
core.app.apps.workflow.app_config_manager
import
WorkflowAppConfig
from
core.app.entities.app_invoke_entities
import
(
AppGenerateEntity
,
InvokeFrom
,
WorkflowAppGenerateEntity
,
)
from
core.app.entities.queue_entities
import
QueueStopEvent
,
QueueTextChunkEvent
from
core.callback_handler.workflow_event_trigger_callback
import
WorkflowEventTriggerCallback
from
core.moderation.base
import
ModerationException
from
core.moderation.input_moderation
import
InputModeration
from
core.workflow.entities.node_entities
import
SystemVariable
from
core.workflow.workflow_engine_manager
import
WorkflowEngineManager
from
extensions.ext_database
import
db
from
models.account
import
Account
from
models.model
import
App
,
EndUser
from
models.workflow
import
WorkflowRunTriggeredFrom
logger
=
logging
.
getLogger
(
__name__
)
class
WorkflowAppRunner
:
"""
Workflow Application Runner
"""
def
run
(
self
,
application_generate_entity
:
WorkflowAppGenerateEntity
,
queue_manager
:
AppQueueManager
)
->
None
:
"""
Run application
:param application_generate_entity: application generate entity
:param queue_manager: application queue manager
:return:
"""
app_config
=
application_generate_entity
.
app_config
app_config
=
cast
(
WorkflowAppConfig
,
app_config
)
app_record
=
db
.
session
.
query
(
App
)
.
filter
(
App
.
id
==
app_config
.
app_id
)
.
first
()
if
not
app_record
:
raise
ValueError
(
"App not found"
)
workflow
=
WorkflowEngineManager
()
.
get_workflow
(
app_model
=
app_record
,
workflow_id
=
app_config
.
workflow_id
)
if
not
workflow
:
raise
ValueError
(
"Workflow not initialized"
)
inputs
=
application_generate_entity
.
inputs
files
=
application_generate_entity
.
files
# moderation
if
self
.
handle_input_moderation
(
queue_manager
=
queue_manager
,
app_record
=
app_record
,
app_generate_entity
=
application_generate_entity
,
inputs
=
inputs
):
return
# fetch user
if
application_generate_entity
.
invoke_from
in
[
InvokeFrom
.
DEBUGGER
,
InvokeFrom
.
EXPLORE
]:
user
=
db
.
session
.
query
(
Account
)
.
filter
(
Account
.
id
==
application_generate_entity
.
user_id
)
.
first
()
else
:
user
=
db
.
session
.
query
(
EndUser
)
.
filter
(
EndUser
.
id
==
application_generate_entity
.
user_id
)
.
first
()
# RUN WORKFLOW
workflow_engine_manager
=
WorkflowEngineManager
()
workflow_engine_manager
.
run_workflow
(
workflow
=
workflow
,
triggered_from
=
WorkflowRunTriggeredFrom
.
DEBUGGING
if
application_generate_entity
.
invoke_from
==
InvokeFrom
.
DEBUGGER
else
WorkflowRunTriggeredFrom
.
APP_RUN
,
user
=
user
,
user_inputs
=
inputs
,
system_inputs
=
{
SystemVariable
.
FILES
:
files
},
callbacks
=
[
WorkflowEventTriggerCallback
(
queue_manager
=
queue_manager
)]
)
def
handle_input_moderation
(
self
,
queue_manager
:
AppQueueManager
,
app_record
:
App
,
app_generate_entity
:
WorkflowAppGenerateEntity
,
inputs
:
dict
)
->
bool
:
"""
Handle input moderation
:param queue_manager: application queue manager
:param app_record: app record
:param app_generate_entity: application generate entity
:param inputs: inputs
:return:
"""
try
:
# process sensitive_word_avoidance
moderation_feature
=
InputModeration
()
_
,
inputs
,
query
=
moderation_feature
.
check
(
app_id
=
app_record
.
id
,
tenant_id
=
app_generate_entity
.
app_config
.
tenant_id
,
app_config
=
app_generate_entity
.
app_config
,
inputs
=
inputs
,
query
=
''
)
except
ModerationException
as
e
:
if
app_generate_entity
.
stream
:
self
.
_stream_output
(
queue_manager
=
queue_manager
,
text
=
str
(
e
),
)
queue_manager
.
publish
(
QueueStopEvent
(
stopped_by
=
QueueStopEvent
.
StopBy
.
INPUT_MODERATION
),
PublishFrom
.
APPLICATION_MANAGER
)
return
True
return
False
def
_stream_output
(
self
,
queue_manager
:
AppQueueManager
,
text
:
str
)
->
None
:
"""
Direct output
:param queue_manager: application queue manager
:param text: text
:return:
"""
index
=
0
for
token
in
text
:
queue_manager
.
publish
(
QueueTextChunkEvent
(
text
=
token
),
PublishFrom
.
APPLICATION_MANAGER
)
index
+=
1
time
.
sleep
(
0.01
)
def
moderation_for_inputs
(
self
,
app_id
:
str
,
tenant_id
:
str
,
app_generate_entity
:
AppGenerateEntity
,
inputs
:
dict
)
->
tuple
[
bool
,
dict
,
str
]:
"""
Process sensitive_word_avoidance.
:param app_id: app id
:param tenant_id: tenant id
:param app_generate_entity: app generate entity
:param inputs: inputs
:return:
"""
moderation_feature
=
InputModeration
()
return
moderation_feature
.
check
(
app_id
=
app_id
,
tenant_id
=
tenant_id
,
app_config
=
app_generate_entity
.
app_config
,
inputs
=
inputs
,
query
=
''
)
api/core/app/apps/workflow/generate_task_pipeline.py
0 → 100644
View file @
63721834
This diff is collapsed.
Click to expand it.
api/core/app/entities/app_invoke_entities.py
View file @
63721834
...
...
@@ -127,9 +127,9 @@ class AdvancedChatAppGenerateEntity(AppGenerateEntity):
query
:
Optional
[
str
]
=
None
class
Workflow
UIBased
AppGenerateEntity
(
AppGenerateEntity
):
class
WorkflowAppGenerateEntity
(
AppGenerateEntity
):
"""
Workflow
UI Based
Application Generate Entity.
Workflow Application Generate Entity.
"""
# app config
app_config
:
WorkflowUIBasedAppConfig
api/core/callback_handler/index_tool_callback_handler.py
View file @
63721834
from
core.app.app_queue_manager
import
AppQueueManager
,
PublishFrom
from
core.app.app
s.base_app
_queue_manager
import
AppQueueManager
,
PublishFrom
from
core.app.entities.app_invoke_entities
import
InvokeFrom
from
core.app.entities.queue_entities
import
QueueRetrieverResourcesEvent
from
core.rag.models.document
import
Document
from
extensions.ext_database
import
db
from
models.dataset
import
DatasetQuery
,
DocumentSegment
...
...
@@ -82,4 +83,7 @@ class DatasetIndexToolCallbackHandler:
db
.
session
.
add
(
dataset_retriever_resource
)
db
.
session
.
commit
()
self
.
_queue_manager
.
publish_retriever_resources
(
resource
,
PublishFrom
.
APPLICATION_MANAGER
)
self
.
_queue_manager
.
publish
(
QueueRetrieverResourcesEvent
(
retriever_resources
=
resource
),
PublishFrom
.
APPLICATION_MANAGER
)
api/core/callback_handler/workflow_event_trigger_callback.py
View file @
63721834
from
core.app.app_queue_manager
import
AppQueueManager
,
PublishFrom
from
core.app.apps.base_app_queue_manager
import
AppQueueManager
,
PublishFrom
from
core.app.entities.queue_entities
import
(
QueueNodeFinishedEvent
,
QueueNodeStartedEvent
,
QueueTextChunkEvent
,
QueueWorkflowFinishedEvent
,
QueueWorkflowStartedEvent
,
)
from
core.workflow.callbacks.base_workflow_callback
import
BaseWorkflowCallback
from
models.workflow
import
WorkflowNodeExecution
,
WorkflowRun
...
...
@@ -12,43 +19,45 @@ class WorkflowEventTriggerCallback(BaseWorkflowCallback):
"""
Workflow run started
"""
self
.
_queue_manager
.
publish
_workflow_started
(
workflow_run_id
=
workflow_run
.
id
,
pub_from
=
PublishFrom
.
TASK_PIPELINE
self
.
_queue_manager
.
publish
(
QueueWorkflowStartedEvent
(
workflow_run_id
=
workflow_run
.
id
)
,
PublishFrom
.
APPLICATION_MANAGER
)
def
on_workflow_run_finished
(
self
,
workflow_run
:
WorkflowRun
)
->
None
:
"""
Workflow run finished
"""
self
.
_queue_manager
.
publish
_workflow_finished
(
workflow_run_id
=
workflow_run
.
id
,
pub_from
=
PublishFrom
.
TASK_PIPELINE
self
.
_queue_manager
.
publish
(
QueueWorkflowFinishedEvent
(
workflow_run_id
=
workflow_run
.
id
)
,
PublishFrom
.
APPLICATION_MANAGER
)
def
on_workflow_node_execute_started
(
self
,
workflow_node_execution
:
WorkflowNodeExecution
)
->
None
:
"""
Workflow node execute started
"""
self
.
_queue_manager
.
publish
_node_started
(
workflow_node_execution_id
=
workflow_node_execution
.
id
,
pub_from
=
PublishFrom
.
TASK_PIPELINE
self
.
_queue_manager
.
publish
(
QueueNodeStartedEvent
(
workflow_node_execution_id
=
workflow_node_execution
.
id
)
,
PublishFrom
.
APPLICATION_MANAGER
)
def
on_workflow_node_execute_finished
(
self
,
workflow_node_execution
:
WorkflowNodeExecution
)
->
None
:
"""
Workflow node execute finished
"""
self
.
_queue_manager
.
publish
_node_finished
(
workflow_node_execution_id
=
workflow_node_execution
.
id
,
pub_from
=
PublishFrom
.
TASK_PIPELINE
self
.
_queue_manager
.
publish
(
QueueNodeFinishedEvent
(
workflow_node_execution_id
=
workflow_node_execution
.
id
)
,
PublishFrom
.
APPLICATION_MANAGER
)
def
on_text_chunk
(
self
,
text
:
str
)
->
None
:
"""
Publish text chunk
"""
self
.
_queue_manager
.
publish_text_chunk
(
text
=
text
,
pub_from
=
PublishFrom
.
TASK_PIPELINE
self
.
_queue_manager
.
publish
(
QueueTextChunkEvent
(
text
=
text
),
PublishFrom
.
APPLICATION_MANAGER
)
api/core/moderation/output_moderation.py
View file @
63721834
...
...
@@ -6,7 +6,8 @@ from typing import Any, Optional
from
flask
import
Flask
,
current_app
from
pydantic
import
BaseModel
from
core.app.app_queue_manager
import
PublishFrom
from
core.app.apps.base_app_queue_manager
import
AppQueueManager
,
PublishFrom
from
core.app.entities.queue_entities
import
QueueMessageReplaceEvent
from
core.moderation.base
import
ModerationAction
,
ModerationOutputsResult
from
core.moderation.factory
import
ModerationFactory
...
...
@@ -25,7 +26,7 @@ class OutputModeration(BaseModel):
app_id
:
str
rule
:
ModerationRule
on_message_replace_func
:
Any
queue_manager
:
AppQueueManager
thread
:
Optional
[
threading
.
Thread
]
=
None
thread_running
:
bool
=
True
...
...
@@ -67,7 +68,12 @@ class OutputModeration(BaseModel):
final_output
=
result
.
text
if
public_event
:
self
.
on_message_replace_func
(
final_output
,
PublishFrom
.
TASK_PIPELINE
)
self
.
queue_manager
.
publish
(
QueueMessageReplaceEvent
(
text
=
final_output
),
PublishFrom
.
TASK_PIPELINE
)
return
final_output
...
...
@@ -117,7 +123,12 @@ class OutputModeration(BaseModel):
# trigger replace event
if
self
.
thread_running
:
self
.
on_message_replace_func
(
final_output
,
PublishFrom
.
TASK_PIPELINE
)
self
.
queue_manager
.
publish
(
QueueMessageReplaceEvent
(
text
=
final_output
),
PublishFrom
.
TASK_PIPELINE
)
if
result
.
action
==
ModerationAction
.
DIRECT_OUTPUT
:
break
...
...
api/services/workflow_service.py
View file @
63721834
...
...
@@ -6,6 +6,7 @@ from typing import Optional, Union
from
core.app.apps.advanced_chat.app_config_manager
import
AdvancedChatAppConfigManager
from
core.app.apps.advanced_chat.app_generator
import
AdvancedChatAppGenerator
from
core.app.apps.workflow.app_config_manager
import
WorkflowAppConfigManager
from
core.app.apps.workflow.app_generator
import
WorkflowAppGenerator
from
core.app.entities.app_invoke_entities
import
InvokeFrom
from
core.workflow.entities.node_entities
import
NodeType
from
core.workflow.workflow_engine_manager
import
WorkflowEngineManager
...
...
@@ -175,8 +176,24 @@ class WorkflowService:
user
:
Union
[
Account
,
EndUser
],
args
:
dict
,
invoke_from
:
InvokeFrom
)
->
Union
[
dict
,
Generator
]:
# TODO
pass
# fetch draft workflow by app_model
draft_workflow
=
self
.
get_draft_workflow
(
app_model
=
app_model
)
if
not
draft_workflow
:
raise
ValueError
(
'Workflow not initialized'
)
# run draft workflow
app_generator
=
WorkflowAppGenerator
()
response
=
app_generator
.
generate
(
app_model
=
app_model
,
workflow
=
draft_workflow
,
user
=
user
,
args
=
args
,
invoke_from
=
invoke_from
,
stream
=
True
)
return
response
def
convert_to_workflow
(
self
,
app_model
:
App
,
account
:
Account
)
->
App
:
"""
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment