Commit a15348ef authored by jyong's avatar jyong

Merge branch 'feat/add-thread-control' into deploy/dev

# Conflicts:
#	api/controllers/console/datasets/datasets_segments.py
#	api/core/indexing_runner.py
#	api/migrations/versions/8d2d099ceb74_add_qa_model_support.py
parents 5a168730 3c37c22e
......@@ -16,7 +16,6 @@ from models.dataset import DocumentSegment
from libs.helper import TimestampField
from services.dataset_service import DatasetService, DocumentService, SegmentService
from tasks.test_task import test_task
from tasks.enable_segment_to_index_task import enable_segment_to_index_task
from tasks.remove_segment_from_index_task import remove_segment_from_index_task
......@@ -285,15 +284,6 @@ class DatasetDocumentSegmentUpdateApi(Resource):
}, 200
class DatasetDocumentTest(Resource):
@setup_required
@login_required
@account_initialization_required
def patch(self):
test_task.delay()
return 200
api.add_resource(DatasetDocumentSegmentListApi,
'/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/segments')
api.add_resource(DatasetDocumentSegmentApi,
......@@ -302,5 +292,3 @@ api.add_resource(DatasetDocumentSegmentAddApi,
'/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/segment')
api.add_resource(DatasetDocumentSegmentUpdateApi,
'/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/segments/<uuid:segment_id>')
api.add_resource(DatasetDocumentTest,
'/datasets/test')
import asyncio
import concurrent
import datetime
import json
......@@ -8,25 +7,17 @@ import threading
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Process
from typing import Optional, List, cast
import openai
from billiard.pool import Pool
from flask import current_app, Flask
from flask_login import current_user
from langchain.embeddings import OpenAIEmbeddings
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter, TextSplitter
from core.data_loader.file_extractor import FileExtractor
from core.data_loader.loader.notion import NotionLoader
from core.docstore.dataset_docstore import DatesetDocumentStore
from core.embedding.cached_embedding import CacheEmbedding
from core.generator.llm_generator import LLMGenerator
from core.index.index import IndexBuilder
from core.index.keyword_table_index.keyword_table_index import KeywordTableIndex, KeywordTableConfig
from core.index.vector_index.vector_index import VectorIndex
from core.llm.error import ProviderTokenNotInitError
from core.llm.llm_builder import LLMBuilder
from core.llm.streamable_open_ai import StreamableOpenAI
......@@ -516,34 +507,23 @@ class IndexingRunner:
model_name='gpt-3.5-turbo',
max_tokens=2000
)
threads = []
for doc in documents:
document_format_thread = threading.Thread(target=self.format_document, kwargs={
'llm': llm, 'document_node': doc, 'split_documents': split_documents, 'document_form': document_form})
threads.append(document_format_thread)
document_format_thread.start()
for thread in threads:
thread.join()
#asyncio.run(self.format_document(llm, documents, split_documents, document_form))
# threads.append(task)
# await asyncio.gather(*threads)
# asyncio.run(main())
#await asyncio.gather(say('Hello', 2), say('World', 1))
# with Pool(5) as pool:
# threads = []
# for doc in documents:
# result = pool.apply_async(format_document, kwds={'flask_app': current_app._get_current_object(), 'document_node': doc, 'split_documents': split_documents})
# if result.ready():
# split_documents.extend(result.get())
# with ThreadPoolExecutor() as executor:
# future_to_doc = {executor.submit(self.format_document, llm, doc, document_form): doc for doc in documents}
# for future in concurrent.futures.as_completed(future_to_doc):
# split_documents.extend(future.result())
#self.format_document(llm, documents, split_documents, document_form)
# document_format_thread = threading.Thread(target=self.format_document, kwargs={
# 'llm': llm, 'document_node': doc, 'split_documents': split_documents, 'document_form': document_form})
# threads.append(document_format_thread)
# document_format_thread.start()
# for thread in threads:
# thread.join()
with ThreadPoolExecutor() as executor:
future_to_doc = {executor.submit(self.format_document, llm, doc, document_form): doc for doc in documents}
for future in concurrent.futures.as_completed(future_to_doc):
split_documents.extend(future.result())
all_documents.extend(split_documents)
return all_documents
def format_document(self, llm: StreamableOpenAI, document_node, split_documents: List, document_form: str):
def format_document(self, llm: StreamableOpenAI, document_node, document_form: str):
print(document_node.page_content)
format_documents = []
if document_node.page_content is None or not document_node.page_content.strip():
......@@ -572,9 +552,9 @@ class IndexingRunner:
qa_document.metadata['doc_hash'] = hash
qa_documents.append(qa_document)
format_documents.extend(qa_documents)
except Exception:
logging.error("sss")
split_documents.extend(format_documents)
except Exception as e:
logging.error(str(e))
return format_documents
def _split_to_documents_for_estimate(self, text_docs: List[Document], splitter: TextSplitter,
......
......@@ -32,10 +32,11 @@ class DatasetRetrieverTool(BaseTool):
@classmethod
def from_dataset(cls, dataset: Dataset, **kwargs):
description = dataset.description.replace('\n', '').replace('\r', '')
description = dataset.description
if not description:
description = 'useful for when you want to answer queries about the ' + dataset.name
description = description.replace('\n', '').replace('\r', '')
description += '\nID of dataset MUST be ' + dataset.id
return cls(
tenant_id=dataset.tenant_id,
......
......@@ -76,6 +76,9 @@ const Main: FC<IMainProps> = () => {
const media = useBreakpoints()
const isMobile = media === MediaType.mobile
useEffect(() => {
document.title = `${t('explore.sidebar.chat')} - Dify`
}, [])
/*
* app info
*/
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment