Source code for quantlab.data.api_clients

"""
API client wrappers for external data sources

Provides unified interface to:
- Polygon.io (real-time quotes, options chains)
- Alpha Vantage (news sentiment, treasury rates)
- yfinance (fundamentals, analyst data)
"""

import time
import requests
from datetime import datetime, date
from typing import Optional, List, Dict, Any
from polygon import RESTClient as PolygonRESTClient
import yfinance as yf
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock

from ..utils.logger import setup_logger

logger = setup_logger(__name__)


[docs] class RateLimiter: """Simple rate limiter for API calls"""
[docs] def __init__(self, requests_per_minute: int): """ Initialize rate limiter Args: requests_per_minute: Maximum requests allowed per minute """ self.requests_per_minute = requests_per_minute self.min_interval = 60.0 / requests_per_minute self.last_request_time = 0.0
[docs] def wait_if_needed(self): """Wait if necessary to respect rate limit""" current_time = time.time() time_since_last = current_time - self.last_request_time if time_since_last < self.min_interval: sleep_time = self.min_interval - time_since_last logger.debug(f"Rate limiting: sleeping {sleep_time:.2f}s") time.sleep(sleep_time) self.last_request_time = time.time()
[docs] class PolygonClient: """ Wrapper for Polygon.io API Provides: - Real-time stock quotes - Options chains - Historical OHLCV data - Company details - Technical indicators (SMA, EMA, MACD, RSI) """
[docs] def __init__(self, api_key: str, rate_limit: int = 10000): """ Initialize Polygon client Args: api_key: Polygon API key rate_limit: Requests per minute (default: 10000 for unlimited Starter plan) """ self.client = PolygonRESTClient(api_key) self.rate_limiter = RateLimiter(rate_limit) self.api_key = api_key logger.info("✓ Polygon client initialized")
[docs] def get_stock_snapshot(self, ticker: str) -> Optional[Dict[str, Any]]: """ Get current stock snapshot Args: ticker: Stock ticker symbol Returns: Dictionary with price data or None """ try: # No rate limiting for unlimited Polygon plan snapshot = self.client.get_snapshot_ticker("stocks", ticker) if not snapshot or not snapshot.day: logger.warning(f"No snapshot data for {ticker}") return None return { "ticker": ticker, "price": snapshot.day.close, "open": snapshot.day.open, "high": snapshot.day.high, "low": snapshot.day.low, "volume": snapshot.day.volume, "vwap": snapshot.day.vwap if hasattr(snapshot.day, 'vwap') else None, "change_percent": snapshot.todaysChangePerc if hasattr(snapshot, 'todaysChangePerc') else None, "timestamp": datetime.now() } except Exception as e: logger.error(f"Failed to get snapshot for {ticker}: {e}") return None
def _fetch_option_snapshot(self, ticker: str, contract) -> Optional[Dict[str, Any]]: """Fetch single option snapshot (for parallel execution)""" try: snapshot = self.client.get_snapshot_option(ticker, contract.ticker) option_data = { "contract_ticker": contract.ticker, "underlying_ticker": ticker, "strike_price": contract.strike_price, "expiration_date": contract.expiration_date, "option_type": contract.contract_type, } # Add pricing and Greeks from snapshot if snapshot and snapshot.details: details = snapshot.details option_data.update({ "bid": details.bid if hasattr(details, 'bid') else None, "ask": details.ask if hasattr(details, 'ask') else None, "last_price": details.last_price if hasattr(details, 'last_price') else None, "volume": details.volume if hasattr(details, 'volume') else None, "open_interest": details.open_interest if hasattr(details, 'open_interest') else None, }) if snapshot and snapshot.greeks: greeks = snapshot.greeks option_data.update({ "delta": greeks.delta if hasattr(greeks, 'delta') else None, "gamma": greeks.gamma if hasattr(greeks, 'gamma') else None, "theta": greeks.theta if hasattr(greeks, 'theta') else None, "vega": greeks.vega if hasattr(greeks, 'vega') else None, }) if snapshot and snapshot.implied_volatility: option_data["implied_volatility"] = snapshot.implied_volatility return option_data except Exception as e: logger.debug(f"Failed to get snapshot for {contract.ticker}: {e}") return None
[docs] def get_options_chain( self, ticker: str, expiration_date: Optional[date] = None, contract_type: Optional[str] = None, max_workers: int = 50 ) -> List[Dict[str, Any]]: """ Get options chain for a ticker (parallel fetching) Args: ticker: Underlying ticker symbol expiration_date: Optional specific expiration date contract_type: Optional 'call' or 'put' filter max_workers: Number of parallel threads (default: 50 for unlimited API) Returns: List of option contracts """ try: # Get options contracts list contracts = list(self.client.list_options_contracts( underlying_ticker=ticker, expiration_date=expiration_date.isoformat() if expiration_date else None, contract_type=contract_type, limit=1000 )) logger.info(f"Fetching {len(contracts)} option contracts in parallel with {max_workers} workers...") options = [] # Fetch snapshots in parallel with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all fetch tasks future_to_contract = { executor.submit(self._fetch_option_snapshot, ticker, contract): contract for contract in contracts } # Collect results as they complete completed = 0 for future in as_completed(future_to_contract): completed += 1 if completed % 50 == 0: logger.info(f"Progress: {completed}/{len(contracts)} contracts fetched...") result = future.result() if result: options.append(result) logger.info(f"✓ Retrieved {len(options)} option contracts for {ticker} (parallel)") return options except Exception as e: logger.error(f"Failed to get options chain for {ticker}: {e}") return []
[docs] def get_market_holidays(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> List[str]: """ Get market holidays from Polygon API Args: start_date: Start date in YYYY-MM-DD format (default: current year) end_date: End date in YYYY-MM-DD format (default: next year) Returns: List of holiday dates in YYYY-MM-DD format """ try: # Default to current year through next year if not specified if not start_date: start_date = f"{datetime.now().year}-01-01" if not end_date: end_date = f"{datetime.now().year + 1}-12-31" # Use the marketstatus endpoint to get market holidays url = f"https://api.polygon.io/v1/marketstatus/upcoming" params = {"apiKey": self.api_key} response = requests.get(url, params=params) response.raise_for_status() data = response.json() holidays = [] if data and isinstance(data, list): for item in data: if item.get('status') == 'closed' and item.get('date'): holidays.append(item['date']) logger.info(f"✓ Retrieved {len(holidays)} market holidays from Polygon") return holidays except Exception as e: logger.warning(f"Failed to get market holidays from Polygon: {e}, using default US holidays") # Fallback to common US market holidays for current and next year year = datetime.now().year default_holidays = [ # Current year f"{year}-01-01", # New Year's Day f"{year}-01-15", # MLK Day (approx) f"{year}-02-19", # Presidents Day (approx) f"{year}-04-07", # Good Friday (approx) f"{year}-05-27", # Memorial Day (approx) f"{year}-06-19", # Juneteenth f"{year}-07-04", # Independence Day f"{year}-09-02", # Labor Day (approx) f"{year}-11-28", # Thanksgiving (approx) f"{year}-12-25", # Christmas # Next year f"{year+1}-01-01", f"{year+1}-01-20", f"{year+1}-02-17", f"{year+1}-04-18", f"{year+1}-05-26", f"{year+1}-06-19", f"{year+1}-07-04", f"{year+1}-09-01", f"{year+1}-11-27", f"{year+1}-12-25", ] return default_holidays
[docs] def get_intraday_aggregates( self, ticker: str, multiplier: int = 1, timespan: str = "minute", from_date: str = None, to_date: str = None, limit: int = 50000 ) -> Optional[List[Dict[str, Any]]]: """ Get intraday aggregate bars (minute-level data) from Polygon. Args: ticker: Stock ticker symbol multiplier: Size of the timespan multiplier (e.g., 1, 5, 15, 30) timespan: Size of the time window ('minute', 'hour') from_date: Start date in YYYY-MM-DD format (default: today) to_date: End date in YYYY-MM-DD format (default: today) limit: Maximum number of results (default: 50000, max: 50000) Returns: List of dictionaries with OHLCV data or None Example: >>> client.get_intraday_aggregates("AAPL", multiplier=5, timespan="minute") >>> # Get 1 year of 5-minute data >>> client.get_intraday_aggregates("AAPL", multiplier=5, from_date="2024-01-01", to_date="2025-01-01") """ try: from datetime import datetime, timedelta import pandas as pd # Default to today if not specified if not from_date: from_date = datetime.now().date().isoformat() if not to_date: to_date = datetime.now().date().isoformat() logger.info(f"Fetching {multiplier}-{timespan} aggregates for {ticker} from {from_date} to {to_date}") # Use Polygon SDK to get aggregates aggs = list(self.client.list_aggs( ticker=ticker, multiplier=multiplier, timespan=timespan, from_=from_date, to=to_date, limit=limit, adjusted=True, sort="asc" )) if not aggs: logger.warning(f"No intraday data for {ticker}") return None # Convert to list of dicts with timestamps in ET timezone from datetime import timezone import pytz # US Eastern timezone et_tz = pytz.timezone('US/Eastern') result = [] for agg in aggs: # Polygon returns timestamps in milliseconds (UTC) # Convert to ET for proper display and rangebreaks utc_dt = datetime.fromtimestamp(agg.timestamp / 1000, tz=timezone.utc) et_dt = utc_dt.astimezone(et_tz) # Convert to naive datetime for consistency with existing code # (Plotly works better with naive datetimes when using rangebreaks) result.append({ "date": et_dt.replace(tzinfo=None), # Store as naive datetime in ET "open": agg.open, "high": agg.high, "low": agg.low, "close": agg.close, "volume": agg.volume, "vwap": agg.vwap if hasattr(agg, 'vwap') else None, "transactions": agg.transactions if hasattr(agg, 'transactions') else None, }) logger.info(f"✓ Retrieved {len(result)} {multiplier}-{timespan} bars for {ticker}") return result except Exception as e: logger.error(f"Failed to get intraday aggregates for {ticker}: {e}") return None
[docs] def get_technical_indicators(self, ticker: str) -> Optional[Dict[str, Any]]: """ Get technical indicators from Polygon API Args: ticker: Stock ticker symbol Returns: Dictionary with SMA, EMA, MACD, RSI or None """ try: indicators = {} # Get SMA (20-day and 50-day) sma_20 = self.client.get_sma( ticker=ticker, timespan="day", adjusted=True, window=20, series_type="close", order="desc", limit=1, ) if sma_20.values: indicators["sma_20"] = sma_20.values[0].value sma_50 = self.client.get_sma( ticker=ticker, timespan="day", adjusted=True, window=50, series_type="close", order="desc", limit=1, ) if sma_50.values: indicators["sma_50"] = sma_50.values[0].value # Get EMA (12-day and 26-day) ema_12 = self.client.get_ema( ticker=ticker, timespan="day", adjusted=True, window=12, series_type="close", order="desc", limit=1, ) if ema_12.values: indicators["ema_12"] = ema_12.values[0].value ema_26 = self.client.get_ema( ticker=ticker, timespan="day", adjusted=True, window=26, series_type="close", order="desc", limit=1, ) if ema_26.values: indicators["ema_26"] = ema_26.values[0].value # Get RSI (14-day) rsi = self.client.get_rsi( ticker=ticker, timespan="day", adjusted=True, window=14, series_type="close", order="desc", limit=1, ) if rsi.values: indicators["rsi_14"] = rsi.values[0].value # Get MACD macd = self.client.get_macd( ticker=ticker, timespan="day", adjusted=True, short_window=12, long_window=26, signal_window=9, series_type="close", order="desc", limit=1, ) if macd.values: macd_val = macd.values[0] indicators["macd_line"] = macd_val.value indicators["macd_signal"] = macd_val.signal indicators["macd_histogram"] = macd_val.histogram logger.info(f"✓ Retrieved technical indicators from Polygon for {ticker}") return indicators except Exception as e: logger.error(f"Failed to get technical indicators for {ticker}: {e}") return None
[docs] class AlphaVantageClient: """ Wrapper for Alpha Vantage API Provides: - News sentiment analysis - Treasury rates - Economic indicators """ BASE_URL = "https://www.alphavantage.co/query"
[docs] def __init__(self, api_key: str, rate_limit: int = 5): """ Initialize Alpha Vantage client Args: api_key: Alpha Vantage API key rate_limit: Requests per minute (default: 5 for free tier) """ self.api_key = api_key self.rate_limiter = RateLimiter(rate_limit) logger.info("✓ Alpha Vantage client initialized")
[docs] def get_treasury_rate(self, maturity: str = "3month") -> Optional[float]: """ Get current Treasury rate Args: maturity: '3month', '2year', '5year', '10year', '30year' Returns: Treasury rate as decimal (e.g., 0.0407 for 4.07%) or None """ try: self.rate_limiter.wait_if_needed() params = { "function": "TREASURY_YIELD", "interval": "daily", "maturity": maturity, "apikey": self.api_key } response = requests.get(self.BASE_URL, params=params) response.raise_for_status() data = response.json() if "data" in data and len(data["data"]) > 0: # Get most recent rate latest = data["data"][0] rate = float(latest["value"]) / 100 # Convert to decimal logger.info(f"✓ Treasury rate ({maturity}): {rate*100:.3f}%") return rate logger.warning("No Treasury rate data available") return None except Exception as e: logger.error(f"Failed to get Treasury rate: {e}") return None
[docs] def get_news_sentiment( self, tickers: List[str], limit: int = 50 ) -> Optional[Dict[str, Any]]: """ Get news sentiment for tickers Args: tickers: List of ticker symbols limit: Maximum number of articles Returns: Dictionary with sentiment data or None """ try: self.rate_limiter.wait_if_needed() params = { "function": "NEWS_SENTIMENT", "tickers": ",".join(tickers), "limit": limit, "apikey": self.api_key } response = requests.get(self.BASE_URL, params=params) response.raise_for_status() data = response.json() if "feed" not in data: logger.warning("No news sentiment data available") return None articles = data["feed"] # Aggregate sentiment scores sentiment_scores = [] relevance_scores = [] for article in articles: if "overall_sentiment_score" in article: sentiment_scores.append(float(article["overall_sentiment_score"])) # Check ticker-specific sentiment for ticker_sentiment in article.get("ticker_sentiment", []): if ticker_sentiment.get("ticker") in tickers: if "relevance_score" in ticker_sentiment: relevance_scores.append(float(ticker_sentiment["relevance_score"])) if not sentiment_scores: logger.warning("No sentiment scores found") return None avg_sentiment = sum(sentiment_scores) / len(sentiment_scores) avg_relevance = sum(relevance_scores) / len(relevance_scores) if relevance_scores else 0 # Classify sentiment if avg_sentiment > 0.15: label = "bullish" elif avg_sentiment < -0.15: label = "bearish" else: label = "neutral" result = { "sentiment_score": avg_sentiment, "sentiment_label": label, "articles_analyzed": len(articles), "average_relevance": avg_relevance, "positive_articles": sum(1 for s in sentiment_scores if s > 0.15), "negative_articles": sum(1 for s in sentiment_scores if s < -0.15), "neutral_articles": sum(1 for s in sentiment_scores if -0.15 <= s <= 0.15), } logger.info(f"✓ Analyzed {len(articles)} articles: {label} ({avg_sentiment:.3f})") return result except Exception as e: logger.error(f"Failed to get news sentiment: {e}") return None
[docs] class YFinanceClient: """ Wrapper for yfinance library Provides: - VIX data - Fundamental data - Analyst recommendations - Institutional holdings """
[docs] def __init__(self): """Initialize yfinance client""" logger.info("✓ yfinance client initialized")
[docs] def get_vix(self) -> Optional[Dict[str, float]]: """ Get current VIX data Returns: Dictionary with VIX metrics or None """ try: vix = yf.Ticker("^VIX") hist = vix.history(period="5d") if hist.empty: logger.warning("No VIX data available") return None current_vix = hist['Close'].iloc[-1] avg_vix_5d = hist['Close'].mean() result = { "vix": current_vix, "vix_5d_avg": avg_vix_5d, "timestamp": datetime.now() } logger.info(f"✓ VIX: {current_vix:.2f} (5d avg: {avg_vix_5d:.2f})") return result except Exception as e: logger.error(f"Failed to get VIX data: {e}") return None
[docs] def get_fundamentals(self, ticker: str) -> Optional[Dict[str, Any]]: """ Get fundamental data for a ticker Args: ticker: Stock ticker symbol Returns: Dictionary with fundamental metrics or None """ try: stock = yf.Ticker(ticker) info = stock.info if not info: logger.warning(f"No fundamental data for {ticker}") return None result = { "ticker": ticker, "market_cap": info.get("marketCap"), "pe_ratio": info.get("trailingPE"), "forward_pe": info.get("forwardPE"), "peg_ratio": info.get("pegRatio"), "price_to_book": info.get("priceToBook"), "profit_margin": info.get("profitMargins"), "operating_margin": info.get("operatingMargins"), "return_on_equity": info.get("returnOnEquity"), "return_on_assets": info.get("returnOnAssets"), "revenue_growth": info.get("revenueGrowth"), "earnings_growth": info.get("earningsGrowth"), "total_cash": info.get("totalCash"), "total_debt": info.get("totalDebt"), "debt_to_equity": info.get("debtToEquity"), "current_ratio": info.get("currentRatio"), "target_price": info.get("targetMeanPrice"), "recommendation": info.get("recommendationKey"), "num_analysts": info.get("numberOfAnalystOpinions"), "timestamp": datetime.now() } logger.info(f"✓ Retrieved fundamentals for {ticker}") return result except Exception as e: logger.error(f"Failed to get fundamentals for {ticker}: {e}") return None
[docs] def get_institutional_holders(self, ticker: str) -> Optional[List[Dict[str, Any]]]: """ Get institutional holders for a ticker Args: ticker: Stock ticker symbol Returns: List of institutional holders or None """ try: stock = yf.Ticker(ticker) holders = stock.institutional_holders if holders is None or holders.empty: logger.warning(f"No institutional holder data for {ticker}") return None result = [] for _, row in holders.iterrows(): result.append({ "holder": row.get("Holder"), "shares": row.get("Shares"), "date_reported": row.get("Date Reported"), "percent_out": row.get("% Out"), "value": row.get("Value") }) logger.info(f"✓ Retrieved {len(result)} institutional holders for {ticker}") return result except Exception as e: logger.error(f"Failed to get institutional holders for {ticker}: {e}") return None