from operator import itemgetter from typing import Any, Dict, Iterable, Tuple import tiktoken from langchain_core.runnables import RunnablePassthrough def num_tokens_from_string(string: str, encoding_name: str = "cl100k_base") -> int: encoding = tiktoken.get_encoding(encoding_name) num_tokens = len(encoding.encode(string)) return num_tokens def pass_values(x): if not isinstance(x, list): x = [x] return {k: itemgetter(k) for k in x} def prepare_chain(chain,name): chain = propagate_inputs(chain) chain = rename_chain(chain,name) return chain def propagate_inputs(chain): chain_with_values = { "outputs": chain, "inputs": RunnablePassthrough() } | RunnablePassthrough() | flatten_dict return chain_with_values def rename_chain(chain,name): return chain.with_config({"run_name":name}) # Drawn from langchain utils and modified to remove the parent key def _flatten_dict( nested_dict: Dict[str, Any], parent_key: str = "", sep: str = "_" ) -> Iterable[Tuple[str, Any]]: """ Generator that yields flattened items from a nested dictionary for a flat dict. Parameters: nested_dict (dict): The nested dictionary to flatten. parent_key (str): The prefix to prepend to the keys of the flattened dict. sep (str): The separator to use between the parent key and the key of the flattened dictionary. Yields: (str, any): A key-value pair from the flattened dictionary. """ for key, value in nested_dict.items(): new_key = key if isinstance(value, dict): yield from _flatten_dict(value, new_key, sep) else: yield new_key, value def flatten_dict( nested_dict: Dict[str, Any], parent_key: str = "", sep: str = "_" ) -> Dict[str, Any]: """Flattens a nested dictionary into a flat dictionary. Parameters: nested_dict (dict): The nested dictionary to flatten. parent_key (str): The prefix to prepend to the keys of the flattened dict. sep (str): The separator to use between the parent key and the key of the flattened dictionary. Returns: (dict): A flat dictionary. """ flat_dict = {k: v for k, v in _flatten_dict(nested_dict, parent_key, sep)} return flat_dict async def log_event(info,name,config): """Helper function that will run a dummy chain with the given info The astream_event function will catch this chain and stream the dict info to the logger """ chain = RunnablePassthrough().with_config(run_name=name) _ = await chain.ainvoke(info,config)