File size: 15,244 Bytes
a8b3f00
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
import re
import uuid
from json import dumps as json_dumps
from json import loads as json_loads
from json.decoder import JSONDecodeError
from typing import Optional

from requests import get
from yaml import YAMLError, safe_load

from core.tools.entities.common_entities import I18nObject
from core.tools.entities.tool_bundle import ApiToolBundle
from core.tools.entities.tool_entities import ApiProviderSchemaType, ToolParameter
from core.tools.errors import ToolApiSchemaError, ToolNotSupportedError, ToolProviderNotFoundError


class ApiBasedToolSchemaParser:
    @staticmethod
    def parse_openapi_to_tool_bundle(
        openapi: dict, extra_info: Optional[dict], warning: Optional[dict]
    ) -> list[ApiToolBundle]:
        warning = warning if warning is not None else {}
        extra_info = extra_info if extra_info is not None else {}

        # set description to extra_info
        extra_info["description"] = openapi["info"].get("description", "")

        if len(openapi["servers"]) == 0:
            raise ToolProviderNotFoundError("No server found in the openapi yaml.")

        server_url = openapi["servers"][0]["url"]

        # list all interfaces
        interfaces = []
        for path, path_item in openapi["paths"].items():
            methods = ["get", "post", "put", "delete", "patch", "head", "options", "trace"]
            for method in methods:
                if method in path_item:
                    interfaces.append(
                        {
                            "path": path,
                            "method": method,
                            "operation": path_item[method],
                        }
                    )

        # get all parameters
        bundles = []
        for interface in interfaces:
            # convert parameters
            parameters = []
            if "parameters" in interface["operation"]:
                for parameter in interface["operation"]["parameters"]:
                    tool_parameter = ToolParameter(
                        name=parameter["name"],
                        label=I18nObject(en_US=parameter["name"], zh_Hans=parameter["name"]),
                        human_description=I18nObject(
                            en_US=parameter.get("description", ""), zh_Hans=parameter.get("description", "")
                        ),
                        type=ToolParameter.ToolParameterType.STRING,
                        required=parameter.get("required", False),
                        form=ToolParameter.ToolParameterForm.LLM,
                        llm_description=parameter.get("description"),
                        default=parameter["schema"]["default"]
                        if "schema" in parameter and "default" in parameter["schema"]
                        else None,
                    )

                    # check if there is a type
                    typ = ApiBasedToolSchemaParser._get_tool_parameter_type(parameter)
                    if typ:
                        tool_parameter.type = typ

                    parameters.append(tool_parameter)
            # create tool bundle
            # check if there is a request body
            if "requestBody" in interface["operation"]:
                request_body = interface["operation"]["requestBody"]
                if "content" in request_body:
                    for content_type, content in request_body["content"].items():
                        # if there is a reference, get the reference and overwrite the content
                        if "schema" not in content:
                            continue

                        if "$ref" in content["schema"]:
                            # get the reference
                            root = openapi
                            reference = content["schema"]["$ref"].split("/")[1:]
                            for ref in reference:
                                root = root[ref]
                            # overwrite the content
                            interface["operation"]["requestBody"]["content"][content_type]["schema"] = root

                    # parse body parameters
                    if "schema" in interface["operation"]["requestBody"]["content"][content_type]:
                        body_schema = interface["operation"]["requestBody"]["content"][content_type]["schema"]
                        required = body_schema.get("required", [])
                        properties = body_schema.get("properties", {})
                        for name, property in properties.items():
                            tool = ToolParameter(
                                name=name,
                                label=I18nObject(en_US=name, zh_Hans=name),
                                human_description=I18nObject(
                                    en_US=property.get("description", ""), zh_Hans=property.get("description", "")
                                ),
                                type=ToolParameter.ToolParameterType.STRING,
                                required=name in required,
                                form=ToolParameter.ToolParameterForm.LLM,
                                llm_description=property.get("description", ""),
                                default=property.get("default", None),
                            )

                            # check if there is a type
                            typ = ApiBasedToolSchemaParser._get_tool_parameter_type(property)
                            if typ:
                                tool.type = typ

                            parameters.append(tool)

            # check if parameters is duplicated
            parameters_count = {}
            for parameter in parameters:
                if parameter.name not in parameters_count:
                    parameters_count[parameter.name] = 0
                parameters_count[parameter.name] += 1
            for name, count in parameters_count.items():
                if count > 1:
                    warning["duplicated_parameter"] = f"Parameter {name} is duplicated."

            # check if there is a operation id, use $path_$method as operation id if not
            if "operationId" not in interface["operation"]:
                # remove special characters like / to ensure the operation id is valid ^[a-zA-Z0-9_-]{1,64}$
                path = interface["path"]
                if interface["path"].startswith("/"):
                    path = interface["path"][1:]
                # remove special characters like / to ensure the operation id is valid ^[a-zA-Z0-9_-]{1,64}$
                path = re.sub(r"[^a-zA-Z0-9_-]", "", path)
                if not path:
                    path = str(uuid.uuid4())

                interface["operation"]["operationId"] = f'{path}_{interface["method"]}'

            bundles.append(
                ApiToolBundle(
                    server_url=server_url + interface["path"],
                    method=interface["method"],
                    summary=interface["operation"]["description"]
                    if "description" in interface["operation"]
                    else interface["operation"].get("summary", None),
                    operation_id=interface["operation"]["operationId"],
                    parameters=parameters,
                    author="",
                    icon=None,
                    openapi=interface["operation"],
                )
            )

        return bundles

    @staticmethod
    def _get_tool_parameter_type(parameter: dict) -> ToolParameter.ToolParameterType:
        parameter = parameter or {}
        typ = None
        if "type" in parameter:
            typ = parameter["type"]
        elif "schema" in parameter and "type" in parameter["schema"]:
            typ = parameter["schema"]["type"]

        if typ in {"integer", "number"}:
            return ToolParameter.ToolParameterType.NUMBER
        elif typ == "boolean":
            return ToolParameter.ToolParameterType.BOOLEAN
        elif typ == "string":
            return ToolParameter.ToolParameterType.STRING

    @staticmethod
    def parse_openapi_yaml_to_tool_bundle(
        yaml: str, extra_info: Optional[dict], warning: Optional[dict]
    ) -> list[ApiToolBundle]:
        """
        parse openapi yaml to tool bundle

        :param yaml: the yaml string
        :return: the tool bundle
        """
        warning = warning if warning is not None else {}
        extra_info = extra_info if extra_info is not None else {}

        openapi: dict = safe_load(yaml)
        if openapi is None:
            raise ToolApiSchemaError("Invalid openapi yaml.")
        return ApiBasedToolSchemaParser.parse_openapi_to_tool_bundle(openapi, extra_info=extra_info, warning=warning)

    @staticmethod
    def parse_swagger_to_openapi(swagger: dict, extra_info: Optional[dict], warning: Optional[dict]) -> dict:
        """
        parse swagger to openapi

        :param swagger: the swagger dict
        :return: the openapi dict
        """
        # convert swagger to openapi
        info = swagger.get("info", {"title": "Swagger", "description": "Swagger", "version": "1.0.0"})

        servers = swagger.get("servers", [])

        if len(servers) == 0:
            raise ToolApiSchemaError("No server found in the swagger yaml.")

        openapi = {
            "openapi": "3.0.0",
            "info": {
                "title": info.get("title", "Swagger"),
                "description": info.get("description", "Swagger"),
                "version": info.get("version", "1.0.0"),
            },
            "servers": swagger["servers"],
            "paths": {},
            "components": {"schemas": {}},
        }

        # check paths
        if "paths" not in swagger or len(swagger["paths"]) == 0:
            raise ToolApiSchemaError("No paths found in the swagger yaml.")

        # convert paths
        for path, path_item in swagger["paths"].items():
            openapi["paths"][path] = {}
            for method, operation in path_item.items():
                if "operationId" not in operation:
                    raise ToolApiSchemaError(f"No operationId found in operation {method} {path}.")

                if ("summary" not in operation or len(operation["summary"]) == 0) and (
                    "description" not in operation or len(operation["description"]) == 0
                ):
                    warning["missing_summary"] = f"No summary or description found in operation {method} {path}."

                openapi["paths"][path][method] = {
                    "operationId": operation["operationId"],
                    "summary": operation.get("summary", ""),
                    "description": operation.get("description", ""),
                    "parameters": operation.get("parameters", []),
                    "responses": operation.get("responses", {}),
                }

                if "requestBody" in operation:
                    openapi["paths"][path][method]["requestBody"] = operation["requestBody"]

        # convert definitions
        for name, definition in swagger["definitions"].items():
            openapi["components"]["schemas"][name] = definition

        return openapi

    @staticmethod
    def parse_openai_plugin_json_to_tool_bundle(
        json: str, extra_info: Optional[dict], warning: Optional[dict]
    ) -> list[ApiToolBundle]:
        """
        parse openapi plugin yaml to tool bundle

        :param json: the json string
        :return: the tool bundle
        """
        warning = warning if warning is not None else {}
        extra_info = extra_info if extra_info is not None else {}

        try:
            openai_plugin = json_loads(json)
            api = openai_plugin["api"]
            api_url = api["url"]
            api_type = api["type"]
        except:
            raise ToolProviderNotFoundError("Invalid openai plugin json.")

        if api_type != "openapi":
            raise ToolNotSupportedError("Only openapi is supported now.")

        # get openapi yaml
        response = get(api_url, headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "}, timeout=5)

        if response.status_code != 200:
            raise ToolProviderNotFoundError("cannot get openapi yaml from url.")

        return ApiBasedToolSchemaParser.parse_openapi_yaml_to_tool_bundle(
            response.text, extra_info=extra_info, warning=warning
        )

    @staticmethod
    def auto_parse_to_tool_bundle(
        content: str, extra_info: Optional[dict] = None, warning: Optional[dict] = None
    ) -> tuple[list[ApiToolBundle], str]:
        """
        auto parse to tool bundle

        :param content: the content
        :return: tools bundle, schema_type
        """
        warning = warning if warning is not None else {}
        extra_info = extra_info if extra_info is not None else {}

        content = content.strip()
        loaded_content = None
        json_error = None
        yaml_error = None

        try:
            loaded_content = json_loads(content)
        except JSONDecodeError as e:
            json_error = e

        if loaded_content is None:
            try:
                loaded_content = safe_load(content)
            except YAMLError as e:
                yaml_error = e
        if loaded_content is None:
            raise ToolApiSchemaError(
                f"Invalid api schema, schema is neither json nor yaml. json error: {str(json_error)},"
                f" yaml error: {str(yaml_error)}"
            )

        swagger_error = None
        openapi_error = None
        openapi_plugin_error = None
        schema_type = None

        try:
            openapi = ApiBasedToolSchemaParser.parse_openapi_to_tool_bundle(
                loaded_content, extra_info=extra_info, warning=warning
            )
            schema_type = ApiProviderSchemaType.OPENAPI.value
            return openapi, schema_type
        except ToolApiSchemaError as e:
            openapi_error = e

        # openai parse error, fallback to swagger
        try:
            converted_swagger = ApiBasedToolSchemaParser.parse_swagger_to_openapi(
                loaded_content, extra_info=extra_info, warning=warning
            )
            schema_type = ApiProviderSchemaType.SWAGGER.value
            return ApiBasedToolSchemaParser.parse_openapi_to_tool_bundle(
                converted_swagger, extra_info=extra_info, warning=warning
            ), schema_type
        except ToolApiSchemaError as e:
            swagger_error = e

        # swagger parse error, fallback to openai plugin
        try:
            openapi_plugin = ApiBasedToolSchemaParser.parse_openai_plugin_json_to_tool_bundle(
                json_dumps(loaded_content), extra_info=extra_info, warning=warning
            )
            return openapi_plugin, ApiProviderSchemaType.OPENAI_PLUGIN.value
        except ToolNotSupportedError as e:
            # maybe it's not plugin at all
            openapi_plugin_error = e

        raise ToolApiSchemaError(
            f"Invalid api schema, openapi error: {str(openapi_error)}, swagger error: {str(swagger_error)},"
            f" openapi plugin error: {str(openapi_plugin_error)}"
        )