File size: 1,781 Bytes
a8b3f00
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import json

import requests

from services.auth.api_key_auth_base import ApiKeyAuthBase


class JinaAuth(ApiKeyAuthBase):
    def __init__(self, credentials: dict):
        super().__init__(credentials)
        auth_type = credentials.get("auth_type")
        if auth_type != "bearer":
            raise ValueError("Invalid auth type, Jina Reader auth type must be Bearer")
        self.api_key = credentials.get("config").get("api_key", None)

        if not self.api_key:
            raise ValueError("No API key provided")

    def validate_credentials(self):
        headers = self._prepare_headers()
        options = {
            "url": "https://example.com",
        }
        response = self._post_request("https://r.jina.ai", options, headers)
        if response.status_code == 200:
            return True
        else:
            self._handle_error(response)

    def _prepare_headers(self):
        return {"Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}"}

    def _post_request(self, url, data, headers):
        return requests.post(url, headers=headers, json=data)

    def _handle_error(self, response):
        if response.status_code in {402, 409, 500}:
            error_message = response.json().get("error", "Unknown error occurred")
            raise Exception(f"Failed to authorize. Status code: {response.status_code}. Error: {error_message}")
        else:
            if response.text:
                error_message = json.loads(response.text).get("error", "Unknown error occurred")
                raise Exception(f"Failed to authorize. Status code: {response.status_code}. Error: {error_message}")
            raise Exception(f"Unexpected error occurred while trying to authorize. Status code: {response.status_code}")