""" This module contains the client classes for retrieving stock data from various sources. """ import time import logging import requests from bs4 import BeautifulSoup from django.conf import settings from core.models import Stock from data_pipeline.interfaces.api_client import DataClient from core.constants import MONEYCONTROL_TOPSTOCKS_URL logger = logging.getLogger(__name__) class StockRankings(DataClient): """ """ model = Stock api_url = MONEYCONTROL_TOPSTOCKS_URL def __init__(self) -> None: self.api_response = None self.transformed_data = None def extract(self) -> None: logger.info("Fetching data from %s", self.api_url) with requests.Session() as session: try: response = session.get(self.api_url) response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx) except requests.exceptions.RequestException as e: logger.error(f"Error fetching data from {self.api_url}: {e}") return soup = BeautifulSoup(response.text, "html.parser") # Find the table containing stock information table = soup.find("table", {"id": "indicesTable"}) if not table: logger.warning( "Table with id 'indicesTable' not found for stock during scraping ranks." ) raise Exception( "Table with id 'indicesTable' not found for stock during scraping ranks." ) data = [] rows = table.find_all("tr") logger.info(f"Found {len(rows)} rows in the table") isin_fails = 0 # Extract data from each row for idx, row in enumerate(rows, start=1): columns = row.find_all("td") isin_number = None if columns: link = columns[0].find("a").get("href") if columns[0] else None if link is not None: try: logger.info(f"Fetching stock details from link {link}") response = session.get(link) time.sleep(2) response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") isin_element = soup.select_one( 'li.clearfix span:contains("ISIN") + p' ) if isin_element: isin_number = isin_element.get_text(strip=True) else: isin_fails += 1 logger.warning(f"ISIN not found for link {link}") except requests.exceptions.RequestException as e: logger.exception( f"Error fetching ISIN from link {link}: {e}" ) data.append( { "name": columns[0].get_text(strip=True), "ltp": columns[1].get_text(strip=True), "link": link, "volume": columns[4].get_text(strip=True), "percentage_change": columns[2].get_text(strip=True), "price_change": columns[3].get_text(strip=True), "rank": idx, "isin_number": isin_number, } ) logger.info(f"ISIN not found for {isin_fails} stocks out of {len(rows)}") self.api_response = data def load(self) -> None: """ Load the data into the database """ logger.info("Loading ranking data into the database...") # clear the rank field Stock.objects.exclude(rank=None).update(rank=None) for rank, stock_details in enumerate(self.transformed_data, 1): try: stock = Stock.objects.get(isin_number=stock_details["isin_number"]) except Stock.DoesNotExist: logger.warning( f"No matching stock found for ISIN: {stock_details['isin_number']} creating new object..." ) stock = Stock.objects.create(data={"stock_rank": stock_details}) else: stock.data.update({"stock_rank": stock_details}) stock.name = stock_details["name"] stock.ltp = stock_details["ltp"] stock.percentage_change = stock_details["percentage_change"] stock.price_change = stock_details["price_change"] stock.link = stock_details["link"] stock.volume = stock_details["volume"] stock.isin_number = stock_details["isin_number"] stock.rank = stock_details["rank"] stock.save() logger.info( f"Saved {rank=} {stock.name} | {stock_details=} {stock_details['isin_number']=}" ) class StockDetails(DataClient): """ Retrieves and updates stock details from the Morningstar API. """ model = Stock api_url = f"https://{settings.MORNINGSTAR_HOST}/stock/get-detail" def __init__(self, perf_id: str, isin_number: str) -> None: """ Initializes the StockDetails object. Args: perf_id (str): Performance ID of the stock. isin_number (str): ISIN number of the stock. """ if not perf_id: raise ValueError("Performance ID cannot be empty.") if not isin_number: raise ValueError("ISIN number cannot be empty.") self.api_response = {"details": None} self.perf_id = perf_id self.isin_number = isin_number def _request(self) -> requests.Response: querystring = {"PerformanceId": self.perf_id} return requests.get( self.api_url, headers=settings.MORNINGSTAR_API_HEADERS, params=querystring, ) def extract(self) -> None: """ Extracts stock details from the Morningstar API. """ response = self._request() requests_count = 1 while response.status_code != 200: if response.status_code == 429: logger.info( f"API response: %s. Waiting for %s secs", response.status_code, 30 * requests_count, ) time.sleep(30 * requests_count) response = self._request() if requests_count > 3: logger.warning( f"API response: %s. Max retries reached", response.status_code ) break requests_count += 1 else: self.api_response["details"] = response.json() logger.info(f"API response: %s", response.status_code) def load(self) -> None: """ Loads the retrieved stock details into the database. """ stock = Stock.objects.filter(isin_number=self.isin_number).first() if stock is None: logger.warning(f"No matching stock found for ISIN: {self.isin_number}") return stock.data = self.transformed_data stock.save() logger.info(f"Successfully stored data for {stock.isin_number}.")