Commit f53d3343 authored by jyong's avatar jyong

support keywords modify

parent 4f1b4b73
......@@ -230,6 +230,7 @@ class DatasetDocumentSegmentAddApi(Resource):
parser = reqparse.RequestParser()
parser.add_argument('content', type=str, required=True, nullable=False, location='json')
parser.add_argument('answer', type=str, required=False, nullable=True, location='json')
parser.add_argument('keywords', type=list, required=False, nullable=True, location='json')
args = parser.parse_args()
SegmentService.segment_create_args_validate(args, document)
segment = SegmentService.create_segment(args, document)
......@@ -273,6 +274,7 @@ class DatasetDocumentSegmentUpdateApi(Resource):
parser = reqparse.RequestParser()
parser.add_argument('content', type=str, required=True, nullable=False, location='json')
parser.add_argument('answer', type=str, required=False, nullable=True, location='json')
parser.add_argument('keywords', type=list, required=False, nullable=True, location='json')
args = parser.parse_args()
SegmentService.segment_create_args_validate(args, document)
segment = SegmentService.update_segment(args, segment, document)
......
......@@ -205,6 +205,16 @@ class KeywordTableIndex(BaseIndex):
document_segment.keywords = keywords
db.session.commit()
def create_segment_keywords(self, node_id: str, keywords: List[str]):
keyword_table = self._get_dataset_keyword_table()
self._update_segment_keywords(node_id, keywords)
keyword_table = self._add_text_to_keyword_table(keyword_table, node_id, keywords)
self._save_dataset_keyword_table(keyword_table)
def update_segment_keywords_index(self, node_id: str, keywords: List[str]):
keyword_table = self._get_dataset_keyword_table()
keyword_table = self._add_text_to_keyword_table(keyword_table, node_id, keywords)
self._save_dataset_keyword_table(keyword_table)
class KeywordTableRetriever(BaseRetriever, BaseModel):
index: KeywordTableIndex
......
......@@ -31,7 +31,8 @@ from tasks.document_indexing_task import document_indexing_task
from tasks.document_indexing_update_task import document_indexing_update_task
from tasks.create_segment_to_index_task import create_segment_to_index_task
from tasks.update_segment_index_task import update_segment_index_task
from tasks.update_segment_keyword_index_task\
import update_segment_keyword_index_task
class DatasetService:
......@@ -896,7 +897,7 @@ class SegmentService:
db.session.commit()
indexing_cache_key = 'segment_{}_indexing'.format(segment_document.id)
redis_client.setex(indexing_cache_key, 600, 1)
create_segment_to_index_task.delay(segment_document.id)
create_segment_to_index_task.delay(segment_document.id, args['keywords'])
return segment_document
@classmethod
......@@ -909,8 +910,12 @@ class SegmentService:
if segment.content == content:
if document.doc_form == 'qa_model':
segment.answer = args['answer']
segment.keywords = args['keywords']
db.session.add(segment)
db.session.commit()
# update segment index task
redis_client.setex(indexing_cache_key, 600, 1)
update_segment_keyword_index_task.delay(segment.id)
else:
segment_hash = helper.generate_text_hash(content)
# calc embedding use tokens
......@@ -928,5 +933,5 @@ class SegmentService:
db.session.commit()
# update segment index task
redis_client.setex(indexing_cache_key, 600, 1)
update_segment_index_task.delay(segment.id)
update_segment_index_task.delay(segment.id, args['keywords'])
return segment
import datetime
import logging
import time
from typing import Optional, List
import click
from celery import shared_task
......@@ -14,11 +15,11 @@ from models.dataset import DocumentSegment
@shared_task
def create_segment_to_index_task(segment_id: str):
def create_segment_to_index_task(segment_id: str, keywords: Optional[List[str]] = None):
"""
Async create segment to index
:param segment_id:
:param keywords:
Usage: create_segment_to_index_task.delay(segment_id)
"""
logging.info(click.style('Start create segment to index: {}'.format(segment_id), fg='green'))
......@@ -75,7 +76,10 @@ def create_segment_to_index_task(segment_id: str):
# save keyword index
index = IndexBuilder.get_index(dataset, 'economy')
if index:
index.add_texts([document])
if keywords and len(keywords) > 0:
index.create_segment_keywords(segment.index_node_id, keywords)
else:
index.add_texts([document])
# update segment to completed
update_params = {
......
import datetime
import logging
import time
from typing import List, Optional
import click
from celery import shared_task
......@@ -14,11 +15,11 @@ from models.dataset import DocumentSegment
@shared_task
def update_segment_index_task(segment_id: str):
def update_segment_index_task(segment_id: str, keywords: Optional[List[str]] = None):
"""
Async update segment index
:param segment_id:
:param keywords:
Usage: update_segment_index_task.delay(segment_id)
"""
logging.info(click.style('Start update segment index: {}'.format(segment_id), fg='green'))
......@@ -87,7 +88,10 @@ def update_segment_index_task(segment_id: str):
# save keyword index
index = IndexBuilder.get_index(dataset, 'economy')
if index:
index.add_texts([document])
if keywords and len(keywords) > 0:
index.create_segment_keywords(segment.index_node_id, keywords)
else:
index.add_texts([document])
# update segment to completed
update_params = {
......
import datetime
import logging
import time
from typing import List, Optional
import click
from celery import shared_task
from langchain.schema import Document
from werkzeug.exceptions import NotFound
from core.index.index import IndexBuilder
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models.dataset import DocumentSegment
@shared_task
def update_segment_keyword_index_task(segment_id: str):
"""
Async update segment index
:param segment_id:
Usage: update_segment_keyword_index_task.delay(segment_id)
"""
logging.info(click.style('Start update segment keyword index: {}'.format(segment_id), fg='green'))
start_at = time.perf_counter()
segment = db.session.query(DocumentSegment).filter(DocumentSegment.id == segment_id).first()
if not segment:
raise NotFound('Segment not found')
indexing_cache_key = 'segment_{}_indexing'.format(segment.id)
try:
dataset = segment.dataset
if not dataset:
logging.info(click.style('Segment {} has no dataset, pass.'.format(segment.id), fg='cyan'))
return
dataset_document = segment.document
if not dataset_document:
logging.info(click.style('Segment {} has no document, pass.'.format(segment.id), fg='cyan'))
return
if not dataset_document.enabled or dataset_document.archived or dataset_document.indexing_status != 'completed':
logging.info(click.style('Segment {} document status is invalid, pass.'.format(segment.id), fg='cyan'))
return
kw_index = IndexBuilder.get_index(dataset, 'economy')
# delete from keyword index
kw_index.delete_by_ids([segment.index_node_id])
# add new index
document = Document(
page_content=segment.content,
metadata={
"doc_id": segment.index_node_id,
"doc_hash": segment.index_node_hash,
"document_id": segment.document_id,
"dataset_id": segment.dataset_id,
}
)
# save keyword index
index = IndexBuilder.get_index(dataset, 'economy')
if index:
index.update_segment_keywords_index(segment.index_node_id, segment.keywords)
end_at = time.perf_counter()
logging.info(click.style('Segment update index: {} latency: {}'.format(segment.id, end_at - start_at), fg='green'))
except Exception as e:
logging.exception("update segment index failed")
segment.enabled = False
segment.disabled_at = datetime.datetime.utcnow()
segment.status = 'error'
segment.error = str(e)
db.session.commit()
finally:
redis_client.delete(indexing_cache_key)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment