Workflow-Engine / api /services /tools /tools_transform_service.py
Severian's picture
initial commit
a8b3f00
raw
history blame
10.9 kB
import json
import logging
from typing import Optional, Union
from configs import dify_config
from core.tools.entities.api_entities import UserTool, UserToolProvider
from core.tools.entities.common_entities import I18nObject
from core.tools.entities.tool_bundle import ApiToolBundle
from core.tools.entities.tool_entities import (
ApiProviderAuthType,
ToolParameter,
ToolProviderCredentials,
ToolProviderType,
)
from core.tools.provider.api_tool_provider import ApiToolProviderController
from core.tools.provider.builtin_tool_provider import BuiltinToolProviderController
from core.tools.provider.workflow_tool_provider import WorkflowToolProviderController
from core.tools.tool.tool import Tool
from core.tools.tool.workflow_tool import WorkflowTool
from core.tools.utils.configuration import ToolConfigurationManager
from models.tools import ApiToolProvider, BuiltinToolProvider, WorkflowToolProvider
logger = logging.getLogger(__name__)
class ToolTransformService:
@staticmethod
def get_tool_provider_icon_url(provider_type: str, provider_name: str, icon: str) -> Union[str, dict]:
"""
get tool provider icon url
"""
url_prefix = dify_config.CONSOLE_API_URL + "/console/api/workspaces/current/tool-provider/"
if provider_type == ToolProviderType.BUILT_IN.value:
return url_prefix + "builtin/" + provider_name + "/icon"
elif provider_type in {ToolProviderType.API.value, ToolProviderType.WORKFLOW.value}:
try:
return json.loads(icon)
except:
return {"background": "#252525", "content": "\ud83d\ude01"}
return ""
@staticmethod
def repack_provider(provider: Union[dict, UserToolProvider]):
"""
repack provider
:param provider: the provider dict
"""
if isinstance(provider, dict) and "icon" in provider:
provider["icon"] = ToolTransformService.get_tool_provider_icon_url(
provider_type=provider["type"], provider_name=provider["name"], icon=provider["icon"]
)
elif isinstance(provider, UserToolProvider):
provider.icon = ToolTransformService.get_tool_provider_icon_url(
provider_type=provider.type.value, provider_name=provider.name, icon=provider.icon
)
@staticmethod
def builtin_provider_to_user_provider(
provider_controller: BuiltinToolProviderController,
db_provider: Optional[BuiltinToolProvider],
decrypt_credentials: bool = True,
) -> UserToolProvider:
"""
convert provider controller to user provider
"""
result = UserToolProvider(
id=provider_controller.identity.name,
author=provider_controller.identity.author,
name=provider_controller.identity.name,
description=I18nObject(
en_US=provider_controller.identity.description.en_US,
zh_Hans=provider_controller.identity.description.zh_Hans,
pt_BR=provider_controller.identity.description.pt_BR,
ja_JP=provider_controller.identity.description.ja_JP,
),
icon=provider_controller.identity.icon,
label=I18nObject(
en_US=provider_controller.identity.label.en_US,
zh_Hans=provider_controller.identity.label.zh_Hans,
pt_BR=provider_controller.identity.label.pt_BR,
ja_JP=provider_controller.identity.label.ja_JP,
),
type=ToolProviderType.BUILT_IN,
masked_credentials={},
is_team_authorization=False,
tools=[],
labels=provider_controller.tool_labels,
)
# get credentials schema
schema = provider_controller.get_credentials_schema()
for name, value in schema.items():
result.masked_credentials[name] = ToolProviderCredentials.CredentialsType.default(value.type)
# check if the provider need credentials
if not provider_controller.need_credentials:
result.is_team_authorization = True
result.allow_delete = False
elif db_provider:
result.is_team_authorization = True
if decrypt_credentials:
credentials = db_provider.credentials
# init tool configuration
tool_configuration = ToolConfigurationManager(
tenant_id=db_provider.tenant_id, provider_controller=provider_controller
)
# decrypt the credentials and mask the credentials
decrypted_credentials = tool_configuration.decrypt_tool_credentials(credentials=credentials)
masked_credentials = tool_configuration.mask_tool_credentials(credentials=decrypted_credentials)
result.masked_credentials = masked_credentials
result.original_credentials = decrypted_credentials
return result
@staticmethod
def api_provider_to_controller(
db_provider: ApiToolProvider,
) -> ApiToolProviderController:
"""
convert provider controller to user provider
"""
# package tool provider controller
controller = ApiToolProviderController.from_db(
db_provider=db_provider,
auth_type=ApiProviderAuthType.API_KEY
if db_provider.credentials["auth_type"] == "api_key"
else ApiProviderAuthType.NONE,
)
return controller
@staticmethod
def workflow_provider_to_controller(db_provider: WorkflowToolProvider) -> WorkflowToolProviderController:
"""
convert provider controller to provider
"""
return WorkflowToolProviderController.from_db(db_provider)
@staticmethod
def workflow_provider_to_user_provider(
provider_controller: WorkflowToolProviderController, labels: Optional[list[str]] = None
):
"""
convert provider controller to user provider
"""
return UserToolProvider(
id=provider_controller.provider_id,
author=provider_controller.identity.author,
name=provider_controller.identity.name,
description=I18nObject(
en_US=provider_controller.identity.description.en_US,
zh_Hans=provider_controller.identity.description.zh_Hans,
),
icon=provider_controller.identity.icon,
label=I18nObject(
en_US=provider_controller.identity.label.en_US,
zh_Hans=provider_controller.identity.label.zh_Hans,
),
type=ToolProviderType.WORKFLOW,
masked_credentials={},
is_team_authorization=True,
tools=[],
labels=labels or [],
)
@staticmethod
def api_provider_to_user_provider(
provider_controller: ApiToolProviderController,
db_provider: ApiToolProvider,
decrypt_credentials: bool = True,
labels: Optional[list[str]] = None,
) -> UserToolProvider:
"""
convert provider controller to user provider
"""
username = "Anonymous"
try:
username = db_provider.user.name
except Exception as e:
logger.error(f"failed to get user name for api provider {db_provider.id}: {str(e)}")
# add provider into providers
credentials = db_provider.credentials
result = UserToolProvider(
id=db_provider.id,
author=username,
name=db_provider.name,
description=I18nObject(
en_US=db_provider.description,
zh_Hans=db_provider.description,
),
icon=db_provider.icon,
label=I18nObject(
en_US=db_provider.name,
zh_Hans=db_provider.name,
),
type=ToolProviderType.API,
masked_credentials={},
is_team_authorization=True,
tools=[],
labels=labels or [],
)
if decrypt_credentials:
# init tool configuration
tool_configuration = ToolConfigurationManager(
tenant_id=db_provider.tenant_id, provider_controller=provider_controller
)
# decrypt the credentials and mask the credentials
decrypted_credentials = tool_configuration.decrypt_tool_credentials(credentials=credentials)
masked_credentials = tool_configuration.mask_tool_credentials(credentials=decrypted_credentials)
result.masked_credentials = masked_credentials
return result
@staticmethod
def tool_to_user_tool(
tool: Union[ApiToolBundle, WorkflowTool, Tool],
credentials: Optional[dict] = None,
tenant_id: Optional[str] = None,
labels: Optional[list[str]] = None,
) -> UserTool:
"""
convert tool to user tool
"""
if isinstance(tool, Tool):
# fork tool runtime
tool = tool.fork_tool_runtime(
runtime={
"credentials": credentials,
"tenant_id": tenant_id,
}
)
# get tool parameters
parameters = tool.parameters or []
# get tool runtime parameters
runtime_parameters = tool.get_runtime_parameters() or []
# override parameters
current_parameters = parameters.copy()
for runtime_parameter in runtime_parameters:
found = False
for index, parameter in enumerate(current_parameters):
if parameter.name == runtime_parameter.name and parameter.form == runtime_parameter.form:
current_parameters[index] = runtime_parameter
found = True
break
if not found and runtime_parameter.form == ToolParameter.ToolParameterForm.FORM:
current_parameters.append(runtime_parameter)
return UserTool(
author=tool.identity.author,
name=tool.identity.name,
label=tool.identity.label,
description=tool.description.human,
parameters=current_parameters,
labels=labels,
)
if isinstance(tool, ApiToolBundle):
return UserTool(
author=tool.author,
name=tool.operation_id,
label=I18nObject(en_US=tool.operation_id, zh_Hans=tool.operation_id),
description=I18nObject(en_US=tool.summary or "", zh_Hans=tool.summary or ""),
parameters=tool.parameters,
labels=labels,
)