Commit 3c37c22e authored by jyong's avatar jyong

add thread pool

parent ca606103
import asyncio
import concurrent
import datetime
import json
......@@ -8,25 +7,17 @@ import threading
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Process
from typing import Optional, List, cast
import openai
from billiard.pool import Pool
from flask import current_app, Flask
from flask_login import current_user
from langchain.embeddings import OpenAIEmbeddings
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter, TextSplitter
from core.data_loader.file_extractor import FileExtractor
from core.data_loader.loader.notion import NotionLoader
from core.docstore.dataset_docstore import DatesetDocumentStore
from core.embedding.cached_embedding import CacheEmbedding
from core.generator.llm_generator import LLMGenerator
from core.index.index import IndexBuilder
from core.index.keyword_table_index.keyword_table_index import KeywordTableIndex, KeywordTableConfig
from core.index.vector_index.vector_index import VectorIndex
from core.llm.error import ProviderTokenNotInitError
from core.llm.llm_builder import LLMBuilder
from core.llm.streamable_open_ai import StreamableOpenAI
......@@ -516,19 +507,23 @@ class IndexingRunner:
model_name='gpt-3.5-turbo',
max_tokens=2000
)
threads = []
for doc in documents:
document_format_thread = threading.Thread(target=self.format_document, kwargs={
'llm': llm, 'document_node': doc, 'split_documents': split_documents, 'document_form': document_form})
threads.append(document_format_thread)
document_format_thread.start()
for thread in threads:
thread.join()
# threads = []
# for doc in documents:
# document_format_thread = threading.Thread(target=self.format_document, kwargs={
# 'llm': llm, 'document_node': doc, 'split_documents': split_documents, 'document_form': document_form})
# threads.append(document_format_thread)
# document_format_thread.start()
# for thread in threads:
# thread.join()
with ThreadPoolExecutor() as executor:
future_to_doc = {executor.submit(self.format_document, llm, doc, document_form): doc for doc in documents}
for future in concurrent.futures.as_completed(future_to_doc):
split_documents.extend(future.result())
all_documents.extend(split_documents)
return all_documents
def format_document(self, llm: StreamableOpenAI, document_node, split_documents: List, document_form: str):
def format_document(self, llm: StreamableOpenAI, document_node, document_form: str):
print(document_node.page_content)
format_documents = []
if document_node.page_content is None or not document_node.page_content.strip():
......@@ -559,7 +554,7 @@ class IndexingRunner:
format_documents.extend(qa_documents)
except Exception as e:
logging.error(str(e))
split_documents.extend(format_documents)
return format_documents
def _split_to_documents_for_estimate(self, text_docs: List[Document], splitter: TextSplitter,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment