Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
D
dify
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ai-tech
dify
Commits
a1aee4e7
Commit
a1aee4e7
authored
Jun 05, 2023
by
jyong
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add integration dataset and document sync from notion
parent
a842f1c2
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
134 additions
and
46 deletions
+134
-46
data_source_oauth.py
api/controllers/console/auth/data_source_oauth.py
+20
-0
data_source.py
api/controllers/console/datasets/data_source.py
+23
-2
oauth_data_source.py
api/libs/oauth_data_source.py
+23
-0
document_indexing_sync_task.py
api/tasks/document_indexing_sync_task.py
+68
-44
No files found.
api/controllers/console/auth/data_source_oauth.py
View file @
a1aee4e7
...
...
@@ -63,5 +63,25 @@ class OAuthDataSourceCallback(Resource):
return
redirect
(
f
'{current_app.config.get("CONSOLE_URL")}?oauth_data_source=success'
)
class
OAuthDataSourceSync
(
Resource
):
def
get
(
self
,
provider
,
binding_id
):
provider
=
str
(
provider
)
binding_id
=
str
(
binding_id
)
OAUTH_DATASOURCE_PROVIDERS
=
get_oauth_providers
()
with
current_app
.
app_context
():
oauth_provider
=
OAUTH_DATASOURCE_PROVIDERS
.
get
(
provider
)
if
not
oauth_provider
:
return
{
'error'
:
'Invalid provider'
},
400
try
:
oauth_provider
.
sync_data_source
(
binding_id
)
except
requests
.
exceptions
.
HTTPError
as
e
:
logging
.
exception
(
f
"An error occurred during the OAuthCallback process with {provider}: {e.response.text}"
)
return
{
'error'
:
'OAuth data source process failed'
},
400
return
{
'result'
:
'success'
},
200
api
.
add_resource
(
OAuthDataSource
,
'/oauth/data-source/<string:provider>'
)
api
.
add_resource
(
OAuthDataSourceCallback
,
'/oauth/data-source/callback/<string:provider>'
)
api
.
add_resource
(
OAuthDataSourceSync
,
'/oauth/data-source/<string:provider>/<uuid:binding_id>/sync'
)
api/controllers/console/datasets/data_source.py
View file @
a1aee4e7
...
...
@@ -18,6 +18,7 @@ from libs.oauth_data_source import NotionOAuth
from
models.dataset
import
Document
from
models.source
import
DataSourceBinding
from
services.dataset_service
import
DatasetService
,
DocumentService
from
tasks.document_indexing_sync_task
import
document_indexing_sync_task
cache
=
TTLCache
(
maxsize
=
None
,
ttl
=
30
)
...
...
@@ -231,7 +232,7 @@ class DataSourceNotionApi(Resource):
return
response
,
200
class
DataSourceNotionSyncApi
(
Resource
):
class
DataSourceNotion
Dataset
SyncApi
(
Resource
):
@
setup_required
@
login_required
...
...
@@ -244,7 +245,26 @@ class DataSourceNotionSyncApi(Resource):
documents
=
DocumentService
.
get_document_by_dataset_id
(
dataset_id_str
)
for
document
in
documents
:
document_indexing_sync_task
.
delay
(
dataset_id
,
document
.
id
)
return
200
class
DataSourceNotionDocumentSyncApi
(
Resource
):
@
setup_required
@
login_required
@
account_initialization_required
def
get
(
self
,
dataset_id
,
document_id
):
dataset_id_str
=
str
(
dataset_id
)
document_id_str
=
str
(
document_id
)
dataset
=
DatasetService
.
get_dataset
(
dataset_id_str
)
if
dataset
is
None
:
raise
NotFound
(
"Dataset not found."
)
document
=
DocumentService
.
get_document
(
dataset_id_str
,
document_id_str
)
if
document
is
None
:
raise
NotFound
(
"Document not found."
)
document_indexing_sync_task
.
delay
(
dataset_id
,
document
.
id
)
return
200
...
...
@@ -252,4 +272,5 @@ api.add_resource(DataSourceApi, '/data-source/integrates', '/data-source/integra
api
.
add_resource
(
DataSourceNotionListApi
,
'/notion/pre-import/pages'
)
api
.
add_resource
(
DataSourceNotionApi
,
'/notion/workspaces/<uuid:workspace_id>/pages/<uuid:page_id>/preview'
,
'/datasets/notion-indexing-estimate'
)
api
.
add_resource
(
DataSourceNotionSyncApi
,
'/datasets/<uuid:dataset_id>/notion/sync'
)
api
.
add_resource
(
DataSourceNotionDatasetSyncApi
,
'/datasets/<uuid:dataset_id>/notion/sync'
)
api
.
add_resource
(
DataSourceNotionDocumentSyncApi
,
'/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/notion/sync'
)
api/libs/oauth_data_source.py
View file @
a1aee4e7
...
...
@@ -84,6 +84,29 @@ class NotionOAuth(OAuthDataSource):
db
.
session
.
add
(
new_data_source_binding
)
db
.
session
.
commit
()
def
sync_data_source
(
self
,
binding_id
:
str
):
# save data source binding
data_source_binding
=
DataSourceBinding
.
query
.
filter
(
db
.
and_
(
DataSourceBinding
.
tenant_id
==
current_user
.
current_tenant_id
,
DataSourceBinding
.
provider
==
'notion'
,
DataSourceBinding
.
id
==
binding_id
,
DataSourceBinding
.
disabled
==
False
)
)
.
first
()
if
data_source_binding
:
# get all authorized pages
pages
=
self
.
get_authorized_pages
(
data_source_binding
.
access_token
)
source_info
=
json
.
loads
(
data_source_binding
.
source_info
)
source_info
[
'pages'
]
=
pages
source_info
[
'total'
]
=
len
(
pages
)
data_source_binding
.
source_info
=
source_info
data_source_binding
.
disabled
=
False
db
.
session
.
add
(
data_source_binding
)
db
.
session
.
commit
()
else
:
raise
ValueError
(
'Data source binding not found'
)
def
get_authorized_pages
(
self
,
access_token
:
str
):
pages
=
[]
data
=
{
...
...
api/tasks/document_indexing_sync_task.py
View file @
a1aee4e7
...
...
@@ -6,12 +6,14 @@ import click
from
celery
import
shared_task
from
werkzeug.exceptions
import
NotFound
from
core.data_source.notion
import
NotionPageReader
from
core.index.keyword_table_index
import
KeywordTableIndex
from
core.index.vector_index
import
VectorIndex
from
core.indexing_runner
import
IndexingRunner
,
DocumentIsPausedException
from
core.llm.error
import
ProviderTokenNotInitError
from
extensions.ext_database
import
db
from
models.dataset
import
Document
,
Dataset
,
DocumentSegment
from
models.source
import
DataSourceBinding
@
shared_task
...
...
@@ -21,9 +23,9 @@ def document_indexing_sync_task(dataset_id: str, document_id: str):
:param dataset_id:
:param document_id:
Usage: document_indexing_
update
_task.delay(dataset_id, document_id)
Usage: document_indexing_
sync
_task.delay(dataset_id, document_id)
"""
logging
.
info
(
click
.
style
(
'Start
update
document: {}'
.
format
(
document_id
),
fg
=
'green'
))
logging
.
info
(
click
.
style
(
'Start
sync
document: {}'
.
format
(
document_id
),
fg
=
'green'
))
start_at
=
time
.
perf_counter
()
document
=
db
.
session
.
query
(
Document
)
.
filter
(
...
...
@@ -34,52 +36,74 @@ def document_indexing_sync_task(dataset_id: str, document_id: str):
if
not
document
:
raise
NotFound
(
'Document not found'
)
document
.
indexing_status
=
'parsing'
document
.
processing_started_at
=
datetime
.
datetime
.
utcnow
()
db
.
session
.
commit
()
data_source_info
=
document
.
data_source_info_dict
if
document
.
data_source_type
==
'notion_import'
:
if
not
data_source_info
or
'notion_page_id'
not
in
data_source_info
\
or
'notion_workspace_id'
not
in
data_source_info
:
raise
ValueError
(
"no notion page found"
)
workspace_id
=
data_source_info
[
'notion_workspace_id'
]
page_id
=
data_source_info
[
'notion_page_id'
]
page_edited_time
=
data_source_info
[
'last_edited_time'
]
data_source_binding
=
DataSourceBinding
.
query
.
filter
(
db
.
and_
(
DataSourceBinding
.
tenant_id
==
document
.
tenant_id
,
DataSourceBinding
.
provider
==
'notion'
,
DataSourceBinding
.
disabled
==
False
,
DataSourceBinding
.
source_info
[
'workspace_id'
]
==
f
'"{workspace_id}"'
)
)
.
first
()
if
not
data_source_binding
:
raise
ValueError
(
'Data source binding not found.'
)
reader
=
NotionPageReader
(
integration_token
=
data_source_binding
.
access_token
)
last_edited_time
=
reader
.
get_page_last_edited_time
(
page_id
)
# check the page is updated
if
last_edited_time
!=
page_edited_time
:
document
.
indexing_status
=
'parsing'
document
.
processing_started_at
=
datetime
.
datetime
.
utcnow
()
db
.
session
.
commit
()
# delete all document segment and index
try
:
dataset
=
db
.
session
.
query
(
Dataset
)
.
filter
(
Dataset
.
id
==
dataset_id
)
.
first
()
if
not
dataset
:
raise
Exception
(
'Dataset not found'
)
# delete all document segment and index
try
:
dataset
=
db
.
session
.
query
(
Dataset
)
.
filter
(
Dataset
.
id
==
dataset_id
)
.
first
()
if
not
dataset
:
raise
Exception
(
'Dataset not found'
)
vector_index
=
VectorIndex
(
dataset
=
dataset
)
keyword_table_index
=
KeywordTableIndex
(
dataset
=
dataset
)
vector_index
=
VectorIndex
(
dataset
=
dataset
)
keyword_table_index
=
KeywordTableIndex
(
dataset
=
dataset
)
segments
=
db
.
session
.
query
(
DocumentSegment
)
.
filter
(
DocumentSegment
.
document_id
==
document_id
)
.
all
()
index_node_ids
=
[
segment
.
index_node_id
for
segment
in
segments
]
segments
=
db
.
session
.
query
(
DocumentSegment
)
.
filter
(
DocumentSegment
.
document_id
==
document_id
)
.
all
()
index_node_ids
=
[
segment
.
index_node_id
for
segment
in
segments
]
# delete from vector index
vector_index
.
del_nodes
(
index_node_ids
)
# delete from vector index
vector_index
.
del_nodes
(
index_node_ids
)
# delete from keyword index
if
index_node_ids
:
keyword_table_index
.
del_nodes
(
index_node_ids
)
# delete from keyword index
if
index_node_ids
:
keyword_table_index
.
del_nodes
(
index_node_ids
)
for
segment
in
segments
:
db
.
session
.
delete
(
segment
)
for
segment
in
segments
:
db
.
session
.
delete
(
segment
)
end_at
=
time
.
perf_counter
()
logging
.
info
(
click
.
style
(
'Cleaned document when document update data source or process rule: {} latency: {}'
.
format
(
document_id
,
end_at
-
start_at
),
fg
=
'green'
))
except
Exception
:
logging
.
exception
(
"Cleaned document when document update data source or process rule failed"
)
try
:
indexing_runner
=
IndexingRunner
()
indexing_runner
.
run
(
document
)
end_at
=
time
.
perf_counter
()
logging
.
info
(
click
.
style
(
'update document: {} latency: {}'
.
format
(
document
.
id
,
end_at
-
start_at
),
fg
=
'green'
))
except
DocumentIsPausedException
:
logging
.
info
(
click
.
style
(
'Document update paused, document id: {}'
.
format
(
document
.
id
),
fg
=
'yellow'
))
except
ProviderTokenNotInitError
as
e
:
document
.
indexing_status
=
'error'
document
.
error
=
str
(
e
.
description
)
document
.
stopped_at
=
datetime
.
datetime
.
utcnow
()
db
.
session
.
commit
()
except
Exception
as
e
:
logging
.
exception
(
"consume update document failed"
)
document
.
indexing_status
=
'error'
document
.
error
=
str
(
e
)
document
.
stopped_at
=
datetime
.
datetime
.
utcnow
()
db
.
session
.
commit
()
end_at
=
time
.
perf_counter
()
logging
.
info
(
click
.
style
(
'Cleaned document when document update data source or process rule: {} latency: {}'
.
format
(
document_id
,
end_at
-
start_at
),
fg
=
'green'
))
except
Exception
:
logging
.
exception
(
"Cleaned document when document update data source or process rule failed"
)
try
:
indexing_runner
=
IndexingRunner
()
indexing_runner
.
run
(
document
)
end_at
=
time
.
perf_counter
()
logging
.
info
(
click
.
style
(
'update document: {} latency: {}'
.
format
(
document
.
id
,
end_at
-
start_at
),
fg
=
'green'
))
except
DocumentIsPausedException
:
logging
.
info
(
click
.
style
(
'Document update paused, document id: {}'
.
format
(
document
.
id
),
fg
=
'yellow'
))
except
ProviderTokenNotInitError
as
e
:
document
.
indexing_status
=
'error'
document
.
error
=
str
(
e
.
description
)
document
.
stopped_at
=
datetime
.
datetime
.
utcnow
()
db
.
session
.
commit
()
except
Exception
as
e
:
logging
.
exception
(
"consume update document failed"
)
document
.
indexing_status
=
'error'
document
.
error
=
str
(
e
)
document
.
stopped_at
=
datetime
.
datetime
.
utcnow
()
db
.
session
.
commit
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment