Commit 8245cc5f authored by jyong's avatar jyong

1. support cover duplicate document

2. add error document retry
parent 70525653
......@@ -233,7 +233,7 @@ class DatasetDocumentListApi(Resource):
location='json')
parser.add_argument('data_source', type=dict, required=False, location='json')
parser.add_argument('process_rule', type=dict, required=False, location='json')
parser.add_argument('duplicate', type=bool, nullable=False, location='json')
parser.add_argument('duplicate', type=bool, default=True, nullable=False, location='json')
parser.add_argument('original_document_id', type=str, required=False, location='json')
parser.add_argument('doc_form', type=str, default='text_model', required=False, nullable=False, location='json')
parser.add_argument('doc_language', type=str, default='English', required=False, nullable=False,
......@@ -883,6 +883,39 @@ class DocumentRecoverApi(DocumentResource):
return {'result': 'success'}, 204
class DocumentRetryApi(DocumentResource):
@setup_required
@login_required
@account_initialization_required
def post(self, dataset_id, document_id):
"""retry document."""
dataset_id = str(dataset_id)
document_id = str(document_id)
dataset = DatasetService.get_dataset(dataset_id)
if not dataset:
raise NotFound('Dataset not found.')
document = DocumentService.get_document(dataset.id, document_id)
# 404 if document not found
if document is None:
raise NotFound("Document Not Exists.")
# 403 if document is archived
if DocumentService.check_archived(document):
raise ArchivedDocumentImmutableError()
# 400 if document is completed
if document.indexing_status == 'completed':
raise DocumentAlreadyFinishedError()
# retry document
DocumentService.retry_document(document)
return {'result': 'success'}, 204
api.add_resource(GetProcessRuleApi, '/datasets/process-rule')
api.add_resource(DatasetDocumentListApi,
'/datasets/<uuid:dataset_id>/documents')
......
......@@ -45,7 +45,8 @@ from tasks.delete_segment_from_index_task import delete_segment_from_index_task
from tasks.document_indexing_task import document_indexing_task
from tasks.document_indexing_update_task import document_indexing_update_task
from tasks.recover_document_indexing_task import recover_document_indexing_task
from tasks.duplicate_document_indexing_task import duplicate_document_indexing_task
from tasks.retry_document_indexing_task import retry_document_indexing_task
class DatasetService:
......@@ -440,6 +441,18 @@ class DocumentService:
# trigger async task
recover_document_indexing_task.delay(document.dataset_id, document.id)
@staticmethod
def retry_document(document):
# retry document indexing
document.indexing_status = 'waiting'
db.session.add(document)
db.session.commit()
# add retry flag
retry_indexing_cache_key = 'document_{}_is_retried'.format(document.id)
redis_client.setex(retry_indexing_cache_key, 600, 1)
# trigger async task
retry_document_indexing_task.delay(document.dataset_id, [document.id])
@staticmethod
def get_documents_position(dataset_id):
document = Document.query.filter_by(dataset_id=dataset_id).order_by(Document.position.desc()).first()
......@@ -537,6 +550,7 @@ class DocumentService:
db.session.commit()
position = DocumentService.get_documents_position(dataset.id)
document_ids = []
duplicate_document_ids = []
if document_data["data_source"]["type"] == "upload_file":
upload_file_list = document_data["data_source"]["info_list"]['file_info_list']['file_ids']
for file_id in upload_file_list:
......@@ -553,6 +567,28 @@ class DocumentService:
data_source_info = {
"upload_file_id": file_id,
}
# check duplicate
if document_data.get('duplicate', True):
document = Document.query.filter_by(
dataset_id=dataset.id,
tenant_id=current_user.current_tenant_id,
data_source_type='upload_file',
enabled=True,
name=file_name
).first()
if document:
document.dataset_process_rule_id = dataset_process_rule.id
document.updated_at = datetime.datetime.utcnow()
document.created_from = created_from
document.doc_form = document_data['doc_form']
document.doc_language = document_data['doc_language']
document.data_source_info = json.dumps(data_source_info)
document.batch = batch
document.indexing_status = 'waiting'
db.session.add(document)
documents.append(document)
duplicate_document_ids.append(document.id)
continue
document = DocumentService.build_document(dataset, dataset_process_rule.id,
document_data["data_source"]["type"],
document_data["doc_form"],
......@@ -618,7 +654,10 @@ class DocumentService:
db.session.commit()
# trigger async task
document_indexing_task.delay(dataset.id, document_ids)
if document_ids:
document_indexing_task.delay(dataset.id, document_ids)
if duplicate_document_ids:
duplicate_document_indexing_task.delay(dataset.id, duplicate_document_ids)
return documents, batch
......@@ -752,7 +791,6 @@ class DocumentService:
db.session.commit()
# trigger async task
document_indexing_update_task.delay(document.dataset_id, document.id)
return document
@staticmethod
......
import datetime
import logging
import time
import click
from celery import shared_task
from flask import current_app
from core.indexing_runner import DocumentIsPausedException, IndexingRunner
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
from extensions.ext_database import db
from models.dataset import Dataset, Document, DocumentSegment
from services.feature_service import FeatureService
@shared_task(queue='dataset')
def duplicate_document_indexing_task(dataset_id: str, document_ids: list):
"""
Async process document
:param dataset_id:
:param document_ids:
Usage: duplicate_document_indexing_task.delay(dataset_id, document_id)
"""
documents = []
start_at = time.perf_counter()
dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
# check document limit
features = FeatureService.get_features(dataset.tenant_id)
try:
if features.billing.enabled:
vector_space = features.vector_space
count = len(document_ids)
batch_upload_limit = int(current_app.config['BATCH_UPLOAD_LIMIT'])
if count > batch_upload_limit:
raise ValueError(f"You have reached the batch upload limit of {batch_upload_limit}.")
if 0 < vector_space.limit <= vector_space.size:
raise ValueError("Your total number of documents plus the number of uploads have over the limit of "
"your subscription.")
except Exception as e:
for document_id in document_ids:
document = db.session.query(Document).filter(
Document.id == document_id,
Document.dataset_id == dataset_id
).first()
if document:
document.indexing_status = 'error'
document.error = str(e)
document.stopped_at = datetime.datetime.utcnow()
db.session.add(document)
db.session.commit()
return
for document_id in document_ids:
logging.info(click.style('Start process document: {}'.format(document_id), fg='green'))
document = db.session.query(Document).filter(
Document.id == document_id,
Document.dataset_id == dataset_id
).first()
if document:
# clean old data
index_type = document.doc_form
index_processor = IndexProcessorFactory(index_type).init_index_processor()
segments = db.session.query(DocumentSegment).filter(DocumentSegment.document_id == document_id).all()
if segments:
index_node_ids = [segment.index_node_id for segment in segments]
# delete from vector index
index_processor.clean(dataset, index_node_ids)
for segment in segments:
db.session.delete(segment)
db.session.commit()
document.indexing_status = 'parsing'
document.processing_started_at = datetime.datetime.utcnow()
documents.append(document)
db.session.add(document)
db.session.commit()
try:
indexing_runner = IndexingRunner()
indexing_runner.run(documents)
end_at = time.perf_counter()
logging.info(click.style('Processed dataset: {} latency: {}'.format(dataset_id, end_at - start_at), fg='green'))
except DocumentIsPausedException as ex:
logging.info(click.style(str(ex), fg='yellow'))
except Exception:
pass
import datetime
import logging
import time
import click
from celery import shared_task
from flask import current_app
from core.indexing_runner import DocumentIsPausedException, IndexingRunner
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models.dataset import Dataset, Document, DocumentSegment
from services.feature_service import FeatureService
@shared_task(queue='dataset')
def retry_document_indexing_task(dataset_id: str, document_id: str):
"""
Async process document
:param dataset_id:
:param document_id:
Usage: retry_document_indexing_task.delay(dataset_id, document_id)
"""
documents = []
start_at = time.perf_counter()
dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
retry_indexing_cache_key = 'document_{}_is_retried'.format(document_id)
# check document limit
features = FeatureService.get_features(dataset.tenant_id)
try:
if features.billing.enabled:
vector_space = features.vector_space
if 0 < vector_space.limit <= vector_space.size:
raise ValueError("Your total number of documents plus the number of uploads have over the limit of "
"your subscription.")
except Exception as e:
document = db.session.query(Document).filter(
Document.id == document_id,
Document.dataset_id == dataset_id
).first()
if document:
document.indexing_status = 'error'
document.error = str(e)
document.stopped_at = datetime.datetime.utcnow()
db.session.add(document)
db.session.commit()
redis_client.delete(retry_indexing_cache_key)
return
logging.info(click.style('Start retry document: {}'.format(document_id), fg='green'))
try:
document = db.session.query(Document).filter(
Document.id == document_id,
Document.dataset_id == dataset_id
).first()
if document:
# clean old data
index_processor = IndexProcessorFactory(document.doc_form).init_index_processor()
segments = db.session.query(DocumentSegment).filter(DocumentSegment.document_id == document_id).all()
if segments:
index_node_ids = [segment.index_node_id for segment in segments]
# delete from vector index
index_processor.clean(dataset, index_node_ids)
for segment in segments:
db.session.delete(segment)
db.session.commit()
document.indexing_status = 'parsing'
document.processing_started_at = datetime.datetime.utcnow()
documents.append(document)
db.session.add(document)
db.session.commit()
indexing_runner = IndexingRunner()
indexing_runner.run(documents)
end_at = time.perf_counter()
redis_client.delete(retry_indexing_cache_key)
logging.info(click.style('Retry dataset: {} latency: {}'.format(dataset_id, end_at - start_at), fg='green'))
except Exception as ex:
logging.info(click.style(str(ex), fg='yellow'))
redis_client.delete(retry_indexing_cache_key)
pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment