Commit 13937fc1 authored by Yeuoly's avatar Yeuoly Committed by takatost

feat: code

parent 17cd5122
......@@ -132,3 +132,7 @@ SSRF_PROXY_HTTP_URL=
SSRF_PROXY_HTTPS_URL=
BATCH_UPLOAD_LIMIT=10
# CODE EXECUTION CONFIGURATION
CODE_EXECUTION_ENDPOINT=
CODE_EXECUTINO_API_KEY=
......@@ -59,7 +59,9 @@ DEFAULTS = {
'CAN_REPLACE_LOGO': 'False',
'ETL_TYPE': 'dify',
'KEYWORD_STORE': 'jieba',
'BATCH_UPLOAD_LIMIT': 20
'BATCH_UPLOAD_LIMIT': 20,
'CODE_EXECUTION_ENDPOINT': '',
'CODE_EXECUTION_API_KEY': ''
}
......@@ -293,6 +295,9 @@ class Config:
self.BATCH_UPLOAD_LIMIT = get_env('BATCH_UPLOAD_LIMIT')
self.CODE_EXECUTION_ENDPOINT = get_env('CODE_EXECUTION_ENDPOINT')
self.CODE_EXECUTION_API_KEY = get_env('CODE_EXECUTION_API_KEY')
self.API_COMPRESSION_ENABLED = get_bool_env('API_COMPRESSION_ENABLED')
......
from os import environ
from httpx import post
from yarl import URL
from pydantic import BaseModel
from core.workflow.nodes.code.python_template import PythonTemplateTransformer
# Code Executor
CODE_EXECUTION_ENDPOINT = environ.get('CODE_EXECUTION_ENDPOINT', '')
CODE_EXECUTION_API_KEY = environ.get('CODE_EXECUTION_API_KEY', '')
class CodeExecutionException(Exception):
pass
class CodeExecutionResponse(BaseModel):
class Data(BaseModel):
stdout: str
stderr: str
code: int
message: str
data: Data
class CodeExecutor:
@classmethod
def execute_code(cls, language: str, code: str, inputs: dict) -> dict:
"""
Execute code
:param language: code language
:param code: code
:param inputs: inputs
:return:
"""
runner = PythonTemplateTransformer.transform_caller(code, inputs)
url = URL(CODE_EXECUTION_ENDPOINT) / 'v1' / 'sandbox' / 'run'
headers = {
'X-Api-Key': CODE_EXECUTION_API_KEY
}
data = {
'language': language,
'code': runner,
}
try:
response = post(str(url), json=data, headers=headers)
if response.status_code == 503:
raise CodeExecutionException('Code execution service is unavailable')
elif response.status_code != 200:
raise Exception('Failed to execute code')
except CodeExecutionException as e:
raise e
except Exception:
raise CodeExecutionException('Failed to execute code')
try:
response = response.json()
except:
raise CodeExecutionException('Failed to parse response')
response = CodeExecutionResponse(**response)
if response.code != 0:
raise CodeExecutionException(response.message)
if response.data.stderr:
raise CodeExecutionException(response.data.stderr)
return PythonTemplateTransformer.transform_response(response.data.stdout)
\ No newline at end of file
from typing import Optional
from typing import Optional, cast, Union
from core.workflow.entities.node_entities import NodeRunResult, NodeType
from core.workflow.entities.variable_pool import VariablePool
from core.workflow.nodes.base_node import BaseNode
from core.workflow.nodes.code.entities import CodeNodeData
from core.workflow.nodes.code.code_executor import CodeExecutor, CodeExecutionException
from models.workflow import WorkflowNodeExecutionStatus
MAX_NUMBER = 2 ** 63 - 1
MIN_NUMBER = -2 ** 63
MAX_PRECISION = 20
MAX_DEPTH = 5
MAX_STRING_LENGTH = 1000
MAX_STRING_ARRAY_LENGTH = 30
class CodeNode(BaseNode):
_node_data_cls = CodeNodeData
node_type = NodeType.CODE
@classmethod
def get_default_config(cls, filters: Optional[dict] = None) -> dict:
"""
......@@ -62,3 +76,167 @@ class CodeNode(BaseNode):
]
}
}
def _run(self, variable_pool: Optional[VariablePool] = None,
run_args: Optional[dict] = None) -> NodeRunResult:
"""
Run code
:param variable_pool: variable pool
:param run_args: run args
:return:
"""
node_data = self.node_data
node_data: CodeNodeData = cast(self._node_data_cls, node_data)
# SINGLE DEBUG NOT IMPLEMENTED YET
if variable_pool is None and run_args:
raise ValueError("Not support single step debug.")
# Get code language
code_language = node_data.code_language
code = node_data.code
# Get variables
variables = {}
for variable_selector in node_data.variables:
variable = variable_selector.variable
value = variable_pool.get_variable_value(
variable_selector=variable_selector.value_selector
)
variables[variable] = value
# Run code
try:
result = CodeExecutor.execute_code(
language=code_language,
code=code,
inputs=variables
)
except CodeExecutionException as e:
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
error=str(e)
)
# Transform result
result = self._transform_result(result, node_data.outputs)
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs=variables,
outputs=result
)
def _check_string(self, value: str, variable: str) -> str:
"""
Check string
:param value: value
:param variable: variable
:param max_length: max length
:return:
"""
if not isinstance(value, str):
raise ValueError(f"{variable} in input form must be a string")
if len(value) > MAX_STRING_LENGTH:
raise ValueError(f'{variable} in input form must be less than {MAX_STRING_LENGTH} characters')
return value.replace('\x00', '')
def _check_number(self, value: Union[int, float], variable: str) -> Union[int, float]:
"""
Check number
:param value: value
:param variable: variable
:return:
"""
if not isinstance(value, (int, float)):
raise ValueError(f"{variable} in input form must be a number")
if value > MAX_NUMBER or value < MIN_NUMBER:
raise ValueError(f'{variable} in input form is out of range.')
if isinstance(value, float):
value = round(value, MAX_PRECISION)
return value
def _transform_result(self, result: dict, output_schema: dict[str, CodeNodeData.Output],
prefix: str = '',
depth: int = 1) -> dict:
"""
Transform result
:param result: result
:param output_schema: output schema
:return:
"""
if depth > MAX_DEPTH:
raise ValueError("Depth limit reached, object too deep.")
transformed_result = {}
for output_name, output_config in output_schema.items():
if output_config.type == 'object':
# check if output is object
if not isinstance(result.get(output_name), dict):
raise ValueError(
f'Output {prefix}.{output_name} is not an object, got {type(result.get(output_name))} instead.'
)
transformed_result[output_name] = self._transform_result(
result=result[output_name],
output_schema=output_config.children,
prefix=f'{prefix}.{output_name}' if prefix else output_name,
depth=depth + 1
)
elif output_config.type == 'number':
# check if number available
transformed_result[output_name] = self._check_number(
value=result[output_name],
variable=f'{prefix}.{output_name}' if prefix else output_name
)
transformed_result[output_name] = result[output_name]
elif output_config.type == 'string':
# check if string available
transformed_result[output_name] = self._check_string(
value=result[output_name],
variable=f'{prefix}.{output_name}' if prefix else output_name,
)
elif output_config.type == 'array[number]':
# check if array of number available
if not isinstance(result[output_name], list):
raise ValueError(
f'Output {prefix}.{output_name} is not an array, got {type(result.get(output_name))} instead.'
)
transformed_result[output_name] = [
self._check_number(
value=value,
variable=f'{prefix}.{output_name}' if prefix else output_name
)
for value in result[output_name]
]
elif output_config.type == 'array[string]':
# check if array of string available
if not isinstance(result[output_name], list):
raise ValueError(
f'Output {prefix}.{output_name} is not an array, got {type(result.get(output_name))} instead.'
)
if len(result[output_name]) > MAX_STRING_ARRAY_LENGTH:
raise ValueError(
f'{prefix}.{output_name} in input form must be less than {MAX_STRING_ARRAY_LENGTH} characters'
)
transformed_result[output_name] = [
self._check_string(
value=value,
variable=f'{prefix}.{output_name}' if prefix else output_name
)
for value in result[output_name]
]
else:
raise ValueError(f'Output type {output_config.type} is not supported.')
return transformed_result
\ No newline at end of file
from core.workflow.entities.base_node_data_entities import BaseNodeData
from core.workflow.entities.variable_entities import VariableSelector
from pydantic import BaseModel
from typing import Literal, Union
class CodeNodeData(BaseNodeData):
"""
Code Node Data.
"""
class Output(BaseModel):
type: Literal['string', 'number', 'object', 'array[string]', 'array[number]']
children: Union[None, dict[str, 'Output']]
variables: list[VariableSelector]
answer: str
code_language: str
code: str
outputs: dict[str, Output]
import json
import re
PYTHON_RUNNER = """# declare main function here
{{code}}
# execute main function, and return the result
# inputs is a dict, and it
output = main(**{{inputs}})
# convert output to json and print
result = '''
<<RESULT>>
{output}
<<RESULT>>
'''
print(result)
"""
class PythonTemplateTransformer:
@classmethod
def transform_caller(cls, code: str, inputs: dict) -> str:
"""
Transform code to python runner
:param code: code
:param inputs: inputs
:return:
"""
# transform inputs to json string
inputs_str = json.dumps(inputs, indent=4)
# replace code and inputs
runner = PYTHON_RUNNER.replace('{{code}}', code)
runner = runner.replace('{{inputs}}', inputs_str)
return runner
@classmethod
def transform_response(cls, response: str) -> dict:
"""
Transform response to dict
:param response: response
:return:
"""
# extract result
result = re.search(r'<<RESULT>>(.*)<<RESULT>>', response, re.DOTALL)
if not result:
raise ValueError('Failed to parse result')
result = result.group(1)
return json.loads(result)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment