Commit b368d9ab authored by takatost's avatar takatost

fix stream bugs

parent 56e39e87
...@@ -54,7 +54,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): ...@@ -54,7 +54,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
inputs = args['inputs'] inputs = args['inputs']
extras = { extras = {
"auto_generate_conversation_name": args['auto_generate_name'] if 'auto_generate_name' in args else True "auto_generate_conversation_name": args['auto_generate_name'] if 'auto_generate_name' in args else False
} }
# get conversation # get conversation
......
...@@ -346,7 +346,7 @@ class AdvancedChatAppGenerateTaskPipeline: ...@@ -346,7 +346,7 @@ class AdvancedChatAppGenerateTaskPipeline:
yield self._yield_response(response) yield self._yield_response(response)
elif isinstance(event, QueueTextChunkEvent): elif isinstance(event, QueueTextChunkEvent):
delta_text = event.chunk_text delta_text = event.text
if delta_text is None: if delta_text is None:
continue continue
......
...@@ -76,7 +76,7 @@ class WorkflowEventTriggerCallback(BaseWorkflowCallback): ...@@ -76,7 +76,7 @@ class WorkflowEventTriggerCallback(BaseWorkflowCallback):
streamable_node_ids = [] streamable_node_ids = []
end_node_ids = [] end_node_ids = []
for node_config in graph.get('nodes'): for node_config in graph.get('nodes'):
if node_config.get('type') == NodeType.END.value: if node_config.get('data', {}).get('type') == NodeType.END.value:
end_node_ids.append(node_config.get('id')) end_node_ids.append(node_config.get('id'))
for edge_config in graph.get('edges'): for edge_config in graph.get('edges'):
......
...@@ -15,6 +15,7 @@ from core.app.entities.queue_entities import ( ...@@ -15,6 +15,7 @@ from core.app.entities.queue_entities import (
QueueMessageEndEvent, QueueMessageEndEvent,
QueuePingEvent, QueuePingEvent,
QueueStopEvent, QueueStopEvent,
QueueWorkflowFinishedEvent,
) )
from extensions.ext_redis import redis_client from extensions.ext_redis import redis_client
...@@ -36,7 +37,8 @@ class AppQueueManager: ...@@ -36,7 +37,8 @@ class AppQueueManager:
self._invoke_from = invoke_from self._invoke_from = invoke_from
user_prefix = 'account' if self._invoke_from in [InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER] else 'end-user' user_prefix = 'account' if self._invoke_from in [InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER] else 'end-user'
redis_client.setex(AppQueueManager._generate_task_belong_cache_key(self._task_id), 1800, f"{user_prefix}-{self._user_id}") redis_client.setex(AppQueueManager._generate_task_belong_cache_key(self._task_id), 1800,
f"{user_prefix}-{self._user_id}")
q = queue.Queue() q = queue.Queue()
...@@ -106,7 +108,10 @@ class AppQueueManager: ...@@ -106,7 +108,10 @@ class AppQueueManager:
self._q.put(message) self._q.put(message)
if isinstance(event, QueueStopEvent | QueueErrorEvent | QueueMessageEndEvent): if isinstance(event, QueueStopEvent
| QueueErrorEvent
| QueueMessageEndEvent
| QueueWorkflowFinishedEvent):
self.stop_listen() self.stop_listen()
if pub_from == PublishFrom.APPLICATION_MANAGER and self._is_stopped(): if pub_from == PublishFrom.APPLICATION_MANAGER and self._is_stopped():
......
...@@ -248,7 +248,7 @@ class WorkflowAppGenerateTaskPipeline: ...@@ -248,7 +248,7 @@ class WorkflowAppGenerateTaskPipeline:
yield self._yield_response(workflow_run_response) yield self._yield_response(workflow_run_response)
elif isinstance(event, QueueTextChunkEvent): elif isinstance(event, QueueTextChunkEvent):
delta_text = event.chunk_text delta_text = event.text
if delta_text is None: if delta_text is None:
continue continue
......
...@@ -76,7 +76,7 @@ class WorkflowEventTriggerCallback(BaseWorkflowCallback): ...@@ -76,7 +76,7 @@ class WorkflowEventTriggerCallback(BaseWorkflowCallback):
streamable_node_ids = [] streamable_node_ids = []
end_node_ids = [] end_node_ids = []
for node_config in graph.get('nodes'): for node_config in graph.get('nodes'):
if node_config.get('type') == NodeType.END.value: if node_config.get('data', {}).get('type') == NodeType.END.value:
if node_config.get('data', {}).get('outputs', {}).get('type', '') == 'plain-text': if node_config.get('data', {}).get('outputs', {}).get('type', '') == 'plain-text':
end_node_ids.append(node_config.get('id')) end_node_ids.append(node_config.get('id'))
......
...@@ -48,7 +48,7 @@ class QueueTextChunkEvent(AppQueueEvent): ...@@ -48,7 +48,7 @@ class QueueTextChunkEvent(AppQueueEvent):
QueueTextChunkEvent entity QueueTextChunkEvent entity
""" """
event = QueueEvent.TEXT_CHUNK event = QueueEvent.TEXT_CHUNK
chunk_text: str text: str
class QueueAgentMessageEvent(AppQueueEvent): class QueueAgentMessageEvent(AppQueueEvent):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment