Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
D
dify
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ai-tech
dify
Commits
7d786468
Commit
7d786468
authored
Mar 04, 2024
by
takatost
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'feat/workflow-backend' into deploy/dev
parents
5d2ff412
e4df0293
Changes
17
Hide whitespace changes
Inline
Side-by-side
Showing
17 changed files
with
440 additions
and
90 deletions
+440
-90
workflow.py
api/controllers/console/app/workflow.py
+77
-9
cot_agent_runner.py
api/core/agent/cot_agent_runner.py
+2
-2
fc_agent_runner.py
api/core/agent/fc_agent_runner.py
+2
-2
app_generator.py
api/core/app/apps/advanced_chat/app_generator.py
+5
-13
app_runner.py
api/core/app/apps/advanced_chat/app_runner.py
+134
-43
base_app_runner.py
api/core/app/apps/base_app_runner.py
+1
-1
app_runner.py
api/core/app/apps/chat/app_runner.py
+1
-1
app_runner.py
api/core/app/apps/completion/app_runner.py
+1
-1
queue_entities.py
api/core/app/entities/queue_entities.py
+1
-0
node_entities.py
api/core/workflow/entities/node_entities.py
+9
-0
variable_pool.py
api/core/workflow/entities/variable_pool.py
+82
-0
base_node.py
api/core/workflow/nodes/base_node.py
+37
-0
workflow_engine_manager.py
api/core/workflow/workflow_engine_manager.py
+30
-4
workflow_fields.py
api/fields/workflow_fields.py
+2
-2
workflow_run_fields.py
api/fields/workflow_run_fields.py
+10
-10
workflow.py
api/models/workflow.py
+8
-0
workflow_service.py
api/services/workflow_service.py
+38
-2
No files found.
api/controllers/console/app/workflow.py
View file @
7d786468
import
json
import
logging
from
collections.abc
import
Generator
from
flask
import
Response
,
stream_with_context
from
flask_restful
import
Resource
,
marshal_with
,
reqparse
from
werkzeug.exceptions
import
InternalServerError
,
NotFound
import
services
from
controllers.console
import
api
from
controllers.console.app.error
import
DraftWorkflowNotExist
from
controllers.console.app.error
import
ConversationCompletedError
,
DraftWorkflowNotExist
from
controllers.console.app.wraps
import
get_app_model
from
controllers.console.setup
import
setup_required
from
controllers.console.wraps
import
account_initialization_required
from
core.app.entities.app_invoke_entities
import
InvokeFrom
from
fields.workflow_fields
import
workflow_fields
from
libs.helper
import
uuid_value
from
libs.login
import
current_user
,
login_required
from
models.model
import
App
,
AppMode
from
services.workflow_service
import
WorkflowService
logger
=
logging
.
getLogger
(
__name__
)
class
DraftWorkflowApi
(
Resource
):
@
setup_required
...
...
@@ -59,23 +68,80 @@ class DraftWorkflowApi(Resource):
}
class
AdvancedChatDraftWorkflowRunApi
(
Resource
):
@
setup_required
@
login_required
@
account_initialization_required
@
get_app_model
(
mode
=
[
AppMode
.
ADVANCED_CHAT
])
def
post
(
self
,
app_model
:
App
):
"""
Run draft workflow
"""
parser
=
reqparse
.
RequestParser
()
parser
.
add_argument
(
'inputs'
,
type
=
dict
,
required
=
True
,
location
=
'json'
)
parser
.
add_argument
(
'query'
,
type
=
str
,
location
=
'json'
,
default
=
''
)
parser
.
add_argument
(
'files'
,
type
=
list
,
required
=
False
,
location
=
'json'
)
parser
.
add_argument
(
'conversation_id'
,
type
=
uuid_value
,
location
=
'json'
)
args
=
parser
.
parse_args
()
workflow_service
=
WorkflowService
()
try
:
response
=
workflow_service
.
run_advanced_chat_draft_workflow
(
app_model
=
app_model
,
user
=
current_user
,
args
=
args
,
invoke_from
=
InvokeFrom
.
DEBUGGER
)
except
services
.
errors
.
conversation
.
ConversationNotExistsError
:
raise
NotFound
(
"Conversation Not Exists."
)
except
services
.
errors
.
conversation
.
ConversationCompletedError
:
raise
ConversationCompletedError
()
except
ValueError
as
e
:
raise
e
except
Exception
as
e
:
logging
.
exception
(
"internal server error."
)
raise
InternalServerError
()
def
generate
()
->
Generator
:
yield
from
response
return
Response
(
stream_with_context
(
generate
()),
status
=
200
,
mimetype
=
'text/event-stream'
)
class
DraftWorkflowRunApi
(
Resource
):
@
setup_required
@
login_required
@
account_initialization_required
@
get_app_model
(
mode
=
[
AppMode
.
ADVANCED_CHAT
,
AppMode
.
WORKFLOW
])
@
get_app_model
(
mode
=
[
AppMode
.
WORKFLOW
])
def
post
(
self
,
app_model
:
App
):
"""
Run draft workflow
"""
# TODO
parser
=
reqparse
.
RequestParser
()
parser
.
add_argument
(
'inputs'
,
type
=
dict
,
required
=
True
,
location
=
'json'
)
args
=
parser
.
parse_args
()
workflow_service
=
WorkflowService
()
workflow_service
.
run_draft_workflow
(
app_model
=
app_model
,
account
=
current_user
)
# TODO
return
{
"result"
:
"success"
}
try
:
response
=
workflow_service
.
run_draft_workflow
(
app_model
=
app_model
,
user
=
current_user
,
args
=
args
,
invoke_from
=
InvokeFrom
.
DEBUGGER
)
except
ValueError
as
e
:
raise
e
except
Exception
as
e
:
logging
.
exception
(
"internal server error."
)
raise
InternalServerError
()
def
generate
()
->
Generator
:
yield
from
response
return
Response
(
stream_with_context
(
generate
()),
status
=
200
,
mimetype
=
'text/event-stream'
)
class
WorkflowTaskStopApi
(
Resource
):
...
...
@@ -214,10 +280,12 @@ class ConvertToWorkflowApi(Resource):
api
.
add_resource
(
DraftWorkflowApi
,
'/apps/<uuid:app_id>/workflows/draft'
)
api
.
add_resource
(
AdvancedChatDraftWorkflowRunApi
,
'/apps/<uuid:app_id>/advanced-chat/workflows/draft/run'
)
api
.
add_resource
(
DraftWorkflowRunApi
,
'/apps/<uuid:app_id>/workflows/draft/run'
)
api
.
add_resource
(
WorkflowTaskStopApi
,
'/apps/<uuid:app_id>/workflows/tasks/<string:task_id>/stop'
)
api
.
add_resource
(
DraftWorkflowNodeRunApi
,
'/apps/<uuid:app_id>/workflows/draft/nodes/<uuid:node_id>/run'
)
api
.
add_resource
(
PublishedWorkflowApi
,
'/apps/<uuid:app_id>/workflows/published'
)
api
.
add_resource
(
DefaultBlockConfigsApi
,
'/apps/<uuid:app_id>/workflows/default-workflow-block-configs'
)
api
.
add_resource
(
DefaultBlockConfigApi
,
'/apps/<uuid:app_id>/workflows/default-workflow-block-configs/:block_type'
)
api
.
add_resource
(
DefaultBlockConfigApi
,
'/apps/<uuid:app_id>/workflows/default-workflow-block-configs'
'/<string:block_type>'
)
api
.
add_resource
(
ConvertToWorkflowApi
,
'/apps/<uuid:app_id>/convert-to-workflow'
)
api/core/agent/cot_agent_runner.py
View file @
7d786468
...
...
@@ -132,8 +132,8 @@ class CotAgentRunner(BaseAgentRunner):
input
=
query
)
# recal
e
llm max tokens
self
.
recal
e
_llm_max_tokens
(
self
.
model_config
,
prompt_messages
)
# recal
c
llm max tokens
self
.
recal
c
_llm_max_tokens
(
self
.
model_config
,
prompt_messages
)
# invoke model
chunks
:
Generator
[
LLMResultChunk
,
None
,
None
]
=
model_instance
.
invoke_llm
(
prompt_messages
=
prompt_messages
,
...
...
api/core/agent/fc_agent_runner.py
View file @
7d786468
...
...
@@ -107,8 +107,8 @@ class FunctionCallAgentRunner(BaseAgentRunner):
messages_ids
=
message_file_ids
)
# recal
e
llm max tokens
self
.
recal
e
_llm_max_tokens
(
self
.
model_config
,
prompt_messages
)
# recal
c
llm max tokens
self
.
recal
c
_llm_max_tokens
(
self
.
model_config
,
prompt_messages
)
# invoke model
chunks
:
Union
[
Generator
[
LLMResultChunk
,
None
,
None
],
LLMResult
]
=
model_instance
.
invoke_llm
(
prompt_messages
=
prompt_messages
,
...
...
api/core/app/apps/advanced_chat/app_generator.py
View file @
7d786468
...
...
@@ -2,7 +2,7 @@ import logging
import
threading
import
uuid
from
collections.abc
import
Generator
from
typing
import
Any
,
Union
from
typing
import
Union
from
flask
import
Flask
,
current_app
from
pydantic
import
ValidationError
...
...
@@ -16,18 +16,19 @@ from core.app.apps.message_based_app_generator import MessageBasedAppGenerator
from
core.app.entities.app_invoke_entities
import
AdvancedChatAppGenerateEntity
,
InvokeFrom
from
core.file.message_file_parser
import
MessageFileParser
from
core.model_runtime.errors.invoke
import
InvokeAuthorizationError
,
InvokeError
from
core.workflow.workflow_engine_manager
import
WorkflowEngineManager
from
extensions.ext_database
import
db
from
models.account
import
Account
from
models.model
import
App
,
Conversation
,
EndUser
,
Message
from
models.workflow
import
Workflow
logger
=
logging
.
getLogger
(
__name__
)
class
AdvancedChatAppGenerator
(
MessageBasedAppGenerator
):
def
generate
(
self
,
app_model
:
App
,
workflow
:
Workflow
,
user
:
Union
[
Account
,
EndUser
],
args
:
Any
,
args
:
dict
,
invoke_from
:
InvokeFrom
,
stream
:
bool
=
True
)
\
->
Union
[
dict
,
Generator
]:
...
...
@@ -35,6 +36,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
Generate App response.
:param app_model: App
:param workflow: Workflow
:param user: account or end user
:param args: request args
:param invoke_from: invoke from source
...
...
@@ -59,16 +61,6 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
if
args
.
get
(
'conversation_id'
):
conversation
=
self
.
_get_conversation_by_user
(
app_model
,
args
.
get
(
'conversation_id'
),
user
)
# get workflow
workflow_engine_manager
=
WorkflowEngineManager
()
if
invoke_from
==
InvokeFrom
.
DEBUGGER
:
workflow
=
workflow_engine_manager
.
get_draft_workflow
(
app_model
=
app_model
)
else
:
workflow
=
workflow_engine_manager
.
get_published_workflow
(
app_model
=
app_model
)
if
not
workflow
:
raise
ValueError
(
'Workflow not initialized'
)
# parse files
files
=
args
[
'files'
]
if
'files'
in
args
and
args
[
'files'
]
else
[]
message_file_parser
=
MessageFileParser
(
tenant_id
=
app_model
.
tenant_id
,
app_id
=
app_model
.
id
)
...
...
api/core/app/apps/advanced_chat/app_runner.py
View file @
7d786468
import
logging
import
time
from
typing
import
cast
from
core.app.app_queue_manager
import
AppQueueManager
,
PublishFrom
...
...
@@ -6,10 +7,15 @@ from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfig
from
core.app.apps.base_app_runner
import
AppRunner
from
core.app.entities.app_invoke_entities
import
(
AdvancedChatAppGenerateEntity
,
InvokeFrom
,
)
from
core.app.entities.queue_entities
import
QueueStopEvent
from
core.moderation.base
import
ModerationException
from
core.workflow.entities.node_entities
import
SystemVariable
from
core.workflow.workflow_engine_manager
import
WorkflowEngineManager
from
extensions.ext_database
import
db
from
models.model
import
App
,
Conversation
,
Message
from
models.account
import
Account
from
models.model
import
App
,
Conversation
,
EndUser
,
Message
logger
=
logging
.
getLogger
(
__name__
)
...
...
@@ -38,66 +44,151 @@ class AdvancedChatAppRunner(AppRunner):
if
not
app_record
:
raise
ValueError
(
"App not found"
)
workflow
=
WorkflowEngineManager
()
.
get_workflow
(
app_model
=
app_record
,
workflow_id
=
app_config
.
workflow_id
)
if
not
workflow
:
raise
ValueError
(
"Workflow not initialized"
)
inputs
=
application_generate_entity
.
inputs
query
=
application_generate_entity
.
query
files
=
application_generate_entity
.
files
# moderation
if
self
.
handle_input_moderation
(
queue_manager
=
queue_manager
,
app_record
=
app_record
,
app_generate_entity
=
application_generate_entity
,
inputs
=
inputs
,
query
=
query
):
return
# annotation reply
if
self
.
handle_annotation_reply
(
app_record
=
app_record
,
message
=
message
,
query
=
query
,
queue_manager
=
queue_manager
,
app_generate_entity
=
application_generate_entity
):
return
# fetch user
if
application_generate_entity
.
invoke_from
in
[
InvokeFrom
.
DEBUGGER
,
InvokeFrom
.
EXPLORE
]:
user
=
db
.
session
.
query
(
Account
)
.
filter
(
Account
.
id
==
application_generate_entity
.
user_id
)
.
first
()
else
:
user
=
db
.
session
.
query
(
EndUser
)
.
filter
(
EndUser
.
id
==
application_generate_entity
.
user_id
)
.
first
()
# RUN WORKFLOW
workflow_engine_manager
=
WorkflowEngineManager
()
result_generator
=
workflow_engine_manager
.
run_workflow
(
app_model
=
app_record
,
workflow
=
workflow
,
user
=
user
,
user_inputs
=
inputs
,
system_inputs
=
{
SystemVariable
.
QUERY
:
query
,
SystemVariable
.
FILES
:
files
,
SystemVariable
.
CONVERSATION
:
conversation
.
id
,
}
)
for
result
in
result_generator
:
# todo handle workflow and node event
pass
def
handle_input_moderation
(
self
,
queue_manager
:
AppQueueManager
,
app_record
:
App
,
app_generate_entity
:
AdvancedChatAppGenerateEntity
,
inputs
:
dict
,
query
:
str
)
->
bool
:
"""
Handle input moderation
:param queue_manager: application queue manager
:param app_record: app record
:param app_generate_entity: application generate entity
:param inputs: inputs
:param query: query
:return:
"""
try
:
# process sensitive_word_avoidance
_
,
inputs
,
query
=
self
.
moderation_for_inputs
(
app_id
=
app_record
.
id
,
tenant_id
=
app_config
.
tenant_id
,
app_generate_entity
=
app
lication
_generate_entity
,
tenant_id
=
app_
generate_entity
.
app_
config
.
tenant_id
,
app_generate_entity
=
app_generate_entity
,
inputs
=
inputs
,
query
=
query
,
)
except
ModerationException
as
e
:
# TODO
self
.
direct_output
(
self
.
_stream_output
(
queue_manager
=
queue_manager
,
app_generate_entity
=
application_generate_entity
,
prompt_messages
=
prompt_messages
,
text
=
str
(
e
),
stream
=
application_generate_entity
.
stream
stream
=
app_generate_entity
.
stream
,
stopped_by
=
QueueStopEvent
.
StopBy
.
INPUT_MODERATION
)
return
return
True
if
query
:
# annotation reply
annotation_reply
=
self
.
query_app_annotations_to_reply
(
app_record
=
app_record
,
message
=
message
,
query
=
query
,
user_id
=
application_generate_entity
.
user_id
,
invoke_from
=
application_generate_entity
.
invoke_from
)
return
False
if
annotation_reply
:
queue_manager
.
publish_annotation_reply
(
message_annotation_id
=
annotation_reply
.
id
,
pub_from
=
PublishFrom
.
APPLICATION_MANAGER
)
# TODO
self
.
direct_output
(
queue_manager
=
queue_manager
,
app_generate_entity
=
application_generate_entity
,
prompt_messages
=
prompt_messages
,
text
=
annotation_reply
.
content
,
stream
=
application_generate_entity
.
stream
)
return
# check hosting moderation
# TODO
hosting_moderation_result
=
self
.
check_hosting_moderation
(
application_generate_entity
=
application_generate_entity
,
queue_manager
=
queue_manager
,
prompt_messages
=
prompt_messages
def
handle_annotation_reply
(
self
,
app_record
:
App
,
message
:
Message
,
query
:
str
,
queue_manager
:
AppQueueManager
,
app_generate_entity
:
AdvancedChatAppGenerateEntity
)
->
bool
:
"""
Handle annotation reply
:param app_record: app record
:param message: message
:param query: query
:param queue_manager: application queue manager
:param app_generate_entity: application generate entity
"""
# annotation reply
annotation_reply
=
self
.
query_app_annotations_to_reply
(
app_record
=
app_record
,
message
=
message
,
query
=
query
,
user_id
=
app_generate_entity
.
user_id
,
invoke_from
=
app_generate_entity
.
invoke_from
)
if
hosting_moderation_result
:
return
if
annotation_reply
:
queue_manager
.
publish_annotation_reply
(
message_annotation_id
=
annotation_reply
.
id
,
pub_from
=
PublishFrom
.
APPLICATION_MANAGER
)
self
.
_stream_output
(
queue_manager
=
queue_manager
,
text
=
annotation_reply
.
content
,
stream
=
app_generate_entity
.
stream
,
stopped_by
=
QueueStopEvent
.
StopBy
.
ANNOTATION_REPLY
)
return
True
return
False
# todo RUN WORKFLOW
\ No newline at end of file
def
_stream_output
(
self
,
queue_manager
:
AppQueueManager
,
text
:
str
,
stream
:
bool
,
stopped_by
:
QueueStopEvent
.
StopBy
)
->
None
:
"""
Direct output
:param queue_manager: application queue manager
:param text: text
:param stream: stream
:return:
"""
if
stream
:
index
=
0
for
token
in
text
:
queue_manager
.
publish_text_chunk
(
token
,
PublishFrom
.
APPLICATION_MANAGER
)
index
+=
1
time
.
sleep
(
0.01
)
queue_manager
.
publish
(
QueueStopEvent
(
stopped_by
=
stopped_by
),
PublishFrom
.
APPLICATION_MANAGER
)
queue_manager
.
stop_listen
()
api/core/app/apps/base_app_runner.py
View file @
7d786468
...
...
@@ -84,7 +84,7 @@ class AppRunner:
return
rest_tokens
def
recal
e
_llm_max_tokens
(
self
,
model_config
:
ModelConfigWithCredentialsEntity
,
def
recal
c
_llm_max_tokens
(
self
,
model_config
:
ModelConfigWithCredentialsEntity
,
prompt_messages
:
list
[
PromptMessage
]):
# recalc max_tokens if sum(prompt_token + max_tokens) over model token limit
model_type_instance
=
model_config
.
provider_model_bundle
.
model_type_instance
...
...
api/core/app/apps/chat/app_runner.py
View file @
7d786468
...
...
@@ -189,7 +189,7 @@ class ChatAppRunner(AppRunner):
return
# Re-calculate the max tokens if sum(prompt_token + max_tokens) over model token limit
self
.
recal
e
_llm_max_tokens
(
self
.
recal
c
_llm_max_tokens
(
model_config
=
application_generate_entity
.
model_config
,
prompt_messages
=
prompt_messages
)
...
...
api/core/app/apps/completion/app_runner.py
View file @
7d786468
...
...
@@ -149,7 +149,7 @@ class CompletionAppRunner(AppRunner):
return
# Re-calculate the max tokens if sum(prompt_token + max_tokens) over model token limit
self
.
recal
e
_llm_max_tokens
(
self
.
recal
c
_llm_max_tokens
(
model_config
=
application_generate_entity
.
model_config
,
prompt_messages
=
prompt_messages
)
...
...
api/core/app/entities/queue_entities.py
View file @
7d786468
...
...
@@ -165,6 +165,7 @@ class QueueStopEvent(AppQueueEvent):
USER_MANUAL
=
"user-manual"
ANNOTATION_REPLY
=
"annotation-reply"
OUTPUT_MODERATION
=
"output-moderation"
INPUT_MODERATION
=
"input-moderation"
event
=
QueueEvent
.
STOP
stopped_by
:
StopBy
...
...
api/core/workflow/entities/node_entities.py
View file @
7d786468
...
...
@@ -30,3 +30,12 @@ class NodeType(Enum):
if
node_type
.
value
==
value
:
return
node_type
raise
ValueError
(
f
'invalid node type value {value}'
)
class
SystemVariable
(
Enum
):
"""
System Variables.
"""
QUERY
=
'query'
FILES
=
'files'
CONVERSATION
=
'conversation'
api/core/workflow/entities/variable_pool.py
0 → 100644
View file @
7d786468
from
enum
import
Enum
from
typing
import
Any
,
Optional
,
Union
from
core.workflow.entities.node_entities
import
SystemVariable
VariableValue
=
Union
[
str
,
int
,
float
,
dict
,
list
]
class
ValueType
(
Enum
):
"""
Value Type Enum
"""
STRING
=
"string"
NUMBER
=
"number"
OBJECT
=
"object"
ARRAY
=
"array"
FILE
=
"file"
class
VariablePool
:
variables_mapping
=
{}
def
__init__
(
self
,
system_variables
:
dict
[
SystemVariable
,
Any
])
->
None
:
# system variables
# for example:
# {
# 'query': 'abc',
# 'files': []
# }
for
system_variable
,
value
in
system_variables
.
items
():
self
.
append_variable
(
'sys'
,
[
system_variable
.
value
],
value
)
def
append_variable
(
self
,
node_id
:
str
,
variable_key_list
:
list
[
str
],
value
:
VariableValue
)
->
None
:
"""
Append variable
:param node_id: node id
:param variable_key_list: variable key list, like: ['result', 'text']
:param value: value
:return:
"""
if
node_id
not
in
self
.
variables_mapping
:
self
.
variables_mapping
[
node_id
]
=
{}
variable_key_list_hash
=
hash
(
tuple
(
variable_key_list
))
self
.
variables_mapping
[
node_id
][
variable_key_list_hash
]
=
value
def
get_variable_value
(
self
,
variable_selector
:
list
[
str
],
target_value_type
:
Optional
[
ValueType
]
=
None
)
->
Optional
[
VariableValue
]:
"""
Get variable
:param variable_selector: include node_id and variables
:param target_value_type: target value type
:return:
"""
if
len
(
variable_selector
)
<
2
:
raise
ValueError
(
'Invalid value selector'
)
node_id
=
variable_selector
[
0
]
if
node_id
not
in
self
.
variables_mapping
:
return
None
# fetch variable keys, pop node_id
variable_key_list
=
variable_selector
[
1
:]
variable_key_list_hash
=
hash
(
tuple
(
variable_key_list
))
value
=
self
.
variables_mapping
[
node_id
]
.
get
(
variable_key_list_hash
)
if
target_value_type
:
if
target_value_type
==
ValueType
.
STRING
:
return
str
(
value
)
elif
target_value_type
==
ValueType
.
NUMBER
:
return
int
(
value
)
elif
target_value_type
==
ValueType
.
OBJECT
:
if
not
isinstance
(
value
,
dict
):
raise
ValueError
(
'Invalid value type: object'
)
elif
target_value_type
==
ValueType
.
ARRAY
:
if
not
isinstance
(
value
,
list
):
raise
ValueError
(
'Invalid value type: array'
)
return
value
api/core/workflow/nodes/base_node.py
View file @
7d786468
from
abc
import
abstractmethod
from
typing
import
Optional
from
core.workflow.entities.node_entities
import
NodeType
from
core.workflow.entities.variable_pool
import
VariablePool
class
BaseNode
:
_node_type
:
NodeType
def
__int__
(
self
,
node_config
:
dict
)
->
None
:
self
.
_node_config
=
node_config
@
abstractmethod
def
run
(
self
,
variable_pool
:
Optional
[
VariablePool
]
=
None
,
run_args
:
Optional
[
dict
]
=
None
)
->
dict
:
"""
Run node
:param variable_pool: variable pool
:param run_args: run args
:return:
"""
if
variable_pool
is
None
and
run_args
is
None
:
raise
ValueError
(
"At least one of `variable_pool` or `run_args` must be provided."
)
return
self
.
_run
(
variable_pool
=
variable_pool
,
run_args
=
run_args
)
@
abstractmethod
def
_run
(
self
,
variable_pool
:
Optional
[
VariablePool
]
=
None
,
run_args
:
Optional
[
dict
]
=
None
)
->
dict
:
"""
Run node
:param variable_pool: variable pool
:param run_args: run args
:return:
"""
raise
NotImplementedError
@
classmethod
def
get_default_config
(
cls
,
filters
:
Optional
[
dict
]
=
None
)
->
dict
:
"""
...
...
api/core/workflow/workflow_engine_manager.py
View file @
7d786468
from
typing
import
Optional
from
collections.abc
import
Generator
from
typing
import
Optional
,
Union
from
core.workflow.entities.node_entities
import
NodeType
from
core.workflow.nodes.code.code_node
import
CodeNode
...
...
@@ -14,7 +15,8 @@ from core.workflow.nodes.template_transform.template_transform_node import Templ
from
core.workflow.nodes.tool.tool_node
import
ToolNode
from
core.workflow.nodes.variable_assigner.variable_assigner_node
import
VariableAssignerNode
from
extensions.ext_database
import
db
from
models.model
import
App
from
models.account
import
Account
from
models.model
import
App
,
EndUser
from
models.workflow
import
Workflow
node_classes
=
{
...
...
@@ -56,13 +58,20 @@ class WorkflowEngineManager:
return
None
# fetch published workflow by workflow_id
return
self
.
get_workflow
(
app_model
,
app_model
.
workflow_id
)
def
get_workflow
(
self
,
app_model
:
App
,
workflow_id
:
str
)
->
Optional
[
Workflow
]:
"""
Get workflow
"""
# fetch workflow by workflow_id
workflow
=
db
.
session
.
query
(
Workflow
)
.
filter
(
Workflow
.
tenant_id
==
app_model
.
tenant_id
,
Workflow
.
app_id
==
app_model
.
id
,
Workflow
.
id
==
app_model
.
workflow_id
Workflow
.
id
==
workflow_id
)
.
first
()
# return
published
workflow
# return workflow
return
workflow
def
get_default_configs
(
self
)
->
list
[
dict
]:
...
...
@@ -96,3 +105,20 @@ class WorkflowEngineManager:
return
None
return
default_config
def
run_workflow
(
self
,
app_model
:
App
,
workflow
:
Workflow
,
user
:
Union
[
Account
,
EndUser
],
user_inputs
:
dict
,
system_inputs
:
Optional
[
dict
]
=
None
)
->
Generator
:
"""
Run workflow
:param app_model: App instance
:param workflow: Workflow instance
:param user: account or end user
:param user_inputs: user variables inputs
:param system_inputs: system inputs, like: query, files
:return:
"""
# TODO
pass
api/fields/workflow_fields.py
View file @
7d786468
...
...
@@ -5,8 +5,8 @@ from libs.helper import TimestampField
workflow_fields
=
{
'id'
:
fields
.
String
,
'graph'
:
fields
.
Nested
(
simple_account_fields
,
attribute
=
'graph_dict'
),
'features'
:
fields
.
Nested
(
simple_account_fields
,
attribute
=
'features_dict'
),
'graph'
:
fields
.
Raw
(
attribute
=
'graph_dict'
),
'features'
:
fields
.
Raw
(
attribute
=
'features_dict'
),
'created_by'
:
fields
.
Nested
(
simple_account_fields
,
attribute
=
'created_by_account'
),
'created_at'
:
TimestampField
,
'updated_by'
:
fields
.
Nested
(
simple_account_fields
,
attribute
=
'updated_by_account'
,
allow_null
=
True
),
...
...
api/fields/workflow_run_fields.py
View file @
7d786468
...
...
@@ -22,10 +22,10 @@ workflow_run_for_list_fields = {
"id"
:
fields
.
String
,
"sequence_number"
:
fields
.
Integer
,
"version"
:
fields
.
String
,
"graph"
:
fields
.
String
,
"inputs"
:
fields
.
String
,
"graph"
:
fields
.
Raw
(
attribute
=
'graph_dict'
)
,
"inputs"
:
fields
.
Raw
(
attribute
=
'inputs_dict'
)
,
"status"
:
fields
.
String
,
"outputs"
:
fields
.
String
,
"outputs"
:
fields
.
Raw
(
attribute
=
'outputs_dict'
)
,
"error"
:
fields
.
String
,
"elapsed_time"
:
fields
.
Float
,
"total_tokens"
:
fields
.
Integer
,
...
...
@@ -49,10 +49,10 @@ workflow_run_detail_fields = {
"id"
:
fields
.
String
,
"sequence_number"
:
fields
.
Integer
,
"version"
:
fields
.
String
,
"graph"
:
fields
.
String
,
"inputs"
:
fields
.
String
,
"graph"
:
fields
.
Raw
(
attribute
=
'graph_dict'
)
,
"inputs"
:
fields
.
Raw
(
attribute
=
'inputs_dict'
)
,
"status"
:
fields
.
String
,
"outputs"
:
fields
.
String
,
"outputs"
:
fields
.
Raw
(
attribute
=
'outputs_dict'
)
,
"error"
:
fields
.
String
,
"elapsed_time"
:
fields
.
Float
,
"total_tokens"
:
fields
.
Integer
,
...
...
@@ -73,13 +73,13 @@ workflow_run_node_execution_fields = {
"node_id"
:
fields
.
String
,
"node_type"
:
fields
.
String
,
"title"
:
fields
.
String
,
"inputs"
:
fields
.
String
,
"process_data"
:
fields
.
String
,
"outputs"
:
fields
.
String
,
"inputs"
:
fields
.
Raw
(
attribute
=
'inputs_dict'
)
,
"process_data"
:
fields
.
Raw
(
attribute
=
'process_data_dict'
)
,
"outputs"
:
fields
.
Raw
(
attribute
=
'outputs_dict'
)
,
"status"
:
fields
.
String
,
"error"
:
fields
.
String
,
"elapsed_time"
:
fields
.
Float
,
"execution_metadata"
:
fields
.
String
,
"execution_metadata"
:
fields
.
Raw
(
attribute
=
'execution_metadata_dict'
)
,
"created_at"
:
TimestampField
,
"created_by_role"
:
fields
.
String
,
"created_by_account"
:
fields
.
Nested
(
simple_account_fields
,
attribute
=
'created_by_account'
,
allow_null
=
True
),
...
...
api/models/workflow.py
View file @
7d786468
...
...
@@ -272,6 +272,14 @@ class WorkflowRun(db.Model):
return
EndUser
.
query
.
get
(
self
.
created_by
)
\
if
created_by_role
==
CreatedByRole
.
END_USER
else
None
@
property
def
graph_dict
(
self
):
return
self
.
graph
if
not
self
.
graph
else
json
.
loads
(
self
.
graph
)
@
property
def
inputs_dict
(
self
):
return
self
.
inputs
if
not
self
.
inputs
else
json
.
loads
(
self
.
inputs
)
@
property
def
outputs_dict
(
self
):
return
self
.
outputs
if
not
self
.
outputs
else
json
.
loads
(
self
.
outputs
)
...
...
api/services/workflow_service.py
View file @
7d786468
import
json
from
collections.abc
import
Generator
from
datetime
import
datetime
from
typing
import
Optional
from
typing
import
Optional
,
Union
from
core.app.apps.advanced_chat.app_config_manager
import
AdvancedChatAppConfigManager
from
core.app.apps.advanced_chat.app_generator
import
AdvancedChatAppGenerator
from
core.app.apps.workflow.app_config_manager
import
WorkflowAppConfigManager
from
core.app.entities.app_invoke_entities
import
InvokeFrom
from
core.workflow.entities.node_entities
import
NodeType
from
core.workflow.workflow_engine_manager
import
WorkflowEngineManager
from
extensions.ext_database
import
db
from
models.account
import
Account
from
models.model
import
App
,
AppMode
from
models.model
import
App
,
AppMode
,
EndUser
from
models.workflow
import
Workflow
,
WorkflowType
from
services.workflow.workflow_converter
import
WorkflowConverter
...
...
@@ -142,6 +145,39 @@ class WorkflowService:
workflow_engine_manager
=
WorkflowEngineManager
()
return
workflow_engine_manager
.
get_default_config
(
node_type
,
filters
)
def
run_advanced_chat_draft_workflow
(
self
,
app_model
:
App
,
user
:
Union
[
Account
,
EndUser
],
args
:
dict
,
invoke_from
:
InvokeFrom
)
->
Union
[
dict
,
Generator
]:
"""
Run advanced chatbot draft workflow
"""
# fetch draft workflow by app_model
draft_workflow
=
self
.
get_draft_workflow
(
app_model
=
app_model
)
if
not
draft_workflow
:
raise
ValueError
(
'Workflow not initialized'
)
# run draft workflow
app_generator
=
AdvancedChatAppGenerator
()
response
=
app_generator
.
generate
(
app_model
=
app_model
,
workflow
=
draft_workflow
,
user
=
user
,
args
=
args
,
invoke_from
=
invoke_from
,
stream
=
True
)
return
response
def
run_draft_workflow
(
self
,
app_model
:
App
,
user
:
Union
[
Account
,
EndUser
],
args
:
dict
,
invoke_from
:
InvokeFrom
)
->
Union
[
dict
,
Generator
]:
# TODO
pass
def
convert_to_workflow
(
self
,
app_model
:
App
,
account
:
Account
)
->
App
:
"""
Basic mode of chatbot app(expert mode) to workflow
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment