Commit 76569834 authored by John Wang's avatar John Wang

Merge branch 'feat/universal-chat' into deploy/dev

parents dd21c0ca b63f1fa5
...@@ -392,6 +392,15 @@ class PubHandler: ...@@ -392,6 +392,15 @@ class PubHandler:
def _is_stopped(self): def _is_stopped(self):
return redis_client.get(self._stopped_cache_key) is not None return redis_client.get(self._stopped_cache_key) is not None
@classmethod
def ping(cls, user: Union[Account | EndUser], task_id: str):
content = {
'event': 'ping'
}
channel = cls.generate_channel_name(user, task_id)
redis_client.publish(channel, json.dumps(content))
@classmethod @classmethod
def stop(cls, user: Union[Account | EndUser], task_id: str): def stop(cls, user: Union[Account | EndUser], task_id: str):
stopped_cache_key = cls.generate_stopped_cache_key(user, task_id) stopped_cache_key = cls.generate_stopped_cache_key(user, task_id)
......
...@@ -174,7 +174,7 @@ class CompletionService: ...@@ -174,7 +174,7 @@ class CompletionService:
generate_worker_thread.start() generate_worker_thread.start()
# wait for 5 minutes to close the thread # wait for 10 minutes to close the thread
cls.countdown_and_close(generate_worker_thread, pubsub, user, generate_task_id) cls.countdown_and_close(generate_worker_thread, pubsub, user, generate_task_id)
return cls.compact_response(pubsub, streaming) return cls.compact_response(pubsub, streaming)
...@@ -236,6 +236,9 @@ class CompletionService: ...@@ -236,6 +236,9 @@ class CompletionService:
def close_pubsub(): def close_pubsub():
sleep_iterations = 0 sleep_iterations = 0
while sleep_iterations < timeout and worker_thread.is_alive(): while sleep_iterations < timeout and worker_thread.is_alive():
if sleep_iterations > 0 and sleep_iterations % 10 == 0:
PubHandler.ping(user, generate_task_id)
time.sleep(1) time.sleep(1)
sleep_iterations += 1 sleep_iterations += 1
...@@ -422,6 +425,8 @@ class CompletionService: ...@@ -422,6 +425,8 @@ class CompletionService:
yield "data: " + json.dumps(cls.get_chain_response_data(result.get('data'))) + "\n\n" yield "data: " + json.dumps(cls.get_chain_response_data(result.get('data'))) + "\n\n"
elif event == 'agent_thought': elif event == 'agent_thought':
yield "data: " + json.dumps(cls.get_agent_thought_response_data(result.get('data'))) + "\n\n" yield "data: " + json.dumps(cls.get_agent_thought_response_data(result.get('data'))) + "\n\n"
else:
yield "data: " + json.dumps(result) + "\n\n"
except ValueError as e: except ValueError as e:
if e.args[0] != "I/O operation on closed file.": # ignore this error if e.args[0] != "I/O operation on closed file.": # ignore this error
logging.exception(e) logging.exception(e)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment