Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
D
dify
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ai-tech
dify
Commits
cee92d9d
Commit
cee92d9d
authored
Aug 01, 2023
by
jyong
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'feat/qa-dataset-multi-import' into deploy/dev
# Conflicts: # api/tasks/disable_segment_from_index_task.py
parents
5bc7a783
2be29bb1
Changes
13
Hide whitespace changes
Inline
Side-by-side
Showing
13 changed files
with
388 additions
and
63 deletions
+388
-63
datasets.py
api/controllers/console/datasets/datasets.py
+5
-2
datasets_document.py
api/controllers/console/datasets/datasets_document.py
+20
-1
datasets_segments.py
api/controllers/console/datasets/datasets_segments.py
+108
-3
llm_generator.py
api/core/generator/llm_generator.py
+2
-16
indexing_runner.py
api/core/indexing_runner.py
+39
-17
prompts.py
api/core/prompt/prompts.py
+2
-2
2c8af9671032_add_qa_document_language.py
...rations/versions/2c8af9671032_add_qa_document_language.py
+32
-0
dataset.py
api/models/dataset.py
+1
-0
requirements.txt
api/requirements.txt
+2
-1
dataset_service.py
api/services/dataset_service.py
+25
-17
batch_create_segment_to_index_task.py
api/tasks/batch_create_segment_to_index_task.py
+90
-0
delete_segment_from_index_task.py
api/tasks/delete_segment_from_index_task.py
+58
-0
disable_segment_from_index_task.py
api/tasks/disable_segment_from_index_task.py
+4
-4
No files found.
api/controllers/console/datasets/datasets.py
View file @
cee92d9d
...
@@ -221,6 +221,7 @@ class DatasetIndexingEstimateApi(Resource):
...
@@ -221,6 +221,7 @@ class DatasetIndexingEstimateApi(Resource):
parser
.
add_argument
(
'info_list'
,
type
=
dict
,
required
=
True
,
nullable
=
True
,
location
=
'json'
)
parser
.
add_argument
(
'info_list'
,
type
=
dict
,
required
=
True
,
nullable
=
True
,
location
=
'json'
)
parser
.
add_argument
(
'process_rule'
,
type
=
dict
,
required
=
True
,
nullable
=
True
,
location
=
'json'
)
parser
.
add_argument
(
'process_rule'
,
type
=
dict
,
required
=
True
,
nullable
=
True
,
location
=
'json'
)
parser
.
add_argument
(
'doc_form'
,
type
=
str
,
default
=
'text_model'
,
required
=
False
,
nullable
=
False
,
location
=
'json'
)
parser
.
add_argument
(
'doc_form'
,
type
=
str
,
default
=
'text_model'
,
required
=
False
,
nullable
=
False
,
location
=
'json'
)
parser
.
add_argument
(
'doc_language'
,
type
=
str
,
default
=
'English'
,
required
=
False
,
nullable
=
False
,
location
=
'json'
)
args
=
parser
.
parse_args
()
args
=
parser
.
parse_args
()
# validate args
# validate args
DocumentService
.
estimate_args_validate
(
args
)
DocumentService
.
estimate_args_validate
(
args
)
...
@@ -235,12 +236,14 @@ class DatasetIndexingEstimateApi(Resource):
...
@@ -235,12 +236,14 @@ class DatasetIndexingEstimateApi(Resource):
raise
NotFound
(
"File not found."
)
raise
NotFound
(
"File not found."
)
indexing_runner
=
IndexingRunner
()
indexing_runner
=
IndexingRunner
()
response
=
indexing_runner
.
file_indexing_estimate
(
file_details
,
args
[
'process_rule'
],
args
[
'doc_form'
])
response
=
indexing_runner
.
file_indexing_estimate
(
file_details
,
args
[
'process_rule'
],
args
[
'doc_form'
],
args
[
'doc_language'
])
elif
args
[
'info_list'
][
'data_source_type'
]
==
'notion_import'
:
elif
args
[
'info_list'
][
'data_source_type'
]
==
'notion_import'
:
indexing_runner
=
IndexingRunner
()
indexing_runner
=
IndexingRunner
()
response
=
indexing_runner
.
notion_indexing_estimate
(
args
[
'info_list'
][
'notion_info_list'
],
response
=
indexing_runner
.
notion_indexing_estimate
(
args
[
'info_list'
][
'notion_info_list'
],
args
[
'process_rule'
],
args
[
'doc_form'
])
args
[
'process_rule'
],
args
[
'doc_form'
],
args
[
'doc_language'
])
else
:
else
:
raise
ValueError
(
'Data source type not support'
)
raise
ValueError
(
'Data source type not support'
)
return
response
,
200
return
response
,
200
...
...
api/controllers/console/datasets/datasets_document.py
View file @
cee92d9d
...
@@ -272,6 +272,7 @@ class DatasetDocumentListApi(Resource):
...
@@ -272,6 +272,7 @@ class DatasetDocumentListApi(Resource):
parser
.
add_argument
(
'duplicate'
,
type
=
bool
,
nullable
=
False
,
location
=
'json'
)
parser
.
add_argument
(
'duplicate'
,
type
=
bool
,
nullable
=
False
,
location
=
'json'
)
parser
.
add_argument
(
'original_document_id'
,
type
=
str
,
required
=
False
,
location
=
'json'
)
parser
.
add_argument
(
'original_document_id'
,
type
=
str
,
required
=
False
,
location
=
'json'
)
parser
.
add_argument
(
'doc_form'
,
type
=
str
,
default
=
'text_model'
,
required
=
False
,
nullable
=
False
,
location
=
'json'
)
parser
.
add_argument
(
'doc_form'
,
type
=
str
,
default
=
'text_model'
,
required
=
False
,
nullable
=
False
,
location
=
'json'
)
parser
.
add_argument
(
'doc_language'
,
type
=
str
,
default
=
'English'
,
required
=
False
,
nullable
=
False
,
location
=
'json'
)
args
=
parser
.
parse_args
()
args
=
parser
.
parse_args
()
if
not
dataset
.
indexing_technique
and
not
args
[
'indexing_technique'
]:
if
not
dataset
.
indexing_technique
and
not
args
[
'indexing_technique'
]:
...
@@ -317,6 +318,7 @@ class DatasetInitApi(Resource):
...
@@ -317,6 +318,7 @@ class DatasetInitApi(Resource):
parser
.
add_argument
(
'data_source'
,
type
=
dict
,
required
=
True
,
nullable
=
True
,
location
=
'json'
)
parser
.
add_argument
(
'data_source'
,
type
=
dict
,
required
=
True
,
nullable
=
True
,
location
=
'json'
)
parser
.
add_argument
(
'process_rule'
,
type
=
dict
,
required
=
True
,
nullable
=
True
,
location
=
'json'
)
parser
.
add_argument
(
'process_rule'
,
type
=
dict
,
required
=
True
,
nullable
=
True
,
location
=
'json'
)
parser
.
add_argument
(
'doc_form'
,
type
=
str
,
default
=
'text_model'
,
required
=
False
,
nullable
=
False
,
location
=
'json'
)
parser
.
add_argument
(
'doc_form'
,
type
=
str
,
default
=
'text_model'
,
required
=
False
,
nullable
=
False
,
location
=
'json'
)
parser
.
add_argument
(
'doc_language'
,
type
=
str
,
default
=
'English'
,
required
=
False
,
nullable
=
False
,
location
=
'json'
)
args
=
parser
.
parse_args
()
args
=
parser
.
parse_args
()
# validate args
# validate args
...
@@ -537,7 +539,8 @@ class DocumentIndexingStatusApi(DocumentResource):
...
@@ -537,7 +539,8 @@ class DocumentIndexingStatusApi(DocumentResource):
document
.
completed_segments
=
completed_segments
document
.
completed_segments
=
completed_segments
document
.
total_segments
=
total_segments
document
.
total_segments
=
total_segments
if
document
.
is_paused
:
document
.
indexing_status
=
'paused'
return
marshal
(
document
,
self
.
document_status_fields
)
return
marshal
(
document
,
self
.
document_status_fields
)
...
@@ -794,6 +797,22 @@ class DocumentStatusApi(DocumentResource):
...
@@ -794,6 +797,22 @@ class DocumentStatusApi(DocumentResource):
remove_document_from_index_task
.
delay
(
document_id
)
remove_document_from_index_task
.
delay
(
document_id
)
return
{
'result'
:
'success'
},
200
elif
action
==
"un_archive"
:
if
not
document
.
archived
:
raise
InvalidActionError
(
'Document is not archived.'
)
document
.
archived
=
False
document
.
archived_at
=
None
document
.
archived_by
=
None
document
.
updated_at
=
datetime
.
utcnow
()
db
.
session
.
commit
()
# Set cache to prevent indexing the same document multiple times
redis_client
.
setex
(
indexing_cache_key
,
600
,
1
)
add_document_to_index_task
.
delay
(
document_id
)
return
{
'result'
:
'success'
},
200
return
{
'result'
:
'success'
},
200
else
:
else
:
raise
InvalidActionError
()
raise
InvalidActionError
()
...
...
api/controllers/console/datasets/datasets_segments.py
View file @
cee92d9d
# -*- coding:utf-8 -*-
# -*- coding:utf-8 -*-
import
uuid
from
datetime
import
datetime
from
datetime
import
datetime
from
flask
import
request
from
flask_login
import
login_required
,
current_user
from
flask_login
import
login_required
,
current_user
from
flask_restful
import
Resource
,
reqparse
,
fields
,
marshal
from
flask_restful
import
Resource
,
reqparse
,
fields
,
marshal
from
werkzeug.exceptions
import
NotFound
,
Forbidden
from
werkzeug.exceptions
import
NotFound
,
Forbidden
import
services
import
services
from
controllers.console
import
api
from
controllers.console
import
api
from
controllers.console.datasets.error
import
InvalidActionError
from
controllers.console.datasets.error
import
InvalidActionError
,
NoFileUploadedError
,
TooManyFilesError
from
controllers.console.setup
import
setup_required
from
controllers.console.setup
import
setup_required
from
controllers.console.wraps
import
account_initialization_required
from
controllers.console.wraps
import
account_initialization_required
from
extensions.ext_database
import
db
from
extensions.ext_database
import
db
...
@@ -17,7 +19,9 @@ from models.dataset import DocumentSegment
...
@@ -17,7 +19,9 @@ from models.dataset import DocumentSegment
from
libs.helper
import
TimestampField
from
libs.helper
import
TimestampField
from
services.dataset_service
import
DatasetService
,
DocumentService
,
SegmentService
from
services.dataset_service
import
DatasetService
,
DocumentService
,
SegmentService
from
tasks.enable_segment_to_index_task
import
enable_segment_to_index_task
from
tasks.enable_segment_to_index_task
import
enable_segment_to_index_task
from
tasks.remove_segment_from_index_task
import
remove_segment_from_index_task
from
tasks.disable_segment_from_index_task
import
disable_segment_from_index_task
from
tasks.batch_create_segment_to_index_task
import
batch_create_segment_to_index_task
import
pandas
as
pd
segment_fields
=
{
segment_fields
=
{
'id'
:
fields
.
String
,
'id'
:
fields
.
String
,
...
@@ -197,7 +201,7 @@ class DatasetDocumentSegmentApi(Resource):
...
@@ -197,7 +201,7 @@ class DatasetDocumentSegmentApi(Resource):
# Set cache to prevent indexing the same segment multiple times
# Set cache to prevent indexing the same segment multiple times
redis_client
.
setex
(
indexing_cache_key
,
600
,
1
)
redis_client
.
setex
(
indexing_cache_key
,
600
,
1
)
remov
e_segment_from_index_task
.
delay
(
segment
.
id
)
disabl
e_segment_from_index_task
.
delay
(
segment
.
id
)
return
{
'result'
:
'success'
},
200
return
{
'result'
:
'success'
},
200
else
:
else
:
...
@@ -283,6 +287,104 @@ class DatasetDocumentSegmentUpdateApi(Resource):
...
@@ -283,6 +287,104 @@ class DatasetDocumentSegmentUpdateApi(Resource):
'doc_form'
:
document
.
doc_form
'doc_form'
:
document
.
doc_form
},
200
},
200
@
setup_required
@
login_required
@
account_initialization_required
def
delete
(
self
,
dataset_id
,
document_id
,
segment_id
):
# check dataset
dataset_id
=
str
(
dataset_id
)
dataset
=
DatasetService
.
get_dataset
(
dataset_id
)
if
not
dataset
:
raise
NotFound
(
'Dataset not found.'
)
# check document
document_id
=
str
(
document_id
)
document
=
DocumentService
.
get_document
(
dataset_id
,
document_id
)
if
not
document
:
raise
NotFound
(
'Document not found.'
)
# check segment
segment_id
=
str
(
segment_id
)
segment
=
DocumentSegment
.
query
.
filter
(
DocumentSegment
.
id
==
str
(
segment_id
),
DocumentSegment
.
tenant_id
==
current_user
.
current_tenant_id
)
.
first
()
if
not
segment
:
raise
NotFound
(
'Segment not found.'
)
# The role of the current user in the ta table must be admin or owner
if
current_user
.
current_tenant
.
current_role
not
in
[
'admin'
,
'owner'
]:
raise
Forbidden
()
try
:
DatasetService
.
check_dataset_permission
(
dataset
,
current_user
)
except
services
.
errors
.
account
.
NoPermissionError
as
e
:
raise
Forbidden
(
str
(
e
))
SegmentService
.
delete_segment
(
segment
,
document
,
dataset
)
return
{
'result'
:
'success'
},
200
class
DatasetDocumentSegmentBatchImportApi
(
Resource
):
@
setup_required
@
login_required
@
account_initialization_required
def
post
(
self
,
dataset_id
,
document_id
):
# check dataset
dataset_id
=
str
(
dataset_id
)
dataset
=
DatasetService
.
get_dataset
(
dataset_id
)
if
not
dataset
:
raise
NotFound
(
'Dataset not found.'
)
# check document
document_id
=
str
(
document_id
)
document
=
DocumentService
.
get_document
(
dataset_id
,
document_id
)
if
not
document
:
raise
NotFound
(
'Document not found.'
)
# get file from request
file
=
request
.
files
[
'file'
]
# check file
if
'file'
not
in
request
.
files
:
raise
NoFileUploadedError
()
if
len
(
request
.
files
)
>
1
:
raise
TooManyFilesError
()
# check file type
if
not
file
.
filename
.
endswith
(
'.csv'
):
raise
ValueError
(
"Invalid file type. Only CSV files are allowed"
)
try
:
# Skip the first row
df
=
pd
.
read_csv
(
file
)
result
=
[]
for
index
,
row
in
df
.
iterrows
():
data
=
{
'content'
:
row
[
0
],
'answer'
:
row
[
1
]}
result
.
append
(
data
)
if
len
(
result
)
==
0
:
raise
ValueError
(
"The CSV file is empty."
)
# async job
job_id
=
str
(
uuid
.
uuid4
())
indexing_cache_key
=
'segment_batch_import_{}'
.
format
(
str
(
job_id
))
# send batch add segments task
redis_client
.
setnx
(
indexing_cache_key
,
'waiting'
)
batch_create_segment_to_index_task
.
delay
(
str
(
job_id
),
result
,
dataset_id
,
document_id
,
current_user
.
current_tenant_id
,
current_user
.
id
)
except
Exception
as
e
:
return
{
'error'
:
str
(
e
)},
500
return
{
'job_id'
:
job_id
,
'job_status'
:
'waiting'
},
200
@
setup_required
@
login_required
@
account_initialization_required
def
get
(
self
,
job_id
):
job_id
=
str
(
job_id
)
indexing_cache_key
=
'segment_batch_import_{}'
.
format
(
job_id
)
cache_result
=
redis_client
.
get
(
indexing_cache_key
)
if
cache_result
is
None
:
raise
ValueError
(
"The job is not exist."
)
return
{
'job_id'
:
job_id
,
'job_status'
:
cache_result
.
decode
()
},
200
api
.
add_resource
(
DatasetDocumentSegmentListApi
,
api
.
add_resource
(
DatasetDocumentSegmentListApi
,
'/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/segments'
)
'/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/segments'
)
...
@@ -292,3 +394,6 @@ api.add_resource(DatasetDocumentSegmentAddApi,
...
@@ -292,3 +394,6 @@ api.add_resource(DatasetDocumentSegmentAddApi,
'/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/segment'
)
'/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/segment'
)
api
.
add_resource
(
DatasetDocumentSegmentUpdateApi
,
api
.
add_resource
(
DatasetDocumentSegmentUpdateApi
,
'/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/segments/<uuid:segment_id>'
)
'/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/segments/<uuid:segment_id>'
)
api
.
add_resource
(
DatasetDocumentSegmentBatchImportApi
,
'/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/segments/batch_import'
,
'/datasets/batch_import_status/<uuid:job_id>'
)
api/core/generator/llm_generator.py
View file @
cee92d9d
...
@@ -188,22 +188,8 @@ class LLMGenerator:
...
@@ -188,22 +188,8 @@ class LLMGenerator:
return
rule_config
return
rule_config
@
classmethod
@
classmethod
async
def
generate_qa_document
(
cls
,
llm
:
StreamableOpenAI
,
query
):
def
generate_qa_document_sync
(
cls
,
llm
:
StreamableOpenAI
,
query
:
str
,
document_language
:
str
):
prompt
=
GENERATOR_QA_PROMPT
prompt
=
GENERATOR_QA_PROMPT
.
format
(
language
=
document_language
)
if
isinstance
(
llm
,
BaseChatModel
):
prompt
=
[
SystemMessage
(
content
=
prompt
),
HumanMessage
(
content
=
query
)]
response
=
llm
.
generate
([
prompt
])
answer
=
response
.
generations
[
0
][
0
]
.
text
return
answer
.
strip
()
@
classmethod
def
generate_qa_document_sync
(
cls
,
llm
:
StreamableOpenAI
,
query
):
prompt
=
GENERATOR_QA_PROMPT
if
isinstance
(
llm
,
BaseChatModel
):
if
isinstance
(
llm
,
BaseChatModel
):
prompt
=
[
SystemMessage
(
content
=
prompt
),
HumanMessage
(
content
=
query
)]
prompt
=
[
SystemMessage
(
content
=
prompt
),
HumanMessage
(
content
=
query
)]
...
...
api/core/indexing_runner.py
View file @
cee92d9d
...
@@ -70,14 +70,6 @@ class IndexingRunner:
...
@@ -70,14 +70,6 @@ class IndexingRunner:
dataset_document
=
dataset_document
,
dataset_document
=
dataset_document
,
processing_rule
=
processing_rule
processing_rule
=
processing_rule
)
)
# new_documents = []
# for document in documents:
# response = LLMGenerator.generate_qa_document(dataset.tenant_id, document.page_content)
# document_qa_list = self.format_split_text(response)
# for result in document_qa_list:
# document = Document(page_content=result['question'], metadata={'source': result['answer']})
# new_documents.append(document)
# build index
self
.
_build_index
(
self
.
_build_index
(
dataset
=
dataset
,
dataset
=
dataset
,
dataset_document
=
dataset_document
,
dataset_document
=
dataset_document
,
...
@@ -228,7 +220,7 @@ class IndexingRunner:
...
@@ -228,7 +220,7 @@ class IndexingRunner:
db
.
session
.
commit
()
db
.
session
.
commit
()
def
file_indexing_estimate
(
self
,
file_details
:
List
[
UploadFile
],
tmp_processing_rule
:
dict
,
def
file_indexing_estimate
(
self
,
file_details
:
List
[
UploadFile
],
tmp_processing_rule
:
dict
,
doc_form
:
str
=
None
)
->
dict
:
doc_form
:
str
=
None
,
doc_language
:
str
=
'English'
)
->
dict
:
"""
"""
Estimate the indexing for the document.
Estimate the indexing for the document.
"""
"""
...
@@ -268,7 +260,7 @@ class IndexingRunner:
...
@@ -268,7 +260,7 @@ class IndexingRunner:
model_name
=
'gpt-3.5-turbo'
,
model_name
=
'gpt-3.5-turbo'
,
max_tokens
=
2000
max_tokens
=
2000
)
)
response
=
LLMGenerator
.
generate_qa_document_sync
(
llm
,
preview_texts
[
0
])
response
=
LLMGenerator
.
generate_qa_document_sync
(
llm
,
preview_texts
[
0
]
,
doc_language
)
document_qa_list
=
self
.
format_split_text
(
response
)
document_qa_list
=
self
.
format_split_text
(
response
)
return
{
return
{
"total_segments"
:
total_segments
*
20
,
"total_segments"
:
total_segments
*
20
,
...
@@ -287,7 +279,8 @@ class IndexingRunner:
...
@@ -287,7 +279,8 @@ class IndexingRunner:
"preview"
:
preview_texts
"preview"
:
preview_texts
}
}
def
notion_indexing_estimate
(
self
,
notion_info_list
:
list
,
tmp_processing_rule
:
dict
,
doc_form
:
str
=
None
)
->
dict
:
def
notion_indexing_estimate
(
self
,
notion_info_list
:
list
,
tmp_processing_rule
:
dict
,
doc_form
:
str
=
None
,
doc_language
:
str
=
'English'
)
->
dict
:
"""
"""
Estimate the indexing for the document.
Estimate the indexing for the document.
"""
"""
...
@@ -345,7 +338,7 @@ class IndexingRunner:
...
@@ -345,7 +338,7 @@ class IndexingRunner:
model_name
=
'gpt-3.5-turbo'
,
model_name
=
'gpt-3.5-turbo'
,
max_tokens
=
2000
max_tokens
=
2000
)
)
response
=
LLMGenerator
.
generate_qa_document_sync
(
llm
,
preview_texts
[
0
])
response
=
LLMGenerator
.
generate_qa_document_sync
(
llm
,
preview_texts
[
0
]
,
doc_language
)
document_qa_list
=
self
.
format_split_text
(
response
)
document_qa_list
=
self
.
format_split_text
(
response
)
return
{
return
{
"total_segments"
:
total_segments
*
20
,
"total_segments"
:
total_segments
*
20
,
...
@@ -452,7 +445,8 @@ class IndexingRunner:
...
@@ -452,7 +445,8 @@ class IndexingRunner:
splitter
=
splitter
,
splitter
=
splitter
,
processing_rule
=
processing_rule
,
processing_rule
=
processing_rule
,
tenant_id
=
dataset
.
tenant_id
,
tenant_id
=
dataset
.
tenant_id
,
document_form
=
dataset_document
.
doc_form
document_form
=
dataset_document
.
doc_form
,
document_language
=
dataset_document
.
doc_language
)
)
# save node to document segment
# save node to document segment
...
@@ -489,7 +483,8 @@ class IndexingRunner:
...
@@ -489,7 +483,8 @@ class IndexingRunner:
return
documents
return
documents
def
_split_to_documents
(
self
,
text_docs
:
List
[
Document
],
splitter
:
TextSplitter
,
def
_split_to_documents
(
self
,
text_docs
:
List
[
Document
],
splitter
:
TextSplitter
,
processing_rule
:
DatasetProcessRule
,
tenant_id
:
str
,
document_form
:
str
)
->
List
[
Document
]:
processing_rule
:
DatasetProcessRule
,
tenant_id
:
str
,
document_form
:
str
,
document_language
:
str
)
->
List
[
Document
]:
"""
"""
Split the text documents into nodes.
Split the text documents into nodes.
"""
"""
...
@@ -523,7 +518,8 @@ class IndexingRunner:
...
@@ -523,7 +518,8 @@ class IndexingRunner:
sub_documents
=
all_documents
[
i
:
i
+
10
]
sub_documents
=
all_documents
[
i
:
i
+
10
]
for
doc
in
sub_documents
:
for
doc
in
sub_documents
:
document_format_thread
=
threading
.
Thread
(
target
=
self
.
format_qa_document
,
kwargs
=
{
document_format_thread
=
threading
.
Thread
(
target
=
self
.
format_qa_document
,
kwargs
=
{
'llm'
:
llm
,
'document_node'
:
doc
,
'all_qa_documents'
:
all_qa_documents
})
'llm'
:
llm
,
'document_node'
:
doc
,
'all_qa_documents'
:
all_qa_documents
,
'document_language'
:
document_language
})
threads
.
append
(
document_format_thread
)
threads
.
append
(
document_format_thread
)
document_format_thread
.
start
()
document_format_thread
.
start
()
for
thread
in
threads
:
for
thread
in
threads
:
...
@@ -531,13 +527,13 @@ class IndexingRunner:
...
@@ -531,13 +527,13 @@ class IndexingRunner:
return
all_qa_documents
return
all_qa_documents
return
all_documents
return
all_documents
def
format_qa_document
(
self
,
llm
:
StreamableOpenAI
,
document_node
,
all_qa_documents
):
def
format_qa_document
(
self
,
llm
:
StreamableOpenAI
,
document_node
,
all_qa_documents
,
document_language
):
format_documents
=
[]
format_documents
=
[]
if
document_node
.
page_content
is
None
or
not
document_node
.
page_content
.
strip
():
if
document_node
.
page_content
is
None
or
not
document_node
.
page_content
.
strip
():
return
return
try
:
try
:
# qa model document
# qa model document
response
=
LLMGenerator
.
generate_qa_document_sync
(
llm
,
document_node
.
page_content
)
response
=
LLMGenerator
.
generate_qa_document_sync
(
llm
,
document_node
.
page_content
,
document_language
)
document_qa_list
=
self
.
format_split_text
(
response
)
document_qa_list
=
self
.
format_split_text
(
response
)
qa_documents
=
[]
qa_documents
=
[]
for
result
in
document_qa_list
:
for
result
in
document_qa_list
:
...
@@ -716,6 +712,32 @@ class IndexingRunner:
...
@@ -716,6 +712,32 @@ class IndexingRunner:
DocumentSegment
.
query
.
filter_by
(
document_id
=
dataset_document_id
)
.
update
(
update_params
)
DocumentSegment
.
query
.
filter_by
(
document_id
=
dataset_document_id
)
.
update
(
update_params
)
db
.
session
.
commit
()
db
.
session
.
commit
()
def
batch_add_segments
(
self
,
segments
:
List
[
DocumentSegment
],
dataset
:
Dataset
):
"""
Batch add segments index processing
"""
documents
=
[]
for
segment
in
segments
:
document
=
Document
(
page_content
=
segment
.
content
,
metadata
=
{
"doc_id"
:
segment
.
index_node_id
,
"doc_hash"
:
segment
.
index_node_hash
,
"document_id"
:
segment
.
document_id
,
"dataset_id"
:
segment
.
dataset_id
,
}
)
documents
.
append
(
document
)
# save vector index
index
=
IndexBuilder
.
get_index
(
dataset
,
'high_quality'
)
if
index
:
index
.
add_texts
(
documents
,
duplicate_check
=
True
)
# save keyword index
index
=
IndexBuilder
.
get_index
(
dataset
,
'economy'
)
if
index
:
index
.
add_texts
(
documents
)
class
DocumentIsPausedException
(
Exception
):
class
DocumentIsPausedException
(
Exception
):
pass
pass
api/core/prompt/prompts.py
View file @
cee92d9d
...
@@ -44,13 +44,13 @@ SUGGESTED_QUESTIONS_AFTER_ANSWER_INSTRUCTION_PROMPT = (
...
@@ -44,13 +44,13 @@ SUGGESTED_QUESTIONS_AFTER_ANSWER_INSTRUCTION_PROMPT = (
)
)
GENERATOR_QA_PROMPT
=
(
GENERATOR_QA_PROMPT
=
(
"Please respond according to the language of the user's input text. If the text is in language [A], you must also reply in language [A].
\n
"
'The user will send a long text. Please think step by step.'
'Step 1: Understand and summarize the main content of this text.
\n
'
'Step 1: Understand and summarize the main content of this text.
\n
'
'Step 2: What key information or concepts are mentioned in this text?
\n
'
'Step 2: What key information or concepts are mentioned in this text?
\n
'
'Step 3: Decompose or combine multiple pieces of information and concepts.
\n
'
'Step 3: Decompose or combine multiple pieces of information and concepts.
\n
'
'Step 4: Generate 20 questions and answers based on these key information and concepts.'
'Step 4: Generate 20 questions and answers based on these key information and concepts.'
'The questions should be clear and detailed, and the answers should be detailed and complete.
\n
'
'The questions should be clear and detailed, and the answers should be detailed and complete.
\n
'
"Answer in the following format: Q1:
\n
A1:
\n
Q2:
\n
A2:...
\n
"
"Answer
must be the language:{language} and
in the following format: Q1:
\n
A1:
\n
Q2:
\n
A2:...
\n
"
)
)
RULE_CONFIG_GENERATE_TEMPLATE
=
"""Given MY INTENDED AUDIENCES and HOPING TO SOLVE using a language model, please select
\
RULE_CONFIG_GENERATE_TEMPLATE
=
"""Given MY INTENDED AUDIENCES and HOPING TO SOLVE using a language model, please select
\
...
...
api/migrations/versions/2c8af9671032_add_qa_document_language.py
0 → 100644
View file @
cee92d9d
"""add_qa_document_language
Revision ID: 2c8af9671032
Revises: 8d2d099ceb74
Create Date: 2023-08-01 18:57:27.294973
"""
from
alembic
import
op
import
sqlalchemy
as
sa
# revision identifiers, used by Alembic.
revision
=
'2c8af9671032'
down_revision
=
'8d2d099ceb74'
branch_labels
=
None
depends_on
=
None
def
upgrade
():
# ### commands auto generated by Alembic - please adjust! ###
with
op
.
batch_alter_table
(
'documents'
,
schema
=
None
)
as
batch_op
:
batch_op
.
add_column
(
sa
.
Column
(
'doc_language'
,
sa
.
String
(
length
=
255
),
nullable
=
True
))
# ### end Alembic commands ###
def
downgrade
():
# ### commands auto generated by Alembic - please adjust! ###
with
op
.
batch_alter_table
(
'documents'
,
schema
=
None
)
as
batch_op
:
batch_op
.
drop_column
(
'doc_language'
)
# ### end Alembic commands ###
api/models/dataset.py
View file @
cee92d9d
...
@@ -208,6 +208,7 @@ class Document(db.Model):
...
@@ -208,6 +208,7 @@ class Document(db.Model):
doc_metadata
=
db
.
Column
(
db
.
JSON
,
nullable
=
True
)
doc_metadata
=
db
.
Column
(
db
.
JSON
,
nullable
=
True
)
doc_form
=
db
.
Column
(
db
.
String
(
doc_form
=
db
.
Column
(
db
.
String
(
255
),
nullable
=
False
,
server_default
=
db
.
text
(
"'text_model'::character varying"
))
255
),
nullable
=
False
,
server_default
=
db
.
text
(
"'text_model'::character varying"
))
doc_language
=
db
.
Column
(
db
.
String
(
255
),
nullable
=
True
)
DATA_SOURCES
=
[
'upload_file'
,
'notion_import'
]
DATA_SOURCES
=
[
'upload_file'
,
'notion_import'
]
...
...
api/requirements.txt
View file @
cee92d9d
...
@@ -40,4 +40,5 @@ newspaper3k==0.2.8
...
@@ -40,4 +40,5 @@ newspaper3k==0.2.8
google-api-python-client==2.90.0
google-api-python-client==2.90.0
wikipedia==1.4.0
wikipedia==1.4.0
readabilipy==0.2.0
readabilipy==0.2.0
google-search-results==2.4.2
google-search-results==2.4.2
\ No newline at end of file
pandas==1.5.3
\ No newline at end of file
api/services/dataset_service.py
View file @
cee92d9d
...
@@ -32,8 +32,9 @@ from tasks.document_indexing_task import document_indexing_task
...
@@ -32,8 +32,9 @@ from tasks.document_indexing_task import document_indexing_task
from
tasks.document_indexing_update_task
import
document_indexing_update_task
from
tasks.document_indexing_update_task
import
document_indexing_update_task
from
tasks.create_segment_to_index_task
import
create_segment_to_index_task
from
tasks.create_segment_to_index_task
import
create_segment_to_index_task
from
tasks.update_segment_index_task
import
update_segment_index_task
from
tasks.update_segment_index_task
import
update_segment_index_task
from
tasks.update_segment_keyword_index_task
\
from
tasks.recover_document_indexing_task
import
recover_document_indexing_task
import
update_segment_keyword_index_task
from
tasks.update_segment_keyword_index_task
import
update_segment_keyword_index_task
from
tasks.delete_segment_from_index_task
import
delete_segment_from_index_task
class
DatasetService
:
class
DatasetService
:
...
@@ -373,7 +374,7 @@ class DocumentService:
...
@@ -373,7 +374,7 @@ class DocumentService:
indexing_cache_key
=
'document_{}_is_paused'
.
format
(
document
.
id
)
indexing_cache_key
=
'document_{}_is_paused'
.
format
(
document
.
id
)
redis_client
.
delete
(
indexing_cache_key
)
redis_client
.
delete
(
indexing_cache_key
)
# trigger async task
# trigger async task
document_indexing_task
.
delay
(
document
.
dataset_id
,
document
.
id
)
recover_
document_indexing_task
.
delay
(
document
.
dataset_id
,
document
.
id
)
@
staticmethod
@
staticmethod
def
get_documents_position
(
dataset_id
):
def
get_documents_position
(
dataset_id
):
...
@@ -451,6 +452,7 @@ class DocumentService:
...
@@ -451,6 +452,7 @@ class DocumentService:
document
=
DocumentService
.
save_document
(
dataset
,
dataset_process_rule
.
id
,
document
=
DocumentService
.
save_document
(
dataset
,
dataset_process_rule
.
id
,
document_data
[
"data_source"
][
"type"
],
document_data
[
"data_source"
][
"type"
],
document_data
[
"doc_form"
],
document_data
[
"doc_form"
],
document_data
[
"doc_language"
],
data_source_info
,
created_from
,
position
,
data_source_info
,
created_from
,
position
,
account
,
file_name
,
batch
)
account
,
file_name
,
batch
)
db
.
session
.
add
(
document
)
db
.
session
.
add
(
document
)
...
@@ -496,20 +498,11 @@ class DocumentService:
...
@@ -496,20 +498,11 @@ class DocumentService:
document
=
DocumentService
.
save_document
(
dataset
,
dataset_process_rule
.
id
,
document
=
DocumentService
.
save_document
(
dataset
,
dataset_process_rule
.
id
,
document_data
[
"data_source"
][
"type"
],
document_data
[
"data_source"
][
"type"
],
document_data
[
"doc_form"
],
document_data
[
"doc_form"
],
document_data
[
"doc_language"
],
data_source_info
,
created_from
,
position
,
data_source_info
,
created_from
,
position
,
account
,
page
[
'page_name'
],
batch
)
account
,
page
[
'page_name'
],
batch
)
# if page['type'] == 'database':
# document.splitting_completed_at = datetime.datetime.utcnow()
# document.cleaning_completed_at = datetime.datetime.utcnow()
# document.parsing_completed_at = datetime.datetime.utcnow()
# document.completed_at = datetime.datetime.utcnow()
# document.indexing_status = 'completed'
# document.word_count = 0
# document.tokens = 0
# document.indexing_latency = 0
db
.
session
.
add
(
document
)
db
.
session
.
add
(
document
)
db
.
session
.
flush
()
db
.
session
.
flush
()
# if page['type'] != 'database':
document_ids
.
append
(
document
.
id
)
document_ids
.
append
(
document
.
id
)
documents
.
append
(
document
)
documents
.
append
(
document
)
position
+=
1
position
+=
1
...
@@ -521,15 +514,15 @@ class DocumentService:
...
@@ -521,15 +514,15 @@ class DocumentService:
db
.
session
.
commit
()
db
.
session
.
commit
()
# trigger async task
# trigger async task
#document_index_created.send(dataset.id, document_ids=document_ids)
document_indexing_task
.
delay
(
dataset
.
id
,
document_ids
)
document_indexing_task
.
delay
(
dataset
.
id
,
document_ids
)
return
documents
,
batch
return
documents
,
batch
@
staticmethod
@
staticmethod
def
save_document
(
dataset
:
Dataset
,
process_rule_id
:
str
,
data_source_type
:
str
,
document_form
:
str
,
def
save_document
(
dataset
:
Dataset
,
process_rule_id
:
str
,
data_source_type
:
str
,
document_form
:
str
,
data_source_info
:
dict
,
created_from
:
str
,
position
:
int
,
account
:
Account
,
name
:
str
,
document_language
:
str
,
data_source_info
:
dict
,
created_from
:
str
,
position
:
int
,
batch
:
str
):
account
:
Account
,
name
:
str
,
batch
:
str
):
document
=
Document
(
document
=
Document
(
tenant_id
=
dataset
.
tenant_id
,
tenant_id
=
dataset
.
tenant_id
,
dataset_id
=
dataset
.
id
,
dataset_id
=
dataset
.
id
,
...
@@ -541,7 +534,8 @@ class DocumentService:
...
@@ -541,7 +534,8 @@ class DocumentService:
name
=
name
,
name
=
name
,
created_from
=
created_from
,
created_from
=
created_from
,
created_by
=
account
.
id
,
created_by
=
account
.
id
,
doc_form
=
document_form
doc_form
=
document_form
,
doc_language
=
document_language
)
)
return
document
return
document
...
@@ -938,3 +932,17 @@ class SegmentService:
...
@@ -938,3 +932,17 @@ class SegmentService:
redis_client
.
setex
(
indexing_cache_key
,
600
,
1
)
redis_client
.
setex
(
indexing_cache_key
,
600
,
1
)
update_segment_index_task
.
delay
(
segment
.
id
,
args
[
'keywords'
])
update_segment_index_task
.
delay
(
segment
.
id
,
args
[
'keywords'
])
return
segment
return
segment
@
classmethod
def
delete_segment
(
cls
,
segment
:
DocumentSegment
,
document
:
Document
,
dataset
:
Dataset
):
indexing_cache_key
=
'segment_{}_delete_indexing'
.
format
(
segment
.
id
)
cache_result
=
redis_client
.
get
(
indexing_cache_key
)
if
cache_result
is
not
None
:
raise
ValueError
(
"Segment is deleting."
)
# send delete segment index task
redis_client
.
setex
(
indexing_cache_key
,
600
,
1
)
# enabled segment need to delete index
if
segment
.
enabled
:
delete_segment_from_index_task
.
delay
(
segment
.
id
,
segment
.
index_node_id
,
dataset
.
id
,
document
.
id
)
db
.
session
.
delete
(
segment
)
db
.
session
.
commit
()
api/tasks/batch_create_segment_to_index_task.py
0 → 100644
View file @
cee92d9d
import
datetime
import
logging
import
time
import
uuid
from
typing
import
Optional
,
List
import
click
from
celery
import
shared_task
from
sqlalchemy
import
func
from
werkzeug.exceptions
import
NotFound
from
core.index.index
import
IndexBuilder
from
core.indexing_runner
import
IndexingRunner
from
core.llm.token_calculator
import
TokenCalculator
from
extensions.ext_database
import
db
from
extensions.ext_redis
import
redis_client
from
libs
import
helper
from
models.dataset
import
DocumentSegment
,
Dataset
,
Document
@
shared_task
def
batch_create_segment_to_index_task
(
job_id
:
str
,
content
:
List
,
dataset_id
:
str
,
document_id
:
str
,
tenant_id
:
str
,
user_id
:
str
):
"""
Async batch create segment to index
:param job_id:
:param content:
:param dataset_id:
:param document_id:
:param tenant_id:
:param user_id:
Usage: batch_create_segment_to_index_task.delay(segment_id)
"""
logging
.
info
(
click
.
style
(
'Start batch create segment jobId: {}'
.
format
(
job_id
),
fg
=
'green'
))
start_at
=
time
.
perf_counter
()
indexing_cache_key
=
'segment_batch_import_{}'
.
format
(
job_id
)
try
:
dataset
=
db
.
session
.
query
(
Dataset
)
.
filter
(
Dataset
.
id
==
dataset_id
)
.
first
()
if
not
dataset
:
raise
ValueError
(
'Dataset not exist.'
)
dataset_document
=
db
.
session
.
query
(
Document
)
.
filter
(
Document
.
id
==
document_id
)
.
first
()
if
not
dataset_document
:
raise
ValueError
(
'Document not exist.'
)
if
not
dataset_document
.
enabled
or
dataset_document
.
archived
or
dataset_document
.
indexing_status
!=
'completed'
:
raise
ValueError
(
'Document is not available.'
)
document_segments
=
[]
for
segment
in
content
:
content
=
segment
[
'content'
]
answer
=
segment
[
'answer'
]
doc_id
=
str
(
uuid
.
uuid4
())
segment_hash
=
helper
.
generate_text_hash
(
content
)
# calc embedding use tokens
tokens
=
TokenCalculator
.
get_num_tokens
(
'text-embedding-ada-002'
,
content
)
max_position
=
db
.
session
.
query
(
func
.
max
(
DocumentSegment
.
position
))
.
filter
(
DocumentSegment
.
document_id
==
dataset_document
.
id
)
.
scalar
()
segment_document
=
DocumentSegment
(
tenant_id
=
tenant_id
,
dataset_id
=
dataset_id
,
document_id
=
document_id
,
index_node_id
=
doc_id
,
index_node_hash
=
segment_hash
,
position
=
max_position
+
1
if
max_position
else
1
,
content
=
content
,
word_count
=
len
(
content
),
tokens
=
tokens
,
created_by
=
user_id
,
indexing_at
=
datetime
.
datetime
.
utcnow
(),
status
=
'completed'
,
completed_at
=
datetime
.
datetime
.
utcnow
()
)
if
dataset_document
.
doc_form
==
'qa_model'
:
segment_document
.
answer
=
answer
db
.
session
.
add
(
segment_document
)
document_segments
.
append
(
segment_document
)
# add index to db
indexing_runner
=
IndexingRunner
()
indexing_runner
.
batch_add_segments
(
document_segments
,
dataset
)
db
.
session
.
commit
()
redis_client
.
setex
(
indexing_cache_key
,
600
,
'completed'
)
end_at
=
time
.
perf_counter
()
logging
.
info
(
click
.
style
(
'Segment batch created job: {} latency: {}'
.
format
(
job_id
,
end_at
-
start_at
),
fg
=
'green'
))
except
Exception
as
e
:
logging
.
exception
(
"Segments batch created index failed:{}"
.
format
(
str
(
e
)))
redis_client
.
setex
(
indexing_cache_key
,
600
,
'error'
)
api/tasks/delete_segment_from_index_task.py
0 → 100644
View file @
cee92d9d
import
logging
import
time
import
click
from
celery
import
shared_task
from
werkzeug.exceptions
import
NotFound
from
core.index.index
import
IndexBuilder
from
extensions.ext_database
import
db
from
extensions.ext_redis
import
redis_client
from
models.dataset
import
DocumentSegment
,
Dataset
,
Document
@
shared_task
def
delete_segment_from_index_task
(
segment_id
:
str
,
index_node_id
:
str
,
dataset_id
:
str
,
document_id
:
str
):
"""
Async Remove segment from index
:param segment_id:
:param index_node_id:
:param dataset_id:
:param document_id:
Usage: delete_segment_from_index_task.delay(segment_id)
"""
logging
.
info
(
click
.
style
(
'Start delete segment from index: {}'
.
format
(
segment_id
),
fg
=
'green'
))
start_at
=
time
.
perf_counter
()
indexing_cache_key
=
'segment_{}_delete_indexing'
.
format
(
segment_id
)
try
:
dataset
=
db
.
session
.
query
(
Dataset
)
.
filter
(
Dataset
.
id
==
dataset_id
)
.
first
()
if
not
dataset
:
logging
.
info
(
click
.
style
(
'Segment {} has no dataset, pass.'
.
format
(
segment_id
),
fg
=
'cyan'
))
return
dataset_document
=
db
.
session
.
query
(
Document
)
.
filter
(
Document
.
id
==
document_id
)
.
first
()
if
not
dataset_document
:
logging
.
info
(
click
.
style
(
'Segment {} has no document, pass.'
.
format
(
segment_id
),
fg
=
'cyan'
))
return
if
not
dataset_document
.
enabled
or
dataset_document
.
archived
or
dataset_document
.
indexing_status
!=
'completed'
:
logging
.
info
(
click
.
style
(
'Segment {} document status is invalid, pass.'
.
format
(
segment_id
),
fg
=
'cyan'
))
return
vector_index
=
IndexBuilder
.
get_index
(
dataset
,
'high_quality'
)
kw_index
=
IndexBuilder
.
get_index
(
dataset
,
'economy'
)
# delete from vector index
if
vector_index
:
vector_index
.
delete_by_ids
([
index_node_id
])
# delete from keyword index
kw_index
.
delete_by_ids
([
index_node_id
])
end_at
=
time
.
perf_counter
()
logging
.
info
(
click
.
style
(
'Segment deleted from index: {} latency: {}'
.
format
(
segment_id
,
end_at
-
start_at
),
fg
=
'green'
))
except
Exception
:
logging
.
exception
(
"delete segment from index failed"
)
finally
:
redis_client
.
delete
(
indexing_cache_key
)
api/tasks/
remov
e_segment_from_index_task.py
→
api/tasks/
disabl
e_segment_from_index_task.py
View file @
cee92d9d
...
@@ -12,14 +12,14 @@ from models.dataset import DocumentSegment
...
@@ -12,14 +12,14 @@ from models.dataset import DocumentSegment
@
shared_task
(
queue
=
'dataset'
)
@
shared_task
(
queue
=
'dataset'
)
def
remov
e_segment_from_index_task
(
segment_id
:
str
):
def
disabl
e_segment_from_index_task
(
segment_id
:
str
):
"""
"""
Async
Remov
e segment from index
Async
disabl
e segment from index
:param segment_id:
:param segment_id:
Usage:
remove_segment_from_index
.delay(segment_id)
Usage:
disable_segment_from_index_task
.delay(segment_id)
"""
"""
logging
.
info
(
click
.
style
(
'Start
remov
e segment from index: {}'
.
format
(
segment_id
),
fg
=
'green'
))
logging
.
info
(
click
.
style
(
'Start
disabl
e segment from index: {}'
.
format
(
segment_id
),
fg
=
'green'
))
start_at
=
time
.
perf_counter
()
start_at
=
time
.
perf_counter
()
segment
=
db
.
session
.
query
(
DocumentSegment
)
.
filter
(
DocumentSegment
.
id
==
segment_id
)
.
first
()
segment
=
db
.
session
.
query
(
DocumentSegment
)
.
filter
(
DocumentSegment
.
id
==
segment_id
)
.
first
()
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment