text2sql / core /clients /stock.py
ns-devel
Text2SQL app
38171fa
"""
This module contains the client classes for retrieving stock data from various sources.
"""
import time
import logging
import requests
from bs4 import BeautifulSoup
from django.conf import settings
from core.models import Stock
from data_pipeline.interfaces.api_client import DataClient
from core.constants import MONEYCONTROL_TOPSTOCKS_URL
logger = logging.getLogger(__name__)
class StockRankings(DataClient):
""" """
model = Stock
api_url = MONEYCONTROL_TOPSTOCKS_URL
def __init__(self) -> None:
self.api_response = None
self.transformed_data = None
def extract(self) -> None:
logger.info("Fetching data from %s", self.api_url)
with requests.Session() as session:
try:
response = session.get(self.api_url)
response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx)
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching data from {self.api_url}: {e}")
return
soup = BeautifulSoup(response.text, "html.parser")
# Find the table containing stock information
table = soup.find("table", {"id": "indicesTable"})
if not table:
logger.warning(
"Table with id 'indicesTable' not found for stock during scraping ranks."
)
raise Exception(
"Table with id 'indicesTable' not found for stock during scraping ranks."
)
data = []
rows = table.find_all("tr")
logger.info(f"Found {len(rows)} rows in the table")
isin_fails = 0
# Extract data from each row
for idx, row in enumerate(rows, start=1):
columns = row.find_all("td")
isin_number = None
if columns:
link = columns[0].find("a").get("href") if columns[0] else None
if link is not None:
try:
logger.info(f"Fetching stock details from link {link}")
response = session.get(link)
time.sleep(2)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
isin_element = soup.select_one(
'li.clearfix span:contains("ISIN") + p'
)
if isin_element:
isin_number = isin_element.get_text(strip=True)
else:
isin_fails += 1
logger.warning(f"ISIN not found for link {link}")
except requests.exceptions.RequestException as e:
logger.exception(
f"Error fetching ISIN from link {link}: {e}"
)
data.append(
{
"name": columns[0].get_text(strip=True),
"ltp": columns[1].get_text(strip=True),
"link": link,
"volume": columns[4].get_text(strip=True),
"percentage_change": columns[2].get_text(strip=True),
"price_change": columns[3].get_text(strip=True),
"rank": idx,
"isin_number": isin_number,
}
)
logger.info(f"ISIN not found for {isin_fails} stocks out of {len(rows)}")
self.api_response = data
def load(self) -> None:
"""
Load the data into the database
"""
logger.info("Loading ranking data into the database...")
# clear the rank field
Stock.objects.exclude(rank=None).update(rank=None)
for rank, stock_details in enumerate(self.transformed_data, 1):
try:
stock = Stock.objects.get(isin_number=stock_details["isin_number"])
except Stock.DoesNotExist:
logger.warning(
f"No matching stock found for ISIN: {stock_details['isin_number']} creating new object..."
)
stock = Stock.objects.create(data={"stock_rank": stock_details})
else:
stock.data.update({"stock_rank": stock_details})
stock.name = stock_details["name"]
stock.ltp = stock_details["ltp"]
stock.percentage_change = stock_details["percentage_change"]
stock.price_change = stock_details["price_change"]
stock.link = stock_details["link"]
stock.volume = stock_details["volume"]
stock.isin_number = stock_details["isin_number"]
stock.rank = stock_details["rank"]
stock.save()
logger.info(
f"Saved {rank=} {stock.name} | {stock_details=} {stock_details['isin_number']=}"
)
class StockDetails(DataClient):
"""
Retrieves and updates stock details from the Morningstar API.
"""
model = Stock
api_url = f"https://{settings.MORNINGSTAR_HOST}/stock/get-detail"
def __init__(self, perf_id: str, isin_number: str) -> None:
"""
Initializes the StockDetails object.
Args:
perf_id (str): Performance ID of the stock.
isin_number (str): ISIN number of the stock.
"""
if not perf_id:
raise ValueError("Performance ID cannot be empty.")
if not isin_number:
raise ValueError("ISIN number cannot be empty.")
self.api_response = {"details": None}
self.perf_id = perf_id
self.isin_number = isin_number
def _request(self) -> requests.Response:
querystring = {"PerformanceId": self.perf_id}
return requests.get(
self.api_url,
headers=settings.MORNINGSTAR_API_HEADERS,
params=querystring,
)
def extract(self) -> None:
"""
Extracts stock details from the Morningstar API.
"""
response = self._request()
requests_count = 1
while response.status_code != 200:
if response.status_code == 429:
logger.info(
f"API response: %s. Waiting for %s secs",
response.status_code,
30 * requests_count,
)
time.sleep(30 * requests_count)
response = self._request()
if requests_count > 3:
logger.warning(
f"API response: %s. Max retries reached", response.status_code
)
break
requests_count += 1
else:
self.api_response["details"] = response.json()
logger.info(f"API response: %s", response.status_code)
def load(self) -> None:
"""
Loads the retrieved stock details into the database.
"""
stock = Stock.objects.filter(isin_number=self.isin_number).first()
if stock is None:
logger.warning(f"No matching stock found for ISIN: {self.isin_number}")
return
stock.data = self.transformed_data
stock.save()
logger.info(f"Successfully stored data for {stock.isin_number}.")