Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
D
dify
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ai-tech
dify
Commits
6cc4f47d
Commit
6cc4f47d
authored
Jun 05, 2023
by
Jyong
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add notion sync
parent
a5a61197
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
168 additions
and
149 deletions
+168
-149
datasets_document.py
api/controllers/console/datasets/datasets_document.py
+8
-12
dataset_service.py
api/services/dataset_service.py
+160
-137
No files found.
api/controllers/console/datasets/datasets_document.py
View file @
6cc4f47d
...
...
@@ -101,6 +101,7 @@ class DocumentResource(Resource):
return
documents
class
GetProcessRuleApi
(
Resource
):
@
setup_required
@
login_required
...
...
@@ -364,16 +365,11 @@ class DocumentIndexingStatusApi(DocumentResource):
documents
=
self
.
get_batch_documents
(
dataset_id
,
batch
)
documents_status
=
[]
for
document
in
documents
:
completed_segments
=
DocumentSegment
.
query
\
.
filter
(
DocumentSegment
.
completed_at
.
isnot
(
None
),
DocumentSegment
.
document_id
==
str
(
document
.
id
),
DocumentSegment
.
status
!=
're_segment'
)
\
)
.
count
()
total_segments
=
DocumentSegment
.
query
\
.
filter_by
(
document_id
=
str
(
document
.
id
))
\
.
count
()
completed_segments
=
DocumentSegment
.
query
.
filter
(
DocumentSegment
.
completed_at
.
isnot
(
None
),
DocumentSegment
.
document_id
==
str
(
document
.
id
),
DocumentSegment
.
status
!=
're_segment'
)
.
count
()
total_segments
=
DocumentSegment
.
query
.
filter
(
DocumentSegment
.
document_id
==
str
(
document
.
id
),
DocumentSegment
.
status
!=
're_segment'
)
.
count
()
document
.
completed_segments
=
completed_segments
document
.
total_segments
=
total_segments
documents_status
.
append
(
marshal
(
document
,
self
.
document_status_fields
))
...
...
@@ -427,7 +423,7 @@ class DocumentDetailApi(DocumentResource):
'disabled_by'
:
document
.
disabled_by
,
'archived'
:
document
.
archived
,
'segment_count'
:
document
.
segment_count
,
'average_segment_length'
:
document
.
average_segment_length
,
'average_segment_length'
:
document
.
average_segment_length
,
'hit_count'
:
document
.
hit_count
,
'display_status'
:
document
.
display_status
}
...
...
@@ -447,7 +443,7 @@ class DocumentDetailApi(DocumentResource):
'created_at'
:
document
.
created_at
.
timestamp
(),
'tokens'
:
document
.
tokens
,
'indexing_status'
:
document
.
indexing_status
,
'completed_at'
:
int
(
document
.
completed_at
.
timestamp
())
if
document
.
completed_at
else
None
,
'completed_at'
:
int
(
document
.
completed_at
.
timestamp
())
if
document
.
completed_at
else
None
,
'updated_at'
:
int
(
document
.
updated_at
.
timestamp
())
if
document
.
updated_at
else
None
,
'indexing_latency'
:
document
.
indexing_latency
,
'error'
:
document
.
error
,
...
...
api/services/dataset_service.py
View file @
6cc4f47d
...
...
@@ -382,90 +382,92 @@ class DocumentService:
if
dataset
.
indexing_technique
==
'high_quality'
:
IndexBuilder
.
get_default_service_context
(
dataset
.
tenant_id
)
documents
=
[]
if
'original_document_id'
in
document_data
and
document_data
[
"original_document_id"
]:
document
=
DocumentService
.
update_document_with_dataset_id
(
dataset
,
document_data
,
account
)
# save process rule
if
not
dataset_process_rule
:
process_rule
=
document_data
[
"process_rule"
]
if
process_rule
[
"mode"
]
==
"custom"
:
dataset_process_rule
=
DatasetProcessRule
(
dataset_id
=
dataset
.
id
,
mode
=
process_rule
[
"mode"
],
rules
=
json
.
dumps
(
process_rule
[
"rules"
]),
created_by
=
account
.
id
)
elif
process_rule
[
"mode"
]
==
"automatic"
:
dataset_process_rule
=
DatasetProcessRule
(
dataset_id
=
dataset
.
id
,
mode
=
process_rule
[
"mode"
],
rules
=
json
.
dumps
(
DatasetProcessRule
.
AUTOMATIC_RULES
),
created_by
=
account
.
id
)
db
.
session
.
add
(
dataset_process_rule
)
db
.
session
.
commit
()
position
=
DocumentService
.
get_documents_position
(
dataset
.
id
)
batch
=
time
.
strftime
(
'
%
Y
%
m
%
d
%
H
%
M
%
S'
)
+
str
(
random
.
randint
(
100000
,
999999
))
document_ids
=
[]
documents
=
[]
if
document_data
[
"data_source"
][
"type"
]
==
"upload_file"
:
upload_file_list
=
document_data
[
"data_source"
][
"info"
]
for
upload_file
in
upload_file_list
:
file_id
=
upload_file
[
"upload_file_id"
]
file
=
db
.
session
.
query
(
UploadFile
)
.
filter
(
UploadFile
.
tenant_id
==
dataset
.
tenant_id
,
UploadFile
.
id
==
file_id
)
.
first
()
# raise error if file not found
if
not
file
:
raise
FileNotExistsError
()
file_name
=
file
.
name
data_source_info
=
{
"upload_file_id"
:
file_id
,
}
document
=
DocumentService
.
save_document
(
dataset
,
dataset_process_rule
.
id
,
document_data
[
"data_source"
][
"type"
],
data_source_info
,
created_from
,
position
,
account
,
file_name
,
batch
)
db
.
session
.
add
(
document
)
db
.
session
.
flush
()
document_ids
.
append
(
document
.
id
)
documents
.
append
(
document
)
position
+=
1
elif
document_data
[
"data_source"
][
"type"
]
==
"notion_import"
:
notion_info_list
=
document_data
[
"data_source"
][
'info'
]
for
notion_info
in
notion_info_list
:
workspace_id
=
notion_info
[
'workspace_id'
]
data_source_binding
=
DataSourceBinding
.
query
.
filter
(
db
.
and_
(
DataSourceBinding
.
tenant_id
==
current_user
.
current_tenant_id
,
DataSourceBinding
.
provider
==
'notion'
,
DataSourceBinding
.
disabled
==
False
,
DataSourceBinding
.
source_info
[
'workspace_id'
]
==
f
'"{workspace_id}"'
documents
.
append
(
document
)
else
:
# save process rule
if
not
dataset_process_rule
:
process_rule
=
document_data
[
"process_rule"
]
if
process_rule
[
"mode"
]
==
"custom"
:
dataset_process_rule
=
DatasetProcessRule
(
dataset_id
=
dataset
.
id
,
mode
=
process_rule
[
"mode"
],
rules
=
json
.
dumps
(
process_rule
[
"rules"
]),
created_by
=
account
.
id
)
)
.
first
()
if
not
data_source_binding
:
raise
ValueError
(
'Data source binding not found.'
)
for
page
in
notion_info
[
'pages'
]:
elif
process_rule
[
"mode"
]
==
"automatic"
:
dataset_process_rule
=
DatasetProcessRule
(
dataset_id
=
dataset
.
id
,
mode
=
process_rule
[
"mode"
],
rules
=
json
.
dumps
(
DatasetProcessRule
.
AUTOMATIC_RULES
),
created_by
=
account
.
id
)
db
.
session
.
add
(
dataset_process_rule
)
db
.
session
.
commit
()
position
=
DocumentService
.
get_documents_position
(
dataset
.
id
)
batch
=
time
.
strftime
(
'
%
Y
%
m
%
d
%
H
%
M
%
S'
)
+
str
(
random
.
randint
(
100000
,
999999
))
document_ids
=
[]
if
document_data
[
"data_source"
][
"type"
]
==
"upload_file"
:
upload_file_list
=
document_data
[
"data_source"
][
"info"
]
for
upload_file
in
upload_file_list
:
file_id
=
upload_file
[
"upload_file_id"
]
file
=
db
.
session
.
query
(
UploadFile
)
.
filter
(
UploadFile
.
tenant_id
==
dataset
.
tenant_id
,
UploadFile
.
id
==
file_id
)
.
first
()
# raise error if file not found
if
not
file
:
raise
FileNotExistsError
()
file_name
=
file
.
name
data_source_info
=
{
"notion_workspace_id"
:
workspace_id
,
"notion_page_id"
:
page
[
'page_id'
],
"upload_file_id"
:
file_id
,
}
document
=
DocumentService
.
save_document
(
dataset
,
dataset_process_rule
.
id
,
document_data
[
"data_source"
][
"type"
],
data_source_info
,
created_from
,
position
,
account
,
page
[
'page_name'
]
,
batch
)
account
,
file_name
,
batch
)
db
.
session
.
add
(
document
)
db
.
session
.
flush
()
document_ids
.
append
(
document
.
id
)
documents
.
append
(
document
)
position
+=
1
elif
document_data
[
"data_source"
][
"type"
]
==
"notion_import"
:
notion_info_list
=
document_data
[
"data_source"
][
'info'
]
for
notion_info
in
notion_info_list
:
workspace_id
=
notion_info
[
'workspace_id'
]
data_source_binding
=
DataSourceBinding
.
query
.
filter
(
db
.
and_
(
DataSourceBinding
.
tenant_id
==
current_user
.
current_tenant_id
,
DataSourceBinding
.
provider
==
'notion'
,
DataSourceBinding
.
disabled
==
False
,
DataSourceBinding
.
source_info
[
'workspace_id'
]
==
f
'"{workspace_id}"'
)
)
.
first
()
if
not
data_source_binding
:
raise
ValueError
(
'Data source binding not found.'
)
for
page
in
notion_info
[
'pages'
]:
data_source_info
=
{
"notion_workspace_id"
:
workspace_id
,
"notion_page_id"
:
page
[
'page_id'
],
}
document
=
DocumentService
.
save_document
(
dataset
,
dataset_process_rule
.
id
,
document_data
[
"data_source"
][
"type"
],
data_source_info
,
created_from
,
position
,
account
,
page
[
'page_name'
],
batch
)
db
.
session
.
add
(
document
)
db
.
session
.
flush
()
document_ids
.
append
(
document
.
id
)
documents
.
append
(
document
)
position
+=
1
db
.
session
.
commit
()
db
.
session
.
commit
()
# trigger async task
document_indexing_task
.
delay
(
dataset
.
id
,
document_ids
)
# trigger async task
document_indexing_task
.
delay
(
dataset
.
id
,
document_ids
)
return
documents
...
...
@@ -486,76 +488,97 @@ class DocumentService:
)
return
document
@
staticmethod
def
update_document_with_dataset_id
(
dataset
:
Dataset
,
document_data
:
dict
,
account
:
Account
,
dataset_process_rule
:
Optional
[
DatasetProcessRule
]
=
None
,
created_from
:
str
=
'web'
):
document
=
DocumentService
.
get_document
(
dataset
.
id
,
document_data
[
"original_document_id"
])
if
document
.
display_status
!=
'available'
:
raise
ValueError
(
"Document is not available"
)
# save process rule
if
'process_rule'
in
document_data
and
document_data
[
'process_rule'
]:
process_rule
=
document_data
[
"process_rule"
]
if
process_rule
[
"mode"
]
==
"custom"
:
dataset_process_rule
=
DatasetProcessRule
(
dataset_id
=
dataset
.
id
,
mode
=
process_rule
[
"mode"
],
rules
=
json
.
dumps
(
process_rule
[
"rules"
]),
created_by
=
account
.
id
)
elif
process_rule
[
"mode"
]
==
"automatic"
:
dataset_process_rule
=
DatasetProcessRule
(
dataset_id
=
dataset
.
id
,
mode
=
process_rule
[
"mode"
],
rules
=
json
.
dumps
(
DatasetProcessRule
.
AUTOMATIC_RULES
),
created_by
=
account
.
id
)
db
.
session
.
add
(
dataset_process_rule
)
@
staticmethod
def
update_document_with_dataset_id
(
dataset
:
Dataset
,
document_data
:
dict
,
account
:
Account
,
dataset_process_rule
:
Optional
[
DatasetProcessRule
]
=
None
,
created_from
:
str
=
'web'
):
document
=
DocumentService
.
get_document
(
dataset
.
id
,
document_data
[
"original_document_id"
])
if
document
.
display_status
!=
'available'
:
raise
ValueError
(
"Document is not available"
)
# save process rule
if
'process_rule'
in
document_data
and
document_data
[
'process_rule'
]:
process_rule
=
document_data
[
"process_rule"
]
if
process_rule
[
"mode"
]
==
"custom"
:
dataset_process_rule
=
DatasetProcessRule
(
dataset_id
=
dataset
.
id
,
mode
=
process_rule
[
"mode"
],
rules
=
json
.
dumps
(
process_rule
[
"rules"
]),
created_by
=
account
.
id
)
elif
process_rule
[
"mode"
]
==
"automatic"
:
dataset_process_rule
=
DatasetProcessRule
(
dataset_id
=
dataset
.
id
,
mode
=
process_rule
[
"mode"
],
rules
=
json
.
dumps
(
DatasetProcessRule
.
AUTOMATIC_RULES
),
created_by
=
account
.
id
)
db
.
session
.
add
(
dataset_process_rule
)
db
.
session
.
commit
()
document
.
dataset_process_rule_id
=
dataset_process_rule
.
id
# update document data source
if
'data_source'
in
document_data
and
document_data
[
'data_source'
]:
file_name
=
''
data_source_info
=
{}
if
document_data
[
"data_source"
][
"type"
]
==
"upload_file"
:
upload_file_list
=
document_data
[
"data_source"
][
"info"
]
for
upload_file
in
upload_file_list
:
file_id
=
upload_file
[
"upload_file_id"
]
file
=
db
.
session
.
query
(
UploadFile
)
.
filter
(
UploadFile
.
tenant_id
==
dataset
.
tenant_id
,
UploadFile
.
id
==
file_id
)
.
first
()
# raise error if file not found
if
not
file
:
raise
FileNotExistsError
()
file_name
=
file
.
name
data_source_info
=
{
"upload_file_id"
:
file_id
,
}
elif
document_data
[
"data_source"
][
"type"
]
==
"notion_import"
:
notion_info_list
=
document_data
[
"data_source"
][
'info'
]
for
notion_info
in
notion_info_list
:
workspace_id
=
notion_info
[
'workspace_id'
]
data_source_binding
=
DataSourceBinding
.
query
.
filter
(
db
.
and_
(
DataSourceBinding
.
tenant_id
==
current_user
.
current_tenant_id
,
DataSourceBinding
.
provider
==
'notion'
,
DataSourceBinding
.
disabled
==
False
,
DataSourceBinding
.
source_info
[
'workspace_id'
]
==
f
'"{workspace_id}"'
)
)
.
first
()
if
not
data_source_binding
:
raise
ValueError
(
'Data source binding not found.'
)
for
page
in
notion_info
[
'pages'
]:
data_source_info
=
{
"notion_workspace_id"
:
workspace_id
,
"notion_page_id"
:
page
[
'page_id'
],
}
document
.
data_source_type
=
document_data
[
"data_source"
][
"type"
]
document
.
data_source_info
=
json
.
dumps
(
data_source_info
)
document
.
name
=
file_name
# update document to be waiting
document
.
indexing_status
=
'waiting'
document
.
completed_at
=
None
document
.
processing_started_at
=
None
document
.
parsing_completed_at
=
None
document
.
cleaning_completed_at
=
None
document
.
splitting_completed_at
=
None
document
.
updated_at
=
datetime
.
datetime
.
utcnow
()
document
.
created_from
=
created_from
db
.
session
.
add
(
document
)
db
.
session
.
commit
()
document
.
dataset_process_rule_id
=
dataset_process_rule
.
id
# update document data source
if
'data_source'
in
document_data
and
document_data
[
'data_source'
]:
file_name
=
''
data_source_info
=
{}
if
document_data
[
"data_source"
][
"type"
]
==
"upload_file"
:
file_id
=
document_data
[
"data_source"
][
"info"
]
file
=
db
.
session
.
query
(
UploadFile
)
.
filter
(
UploadFile
.
tenant_id
==
dataset
.
tenant_id
,
UploadFile
.
id
==
file_id
)
.
first
()
# raise error if file not found
if
not
file
:
raise
FileNotExistsError
()
file_name
=
file
.
name
data_source_info
=
{
"upload_file_id"
:
file_id
,
}
document
.
data_source_type
=
document_data
[
"data_source"
][
"type"
]
document
.
data_source_info
=
json
.
dumps
(
data_source_info
)
document
.
name
=
file_name
# update document to be waiting
document
.
indexing_status
=
'waiting'
document
.
completed_at
=
None
document
.
processing_started_at
=
None
document
.
parsing_completed_at
=
None
document
.
cleaning_completed_at
=
None
document
.
splitting_completed_at
=
None
document
.
updated_at
=
datetime
.
datetime
.
utcnow
()
document
.
created_from
=
created_from
db
.
session
.
add
(
document
)
db
.
session
.
commit
()
# update document segment
update_params
=
{
DocumentSegment
.
status
:
're_segment'
}
DocumentSegment
.
query
.
filter_by
(
document_id
=
document
.
id
)
.
update
(
update_params
)
db
.
session
.
commit
()
# trigger async task
document_indexing_update_task
.
delay
(
document
.
dataset_id
,
document
.
id
)
# update document segment
update_params
=
{
DocumentSegment
.
status
:
're_segment'
}
DocumentSegment
.
query
.
filter_by
(
document_id
=
document
.
id
)
.
update
(
update_params
)
db
.
session
.
commit
()
# trigger async task
document_indexing_update_task
.
delay
(
document
.
dataset_id
,
document
.
id
)
return
document
return
document
@
staticmethod
def
save_document_without_dataset_id
(
tenant_id
:
str
,
document_data
:
dict
,
account
:
Account
):
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment