File size: 2,601 Bytes
a8b3f00
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import logging
from typing import Optional

import requests

from configs import dify_config
from services.recommend_app.buildin.buildin_retrieval import BuildInRecommendAppRetrieval
from services.recommend_app.recommend_app_base import RecommendAppRetrievalBase
from services.recommend_app.recommend_app_type import RecommendAppType

logger = logging.getLogger(__name__)


class RemoteRecommendAppRetrieval(RecommendAppRetrievalBase):
    """
    Retrieval recommended app from dify official
    """

    def get_recommend_app_detail(self, app_id: str):
        try:
            result = self.fetch_recommended_app_detail_from_dify_official(app_id)
        except Exception as e:
            logger.warning(f"fetch recommended app detail from dify official failed: {e}, switch to built-in.")
            result = BuildInRecommendAppRetrieval.fetch_recommended_app_detail_from_builtin(app_id)
        return result

    def get_recommended_apps_and_categories(self, language: str) -> dict:
        try:
            result = self.fetch_recommended_apps_from_dify_official(language)
        except Exception as e:
            logger.warning(f"fetch recommended apps from dify official failed: {e}, switch to built-in.")
            result = BuildInRecommendAppRetrieval.fetch_recommended_apps_from_builtin(language)
        return result

    def get_type(self) -> str:
        return RecommendAppType.REMOTE

    @classmethod
    def fetch_recommended_app_detail_from_dify_official(cls, app_id: str) -> Optional[dict]:
        """
        Fetch recommended app detail from dify official.
        :param app_id: App ID
        :return:
        """
        domain = dify_config.HOSTED_FETCH_APP_TEMPLATES_REMOTE_DOMAIN
        url = f"{domain}/apps/{app_id}"
        response = requests.get(url, timeout=(3, 10))
        if response.status_code != 200:
            return None

        return response.json()

    @classmethod
    def fetch_recommended_apps_from_dify_official(cls, language: str) -> dict:
        """
        Fetch recommended apps from dify official.
        :param language: language
        :return:
        """
        domain = dify_config.HOSTED_FETCH_APP_TEMPLATES_REMOTE_DOMAIN
        url = f"{domain}/apps?language={language}"
        response = requests.get(url, timeout=(3, 10))
        if response.status_code != 200:
            raise ValueError(f"fetch recommended apps failed, status code: {response.status_code}")

        result = response.json()

        if "categories" in result:
            result["categories"] = sorted(result["categories"])

        return result