Commit 3d6b0669 authored by takatost's avatar takatost

optimize db connections

parent 0386061f
...@@ -27,6 +27,7 @@ DEFAULTS = { ...@@ -27,6 +27,7 @@ DEFAULTS = {
'CHECK_UPDATE_URL': 'https://updates.dify.ai', 'CHECK_UPDATE_URL': 'https://updates.dify.ai',
'DEPLOY_ENV': 'PRODUCTION', 'DEPLOY_ENV': 'PRODUCTION',
'SQLALCHEMY_POOL_SIZE': 30, 'SQLALCHEMY_POOL_SIZE': 30,
'SQLALCHEMY_MAX_OVERFLOW': 10,
'SQLALCHEMY_POOL_RECYCLE': 3600, 'SQLALCHEMY_POOL_RECYCLE': 3600,
'SQLALCHEMY_ECHO': 'False', 'SQLALCHEMY_ECHO': 'False',
'SENTRY_TRACES_SAMPLE_RATE': 1.0, 'SENTRY_TRACES_SAMPLE_RATE': 1.0,
...@@ -148,6 +149,7 @@ class Config: ...@@ -148,6 +149,7 @@ class Config:
self.SQLALCHEMY_DATABASE_URI = f"postgresql://{db_credentials['DB_USERNAME']}:{db_credentials['DB_PASSWORD']}@{db_credentials['DB_HOST']}:{db_credentials['DB_PORT']}/{db_credentials['DB_DATABASE']}{db_extras}" self.SQLALCHEMY_DATABASE_URI = f"postgresql://{db_credentials['DB_USERNAME']}:{db_credentials['DB_PASSWORD']}@{db_credentials['DB_HOST']}:{db_credentials['DB_PORT']}/{db_credentials['DB_DATABASE']}{db_extras}"
self.SQLALCHEMY_ENGINE_OPTIONS = { self.SQLALCHEMY_ENGINE_OPTIONS = {
'pool_size': int(get_env('SQLALCHEMY_POOL_SIZE')), 'pool_size': int(get_env('SQLALCHEMY_POOL_SIZE')),
'max_overflow': int(get_env('SQLALCHEMY_MAX_OVERFLOW')),
'pool_recycle': int(get_env('SQLALCHEMY_POOL_RECYCLE')) 'pool_recycle': int(get_env('SQLALCHEMY_POOL_RECYCLE'))
} }
......
...@@ -95,6 +95,12 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): ...@@ -95,6 +95,12 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
extras=extras extras=extras
) )
workflow = db.session.query(Workflow).filter(Workflow.id == workflow.id).first()
user = (db.session.query(Account).filter(Account.id == user.id).first()
if isinstance(user, Account)
else db.session.query(EndUser).filter(EndUser.id == user.id).first())
db.session.close()
# init generate records # init generate records
( (
conversation, conversation,
...@@ -153,6 +159,8 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): ...@@ -153,6 +159,8 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
conversation = self._get_conversation(conversation_id) conversation = self._get_conversation(conversation_id)
message = self._get_message(message_id) message = self._get_message(message_id)
db.session.close()
# chatbot app # chatbot app
runner = AdvancedChatAppRunner() runner = AdvancedChatAppRunner()
runner.run( runner.run(
...@@ -177,7 +185,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): ...@@ -177,7 +185,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
logger.exception("Unknown Error when generating") logger.exception("Unknown Error when generating")
queue_manager.publish_error(e, PublishFrom.APPLICATION_MANAGER) queue_manager.publish_error(e, PublishFrom.APPLICATION_MANAGER)
finally: finally:
db.session.remove() db.session.close()
def _handle_advanced_chat_response(self, application_generate_entity: AdvancedChatAppGenerateEntity, def _handle_advanced_chat_response(self, application_generate_entity: AdvancedChatAppGenerateEntity,
workflow: Workflow, workflow: Workflow,
...@@ -198,6 +206,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): ...@@ -198,6 +206,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
:return: :return:
""" """
# init generate task pipeline # init generate task pipeline
generate_task_pipeline = AdvancedChatAppGenerateTaskPipeline( generate_task_pipeline = AdvancedChatAppGenerateTaskPipeline(
application_generate_entity=application_generate_entity, application_generate_entity=application_generate_entity,
workflow=workflow, workflow=workflow,
...@@ -216,5 +225,3 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): ...@@ -216,5 +225,3 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
else: else:
logger.exception(e) logger.exception(e)
raise e raise e
# finally:
# db.session.remove()
...@@ -122,6 +122,8 @@ class AdvancedChatAppGenerateTaskPipeline(WorkflowBasedGenerateTaskPipeline): ...@@ -122,6 +122,8 @@ class AdvancedChatAppGenerateTaskPipeline(WorkflowBasedGenerateTaskPipeline):
self._output_moderation_handler = self._init_output_moderation() self._output_moderation_handler = self._init_output_moderation()
self._stream = stream self._stream = stream
db.session.close()
def process(self) -> Union[dict, Generator]: def process(self) -> Union[dict, Generator]:
""" """
Process generate task pipeline. Process generate task pipeline.
......
...@@ -177,6 +177,9 @@ class MessageBasedAppGenerator(BaseAppGenerator): ...@@ -177,6 +177,9 @@ class MessageBasedAppGenerator(BaseAppGenerator):
db.session.add(conversation) db.session.add(conversation)
db.session.commit() db.session.commit()
conversation = db.session.query(Conversation).filter(Conversation.id == conversation.id).first()
db.session.close()
message = Message( message = Message(
app_id=app_config.app_id, app_id=app_config.app_id,
model_provider=model_provider, model_provider=model_provider,
...@@ -204,6 +207,9 @@ class MessageBasedAppGenerator(BaseAppGenerator): ...@@ -204,6 +207,9 @@ class MessageBasedAppGenerator(BaseAppGenerator):
db.session.add(message) db.session.add(message)
db.session.commit() db.session.commit()
message = db.session.query(Message).filter(Message.id == message.id).first()
db.session.close()
for file in application_generate_entity.files: for file in application_generate_entity.files:
message_file = MessageFile( message_file = MessageFile(
message_id=message.id, message_id=message.id,
...@@ -218,6 +224,8 @@ class MessageBasedAppGenerator(BaseAppGenerator): ...@@ -218,6 +224,8 @@ class MessageBasedAppGenerator(BaseAppGenerator):
db.session.add(message_file) db.session.add(message_file)
db.session.commit() db.session.commit()
db.session.close()
return conversation, message return conversation, message
def _get_conversation_introduction(self, application_generate_entity: AppGenerateEntity) -> str: def _get_conversation_introduction(self, application_generate_entity: AppGenerateEntity) -> str:
......
...@@ -99,6 +99,8 @@ class WorkflowAppGenerateTaskPipeline(WorkflowBasedGenerateTaskPipeline): ...@@ -99,6 +99,8 @@ class WorkflowAppGenerateTaskPipeline(WorkflowBasedGenerateTaskPipeline):
self._output_moderation_handler = self._init_output_moderation() self._output_moderation_handler = self._init_output_moderation()
self._stream = stream self._stream = stream
db.session.close()
def process(self) -> Union[dict, Generator]: def process(self) -> Union[dict, Generator]:
""" """
Process generate task pipeline. Process generate task pipeline.
......
...@@ -61,6 +61,9 @@ class WorkflowBasedGenerateTaskPipeline: ...@@ -61,6 +61,9 @@ class WorkflowBasedGenerateTaskPipeline:
db.session.add(workflow_run) db.session.add(workflow_run)
db.session.commit() db.session.commit()
workflow_run = db.session.query(WorkflowRun).filter(WorkflowRun.id == workflow_run.id).first()
db.session.close()
return workflow_run return workflow_run
def _workflow_run_success(self, workflow_run: WorkflowRun, def _workflow_run_success(self, workflow_run: WorkflowRun,
...@@ -85,6 +88,7 @@ class WorkflowBasedGenerateTaskPipeline: ...@@ -85,6 +88,7 @@ class WorkflowBasedGenerateTaskPipeline:
workflow_run.finished_at = datetime.utcnow() workflow_run.finished_at = datetime.utcnow()
db.session.commit() db.session.commit()
db.session.close()
return workflow_run return workflow_run
...@@ -112,6 +116,7 @@ class WorkflowBasedGenerateTaskPipeline: ...@@ -112,6 +116,7 @@ class WorkflowBasedGenerateTaskPipeline:
workflow_run.finished_at = datetime.utcnow() workflow_run.finished_at = datetime.utcnow()
db.session.commit() db.session.commit()
db.session.close()
return workflow_run return workflow_run
...@@ -151,6 +156,10 @@ class WorkflowBasedGenerateTaskPipeline: ...@@ -151,6 +156,10 @@ class WorkflowBasedGenerateTaskPipeline:
db.session.add(workflow_node_execution) db.session.add(workflow_node_execution)
db.session.commit() db.session.commit()
workflow_node_execution = (db.session.query(WorkflowNodeExecution)
.filter(WorkflowNodeExecution.id == workflow_node_execution.id).first())
db.session.close()
return workflow_node_execution return workflow_node_execution
def _workflow_node_execution_success(self, workflow_node_execution: WorkflowNodeExecution, def _workflow_node_execution_success(self, workflow_node_execution: WorkflowNodeExecution,
...@@ -179,6 +188,7 @@ class WorkflowBasedGenerateTaskPipeline: ...@@ -179,6 +188,7 @@ class WorkflowBasedGenerateTaskPipeline:
workflow_node_execution.finished_at = datetime.utcnow() workflow_node_execution.finished_at = datetime.utcnow()
db.session.commit() db.session.commit()
db.session.close()
return workflow_node_execution return workflow_node_execution
...@@ -198,5 +208,6 @@ class WorkflowBasedGenerateTaskPipeline: ...@@ -198,5 +208,6 @@ class WorkflowBasedGenerateTaskPipeline:
workflow_node_execution.finished_at = datetime.utcnow() workflow_node_execution.finished_at = datetime.utcnow()
db.session.commit() db.session.commit()
db.session.close()
return workflow_node_execution return workflow_node_execution
...@@ -19,6 +19,7 @@ from core.workflow.nodes.start.start_node import StartNode ...@@ -19,6 +19,7 @@ from core.workflow.nodes.start.start_node import StartNode
from core.workflow.nodes.template_transform.template_transform_node import TemplateTransformNode from core.workflow.nodes.template_transform.template_transform_node import TemplateTransformNode
from core.workflow.nodes.tool.tool_node import ToolNode from core.workflow.nodes.tool.tool_node import ToolNode
from core.workflow.nodes.variable_assigner.variable_assigner_node import VariableAssignerNode from core.workflow.nodes.variable_assigner.variable_assigner_node import VariableAssignerNode
from extensions.ext_database import db
from models.workflow import ( from models.workflow import (
Workflow, Workflow,
WorkflowNodeExecutionStatus, WorkflowNodeExecutionStatus,
...@@ -282,6 +283,8 @@ class WorkflowEngineManager: ...@@ -282,6 +283,8 @@ class WorkflowEngineManager:
predecessor_node_id=predecessor_node.node_id if predecessor_node else None predecessor_node_id=predecessor_node.node_id if predecessor_node else None
) )
db.session.close()
workflow_nodes_and_result = WorkflowNodeAndResult( workflow_nodes_and_result = WorkflowNodeAndResult(
node=node, node=node,
result=None result=None
...@@ -339,6 +342,8 @@ class WorkflowEngineManager: ...@@ -339,6 +342,8 @@ class WorkflowEngineManager:
if node_run_result.metadata and node_run_result.metadata.get(NodeRunMetadataKey.TOTAL_TOKENS): if node_run_result.metadata and node_run_result.metadata.get(NodeRunMetadataKey.TOTAL_TOKENS):
workflow_run_state.total_tokens += int(node_run_result.metadata.get(NodeRunMetadataKey.TOTAL_TOKENS)) workflow_run_state.total_tokens += int(node_run_result.metadata.get(NodeRunMetadataKey.TOTAL_TOKENS))
db.session.close()
def _set_end_node_output_if_in_chat(self, workflow_run_state: WorkflowRunState, def _set_end_node_output_if_in_chat(self, workflow_run_state: WorkflowRunState,
node: BaseNode, node: BaseNode,
node_run_result: NodeRunResult) -> None: node_run_result: NodeRunResult) -> None:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment