Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
D
dify
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ai-tech
dify
Commits
1507b0b1
Commit
1507b0b1
authored
Jul 18, 2023
by
jyong
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add update segment and support qa segment
parent
2b150ffd
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
208 additions
and
0 deletions
+208
-0
create_segment_to_index_task.py
api/tasks/create_segment_to_index_task.py
+98
-0
update_segment_index_task.py
api/tasks/update_segment_index_task.py
+110
-0
No files found.
api/tasks/create_segment_to_index_task.py
0 → 100644
View file @
1507b0b1
import
datetime
import
logging
import
time
import
click
from
celery
import
shared_task
from
langchain.schema
import
Document
from
werkzeug.exceptions
import
NotFound
from
core.index.index
import
IndexBuilder
from
extensions.ext_database
import
db
from
extensions.ext_redis
import
redis_client
from
models.dataset
import
DocumentSegment
@
shared_task
def
create_segment_to_index_task
(
segment_id
:
str
):
"""
Async create segment to index
:param segment_id:
Usage: create_segment_to_index_task.delay(segment_id)
"""
logging
.
info
(
click
.
style
(
'Start create segment to index: {}'
.
format
(
segment_id
),
fg
=
'green'
))
start_at
=
time
.
perf_counter
()
segment
=
db
.
session
.
query
(
DocumentSegment
)
.
filter
(
DocumentSegment
.
id
==
segment_id
)
.
first
()
if
not
segment
:
raise
NotFound
(
'Segment not found'
)
if
segment
.
status
!=
'waiting'
:
return
indexing_cache_key
=
'segment_{}_indexing'
.
format
(
segment
.
id
)
try
:
# update segment status to indexing
update_params
=
{
DocumentSegment
.
status
:
"indexing"
,
DocumentSegment
.
indexing_at
:
datetime
.
datetime
.
utcnow
()
}
DocumentSegment
.
query
.
filter_by
(
id
=
segment
.
id
)
.
update
(
update_params
)
db
.
session
.
commit
()
document
=
Document
(
page_content
=
segment
.
content
,
metadata
=
{
"doc_id"
:
segment
.
index_node_id
,
"doc_hash"
:
segment
.
index_node_hash
,
"document_id"
:
segment
.
document_id
,
"dataset_id"
:
segment
.
dataset_id
,
}
)
dataset
=
segment
.
dataset
if
not
dataset
:
logging
.
info
(
click
.
style
(
'Segment {} has no dataset, pass.'
.
format
(
segment
.
id
),
fg
=
'cyan'
))
return
dataset_document
=
segment
.
document
if
not
dataset_document
:
logging
.
info
(
click
.
style
(
'Segment {} has no document, pass.'
.
format
(
segment
.
id
),
fg
=
'cyan'
))
return
if
not
dataset_document
.
enabled
or
dataset_document
.
archived
or
dataset_document
.
indexing_status
!=
'completed'
:
logging
.
info
(
click
.
style
(
'Segment {} document status is invalid, pass.'
.
format
(
segment
.
id
),
fg
=
'cyan'
))
return
# save vector index
index
=
IndexBuilder
.
get_index
(
dataset
,
'high_quality'
)
if
index
:
index
.
add_texts
([
document
],
duplicate_check
=
True
)
# save keyword index
index
=
IndexBuilder
.
get_index
(
dataset
,
'economy'
)
if
index
:
index
.
add_texts
([
document
])
# update segment to completed
update_params
=
{
DocumentSegment
.
status
:
"completed"
,
DocumentSegment
.
completed_at
:
datetime
.
datetime
.
utcnow
()
}
DocumentSegment
.
query
.
filter_by
(
id
=
segment
.
id
)
.
update
(
update_params
)
db
.
session
.
commit
()
end_at
=
time
.
perf_counter
()
logging
.
info
(
click
.
style
(
'Segment created to index: {} latency: {}'
.
format
(
segment
.
id
,
end_at
-
start_at
),
fg
=
'green'
))
except
Exception
as
e
:
logging
.
exception
(
"create segment to index failed"
)
segment
.
enabled
=
False
segment
.
disabled_at
=
datetime
.
datetime
.
utcnow
()
segment
.
status
=
'error'
segment
.
error
=
str
(
e
)
db
.
session
.
commit
()
finally
:
redis_client
.
delete
(
indexing_cache_key
)
api/tasks/update_segment_index_task.py
0 → 100644
View file @
1507b0b1
import
datetime
import
logging
import
time
import
click
from
celery
import
shared_task
from
langchain.schema
import
Document
from
werkzeug.exceptions
import
NotFound
from
core.index.index
import
IndexBuilder
from
extensions.ext_database
import
db
from
extensions.ext_redis
import
redis_client
from
models.dataset
import
DocumentSegment
@
shared_task
def
update_segment_index_task
(
segment_id
:
str
):
"""
Async update segment index
:param segment_id:
Usage: update_segment_index_task.delay(segment_id)
"""
logging
.
info
(
click
.
style
(
'Start update segment index: {}'
.
format
(
segment_id
),
fg
=
'green'
))
start_at
=
time
.
perf_counter
()
segment
=
db
.
session
.
query
(
DocumentSegment
)
.
filter
(
DocumentSegment
.
id
==
segment_id
)
.
first
()
if
not
segment
:
raise
NotFound
(
'Segment not found'
)
if
segment
.
status
!=
'updating'
:
return
indexing_cache_key
=
'segment_{}_indexing'
.
format
(
segment
.
id
)
try
:
dataset
=
segment
.
dataset
if
not
dataset
:
logging
.
info
(
click
.
style
(
'Segment {} has no dataset, pass.'
.
format
(
segment
.
id
),
fg
=
'cyan'
))
return
dataset_document
=
segment
.
document
if
not
dataset_document
:
logging
.
info
(
click
.
style
(
'Segment {} has no document, pass.'
.
format
(
segment
.
id
),
fg
=
'cyan'
))
return
if
not
dataset_document
.
enabled
or
dataset_document
.
archived
or
dataset_document
.
indexing_status
!=
'completed'
:
logging
.
info
(
click
.
style
(
'Segment {} document status is invalid, pass.'
.
format
(
segment
.
id
),
fg
=
'cyan'
))
return
# update segment status to indexing
update_params
=
{
DocumentSegment
.
status
:
"indexing"
,
DocumentSegment
.
indexing_at
:
datetime
.
datetime
.
utcnow
()
}
DocumentSegment
.
query
.
filter_by
(
id
=
segment
.
id
)
.
update
(
update_params
)
db
.
session
.
commit
()
vector_index
=
IndexBuilder
.
get_index
(
dataset
,
'high_quality'
)
kw_index
=
IndexBuilder
.
get_index
(
dataset
,
'economy'
)
# delete from vector index
if
vector_index
:
vector_index
.
delete_by_ids
([
segment
.
index_node_id
])
# delete from keyword index
kw_index
.
delete_by_ids
([
segment
.
index_node_id
])
# add new index
document
=
Document
(
page_content
=
segment
.
content
,
metadata
=
{
"doc_id"
:
segment
.
index_node_id
,
"doc_hash"
:
segment
.
index_node_hash
,
"document_id"
:
segment
.
document_id
,
"dataset_id"
:
segment
.
dataset_id
,
}
)
# save vector index
index
=
IndexBuilder
.
get_index
(
dataset
,
'high_quality'
)
if
index
:
index
.
add_texts
([
document
],
duplicate_check
=
True
)
# save keyword index
index
=
IndexBuilder
.
get_index
(
dataset
,
'economy'
)
if
index
:
index
.
add_texts
([
document
])
# update segment to completed
update_params
=
{
DocumentSegment
.
status
:
"completed"
,
DocumentSegment
.
completed_at
:
datetime
.
datetime
.
utcnow
()
}
DocumentSegment
.
query
.
filter_by
(
id
=
segment
.
id
)
.
update
(
update_params
)
db
.
session
.
commit
()
end_at
=
time
.
perf_counter
()
logging
.
info
(
click
.
style
(
'Segment update index: {} latency: {}'
.
format
(
segment
.
id
,
end_at
-
start_at
),
fg
=
'green'
))
except
Exception
as
e
:
logging
.
exception
(
"update segment index failed"
)
segment
.
enabled
=
False
segment
.
disabled_at
=
datetime
.
datetime
.
utcnow
()
segment
.
status
=
'error'
segment
.
error
=
str
(
e
)
db
.
session
.
commit
()
finally
:
redis_client
.
delete
(
indexing_cache_key
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment