|
|
|
import os |
|
import time |
|
from typing import Dict, List, Tuple |
|
|
|
import matplotlib.pyplot as plt |
|
import numpy as np |
|
import pandas as pd |
|
import requests |
|
import stripe |
|
import streamlit as st |
|
import yfinance as yf |
|
from langchain import LLMChain, PromptTemplate |
|
from langchain.agents import initialize_agent, load_tools |
|
from langchain.llms import OpenAI |
|
from openai import OpenAI |
|
|
|
|
|
def download_stock_data( |
|
tickers: List[str], start_date: str, end_date: str, w: int |
|
) -> pd.DataFrame: |
|
""" |
|
Download stock data for given tickers between start_date and end_date. |
|
|
|
Args: |
|
tickers (List[str]): List of stock ticker symbols. |
|
start_date (str): Start date for data retrieval in 'YYYY-MM-DD' format. |
|
end_date (str): End date for data retrieval in 'YYYY-MM-DD' format. |
|
w (int): Size of the interval that is used to download data |
|
|
|
Returns: |
|
pd.DataFrame: DataFrame with adjusted close prices for the given tickers. |
|
""" |
|
data = yf.download(tickers, start=start_date, end=end_date, interval=w) |
|
return data["Adj Close"] |
|
|
|
|
|
def download_stocks(tickers: List[str]) -> List[pd.DataFrame]: |
|
""" |
|
Downloads stock data from Yahoo Finance. |
|
|
|
Args: |
|
tickers: A list of stock tickers. |
|
|
|
Returns: |
|
A list of Pandas DataFrames, one for each stock. |
|
""" |
|
|
|
|
|
df_list = [] |
|
|
|
|
|
for ticker in tickers: |
|
|
|
df = yf.download(ticker) |
|
|
|
|
|
df_list.append(df.tail(255 * 8)) |
|
|
|
return df_list |
|
|
|
|
|
def create_portfolio_and_calculate_returns( |
|
stock_data: pd.DataFrame, top_n: int |
|
) -> pd.DataFrame: |
|
""" |
|
Create a portfolio and calculate returns based on the given window size. |
|
|
|
Args: |
|
stock_data (pd.DataFrame): DataFrame containing stock data. |
|
window_size (int): Size of the window to calculate returns. |
|
|
|
Returns: |
|
pd.DataFrame: DataFrame containing calculated returns and portfolio history. |
|
""" |
|
|
|
returns_data = stock_data.pct_change() |
|
returns_data.dropna(inplace=True) |
|
|
|
portfolio_history = [] |
|
portfolio_returns = [] |
|
|
|
|
|
window_size = 1 |
|
for start in range(0, len(returns_data) - window_size, window_size): |
|
end = start + window_size |
|
current_window = returns_data[start:end] |
|
top_stocks = ( |
|
current_window.mean() |
|
.sort_values(ascending=False) |
|
.head(top_n) |
|
.index.tolist() |
|
) |
|
next_window = returns_data[end : end + window_size][top_stocks].mean(axis=1) |
|
|
|
portfolio_returns.extend(next_window) |
|
added_length = len(next_window) |
|
portfolio_history.extend([top_stocks] * added_length) |
|
|
|
new_returns_data = returns_data.copy() |
|
new_returns_data = new_returns_data.iloc[0:-window_size, :] |
|
new_returns_data["benchmark"] = new_returns_data.apply( |
|
lambda x: x[0:5].mean(), axis=1 |
|
) |
|
new_returns_data["portfolio_returns"] = portfolio_returns |
|
new_returns_data["portfolio_history"] = portfolio_history |
|
new_returns_data["rolling_benchmark"] = ( |
|
new_returns_data["benchmark"] + 1 |
|
).cumprod() |
|
new_returns_data["rolling_portfolio_returns"] = ( |
|
new_returns_data["portfolio_returns"] + 1 |
|
).cumprod() |
|
|
|
return new_returns_data |
|
|
|
|
|
def portfolio_annualised_performance( |
|
weights: np.ndarray, mean_returns: np.ndarray, cov_matrix: np.ndarray |
|
) -> Tuple[float, float]: |
|
""" |
|
Given the weights of the assets in the portfolio, their mean returns, and their covariance matrix, |
|
this function computes and returns the annualized performance of the portfolio in terms of its |
|
standard deviation (volatility) and expected returns. |
|
|
|
Args: |
|
weights (np.ndarray): The weights of the assets in the portfolio. |
|
Each weight corresponds to the proportion of the investor's total |
|
investment in the corresponding asset. |
|
|
|
mean_returns (np.ndarray): The mean (expected) returns of the assets. |
|
|
|
cov_matrix (np.ndarray): The covariance matrix of the asset returns. Each entry at the |
|
intersection of a row and a column represents the covariance |
|
between the returns of the asset corresponding to that row |
|
and the asset corresponding to that column. |
|
|
|
Returns: |
|
Tuple of portfolio volatility (standard deviation) and portfolio expected return, both annualized. |
|
""" |
|
|
|
|
|
|
|
returns = np.sum(mean_returns * weights) * 252 |
|
|
|
|
|
|
|
std = np.sqrt(np.dot(weights.T, np.dot(cov_matrix, weights))) * np.sqrt(252) |
|
|
|
return std, returns |
|
|
|
|
|
def random_portfolios( |
|
num_portfolios: int, |
|
num_weights: int, |
|
mean_returns: np.ndarray, |
|
cov_matrix: np.ndarray, |
|
risk_free_rate: float, |
|
) -> Tuple[np.ndarray, List[np.ndarray]]: |
|
""" |
|
Generate random portfolios and calculate their standard deviation, returns and Sharpe ratio. |
|
|
|
Args: |
|
num_portfolios (int): The number of random portfolios to generate. |
|
|
|
mean_returns (np.ndarray): The mean (expected) returns of the assets. |
|
|
|
cov_matrix (np.ndarray): The covariance matrix of the asset returns. Each entry at the |
|
intersection of a row and a column represents the covariance |
|
between the returns of the asset corresponding to that row |
|
and the asset corresponding to that column. |
|
|
|
risk_free_rate (float): The risk-free rate of return. |
|
|
|
Returns: |
|
Tuple of results and weights_record. |
|
|
|
results (np.ndarray): A 3D array with standard deviation, returns and Sharpe ratio of the portfolios. |
|
|
|
weights_record (List[np.ndarray]): A list with the weights of the assets in each portfolio. |
|
""" |
|
|
|
results = np.zeros((3, num_portfolios)) |
|
|
|
|
|
weights_record = [] |
|
|
|
|
|
for i in np.arange(num_portfolios): |
|
|
|
weights = np.random.random(num_weights) |
|
|
|
|
|
weights /= np.sum(weights) |
|
|
|
|
|
weights_record.append(weights) |
|
|
|
|
|
portfolio_std_dev, portfolio_return = portfolio_annualised_performance( |
|
weights, mean_returns, cov_matrix |
|
) |
|
|
|
|
|
results[0, i] = portfolio_std_dev |
|
results[1, i] = portfolio_return |
|
results[2, i] = (portfolio_return - risk_free_rate) / portfolio_std_dev |
|
|
|
return results, weights_record |
|
|
|
|
|
def display_simulated_ef_with_random( |
|
table: pd.DataFrame, |
|
mean_returns: List[float], |
|
cov_matrix: np.ndarray, |
|
num_portfolios: int, |
|
risk_free_rate: float, |
|
) -> plt.Figure: |
|
""" |
|
This function displays a simulated efficient frontier plot based on randomly generated portfolios with the specified parameters. |
|
|
|
Args: |
|
- mean_returns (List): A list of mean returns for each security or asset in the portfolio. |
|
- cov_matrix (ndarray): A covariance matrix for the securities or assets in the portfolio. |
|
- num_portfolios (int): The number of random portfolios to generate. |
|
- risk_free_rate (float): The risk-free rate of return. |
|
|
|
Returns: |
|
- fig (plt.Figure): A pyplot figure object |
|
""" |
|
|
|
|
|
results, weights = random_portfolios( |
|
num_portfolios, len(mean_returns), mean_returns, cov_matrix, risk_free_rate |
|
) |
|
|
|
|
|
max_sharpe_idx = np.argmax(results[2]) |
|
sdp, rp = results[0, max_sharpe_idx], results[1, max_sharpe_idx] |
|
|
|
|
|
max_sharpe_allocation = pd.DataFrame( |
|
weights[max_sharpe_idx], index=table.columns, columns=["allocation"] |
|
) |
|
max_sharpe_allocation.allocation = [ |
|
round(i * 100, 2) for i in max_sharpe_allocation.allocation |
|
] |
|
max_sharpe_allocation = max_sharpe_allocation.T |
|
|
|
|
|
min_vol_idx = np.argmin(results[0]) |
|
sdp_min, rp_min = results[0, min_vol_idx], results[1, min_vol_idx] |
|
|
|
|
|
min_vol_allocation = pd.DataFrame( |
|
weights[min_vol_idx], index=table.columns, columns=["allocation"] |
|
) |
|
min_vol_allocation.allocation = [ |
|
round(i * 100, 2) for i in min_vol_allocation.allocation |
|
] |
|
min_vol_allocation = min_vol_allocation.T |
|
|
|
|
|
fig, ax = plt.subplots(figsize=(10, 7)) |
|
ax.scatter( |
|
results[0, :], |
|
results[1, :], |
|
c=results[2, :], |
|
cmap="YlGnBu", |
|
marker="o", |
|
s=10, |
|
alpha=0.3, |
|
) |
|
ax.scatter(sdp, rp, marker="*", color="r", s=500, label="Maximum Sharpe ratio") |
|
ax.scatter( |
|
sdp_min, rp_min, marker="*", color="g", s=500, label="Minimum volatility" |
|
) |
|
ax.set_title("Simulated Portfolio Optimization based on Efficient Frontier") |
|
ax.set_xlabel("Annual volatility") |
|
ax.set_ylabel("Annual returns") |
|
ax.legend(labelspacing=0.8) |
|
|
|
return fig, { |
|
"Annualised Return": round(rp, 2), |
|
"Annualised Volatility": round(sdp, 2), |
|
"Max Sharpe Allocation": max_sharpe_allocation, |
|
"Max Sharpe Allocation in Percentile": max_sharpe_allocation.div( |
|
max_sharpe_allocation.sum(axis=1), axis=0 |
|
), |
|
"Annualised Return": round(rp_min, 2), |
|
"Annualised Volatility": round(sdp_min, 2), |
|
"Min Volatility Allocation": min_vol_allocation, |
|
"Min Volatility Allocation in Percentile": min_vol_allocation.div( |
|
min_vol_allocation.sum(axis=1), axis=0 |
|
), |
|
} |
|
|
|
|
|
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"] |
|
|
|
|
|
def run_langchain_agent_(question: str = "What is your question?") -> str: |
|
""" |
|
Executes a language chain agent to answer questions by using a series of tools. |
|
|
|
This function creates an instance of an OpenAI model, sets up a prompt template, loads necessary tools, |
|
initializes the agent, and runs the agent with the provided question. It returns the agent's output. |
|
|
|
Parameters: |
|
question (str): The question to be answered by the agent. Defaults to "What is your question?". |
|
|
|
Returns: |
|
str: The output response from the agent after processing the question. |
|
|
|
""" |
|
|
|
|
|
llm = OpenAI( |
|
temperature=0.1 |
|
) |
|
|
|
|
|
template = """Question: {question}; |
|
You are a financial advisor and user has a question above regarding related tickers provided. |
|
Let's think step by step. |
|
Answer: """ |
|
prompt = PromptTemplate(template=template, input_variables=["question"]) |
|
|
|
|
|
llm_chain = LLMChain(prompt=prompt, llm=llm) |
|
|
|
|
|
tools = load_tools(["wikipedia", "llm-math"], llm=llm) |
|
|
|
|
|
agent = initialize_agent( |
|
tools, |
|
llm, |
|
agent="zero-shot-react-description", |
|
verbose=True, |
|
max_iterations=5, |
|
) |
|
|
|
|
|
output_ = agent.run(question) |
|
|
|
|
|
return output_ |
|
|
|
|
|
openai_client = OpenAI(api_key=os.environ["OPENAI_API_KEY"]) |
|
|
|
|
|
def call_gpt(prompt: str, content: str) -> str: |
|
""" |
|
Sends a structured conversation context including a system prompt, user prompt, |
|
and additional background content to the GPT-3.5-turbo model for a response. |
|
This function is responsible for generating an AI-powered response by interacting |
|
with the OpenAI API. It puts together a preset system message, a formatted user query, |
|
and additional background information before requesting the completion from the model. |
|
Args: |
|
prompt (str): The main question or topic that the user wants to address. |
|
content (str): Additional background information or details relevant to the prompt. |
|
Returns: |
|
str: The generated response from the GPT model based on the given prompts and content. |
|
Note: 'openai_client' is assumed to be an already created and authenticated instance of the OpenAI |
|
openai_client, which should be set up prior to calling this function. |
|
""" |
|
|
|
|
|
response = openai_client.chat.completions.create( |
|
model="gpt-3.5-turbo", |
|
messages=[ |
|
|
|
{"role": "system", "content": "You are a helpful assistant."}, |
|
|
|
{"role": "user", "content": f"I want to ask you a question: {prompt}"}, |
|
|
|
{"role": "assistant", "content": "What is the background content?"}, |
|
|
|
{"role": "user", "content": content}, |
|
], |
|
) |
|
|
|
|
|
return response.choices[0].message.content |
|
|
|
|
|
|
|
stripe.api_key = os.environ["STRIPE_API_KEY"] |
|
|
|
|
|
stripe_price_id = os.environ["STRIPE_PRICE_ID"] |
|
|
|
|
|
|
|
def create_checkout_session(): |
|
try: |
|
session = stripe.checkout.Session.create( |
|
payment_method_types=["card"], |
|
line_items=[ |
|
{ |
|
"price": stripe_price_id, |
|
"quantity": 1, |
|
} |
|
], |
|
mode="payment", |
|
success_url="https://39701j614g.execute-api.us-east-1.amazonaws.com/dev/?sessionID={CHECKOUT_SESSION_ID}&key=123", |
|
cancel_url="https://huggingface.co/spaces/eagle0504/Momentum-Strategy-Screener", |
|
) |
|
return session.id, session.url |
|
except Exception as e: |
|
st.error(f"Error creating checkout session: {e}") |
|
return None, None |
|
|
|
|
|
|
|
def check_payment_status(session_id): |
|
try: |
|
session = stripe.checkout.Session.retrieve(session_id) |
|
return session.payment_status == "paid" |
|
except Exception as e: |
|
st.error(f"Error checking payment status: {e}") |
|
return False |
|
|