H2OTest / llm_studio /app_utils /setting_utils.py
elineve's picture
Upload 301 files
07423df
import errno
import functools
import logging
import os
import pickle
import signal
import traceback
from typing import Any, List
import keyring
import yaml
from h2o_wave import Q, ui
from keyring.errors import KeyringLocked, PasswordDeleteError
from llm_studio.app_utils.config import default_cfg
from llm_studio.app_utils.utils import get_database_dir, get_user_id
__all__ = [
"load_user_settings_and_secrets",
"load_default_user_settings",
"save_user_settings_and_secrets",
"Secrets",
]
logger = logging.getLogger(__name__)
SECRET_KEYS = [
key
for key in default_cfg.user_settings
if any(password in key for password in ["token", "key"])
]
USER_SETTING_KEYS = [key for key in default_cfg.user_settings if key not in SECRET_KEYS]
async def save_user_settings_and_secrets(q: Q):
await _save_secrets(q)
_save_user_settings(q)
def load_user_settings_and_secrets(q: Q):
_maybe_migrate_to_yaml(q)
_load_secrets(q)
_load_user_settings(q)
def load_default_user_settings(q: Q, clear_secrets=True):
for key in default_cfg.user_settings:
q.client[key] = default_cfg.user_settings[key]
if clear_secrets:
_clear_secrets(q, key)
class NoSaver:
"""
Base class that provides methods for saving, loading, and deleting password entries.
Attributes:
username (str): The username associated with the password entries.
root_dir (str): The root directory.
Methods:
save(name: str, password: str) -> None:
Save a password entry with the given name and password.
load(name: str) -> str:
Load and return the password associated with the given name.
delete(name: str) -> None:
Delete the password entry with the given name.
"""
def __init__(self, username: str, root_dir: str):
self.username = username
self.root_dir = root_dir
def save(self, name: str, password: str):
pass
def load(self, name: str) -> str:
return ""
def delete(self, name: str):
pass
class KeyRingSaver(NoSaver):
"""
A class for saving, loading, and deleting passwords using the keyring library.
Some machines may not have keyring installed, so this class may not be available.
"""
def __init__(self, username: str, root_dir: str):
super().__init__(username, root_dir)
self.namespace = f"{username}_h2o_llmstudio"
def save(self, name: str, password: str):
keyring.set_password(self.namespace, name, password)
def load(self, name: str) -> str:
return keyring.get_password(self.namespace, name) or "" # type: ignore
def delete(self, name: str):
try:
keyring.delete_password(self.namespace, name)
except (KeyringLocked, PasswordDeleteError):
pass
except Exception as e:
logger.warning(f"Error deleting password for keyring: {e}")
class EnvFileSaver(NoSaver):
"""
This module provides the EnvFileSaver class, which is used to save, load,
and delete name-password pairs in an environment file.
Only use this class if you are sure that the environment file is secure.
"""
@property
def filename(self):
return os.path.join(self.root_dir, f"{self.username}.env")
def save(self, name: str, password: str):
data = {}
if os.path.exists(self.filename):
with open(self.filename, "r") as f:
data = yaml.safe_load(f)
data[name] = password
with open(self.filename, "w") as f:
yaml.safe_dump(data, f)
def load(self, name: str) -> str:
if not os.path.exists(self.filename):
return ""
with open(self.filename, "r") as f:
data = yaml.safe_load(f)
return data.get(name, "")
def delete(self, name: str):
if os.path.exists(self.filename):
with open(self.filename, "r") as f:
data = yaml.safe_load(f)
if data and name in data:
del data[name]
with open(self.filename, "w") as f:
yaml.safe_dump(data, f)
# https://stackoverflow.com/questions/2281850/timeout-function-if-it-takes-too-long-to-finish
class TimeoutError(Exception):
pass
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
def decorator(func):
def _handle_timeout(signum, frame):
raise TimeoutError(error_message)
@functools.wraps(func)
def wrapper(*args, **kwargs):
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
finally:
signal.alarm(0)
return result
return wrapper
return decorator
@timeout(3)
def check_if_keyring_works():
"""
Test if keyring is working. On misconfigured machines,
Keyring may hang up to 2 minutes with the following error:
jeepney.wrappers.DBusErrorResponse:
[org.freedesktop.DBus.Error.TimedOut]
("Failed to activate service 'org.freedesktop.secrets':
timed out (service_start_timeout=120000ms)",)
To avoid waiting for 2 minutes, we kill the process after 3 seconds.
"""
keyring.get_password("service", "username")
class Secrets:
"""
Factory class to get the secrets' handler.
"""
_secrets = {
"Do not save credentials permanently": NoSaver,
".env File": EnvFileSaver,
}
try:
check_if_keyring_works()
logger.info("Keyring is correctly configured on this machine.")
_secrets["Keyring"] = KeyRingSaver
except TimeoutError:
logger.warning(
"Error loading keyring due to timeout. Disabling keyring save option."
)
except Exception as e:
logger.warning(f"Error loading keyring: {e}. Disabling keyring save option.")
@classmethod
def names(cls) -> List[str]:
return sorted(cls._secrets.keys())
@classmethod
def get(cls, name: str) -> Any:
return cls._secrets.get(name)
def _save_user_settings(q: Q):
user_settings = {key: q.client[key] for key in USER_SETTING_KEYS}
with open(_get_usersettings_path(q), "w") as f:
yaml.dump(user_settings, f)
def _load_user_settings(q: Q):
if os.path.isfile(_get_usersettings_path(q)):
logger.info("Reading user settings")
with open(_get_usersettings_path(q), "r") as f:
user_settings = yaml.load(f, Loader=yaml.FullLoader)
for key in USER_SETTING_KEYS:
q.client[key] = user_settings.get(key, default_cfg.user_settings[key])
else:
logger.info("No user settings found. Using default settings.")
# User may have deleted the user settings file. We load the default settings.
# Secrets may still be stored in keyring or env file.
load_default_user_settings(q, clear_secrets=False)
async def _save_secrets(q: Q):
secret_name, secrets_handler = _get_secrets_handler(q)
for key in SECRET_KEYS:
try:
_clear_secrets(q, key, excludes=tuple(secret_name))
if q.client[key]:
secrets_handler.save(key, q.client[key])
except Exception:
exception = str(traceback.format_exc())
logger.error(f"Could not save password {key} to {secret_name}")
q.page["meta"].dialog = ui.dialog(
title="Could not save secrets. "
"Please choose another Credential Handler.",
name="secrets_error",
items=[
ui.text(
f"The following error occurred when"
f" using {secret_name}: {exception}."
),
ui.button(
name="settings/close_error_dialog", label="Close", primary=True
),
],
closable=True,
)
q.client["keep_meta"] = True
await q.page.save()
break
else: # if no exception
# force dataset connector updated when the user decides to click on save
q.client["dataset/import/s3_bucket"] = q.client["default_aws_bucket_name"]
q.client["dataset/import/s3_access_key"] = q.client["default_aws_access_key"]
q.client["dataset/import/s3_secret_key"] = q.client["default_aws_secret_key"]
q.client["dataset/import/kaggle_access_key"] = q.client[
"default_kaggle_username"
]
q.client["dataset/import/kaggle_secret_key"] = q.client[
"default_kaggle_secret_key"
]
def _load_secrets(q: Q):
secret_name, secrets_handler = _get_secrets_handler(q)
for key in SECRET_KEYS:
try:
q.client[key] = secrets_handler.load(key) or default_cfg.user_settings[key]
except Exception:
logger.error(f"Could not load password {key} from {secret_name}")
q.client[key] = ""
def _get_secrets_handler(q: Q):
secret_name = (
q.client["credential_saver"] or default_cfg.user_settings["credential_saver"]
)
secrets_handler = Secrets.get(secret_name)(
username=get_user_id(q), root_dir=get_database_dir(q)
)
return secret_name, secrets_handler
def _clear_secrets(q: Q, name: str, excludes=tuple()):
for secret_name in Secrets.names():
if secret_name not in excludes:
secrets_handler = Secrets.get(secret_name)(
username=get_user_id(q), root_dir=get_database_dir(q)
)
secrets_handler.delete(name)
def _maybe_migrate_to_yaml(q: Q):
"""
Migrate user settings from a pickle file to a YAML file.
"""
# prior, we used to save the user settings in a pickle file
old_usersettings_path = os.path.join(
get_database_dir(q), f"{get_user_id(q)}.settings"
)
if not os.path.isfile(old_usersettings_path):
return
try:
with open(old_usersettings_path, "rb") as f:
user_settings = pickle.load(f)
secret_name, secrets_handler = _get_secrets_handler(q)
logger.info(f"Migrating token using {secret_name}")
for key in SECRET_KEYS:
if key in user_settings:
secrets_handler.save(key, user_settings[key])
with open(_get_usersettings_path(q), "w") as f:
yaml.dump(
{
key: value
for key, value in user_settings.items()
if key in USER_SETTING_KEYS
},
f,
)
os.remove(old_usersettings_path)
logger.info(f"Successfully migrated tokens to {secret_name}. Old file deleted.")
except Exception as e:
logger.info(
f"Could not migrate tokens. "
f"Please delete {old_usersettings_path} and set your credentials again."
f"Error: \n\n {e} {traceback.format_exc()}"
)
def _get_usersettings_path(q: Q):
return os.path.join(get_database_dir(q), f"{get_user_id(q)}.yaml")