Source code for quantlab.data.data_manager

"""
Unified Data Manager

Routes data requests to appropriate sources:
- Historical data: Parquet files (fast, local)
- Real-time data: Polygon API
- Sentiment: Alpha Vantage API
- Fundamentals: yfinance API

Implements smart routing and fallback strategies.
"""

from datetime import datetime, date, timedelta
from typing import Optional, List, Dict, Any
import json

from .api_clients import PolygonClient, AlphaVantageClient, YFinanceClient
from .parquet_reader import ParquetReader
from .database import DatabaseManager
from .lookup_tables import LookupTableManager
from ..models.ticker_data import TickerSnapshot, OptionContract, FundamentalData, SentimentData
from ..analysis.greeks_calculator import BlackScholesGreeks, calculate_advanced_greeks
from ..analysis.technical_indicators import TechnicalAnalysis
from ..utils.logger import setup_logger
from ..utils.config import Config

logger = setup_logger(__name__)


[docs] class DataManager: """ Unified data manager with smart routing Features: - Automatic source selection (Parquet vs API) - Caching with TTL - Fallback strategies - Greeks calculation """
[docs] def __init__( self, config: Config, db: DatabaseManager, parquet: ParquetReader ): """ Initialize data manager Args: config: Configuration object db: Database manager parquet: Parquet reader """ self.config = config self.db = db self.parquet = parquet # Initialize API clients self.polygon = PolygonClient( api_key=config.polygon_api_key, rate_limit=config.polygon_rate_limit ) self.alphavantage = AlphaVantageClient( api_key=config.alphavantage_api_key, rate_limit=config.alphavantage_rate_limit ) self.yfinance = YFinanceClient() # Initialize lookup table manager self.lookup = LookupTableManager(db) logger.info("✓ Data manager initialized with all sources")
# ===== STOCK DATA =====
[docs] def get_stock_price( self, ticker: str, date: Optional[date] = None, use_cache: bool = True ) -> Optional[TickerSnapshot]: """ Get stock price data Strategy: - If date is None (real-time): Use Polygon API - If date is provided: Use Parquet (historical) - Cache results in database Args: ticker: Stock ticker symbol date: Optional date (None = real-time) use_cache: Whether to use cached data Returns: TickerSnapshot or None """ try: # Check cache first if use_cache: cached = self._get_cached_snapshot(ticker, date) if cached: logger.debug(f"Cache hit for {ticker}") return cached # Real-time: Use Polygon API if date is None: data = self.polygon.get_stock_snapshot(ticker) if not data: return None snapshot = TickerSnapshot( ticker=ticker, date=datetime.now().date(), open=data["open"], high=data["high"], low=data["low"], close=data["price"], volume=data["volume"], vwap=data.get("vwap"), change_percent=data.get("change_percent"), data_source="polygon", fetched_at=datetime.now() ) # Cache it self._cache_snapshot(snapshot) return snapshot # Historical: Use Parquet df = self.parquet.get_stock_daily( tickers=[ticker], start_date=date, end_date=date, limit=1 ) if df is None or df.empty: logger.warning(f"No historical data for {ticker} on {date}") return None row = df.iloc[0] snapshot = TickerSnapshot( ticker=ticker, date=row['date'].date() if hasattr(row['date'], 'date') else row['date'], open=float(row['open']), high=float(row['high']), low=float(row['low']), close=float(row['close']), volume=int(row['volume']), data_source="parquet", fetched_at=datetime.now() ) return snapshot except Exception as e: logger.error(f"Failed to get stock price for {ticker}: {e}") return None
[docs] def get_intraday_prices( self, ticker: str, interval: str = "1min", from_date: Optional[str] = None, to_date: Optional[str] = None, limit: int = 50000, include_extended_hours: bool = False ) -> Optional[Any]: # Returns pandas DataFrame """ Get intraday price data at minute/hour intervals Args: ticker: Stock ticker symbol interval: Time interval ('1min', '5min', '15min', '30min', '1hour') from_date: Start date in YYYY-MM-DD format (default: today) to_date: End date in YYYY-MM-DD format (default: today) limit: Maximum number of bars (default: 50000) include_extended_hours: If True, include pre-market and after-hours data. If False (default), only include regular market hours (9:30 AM - 4 PM ET) Returns: pandas DataFrame with columns: date, open, high, low, close, volume, vwap or None if no data available Example: >>> # Get today's 5-minute data (regular hours only) >>> df = data_manager.get_intraday_prices("AAPL", interval="5min") >>> # Get 1 year of 5-minute data including pre/post market >>> df = data_manager.get_intraday_prices("AAPL", interval="5min", ... from_date="2024-01-01", to_date="2025-01-01", include_extended_hours=True) """ try: import pandas as pd # Parse interval string to multiplier and timespan interval_map = { "1min": (1, "minute"), "5min": (5, "minute"), "15min": (15, "minute"), "30min": (30, "minute"), "1hour": (1, "hour"), } if interval not in interval_map: logger.error(f"Invalid interval: {interval}. Valid options: {list(interval_map.keys())}") return None multiplier, timespan = interval_map[interval] # Fetch from Polygon API aggregates = self.polygon.get_intraday_aggregates( ticker=ticker, multiplier=multiplier, timespan=timespan, from_date=from_date, to_date=to_date, limit=limit ) if not aggregates: logger.warning(f"No intraday data for {ticker}") return None # Convert to DataFrame df = pd.DataFrame(aggregates) # Filter to regular market hours if requested (default) if not include_extended_hours: # Regular US market hours: 9:30 AM - 4:00 PM ET # Keep bars where: (hour > 9 OR (hour == 9 AND minute >= 30)) AND hour < 16 original_count = len(df) df = df[ ((df['date'].dt.hour > 9) | ((df['date'].dt.hour == 9) & (df['date'].dt.minute >= 30))) & (df['date'].dt.hour < 16) ].copy() filtered_count = original_count - len(df) if filtered_count > 0: logger.info(f" Filtered out {filtered_count} extended hours bars (pre-market/after-hours)") logger.info(f"✓ Retrieved {len(df)} intraday bars for {ticker} ({interval})") return df except Exception as e: logger.error(f"Failed to get intraday prices for {ticker}: {e}") return None
# ===== OPTIONS DATA =====
[docs] def get_options_chain( self, ticker: str, expiration_date: Optional[date] = None, option_type: Optional[str] = None, min_itm_pct: float = 5.0, max_itm_pct: float = 20.0, use_cache: bool = True ) -> List[OptionContract]: """ Get options chain with calculated advanced Greeks Args: ticker: Underlying ticker symbol expiration_date: Optional specific expiration option_type: Optional 'call' or 'put' filter min_itm_pct: Minimum ITM percentage max_itm_pct: Maximum ITM percentage use_cache: Whether to use cached data Returns: List of OptionContract objects with Greeks """ try: # Get current stock price current_price_snap = self.get_stock_price(ticker) if not current_price_snap: logger.error(f"Cannot get options: no price for {ticker}") return [] current_price = float(current_price_snap.close) # Convert Decimal to float # Get risk-free rate risk_free_rate = self._get_risk_free_rate() # Get options from Polygon options_data = self.polygon.get_options_chain( ticker=ticker, expiration_date=expiration_date, contract_type=option_type ) if not options_data: logger.warning(f"No options data for {ticker}") return [] # Process and filter options options = [] for opt in options_data: # Calculate ITM percentage strike = float(opt["strike_price"]) # Convert Decimal to float if opt["option_type"] == "call": itm_pct = ((current_price - strike) / strike) * 100 else: itm_pct = ((strike - current_price) / strike) * 100 # Filter by ITM range if not (min_itm_pct <= itm_pct <= max_itm_pct): continue # Calculate advanced Greeks if we have IV iv = opt.get("implied_volatility") if iv: days = BlackScholesGreeks.days_to_expiry(opt["expiration_date"]) greeks = calculate_advanced_greeks( stock_price=current_price, strike_price=strike, days_to_expiry=days, risk_free_rate=risk_free_rate, implied_volatility=iv, option_type=opt["option_type"] ) # Override with calculated Greeks if Polygon doesn't provide them if not opt.get("delta"): opt.update(greeks) else: # Add advanced Greeks to existing first-order Greeks opt["vanna"] = greeks.get("vanna") opt["charm"] = greeks.get("charm") opt["vomma"] = greeks.get("vomma") # Create OptionContract contract = OptionContract( contract_ticker=opt["contract_ticker"], underlying_ticker=ticker, strike_price=strike, expiration_date=opt["expiration_date"], option_type=opt["option_type"], bid=opt.get("bid"), ask=opt.get("ask"), last_price=opt.get("last_price"), volume=opt.get("volume"), open_interest=opt.get("open_interest"), implied_volatility=iv, delta=opt.get("delta"), gamma=opt.get("gamma"), theta=opt.get("theta"), vega=opt.get("vega"), vanna=opt.get("vanna"), charm=opt.get("charm"), vomma=opt.get("vomma"), itm_percentage=itm_pct, data_source="polygon", fetched_at=datetime.now() ) options.append(contract) logger.info(f"✓ Retrieved {len(options)} options for {ticker} (ITM: {min_itm_pct}-{max_itm_pct}%)") return options except Exception as e: logger.error(f"Failed to get options chain for {ticker}: {e}") return []
# ===== FUNDAMENTALS =====
[docs] def get_fundamentals( self, ticker: str, use_cache: bool = True ) -> Optional[FundamentalData]: """ Get fundamental data from yfinance Args: ticker: Stock ticker symbol use_cache: Whether to use cached data Returns: FundamentalData or None """ try: # Check cache if use_cache: cached = self._get_cached_fundamentals(ticker) if cached: return cached # Fetch from yfinance data = self.yfinance.get_fundamentals(ticker) if not data: return None fundamentals = FundamentalData( ticker=ticker, date=datetime.now().date(), market_cap=data.get("market_cap"), pe_ratio=data.get("pe_ratio"), forward_pe=data.get("forward_pe"), peg_ratio=data.get("peg_ratio"), price_to_book=data.get("price_to_book"), profit_margin=data.get("profit_margin"), operating_margin=data.get("operating_margin"), return_on_equity=data.get("return_on_equity"), return_on_assets=data.get("return_on_assets"), revenue_growth=data.get("revenue_growth"), earnings_growth=data.get("earnings_growth"), total_cash=data.get("total_cash"), total_debt=data.get("total_debt"), debt_to_equity=data.get("debt_to_equity"), current_ratio=data.get("current_ratio"), target_price=data.get("target_price"), recommendation=data.get("recommendation"), num_analysts=data.get("num_analysts"), data_source="yfinance", fetched_at=datetime.now() ) # Cache it self._cache_fundamentals(fundamentals) return fundamentals except Exception as e: logger.error(f"Failed to get fundamentals for {ticker}: {e}") return None
# ===== SENTIMENT =====
[docs] def get_sentiment( self, tickers: List[str], use_cache: bool = True ) -> Optional[SentimentData]: """ Get news sentiment from Alpha Vantage Args: tickers: List of ticker symbols use_cache: Whether to use cached data Returns: SentimentData or None """ try: # Check cache (use first ticker as key) if use_cache and len(tickers) == 1: cached = self._get_cached_sentiment(tickers[0]) if cached: return cached # Fetch from Alpha Vantage data = self.alphavantage.get_news_sentiment(tickers) if not data: return None sentiment = SentimentData( ticker=tickers[0] if len(tickers) == 1 else ",".join(tickers), date=datetime.now().date(), sentiment_score=data["sentiment_score"], sentiment_label=data["sentiment_label"], articles_analyzed=data["articles_analyzed"], positive_articles=data["positive_articles"], negative_articles=data["negative_articles"], neutral_articles=data["neutral_articles"], average_relevance=data["average_relevance"], data_source="alphavantage", fetched_at=datetime.now() ) # Cache it if len(tickers) == 1: self._cache_sentiment(sentiment) return sentiment except Exception as e: logger.error(f"Failed to get sentiment for {tickers}: {e}") return None
# ===== MARKET DATA =====
[docs] def get_vix(self) -> Optional[Dict[str, float]]: """ Get VIX data from yfinance Returns: Dictionary with VIX metrics or None """ return self.yfinance.get_vix()
def _get_risk_free_rate(self) -> float: """ Get current risk-free rate (3-month Treasury) Strategy: 1. Check lookup table (refreshed daily) 2. Fallback to Alpha Vantage API 3. Default to 4.5% Returns: Risk-free rate as decimal, or default 0.045 """ try: # Try lookup table first (much faster!) rate = self.lookup.get_treasury_rate('3month', max_age_days=1) if rate: logger.debug(f"Using cached Treasury rate: {rate*100:.3f}%") return rate # Fallback to API logger.debug("Treasury rate not in cache, fetching from API...") rate = self.alphavantage.get_treasury_rate("3month") if rate: return rate logger.warning("Using default risk-free rate: 4.5%") return 0.045 except: return 0.045 # ===== CACHING HELPERS ===== def _get_cached_snapshot(self, ticker: str, date: Optional[date]) -> Optional[TickerSnapshot]: """Get cached ticker snapshot""" try: if date is None: date = datetime.now().date() # DuckDB compatible timestamp comparison result = self.db.execute( """ SELECT * FROM ticker_snapshots WHERE ticker = ? AND date = ? AND fetched_at > (CURRENT_TIMESTAMP - INTERVAL '15 minutes') ORDER BY fetched_at DESC LIMIT 1 """, [ticker, date] ).fetchone() if result: return TickerSnapshot( ticker=result[1], date=result[2], open=result[3], high=result[4], low=result[5], close=result[6], volume=result[7], vwap=result[8], change_percent=result[9], data_source=result[10], fetched_at=result[11] ) return None except Exception as e: logger.debug(f"Cache lookup failed: {e}") return None def _cache_snapshot(self, snapshot: TickerSnapshot): """Cache ticker snapshot""" try: # Generate unique ID import uuid snapshot_id = int(uuid.uuid4().int & (1 << 63) - 1) self.db.execute( """ INSERT INTO ticker_snapshots (id, ticker, date, open, high, low, close, volume, vwap, change_percent, data_source, fetched_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (ticker, date, data_source) DO UPDATE SET open = EXCLUDED.open, high = EXCLUDED.high, low = EXCLUDED.low, close = EXCLUDED.close, volume = EXCLUDED.volume, vwap = EXCLUDED.vwap, change_percent = EXCLUDED.change_percent, fetched_at = EXCLUDED.fetched_at """, [ snapshot_id, snapshot.ticker, snapshot.date, snapshot.open, snapshot.high, snapshot.low, snapshot.close, snapshot.volume, snapshot.vwap, snapshot.change_percent, snapshot.data_source, snapshot.fetched_at ] ) except Exception as e: logger.debug(f"Failed to cache snapshot: {e}") def _get_cached_fundamentals(self, ticker: str) -> Optional[FundamentalData]: """Get cached fundamentals (24hr TTL)""" try: result = self.db.execute( """ SELECT * FROM fundamental_data WHERE ticker = ? AND fetched_at > (CURRENT_TIMESTAMP - INTERVAL '24 hours') ORDER BY fetched_at DESC LIMIT 1 """, [ticker] ).fetchone() if result: return FundamentalData( ticker=result[1], date=result[2], market_cap=result[3], pe_ratio=result[4], forward_pe=result[5], peg_ratio=result[6], price_to_book=result[7], profit_margin=result[8], operating_margin=result[9], return_on_equity=result[10], return_on_assets=result[11], revenue_growth=result[12], earnings_growth=result[13], total_cash=result[14], total_debt=result[15], debt_to_equity=result[16], current_ratio=result[17], target_price=result[18], recommendation=result[19], num_analysts=result[20], data_source=result[23], fetched_at=result[24] ) return None except Exception as e: logger.debug(f"Cache lookup failed: {e}") return None def _cache_fundamentals(self, data: FundamentalData): """Cache fundamental data""" try: import uuid data_id = int(uuid.uuid4().int & (1 << 63) - 1) self.db.execute( """ INSERT INTO fundamental_data (id, ticker, date, market_cap, pe_ratio, forward_pe, peg_ratio, price_to_book, profit_margin, operating_margin, return_on_equity, return_on_assets, revenue_growth, earnings_growth, total_cash, total_debt, debt_to_equity, current_ratio, target_price, recommendation, num_analysts, institutional_ownership, insider_ownership, data_source, fetched_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (ticker, date, data_source) DO UPDATE SET market_cap = EXCLUDED.market_cap, pe_ratio = EXCLUDED.pe_ratio, forward_pe = EXCLUDED.forward_pe, peg_ratio = EXCLUDED.peg_ratio, price_to_book = EXCLUDED.price_to_book, profit_margin = EXCLUDED.profit_margin, operating_margin = EXCLUDED.operating_margin, return_on_equity = EXCLUDED.return_on_equity, return_on_assets = EXCLUDED.return_on_assets, revenue_growth = EXCLUDED.revenue_growth, earnings_growth = EXCLUDED.earnings_growth, total_cash = EXCLUDED.total_cash, total_debt = EXCLUDED.total_debt, debt_to_equity = EXCLUDED.debt_to_equity, current_ratio = EXCLUDED.current_ratio, target_price = EXCLUDED.target_price, recommendation = EXCLUDED.recommendation, num_analysts = EXCLUDED.num_analysts, institutional_ownership = EXCLUDED.institutional_ownership, insider_ownership = EXCLUDED.insider_ownership, fetched_at = EXCLUDED.fetched_at """, [ data_id, data.ticker, data.date, data.market_cap, data.pe_ratio, data.forward_pe, data.peg_ratio, data.price_to_book, data.profit_margin, data.operating_margin, data.return_on_equity, data.return_on_assets, data.revenue_growth, data.earnings_growth, data.total_cash, data.total_debt, data.debt_to_equity, data.current_ratio, data.target_price, data.recommendation, data.num_analysts, data.institutional_ownership, data.insider_ownership, data.data_source, data.fetched_at ] ) except Exception as e: logger.debug(f"Failed to cache fundamentals: {e}") def _get_cached_sentiment(self, ticker: str) -> Optional[SentimentData]: """Get cached sentiment (1hr TTL)""" try: result = self.db.execute( """ SELECT * FROM sentiment_data WHERE ticker = ? AND fetched_at > (CURRENT_TIMESTAMP - INTERVAL '1 hour') ORDER BY fetched_at DESC LIMIT 1 """, [ticker] ).fetchone() if result: return SentimentData( ticker=result[1], date=result[2], sentiment_score=result[3], sentiment_label=result[4], articles_analyzed=result[5], positive_articles=result[6], negative_articles=result[7], neutral_articles=result[8], average_relevance=result[9], buzz_score=result[10], data_source=result[11], fetched_at=result[12] ) return None except Exception as e: logger.debug(f"Cache lookup failed: {e}") return None def _cache_sentiment(self, data: SentimentData): """Cache sentiment data""" try: import uuid data_id = int(uuid.uuid4().int & (1 << 63) - 1) self.db.execute( """ INSERT INTO sentiment_data (id, ticker, date, sentiment_score, sentiment_label, articles_analyzed, positive_articles, negative_articles, neutral_articles, average_relevance, buzz_score, data_source, fetched_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (ticker, date, data_source) DO UPDATE SET sentiment_score = EXCLUDED.sentiment_score, sentiment_label = EXCLUDED.sentiment_label, articles_analyzed = EXCLUDED.articles_analyzed, positive_articles = EXCLUDED.positive_articles, negative_articles = EXCLUDED.negative_articles, neutral_articles = EXCLUDED.neutral_articles, average_relevance = EXCLUDED.average_relevance, buzz_score = EXCLUDED.buzz_score, fetched_at = EXCLUDED.fetched_at """, [ data_id, data.ticker, data.date, data.sentiment_score, data.sentiment_label, data.articles_analyzed, data.positive_articles, data.negative_articles, data.neutral_articles, data.average_relevance, data.buzz_score, data.data_source, data.fetched_at ] ) except Exception as e: logger.debug(f"Failed to cache sentiment: {e}") # ===== TECHNICAL INDICATORS =====
[docs] def get_technical_indicators( self, ticker: str, days: int = 200, verify_calculations: bool = False ) -> Optional[Dict[str, Any]]: """ Get technical indicators using hybrid approach: - Primary: Polygon API for SMA, EMA, MACD, RSI (faster, pre-calculated) - Fallback: Calculate from Parquet historical data - Always calculate: Bollinger Bands, ATR, Stochastic, OBV, ADX (not in Polygon) Args: ticker: Stock ticker symbol days: Number of days of historical data for fallback/calculated indicators (default: 200) verify_calculations: If True, compare calculated vs API values for verification Returns: Dictionary with all technical indicators or None """ try: result = { "timestamp": datetime.now().isoformat(), "trend": {}, "momentum": {}, "volatility": {}, "volume": {}, "trend_strength": {}, "data_source": {} # Track which source was used for each indicator } # Step 1: Try to get indicators from Polygon API first polygon_indicators = None try: polygon_indicators = self.polygon.get_technical_indicators(ticker) if polygon_indicators: logger.debug(f"✓ Got technical indicators from Polygon API for {ticker}") except Exception as e: logger.debug(f"Polygon API indicators failed, will use fallback: {e}") # Step 2: Get historical data for calculated indicators (and fallback) end_date = datetime.now().date() start_date = end_date - timedelta(days=days) df = self.parquet.get_stock_daily( tickers=[ticker], start_date=start_date, end_date=end_date ) if df is None or df.empty: # If we have Polygon data, we can still proceed if polygon_indicators: logger.warning(f"No historical data for {ticker}, using Polygon API only") else: logger.error(f"No historical data and no Polygon data for {ticker}") return None # Ensure required columns exist for calculations required_cols = ['open', 'high', 'low', 'close', 'volume'] has_historical_data = df is not None and not df.empty and all(col in df.columns for col in required_cols) # Step 3: Calculate indicators from historical data (for fallback and non-Polygon indicators) calculated_indicators = {} if has_historical_data: from ..analysis.technical_indicators import TechnicalIndicators import pandas as pd # Current price result["current_price"] = float(df['close'].iloc[-1]) # Calculate SMA, EMA, RSI, MACD (for fallback or verification) calculated_indicators["sma_20"] = float(TechnicalIndicators.sma(df['close'], 20).iloc[-1]) calculated_indicators["sma_50"] = float(TechnicalIndicators.sma(df['close'], 50).iloc[-1]) calculated_indicators["ema_12"] = float(TechnicalIndicators.ema(df['close'], 12).iloc[-1]) calculated_indicators["ema_26"] = float(TechnicalIndicators.ema(df['close'], 26).iloc[-1]) calculated_indicators["rsi_14"] = float(TechnicalIndicators.rsi(df['close'], 14).iloc[-1]) macd_line, signal_line, histogram = TechnicalIndicators.macd(df['close'], 12, 26, 9) calculated_indicators["macd_line"] = float(macd_line.iloc[-1]) calculated_indicators["macd_signal"] = float(signal_line.iloc[-1]) calculated_indicators["macd_histogram"] = float(histogram.iloc[-1]) # Calculate Bollinger Bands (not in Polygon API) bb_upper, bb_middle, bb_lower = TechnicalIndicators.bollinger_bands(df['close'], 20, 2.0) result["volatility"]["bb_upper"] = float(bb_upper.iloc[-1]) if not pd.isna(bb_upper.iloc[-1]) else None result["volatility"]["bb_middle"] = float(bb_middle.iloc[-1]) if not pd.isna(bb_middle.iloc[-1]) else None result["volatility"]["bb_lower"] = float(bb_lower.iloc[-1]) if not pd.isna(bb_lower.iloc[-1]) else None # Calculate ATR (not in Polygon API) atr = TechnicalIndicators.atr(df, 14) result["volatility"]["atr_14"] = float(atr.iloc[-1]) if not pd.isna(atr.iloc[-1]) else None # Calculate Stochastic (not in Polygon API) stoch_k, stoch_d = TechnicalIndicators.stochastic(df, 14, 3, 3) result["momentum"]["stochastic_k"] = float(stoch_k.iloc[-1]) if not pd.isna(stoch_k.iloc[-1]) else None result["momentum"]["stochastic_d"] = float(stoch_d.iloc[-1]) if not pd.isna(stoch_d.iloc[-1]) else None # Calculate OBV (not in Polygon API) obv = TechnicalIndicators.obv(df) result["volume"]["obv"] = float(obv.iloc[-1]) if not pd.isna(obv.iloc[-1]) else None # Calculate ADX (not in Polygon API) adx = TechnicalIndicators.adx(df, 14) result["trend_strength"]["adx_14"] = float(adx.iloc[-1]) if not pd.isna(adx.iloc[-1]) else None # Step 4: Use Polygon API data when available, fallback to calculated # Trend indicators (SMA, EMA) if polygon_indicators and polygon_indicators.get("sma_20"): result["trend"]["sma_20"] = polygon_indicators["sma_20"] result["data_source"]["sma_20"] = "polygon" elif "sma_20" in calculated_indicators: result["trend"]["sma_20"] = calculated_indicators["sma_20"] result["data_source"]["sma_20"] = "calculated" if polygon_indicators and polygon_indicators.get("sma_50"): result["trend"]["sma_50"] = polygon_indicators["sma_50"] result["data_source"]["sma_50"] = "polygon" elif "sma_50" in calculated_indicators: result["trend"]["sma_50"] = calculated_indicators["sma_50"] result["data_source"]["sma_50"] = "calculated" if polygon_indicators and polygon_indicators.get("ema_12"): result["trend"]["ema_12"] = polygon_indicators["ema_12"] result["data_source"]["ema_12"] = "polygon" elif "ema_12" in calculated_indicators: result["trend"]["ema_12"] = calculated_indicators["ema_12"] result["data_source"]["ema_12"] = "calculated" if polygon_indicators and polygon_indicators.get("ema_26"): result["trend"]["ema_26"] = polygon_indicators["ema_26"] result["data_source"]["ema_26"] = "polygon" elif "ema_26" in calculated_indicators: result["trend"]["ema_26"] = calculated_indicators["ema_26"] result["data_source"]["ema_26"] = "calculated" # Momentum indicators (RSI, MACD) if polygon_indicators and polygon_indicators.get("rsi_14"): result["momentum"]["rsi_14"] = polygon_indicators["rsi_14"] result["data_source"]["rsi_14"] = "polygon" elif "rsi_14" in calculated_indicators: result["momentum"]["rsi_14"] = calculated_indicators["rsi_14"] result["data_source"]["rsi_14"] = "calculated" if polygon_indicators and polygon_indicators.get("macd_line"): result["momentum"]["macd_line"] = polygon_indicators["macd_line"] result["momentum"]["macd_signal"] = polygon_indicators["macd_signal"] result["momentum"]["macd_histogram"] = polygon_indicators["macd_histogram"] result["data_source"]["macd"] = "polygon" elif "macd_line" in calculated_indicators: result["momentum"]["macd_line"] = calculated_indicators["macd_line"] result["momentum"]["macd_signal"] = calculated_indicators["macd_signal"] result["momentum"]["macd_histogram"] = calculated_indicators["macd_histogram"] result["data_source"]["macd"] = "calculated" # Step 5: Verification - compare calculated vs API values if verify_calculations and polygon_indicators and calculated_indicators: verification = self._verify_indicator_calculations(polygon_indicators, calculated_indicators) result["verification"] = verification logger.info(f"Verification: {verification}") # Step 6: Generate trading signals result["signals"] = self._interpret_technical_signals(result) source_summary = "Polygon API" if polygon_indicators else "calculated" logger.info(f"✓ Retrieved technical indicators for {ticker} (primary source: {source_summary})") return result except Exception as e: logger.error(f"Failed to get technical indicators for {ticker}: {e}") import traceback logger.error(traceback.format_exc()) return None
def _verify_indicator_calculations( self, polygon_data: Dict[str, float], calculated_data: Dict[str, float], tolerance_pct: float = 1.0 ) -> Dict[str, Any]: """ Verify that our calculated indicators match Polygon API values Args: polygon_data: Indicators from Polygon API calculated_data: Indicators calculated from historical data tolerance_pct: Acceptable difference percentage (default: 1%) Returns: Dictionary with verification results """ verification = { "status": "pass", "differences": {}, "tolerance_pct": tolerance_pct } indicators_to_check = ["sma_20", "sma_50", "ema_12", "ema_26", "rsi_14", "macd_line", "macd_signal", "macd_histogram"] for indicator in indicators_to_check: if indicator in polygon_data and indicator in calculated_data: polygon_val = polygon_data[indicator] calculated_val = calculated_data[indicator] # Calculate percentage difference if polygon_val != 0: diff_pct = abs((calculated_val - polygon_val) / polygon_val) * 100 else: diff_pct = 0 if calculated_val == 0 else 100 verification["differences"][indicator] = { "polygon": polygon_val, "calculated": calculated_val, "diff_pct": diff_pct, "within_tolerance": diff_pct <= tolerance_pct } if diff_pct > tolerance_pct: verification["status"] = "fail" logger.warning(f"Indicator {indicator} diff: {diff_pct:.2f}% (Polygon: {polygon_val:.4f}, Calculated: {calculated_val:.4f})") return verification def _interpret_technical_signals(self, indicators: Dict[str, Any]) -> Dict[str, str]: """ Interpret technical indicators into trading signals Args: indicators: Dictionary of calculated indicators Returns: Dictionary of signal interpretations """ signals = {} current_price = indicators.get("current_price") # RSI signal rsi = indicators.get("momentum", {}).get("rsi_14") if rsi: if rsi > 70: signals["rsi"] = "Overbought (>70)" elif rsi < 30: signals["rsi"] = "Oversold (<30)" else: signals["rsi"] = "Neutral (30-70)" # MACD signal macd_hist = indicators.get("momentum", {}).get("macd_histogram") if macd_hist is not None: if macd_hist > 0: signals["macd"] = "Bullish (histogram > 0)" else: signals["macd"] = "Bearish (histogram < 0)" # Bollinger Bands signal bb_upper = indicators.get("volatility", {}).get("bb_upper") bb_lower = indicators.get("volatility", {}).get("bb_lower") if current_price and bb_upper and bb_lower: if current_price > bb_upper: signals["bollinger"] = "Overbought (above upper band)" elif current_price < bb_lower: signals["bollinger"] = "Oversold (below lower band)" else: signals["bollinger"] = "Normal range" # Stochastic signal stoch_k = indicators.get("momentum", {}).get("stochastic_k") if stoch_k: if stoch_k > 80: signals["stochastic"] = "Overbought (>80)" elif stoch_k < 20: signals["stochastic"] = "Oversold (<20)" else: signals["stochastic"] = "Neutral (20-80)" # ADX trend strength adx = indicators.get("trend_strength", {}).get("adx_14") if adx: if adx > 25: signals["trend_strength"] = "Strong trend (ADX > 25)" elif adx > 20: signals["trend_strength"] = "Moderate trend (ADX 20-25)" else: signals["trend_strength"] = "Weak/No trend (ADX < 20)" # Moving average crossover sma_20 = indicators.get("trend", {}).get("sma_20") sma_50 = indicators.get("trend", {}).get("sma_50") if current_price and sma_20 and sma_50: if current_price > sma_20 > sma_50: signals["ma_trend"] = "Bullish (price > SMA20 > SMA50)" elif current_price < sma_20 < sma_50: signals["ma_trend"] = "Bearish (price < SMA20 < SMA50)" else: signals["ma_trend"] = "Mixed signals" return signals