|
""" |
|
This module contains the client classes for retrieving stock data from various sources. |
|
""" |
|
|
|
import time |
|
import logging |
|
|
|
import requests |
|
from bs4 import BeautifulSoup |
|
|
|
from django.conf import settings |
|
from core.models import Stock |
|
from data_pipeline.interfaces.api_client import DataClient |
|
from core.constants import MONEYCONTROL_TOPSTOCKS_URL |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class StockRankings(DataClient): |
|
""" """ |
|
|
|
model = Stock |
|
api_url = MONEYCONTROL_TOPSTOCKS_URL |
|
|
|
def __init__(self) -> None: |
|
self.api_response = None |
|
self.transformed_data = None |
|
|
|
def extract(self) -> None: |
|
|
|
logger.info("Fetching data from %s", self.api_url) |
|
with requests.Session() as session: |
|
try: |
|
response = session.get(self.api_url) |
|
response.raise_for_status() |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"Error fetching data from {self.api_url}: {e}") |
|
return |
|
|
|
soup = BeautifulSoup(response.text, "html.parser") |
|
|
|
|
|
table = soup.find("table", {"id": "indicesTable"}) |
|
if not table: |
|
logger.warning( |
|
"Table with id 'indicesTable' not found for stock during scraping ranks." |
|
) |
|
raise Exception( |
|
"Table with id 'indicesTable' not found for stock during scraping ranks." |
|
) |
|
|
|
data = [] |
|
rows = table.find_all("tr") |
|
logger.info(f"Found {len(rows)} rows in the table") |
|
isin_fails = 0 |
|
|
|
for idx, row in enumerate(rows, start=1): |
|
columns = row.find_all("td") |
|
isin_number = None |
|
if columns: |
|
link = columns[0].find("a").get("href") if columns[0] else None |
|
if link is not None: |
|
try: |
|
logger.info(f"Fetching stock details from link {link}") |
|
response = session.get(link) |
|
time.sleep(2) |
|
response.raise_for_status() |
|
soup = BeautifulSoup(response.text, "html.parser") |
|
isin_element = soup.select_one( |
|
'li.clearfix span:contains("ISIN") + p' |
|
) |
|
if isin_element: |
|
isin_number = isin_element.get_text(strip=True) |
|
else: |
|
isin_fails += 1 |
|
logger.warning(f"ISIN not found for link {link}") |
|
except requests.exceptions.RequestException as e: |
|
logger.exception( |
|
f"Error fetching ISIN from link {link}: {e}" |
|
) |
|
data.append( |
|
{ |
|
"name": columns[0].get_text(strip=True), |
|
"ltp": columns[1].get_text(strip=True), |
|
"link": link, |
|
"volume": columns[4].get_text(strip=True), |
|
"percentage_change": columns[2].get_text(strip=True), |
|
"price_change": columns[3].get_text(strip=True), |
|
"rank": idx, |
|
"isin_number": isin_number, |
|
} |
|
) |
|
logger.info(f"ISIN not found for {isin_fails} stocks out of {len(rows)}") |
|
|
|
self.api_response = data |
|
|
|
def load(self) -> None: |
|
""" |
|
Load the data into the database |
|
""" |
|
logger.info("Loading ranking data into the database...") |
|
|
|
Stock.objects.exclude(rank=None).update(rank=None) |
|
|
|
for rank, stock_details in enumerate(self.transformed_data, 1): |
|
try: |
|
stock = Stock.objects.get(isin_number=stock_details["isin_number"]) |
|
except Stock.DoesNotExist: |
|
logger.warning( |
|
f"No matching stock found for ISIN: {stock_details['isin_number']} creating new object..." |
|
) |
|
stock = Stock.objects.create(data={"stock_rank": stock_details}) |
|
|
|
else: |
|
stock.data.update({"stock_rank": stock_details}) |
|
|
|
stock.name = stock_details["name"] |
|
stock.ltp = stock_details["ltp"] |
|
stock.percentage_change = stock_details["percentage_change"] |
|
stock.price_change = stock_details["price_change"] |
|
stock.link = stock_details["link"] |
|
stock.volume = stock_details["volume"] |
|
stock.isin_number = stock_details["isin_number"] |
|
stock.rank = stock_details["rank"] |
|
stock.save() |
|
|
|
logger.info( |
|
f"Saved {rank=} {stock.name} | {stock_details=} {stock_details['isin_number']=}" |
|
) |
|
|
|
|
|
class StockDetails(DataClient): |
|
""" |
|
Retrieves and updates stock details from the Morningstar API. |
|
""" |
|
|
|
model = Stock |
|
api_url = f"https://{settings.MORNINGSTAR_HOST}/stock/get-detail" |
|
|
|
def __init__(self, perf_id: str, isin_number: str) -> None: |
|
""" |
|
Initializes the StockDetails object. |
|
|
|
Args: |
|
perf_id (str): Performance ID of the stock. |
|
isin_number (str): ISIN number of the stock. |
|
""" |
|
if not perf_id: |
|
raise ValueError("Performance ID cannot be empty.") |
|
if not isin_number: |
|
raise ValueError("ISIN number cannot be empty.") |
|
|
|
self.api_response = {"details": None} |
|
self.perf_id = perf_id |
|
self.isin_number = isin_number |
|
|
|
def _request(self) -> requests.Response: |
|
|
|
querystring = {"PerformanceId": self.perf_id} |
|
return requests.get( |
|
self.api_url, |
|
headers=settings.MORNINGSTAR_API_HEADERS, |
|
params=querystring, |
|
) |
|
|
|
def extract(self) -> None: |
|
""" |
|
Extracts stock details from the Morningstar API. |
|
""" |
|
|
|
response = self._request() |
|
|
|
requests_count = 1 |
|
while response.status_code != 200: |
|
if response.status_code == 429: |
|
|
|
logger.info( |
|
f"API response: %s. Waiting for %s secs", |
|
response.status_code, |
|
30 * requests_count, |
|
) |
|
time.sleep(30 * requests_count) |
|
response = self._request() |
|
if requests_count > 3: |
|
logger.warning( |
|
f"API response: %s. Max retries reached", response.status_code |
|
) |
|
break |
|
requests_count += 1 |
|
|
|
else: |
|
self.api_response["details"] = response.json() |
|
logger.info(f"API response: %s", response.status_code) |
|
|
|
def load(self) -> None: |
|
""" |
|
Loads the retrieved stock details into the database. |
|
""" |
|
|
|
stock = Stock.objects.filter(isin_number=self.isin_number).first() |
|
if stock is None: |
|
logger.warning(f"No matching stock found for ISIN: {self.isin_number}") |
|
return |
|
stock.data = self.transformed_data |
|
stock.save() |
|
logger.info(f"Successfully stored data for {stock.isin_number}.") |
|
|