def test_distribution_plateau_multi_wave_relies_on_second_wave_metrics(): """Second-wave stall during plateau should promote a bearish distribution alert.""" > result = detect_divergence( price_change_pct=4.25, cmf_value=0.21, buyer_volume=1_200_000.0, seller_volume=1_010_000.0, flow_context={ "short_term_delta_pct": 1.6, "short_term_dominance": "buyer", "short_term_buyers_pct": 52.0, "closing_delta_pct": 0.9, "closing_dominance": "buyer", "full_session_delta_pct": 0.7, "full_session_dominance": "buyer", "plateau_gain_pct": 4.3, "plateau_range_pct": 1.3, "plateau_duration_minutes": 62.0, "plateau_slope_pct": -1.15, "second_wave_peak_pct": 4.05, "second_wave_gain_pct": 0.45, "second_wave_retracement_pct": 1.22, "second_wave_start_minutes": 235.0, }, ) tests/indicators/test_divergence_detector.py:276: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ def detect_divergence( *, price_change_pct: float, cmf_value: float, buyer_volume: Optional[float] = None, seller_volume: Optional[float] = None, thresholds: Optional[Dict[str, float]] = None, flow_context: Optional[Dict[str, object]] = None, symbol: Optional[str] = None, session_day: Optional[date] = None, earnings_context: Optional[Dict[str, object]] = None, ) -> DivergenceDetectionResult: """ Detect bullish or bearish divergence based on price change and CMF. Args: price_change_pct: Percent change over the comparison window (e.g. last hour). cmf_value: Chaikin Money Flow value for the same window. buyer_volume: Optional aggregate buyer volume for the evaluation window. seller_volume: Optional aggregate seller volume for the evaluation window. thresholds: Optional overrides for sensitivity scoring. flow_context: Optional flow window context (e.g., opening vs. closing deltas) used to detect late-session buyer/seller acceleration patterns. """ price_pct = float(price_change_pct or 0.0) cmf = float(cmf_value or 0.0) params = dict(DEFAULT_THRESHOLDS) if thresholds: params.update( {k: float(v) for k, v in thresholds.items() if isinstance(v, (int, float))} ) flow_context = flow_context or {} closing_delta = _safe_float(flow_context.get("closing_delta_pct")) closing_dominance = str(flow_context.get("closing_dominance") or "").strip().lower() or None opening_delta = _safe_float(flow_context.get("opening_delta_pct")) opening_dominance = str(flow_context.get("opening_dominance") or "").strip().lower() or None full_session_delta = _safe_float(flow_context.get("full_session_delta_pct")) full_session_dominance = str(flow_context.get("full_session_dominance") or "").strip().lower() or None short_term_delta = _safe_float(flow_context.get("short_term_delta_pct")) short_term_dominance = str(flow_context.get("short_term_dominance") or "").strip().lower() or None trailing_delta = _safe_float(flow_context.get("trailing_delta_pct")) trailing_dominance = str(flow_context.get("trailing_dominance") or "").strip().lower() or None trailing_strength = str(flow_context.get("trailing_strength") or "").strip().lower() or None cmf_short_slope = _safe_float(flow_context.get("cmf_short_slope")) cmf_long_slope = _safe_float(flow_context.get("cmf_long_slope")) opening_recovery_pct: Optional[float] = None if opening_delta is not None and full_session_delta is not None: opening_recovery_pct = full_session_delta - opening_delta short_term_buyers_pct = _safe_float(flow_context.get("short_term_buyers_pct")) short_term_sellers_pct = _safe_float(flow_context.get("short_term_sellers_pct")) short_term_strength = str(flow_context.get("short_term_strength") or "").strip().lower() or None plateau_gain_pct = _safe_float(flow_context.get("plateau_gain_pct")) plateau_range_pct = _safe_float(flow_context.get("plateau_range_pct")) plateau_duration_minutes = _safe_float(flow_context.get("plateau_duration_minutes")) plateau_slope_pct = _safe_float(flow_context.get("plateau_slope_pct")) second_wave_peak_pct = _safe_float(flow_context.get("second_wave_peak_pct")) second_wave_gain_pct = _safe_float(flow_context.get("second_wave_gain_pct")) second_wave_retracement_pct = _safe_float(flow_context.get("second_wave_retracement_pct")) second_wave_start_minutes = _safe_float(flow_context.get("second_wave_start_minutes")) plateau_wave_count_raw = flow_context.get("plateau_wave_count") plateau_wave_count = None try: if plateau_wave_count_raw is not None: plateau_wave_count = int(plateau_wave_count_raw) except (TypeError, ValueError): # pragma: no cover - defensive plateau_wave_count = None buyer_total = float(buyer_volume) if buyer_volume is not None else 0.0 seller_total = float(seller_volume) if seller_volume is not None else 0.0 volume_provided = buyer_volume is not None or seller_volume is not None volume_total = buyer_total + seller_total volume_bias = ( (buyer_total - seller_total) / volume_total if volume_provided and volume_total > 0 else 0.0 ) volume_bias_required = max(float(params.get("volume_bias_min", 0.0)), 0.0) volume_direction: Optional[str] = None volume_override_reason_string: Optional[str] = None volume_override = False flow_acceleration_delta: Optional[float] = None pattern_override: Optional[str] = None recommendation_override: Optional[str] = None strength_hint: Optional[str] = None confidence_hint: Optional[float] = None score_hint: Optional[float] = None reversal_metadata: Dict[str, object] = {} earnings_context_resolved = _resolve_recent_earnings_context( symbol, session_day, earnings_context, ) if volume_provided and volume_total > 0: if volume_bias > 0: volume_direction = "buyer" elif volume_bias < 0: volume_direction = "seller" distribution_triggered = False distribution_metadata: Dict[str, object] = {} coil_metadata: Dict[str, object] = {} coil_breakout_detected = False coil_detected = False coil_dissipation_triggered = False distribution_price_cutoff = abs(float(params.get("distribution_price_abs", 2.0))) distribution_price_gain_min = float(params.get("distribution_price_gain_min", 0.0)) distribution_price_gain_max = float(params.get("distribution_price_gain_max", 0.0)) distribution_cmf_min = float(params.get("distribution_cmf_min", 0.2)) distribution_delta_min = float(params.get("distribution_delta_min", 9.0)) distribution_volume_bias_min = float(params.get("distribution_volume_bias_min", 0.04)) distribution_buyers_pct_min = float(params.get("distribution_buyers_pct_min", 55.0)) distribution_opening_delta_min = float( params.get("distribution_opening_delta_min", 1.5) ) distribution_cluster_span_min_minutes = float( params.get("distribution_cluster_span_min_minutes", 45.0) ) distribution_cluster_late_minutes_min = float( params.get("distribution_cluster_late_minutes_min", 210.0) ) distribution_trailing_guard = float(params.get("distribution_trailing_guard", 0.0)) distribution_cmf_reinforce = float(params.get("distribution_cmf_reinforce", 0.45)) plateau_gain_min = float(params.get("distribution_plateau_gain_min", 3.0)) plateau_range_max = float(params.get("distribution_plateau_range_max", 1.5)) plateau_duration_min = float(params.get("distribution_plateau_duration_min", 20.0)) plateau_slope_max = float(params.get("distribution_plateau_slope_max", 1.0)) plateau_slope_min = float(params.get("distribution_plateau_slope_min", -3.0)) plateau_delta_guard = float(params.get("distribution_plateau_delta_guard", 4.0)) plateau_wave_count_min = int(params.get("distribution_plateau_wave_count_min", 2)) distribution_positive_gain_guard = float(params.get("distribution_positive_gain_guard", 3.5)) second_wave_gain_max = float(params.get("distribution_second_wave_gain_max", 1.2)) second_wave_retracement_min = float(params.get("distribution_second_wave_retracement_min", 0.6)) second_wave_start_min_minutes = float(params.get("distribution_second_wave_start_min_minutes", 120.0)) terminal_gain_min = float(params.get("distribution_terminal_gain_min", plateau_gain_min)) terminal_duration_max = float(params.get("distribution_terminal_duration_max", 18.0)) terminal_wave_start_min = float(params.get("distribution_terminal_wave_start_min_minutes", 300.0)) terminal_closing_delta_min = float(params.get("distribution_terminal_closing_delta_min", 4.5)) terminal_buyers_pct_min = float(params.get("distribution_terminal_buyers_pct_min", 52.0)) terminal_volume_bias_min = float(params.get("distribution_terminal_volume_bias_min", 0.035)) terminal_slope_max = float(params.get("distribution_terminal_slope_max", 0.2)) flow_exhaustion_price_guard = float(params.get("distribution_flow_exhaustion_price_guard", terminal_gain_min)) flow_exhaustion_delta_min = float(params.get("distribution_flow_exhaustion_delta_min", 6.0)) flow_exhaustion_ratio_min = float(params.get("distribution_flow_exhaustion_ratio_min", 3.0)) flow_exhaustion_short_delta_min = float(params.get("distribution_flow_exhaustion_short_delta_min", 6.0)) flow_exhaustion_delta_gap_min = float(params.get("distribution_flow_exhaustion_delta_gap_min", 2.0)) flow_exhaustion_full_ratio_max = float(params.get("distribution_flow_exhaustion_full_ratio_max", 0.75)) flow_exhaustion_cmf_min = float(params.get("distribution_flow_exhaustion_cmf_min", 0.12)) flow_exhaustion_full_delta_min = float(params.get("distribution_flow_exhaustion_full_delta_min", 2.0)) coil_price_min = float(params.get("coil_price_min", 1.8)) coil_price_max = float(params.get("coil_price_max", 4.5)) coil_cmf_floor = float(params.get("coil_cmf_floor", -0.08)) coil_opening_delta_min = float(params.get("coil_opening_delta_min", 4.0)) coil_closing_delta_min = float(params.get("coil_closing_delta_min", -2.0)) coil_plateau_gain_min = float(params.get("coil_plateau_gain_min", 3.0)) coil_plateau_range_max = float(params.get("coil_plateau_range_max", 3.0)) coil_plateau_duration_min = float(params.get("coil_plateau_duration_min", 180.0)) coil_plateau_slope_min = float(params.get("coil_plateau_slope_min", -2.6)) coil_plateau_slope_max = float(params.get("coil_plateau_slope_max", 0.8)) coil_volume_bias_floor = float(params.get("coil_volume_bias_floor", -0.06)) coil_second_wave_ratio_max = float(params.get("coil_second_wave_ratio_max", 0.75)) coil_short_duration_max = float(params.get("coil_short_duration_max", 45.0)) coil_breakout_gain_min = float(params.get("coil_breakout_gain_min", 3.0)) coil_breakout_ratio_min = float(params.get("coil_breakout_ratio_min", 2.0)) coil_breakout_short_delta_max = float(params.get("coil_breakout_short_delta_max", -0.5)) coil_breakout_closing_delta_max = float(params.get("coil_breakout_closing_delta_max", 2.5)) coil_breakout_recommendation = params.get( "coil_breakout_recommendation", "Energy dissipated after coil breakout; fade the move.", ) distribution_trailing_collapse_min = float(params.get("distribution_trailing_collapse_min", 5.0)) distribution_cmf_slope_max = float(params.get("distribution_cmf_slope_max", -0.03)) coil_distribution_trailing_delta_min = float(params.get("coil_distribution_trailing_delta_min", 6.0)) coil_distribution_cmf_ceiling = float(params.get("coil_distribution_cmf_ceiling", 0.05)) direction: Optional[str] = None trailing_delta_reference: Optional[float] = None if trailing_delta is not None: trailing_delta_reference = trailing_delta elif closing_delta is not None: trailing_delta_reference = closing_delta elif short_term_delta is not None: trailing_delta_reference = short_term_delta short_delta_reference = short_term_delta if short_term_delta is not None else closing_delta buyers_pct_reference = short_term_buyers_pct if short_term_buyers_pct is not None else None def _buyer_supports_distribution( *, min_pct: float, volume_minimum: float, opening_threshold: Optional[float], ) -> Tuple[bool, bool]: """Return whether buying support meets distribution gates and if opening support was used.""" support = False used_opening = False if buyers_pct_reference is not None: support = buyers_pct_reference >= min_pct if not support and volume_bias is not None: support = volume_bias >= volume_minimum if ( not support and opening_threshold is not None and opening_threshold > 0 and opening_dominance == "buyer" and opening_delta is not None and opening_delta >= opening_threshold ): support = True used_opening = True return support, used_opening distribution_opening_support_used = False distribution_soft_trigger = False def _normalize_direction_label(value: object) -> Optional[str]: if value is None: return None text = str(value).strip().lower() if not text or text in {"none", "nan"}: return None return text volume_cluster_direction = _normalize_direction_label(flow_context.get("volume_cluster_direction")) volume_cluster_price_direction = _normalize_direction_label( flow_context.get("volume_cluster_price_direction") ) volume_cluster_cmf_direction = _normalize_direction_label( flow_context.get("volume_cluster_cmf_direction") ) volume_cluster_status = _normalize_direction_label(flow_context.get("volume_cluster_status")) volume_cluster_span_hours = _safe_float(flow_context.get("volume_cluster_span_hours")) volume_cluster_span_minutes = _safe_float(flow_context.get("volume_cluster_span_minutes")) volume_cluster_latest_minutes = _safe_float(flow_context.get("volume_cluster_latest_minutes")) cluster_indices_raw = flow_context.get("volume_cluster_indices") volume_cluster_indices_list: Optional[List[int]] = None if isinstance(cluster_indices_raw, (list, tuple)): cleaned_indices: List[int] = [] for item in cluster_indices_raw: try: cleaned_indices.append(int(item)) except (TypeError, ValueError): continue if cleaned_indices: volume_cluster_indices_list = cleaned_indices cluster_times_raw = flow_context.get("volume_cluster_times") volume_cluster_times_list: Optional[List[str]] = None if isinstance(cluster_times_raw, (list, tuple)): cleaned_times: List[str] = [] for item in cluster_times_raw: text = str(item).strip() if text: cleaned_times.append(text) if cleaned_times: volume_cluster_times_list = cleaned_times cluster_volumes_raw = flow_context.get("volume_cluster_volumes") volume_cluster_volumes_list: Optional[List[float]] = None if isinstance(cluster_volumes_raw, (list, tuple)): cleaned_volumes: List[float] = [] for item in cluster_volumes_raw: try: cleaned_volumes.append(float(item)) except (TypeError, ValueError): continue if cleaned_volumes: volume_cluster_volumes_list = cleaned_volumes cluster_volume_down = volume_cluster_direction in {"down", "bearish"} cluster_volume_flat = volume_cluster_direction in {"flat", "neutral"} cluster_price_up = volume_cluster_price_direction in {"up", "bullish"} cluster_cmf_down = volume_cluster_cmf_direction in {"down", "bearish"} volume_cluster_conflict = False if cluster_price_up and cluster_volume_down: volume_cluster_conflict = True elif cluster_price_up and cluster_volume_flat and cluster_cmf_down: volume_cluster_conflict = True cluster_span_minutes_resolved: Optional[float] = volume_cluster_span_minutes if cluster_span_minutes_resolved is None and volume_cluster_span_hours is not None: cluster_span_minutes_resolved = volume_cluster_span_hours * 60.0 exhaustion_detected = False if ( price_pct is not None and price_pct >= max(distribution_price_gain_min, 1.2) and cmf <= max(0.05, distribution_cmf_min * 0.5) and closing_delta is not None and closing_delta >= distribution_delta_min and short_term_delta is not None and short_term_delta <= max(2.5, distribution_delta_min * 0.25) ): exhaustion_detected = True distribution_price_ok = False distribution_price_relaxed = False if price_pct is not None: if price_pct <= -distribution_price_cutoff: negative_flow_present = False if closing_delta is not None and closing_delta <= 0: negative_flow_present = True if full_session_delta is not None and full_session_delta <= 0: negative_flow_present = True if opening_delta is not None and opening_delta <= -distribution_delta_min: negative_flow_present = True if negative_flow_present: distribution_price_ok = True else: relaxed_gain_floor = max(min(distribution_price_gain_min * 0.25, 0.6), 0.2) if price_pct >= relaxed_gain_floor: heavy_seller_close = ( closing_delta is not None and closing_delta <= -max(distribution_delta_min * 0.8, 3.5) ) heavy_seller_short = ( short_term_delta is not None and short_term_delta <= -max(distribution_delta_min * 1.5, 3.0) ) cmf_confirms = cmf is not None and cmf <= distribution_cmf_min + 0.08 if heavy_seller_close and heavy_seller_short and cmf_confirms: distribution_price_ok = True distribution_price_relaxed = True if not distribution_price_ok and distribution_price_gain_min > 0 and price_pct >= distribution_price_gain_min: if distribution_price_gain_max > 0 and price_pct > distribution_price_gain_max: distribution_price_ok = False else: bearish_flow_present = False if opening_delta is not None and opening_delta <= 0: bearish_flow_present = True if full_session_delta is not None and full_session_delta <= 0: bearish_flow_present = True if closing_delta is not None and closing_delta <= 0: bearish_flow_present = True distribution_price_ok = bearish_flow_present failed_distribution_trigger = False failed_distribution_metadata: Dict[str, object] = {} failed_opening_guard = max(distribution_opening_delta_min, 3.0) failed_recovery_guard = max(distribution_delta_min * 0.4, 2.0) failed_price_guard = max(distribution_price_cutoff * 0.35, 0.6) failed_full_session_guard = -max(distribution_delta_min * 0.25, 0.5) failed_cmf_ceiling = max(0.08, distribution_cmf_min + 0.2) trailing_collapse = ( trailing_delta_reference is not None and trailing_delta_reference <= -distribution_trailing_collapse_min ) cmf_slope_break = cmf_short_slope is not None and cmf_short_slope <= distribution_cmf_slope_max opening_support_strong = ( opening_delta is not None and opening_delta >= failed_opening_guard and opening_dominance in {"buyer", "neutral"} ) opening_unwound = ( opening_recovery_pct is not None and opening_recovery_pct <= -failed_recovery_guard ) price_collapse = price_pct is not None and price_pct <= -failed_price_guard session_flow_flipped = ( full_session_delta is not None and full_session_delta <= failed_full_session_guard ) cmf_collapsed = cmf <= failed_cmf_ceiling collapse_confirmed = price_collapse or trailing_collapse if ( not distribution_triggered and opening_support_strong and (opening_unwound or trailing_collapse) and collapse_confirmed and (cmf_collapsed or cmf_slope_break) and (full_session_delta is None or session_flow_flipped or full_session_dominance == "seller") ): failed_distribution_trigger = True distribution_triggered = True distribution_soft_trigger = True direction = "bearish" strength_hint = strength_hint or "moderate" confidence_hint = max(confidence_hint or 0.0, 0.78) score_hint = max(score_hint or 0.0, params.get("moderate_score", 60.0) + 12.0) pattern_override = pattern_override or "Bearish Distribution Divergence" recommendation_override = recommendation_override or "Sell into failed accumulation; early buyers unwound." volume_override = True volume_direction = "buyer" volume_override_reason_string = volume_override_reason_string or "failed bullish divergence" distribution_opening_support_used = True failed_distribution_metadata = { "distribution_triggered": True, "distribution_failed_breakout": True, "distribution_opening_delta_pct": opening_delta, "distribution_opening_recovery_pct": opening_recovery_pct, "distribution_price_collapse_pct": price_pct, "distribution_cmf_collapse": cmf, } if full_session_delta is not None: failed_distribution_metadata["distribution_full_session_delta_pct"] = full_session_delta if closing_delta is not None: failed_distribution_metadata["distribution_closing_delta_pct"] = closing_delta if short_term_delta is not None: failed_distribution_metadata["distribution_short_term_delta_pct"] = short_term_delta if buyers_pct_reference is not None: failed_distribution_metadata["distribution_short_term_buyers_pct"] = buyers_pct_reference if short_term_dominance: failed_distribution_metadata["distribution_short_term_dominance"] = short_term_dominance if volume_bias: failed_distribution_metadata["distribution_volume_bias"] = volume_bias if trailing_delta_reference is not None: failed_distribution_metadata["distribution_trailing_delta_pct"] = trailing_delta_reference if trailing_strength: failed_distribution_metadata["distribution_trailing_strength"] = trailing_strength if trailing_dominance: failed_distribution_metadata["distribution_trailing_dominance"] = trailing_dominance if cmf_short_slope is not None: failed_distribution_metadata["distribution_cmf_short_slope"] = cmf_short_slope if cmf_long_slope is not None: failed_distribution_metadata["distribution_cmf_long_slope"] = cmf_long_slope if failed_distribution_trigger: distribution_metadata = dict(distribution_metadata) distribution_metadata.update(failed_distribution_metadata) if ( distribution_price_ok and cmf >= distribution_cmf_min and short_delta_reference is not None and short_delta_reference >= distribution_delta_min ): buyers_dominant, used_opening_support = _buyer_supports_distribution( min_pct=distribution_buyers_pct_min, volume_minimum=distribution_volume_bias_min, opening_threshold=distribution_opening_delta_min, ) if used_opening_support: distribution_opening_support_used = True trailing_guard_reference = trailing_delta_reference if trailing_guard_reference is None and closing_delta is not None: trailing_guard_reference = closing_delta trailing_guard_ok = True if ( distribution_trailing_guard > 0 and trailing_guard_reference is not None ): trailing_guard_ok = trailing_guard_reference >= distribution_trailing_guard if buyers_dominant and trailing_guard_ok: direction = "bearish" distribution_triggered = True volume_override = True volume_direction = "buyer" volume_override_reason_string = "distribution buyer dominance" confidence_hint = max(confidence_hint or 0.0, 0.85) score_hint = max(score_hint or 0.0, params.get("strong_score", 75.0) + 10.0) strength_hint = "strong" pattern_override = "Bearish Distribution Divergence" recommendation_override = "Sell into strength; distribution flow detected." distribution_metadata = { "distribution_triggered": True, "distribution_delta_pct": short_delta_reference, "distribution_buyers_pct": buyers_pct_reference, "distribution_volume_bias": volume_bias, "distribution_strength": short_term_strength or closing_dominance, "distribution_trailing_delta_pct": trailing_guard_reference, } if distribution_price_relaxed: distribution_metadata["distribution_price_relaxed"] = True if distribution_opening_support_used: distribution_metadata["distribution_opening_support"] = True if cmf >= distribution_cmf_reinforce: confidence_hint = max(confidence_hint or 0.0, 0.9) score_hint = max(score_hint or 0.0, params.get("strong_score", 75.0) + 20.0) distribution_metadata["distribution_cmf_reinforced"] = True if short_term_strength in {"strong", "extreme"}: confidence_hint = max(confidence_hint or 0.0, 0.9) score_hint = max(score_hint or 0.0, params.get("strong_score", 75.0) + 15.0) if trailing_strength: distribution_metadata["distribution_trailing_strength"] = trailing_strength if trailing_dominance: distribution_metadata["distribution_trailing_dominance"] = trailing_dominance if ( not distribution_triggered and distribution_price_ok and (short_delta_reference is None or short_delta_reference < distribution_delta_min) ): cmf_fallback_cutoff = min(distribution_cmf_min + 0.02, -0.05) cmf_negative = cmf <= cmf_fallback_cutoff closing_sellers = closing_delta is not None and closing_delta <= max(-distribution_trailing_guard, -0.5) session_soft = True if full_session_delta is not None: session_soft = full_session_delta <= max(distribution_delta_min * 0.75, 1.5) short_term_flat = True if short_term_delta is not None: short_term_flat = abs(short_term_delta) <= max(distribution_delta_min * 0.5, 1.0) if short_term_flat and short_term_dominance: short_term_flat = short_term_dominance in {"neutral", "seller"} base_soft_gate = cmf_negative and closing_sellers and session_soft and short_term_flat cluster_timely = True if volume_cluster_latest_minutes is not None: cluster_timely = volume_cluster_latest_minutes >= distribution_cluster_late_minutes_min cluster_span_gate = True if cluster_span_minutes_resolved is not None: cluster_span_gate = cluster_span_minutes_resolved >= distribution_cluster_span_min_minutes cluster_conflict_ready = ( volume_cluster_conflict and cluster_timely and cluster_span_gate and (cmf_negative or cluster_cmf_down) ) price_gain_required = max(distribution_price_gain_min, 1.0) if distribution_price_relaxed: price_gain_required = min(price_gain_required, relaxed_gain_floor) price_gain_ok = price_pct is not None and price_pct >= price_gain_required if price_gain_ok and (base_soft_gate or cluster_conflict_ready): fallback_support, fallback_opening_support = _buyer_supports_distribution( min_pct=max(distribution_buyers_pct_min - 1.5, 49.5), volume_minimum=max(distribution_volume_bias_min * 0.8, 0.008), opening_threshold=distribution_opening_delta_min, ) explicit_opening_support = ( opening_dominance == "buyer" and opening_delta is not None and opening_delta >= distribution_opening_delta_min ) if fallback_support: distribution_triggered = True distribution_soft_trigger = True if fallback_opening_support or explicit_opening_support: distribution_opening_support_used = True direction = "bearish" volume_override = True volume_direction = "buyer" if not volume_override_reason_string: volume_override_reason_string = ( "volume-price cluster divergence" if cluster_conflict_ready else "distribution opening support" ) confidence_hint = max(confidence_hint or 0.0, 0.84 if cluster_conflict_ready else 0.82) score_hint = max( score_hint or 0.0, params.get("strong_score", 75.0) + (10.0 if cluster_conflict_ready else 8.0), ) strength_hint = strength_hint or short_term_strength or "moderate" pattern_override = pattern_override or "Bearish Distribution Divergence" recommendation_override = recommendation_override or "Sell into strength; distribution flow detected." distribution_metadata = dict(distribution_metadata) distribution_metadata.update( { "distribution_triggered": True, "distribution_delta_pct": short_delta_reference, "distribution_buyers_pct": buyers_pct_reference, "distribution_volume_bias": volume_bias, "distribution_strength": short_term_strength or closing_dominance, "distribution_trailing_delta_pct": closing_delta, "distribution_short_term_flat": True, } ) if cluster_conflict_ready: distribution_metadata["distribution_volume_cluster_conflict"] = True if volume_cluster_direction: distribution_metadata["distribution_volume_cluster_direction"] = volume_cluster_direction if volume_cluster_price_direction: distribution_metadata[ "distribution_volume_cluster_price_direction" ] = volume_cluster_price_direction if volume_cluster_cmf_direction: distribution_metadata["distribution_volume_cluster_cmf_direction"] = volume_cluster_cmf_direction if volume_cluster_span_hours is not None: distribution_metadata["distribution_volume_cluster_span_hours"] = volume_cluster_span_hours if cluster_span_minutes_resolved is not None: distribution_metadata["distribution_volume_cluster_span_minutes"] = cluster_span_minutes_resolved if volume_cluster_latest_minutes is not None: distribution_metadata[ "distribution_volume_cluster_latest_minutes" ] = volume_cluster_latest_minutes if volume_cluster_indices_list: distribution_metadata["distribution_volume_cluster_indices"] = volume_cluster_indices_list if volume_cluster_times_list: distribution_metadata["distribution_volume_cluster_times"] = volume_cluster_times_list if volume_cluster_volumes_list: distribution_metadata["distribution_volume_cluster_volumes"] = volume_cluster_volumes_list if volume_cluster_status: distribution_metadata["distribution_volume_cluster_status"] = volume_cluster_status if distribution_opening_support_used: distribution_metadata["distribution_opening_support"] = True if session_soft: distribution_metadata["distribution_session_soft"] = True if cmf_negative: distribution_metadata["distribution_cmf_soft_threshold"] = True distribution_metadata["distribution_soft_trigger"] = True if distribution_price_relaxed: distribution_metadata["distribution_price_relaxed"] = True late_reversal_triggered = False absorption_triggered = False late_reversal_detected = ( price_pct is not None and price_pct >= 1.0 and closing_delta is not None and closing_delta >= max(distribution_delta_min * 0.6, 2.0) and (opening_delta is None or opening_delta <= 0.5) and (short_term_delta is None or short_term_delta <= 0.5) and (cmf is None or cmf > -0.25) ) short_term_seller_pressure = ( short_term_dominance == "seller" and ( short_term_delta is None or short_term_delta <= -max(distribution_delta_min, 3.0) ) ) closing_seller_pressure = ( closing_dominance == "seller" and closing_delta is not None and closing_delta <= -max(distribution_delta_min * 0.85, 4.0) ) session_seller_pressure = ( full_session_dominance == "seller" and ( full_session_delta is None or full_session_delta <= -max(distribution_delta_min * 0.75, 2.0) ) ) relaxed_gain_floor = max(min(distribution_price_gain_min * 0.15, 0.4), 0.15) heavy_distribution_detected = ( not distribution_triggered and price_pct is not None and price_pct >= relaxed_gain_floor and short_term_seller_pressure and closing_seller_pressure and (cmf is None or cmf <= distribution_cmf_min + 0.1) ) if heavy_distribution_detected: distribution_triggered = True distribution_price_relaxed = True distribution_soft_trigger = True direction = "bearish" volume_override = True volume_direction = "buyer" if not volume_override_reason_string: volume_override_reason_string = "heavy seller distribution" confidence_hint = max(confidence_hint or 0.0, 0.83) score_hint = max(score_hint or 0.0, params.get("strong_score", 75.0) + 9.0) strength_hint = strength_hint or short_term_strength or "strong" pattern_override = pattern_override or "Bearish Distribution Divergence" recommendation_override = recommendation_override or "Sell into strength; distribution flow detected." distribution_metadata = { "distribution_triggered": True, "distribution_price_relaxed": True, "distribution_heavy_seller_pressure": True, "distribution_buyers_pct": buyers_pct_reference, "distribution_volume_bias": volume_bias, "distribution_strength": short_term_strength or closing_dominance, "distribution_closing_delta_pct": closing_delta, "distribution_short_term_delta_pct": short_term_delta, } if opening_dominance == "buyer" and opening_delta is not None: distribution_metadata["distribution_opening_support"] = True distribution_opening_support_used = True if session_seller_pressure: distribution_metadata["distribution_session_soft"] = True absorption_detected = ( price_pct is not None and price_pct >= -1.0 and price_pct <= 2.6 and closing_delta is not None and closing_delta <= -max(distribution_delta_min * 0.6, 3.5) and (cmf is None or cmf > -0.25) and (volume_bias is None or abs(volume_bias) <= 0.12) and ( full_session_delta is None or full_session_delta >= -6.0 ) and not short_term_seller_pressure and not closing_seller_pressure and not session_seller_pressure ) if late_reversal_detected: distribution_triggered = False distribution_metadata = {} direction = "bullish" volume_override = True volume_direction = "buyer" volume_override_reason_string = "late session buyer reversal" strength_hint = strength_hint or "strong" confidence_hint = max(confidence_hint or 0.0, 0.82) score_hint = max(score_hint or 0.0, params.get("strong_score", 75.0) + 12.0) pattern_override = pattern_override or "Bullish Reversal Divergence" recommendation_override = recommendation_override or "Buy the squeeze; sellers exhausted into close." late_reversal_triggered = True if absorption_detected: distribution_triggered = False distribution_metadata = {} direction = "bullish" volume_override = True volume_direction = "buyer" volume_override_reason_string = volume_override_reason_string or "seller absorption" confidence_hint = max(confidence_hint or 0.0, 0.78) score_hint = max(score_hint or 0.0, params.get("moderate_score", 60.0) + 18.0) pattern_override = pattern_override or "Bullish Absorption Divergence" recommendation_override = recommendation_override or "Accumulation spotted; sellers failed to break support." absorption_triggered = True if not absorption_triggered: wave_ratio_ok = True if ( second_wave_gain_pct is not None and plateau_gain_pct is not None and plateau_gain_pct > 0 ): wave_ratio_limit = max(coil_second_wave_ratio_max * plateau_gain_pct, plateau_gain_pct - 1.0) wave_ratio_ok = second_wave_gain_pct <= wave_ratio_limit coil_window_ok = ( price_pct >= coil_price_min and price_pct <= coil_price_max and cmf >= coil_cmf_floor and opening_delta is not None and opening_delta >= coil_opening_delta_min and ( closing_delta is None or closing_delta >= coil_closing_delta_min ) and plateau_gain_pct is not None and plateau_gain_pct >= coil_plateau_gain_min and plateau_range_pct is not None and plateau_range_pct <= coil_plateau_range_max and plateau_duration_minutes is not None and plateau_duration_minutes >= coil_plateau_duration_min and plateau_slope_pct is not None and coil_plateau_slope_min <= plateau_slope_pct <= coil_plateau_slope_max and (volume_bias is None or volume_bias >= coil_volume_bias_floor) ) if coil_window_ok and wave_ratio_ok: coil_detected = True distribution_triggered = False distribution_soft_trigger = False distribution_opening_support_used = False distribution_metadata = {} direction = "bullish" volume_override = True volume_direction = "buyer" volume_override_reason_string = volume_override_reason_string or "coil convergence" confidence_hint = max(confidence_hint or 0.0, 0.79) score_hint = max(score_hint or 0.0, params.get("strong_score", 75.0) + 10.0) strength_hint = strength_hint or "strong" pattern_override = pattern_override or "Bullish Coil Divergence" recommendation_override = recommendation_override or "Coiling advance; expect breakout continuation." coil_metadata = { "coil_detected": True, "coil_price_pct": price_pct, "coil_cmf_value": cmf, "coil_opening_delta_pct": opening_delta, "coil_closing_delta_pct": closing_delta, "coil_plateau_gain_pct": plateau_gain_pct, "coil_plateau_range_pct": plateau_range_pct, "coil_plateau_duration_minutes": plateau_duration_minutes, "coil_plateau_slope_pct": plateau_slope_pct, } if second_wave_gain_pct is not None: coil_metadata["coil_second_wave_gain_pct"] = second_wave_gain_pct if second_wave_peak_pct is not None: coil_metadata["coil_second_wave_peak_pct"] = second_wave_peak_pct if second_wave_retracement_pct is not None: coil_metadata["coil_second_wave_retracement_pct"] = second_wave_retracement_pct if opening_dominance: coil_metadata["coil_opening_dominance"] = opening_dominance if closing_dominance: coil_metadata["coil_closing_dominance"] = closing_dominance distribution_metadata.update(coil_metadata) short_duration_coil = ( plateau_duration_minutes is not None and plateau_duration_minutes <= coil_short_duration_max ) if ( not coil_detected and short_duration_coil and plateau_gain_pct is not None and plateau_gain_pct >= coil_breakout_gain_min and plateau_range_pct is not None and plateau_range_pct > 0 and price_pct >= coil_breakout_gain_min ): coil_ratio = price_pct / max(plateau_range_pct, 0.1) short_term_pressure = ( short_term_delta is not None and short_term_delta <= coil_breakout_short_delta_max ) closing_pressure = ( closing_delta is None or closing_delta <= coil_breakout_closing_delta_max ) if coil_ratio >= coil_breakout_ratio_min and short_term_pressure and closing_pressure: coil_breakout_detected = True distribution_triggered = False distribution_soft_trigger = False distribution_opening_support_used = False distribution_metadata = {} direction = "bearish" volume_override = True volume_direction = "seller" volume_override_reason_string = "coil breakout exhaustion" confidence_hint = max(confidence_hint or 0.0, 0.82) score_hint = max(score_hint or 0.0, params.get("strong_score", 75.0) + 14.0) strength_hint = strength_hint or "strong" pattern_override = "Bearish Coil Exhaustion Divergence" recommendation_override = coil_breakout_recommendation coil_metadata.update( { "coil_detected": True, "coil_breakout_detected": True, "coil_breakout_energy_dissipated": True, "coil_breakout_ratio": coil_ratio, "coil_breakout_short_delta_pct": short_term_delta, "coil_breakout_closing_delta_pct": closing_delta, "coil_breakout_price_gain_pct": price_pct, "coil_breakout_plateau_range_pct": plateau_range_pct, } ) coil_metadata.setdefault("coil_price_pct", price_pct) coil_metadata.setdefault("coil_plateau_gain_pct", plateau_gain_pct) coil_metadata.setdefault("coil_plateau_range_pct", plateau_range_pct) coil_metadata.setdefault("coil_plateau_duration_minutes", plateau_duration_minutes) coil_metadata.setdefault("coil_plateau_slope_pct", plateau_slope_pct) coil_metadata.setdefault("coil_breakout_recommendation", coil_breakout_recommendation) coil_energy_cmf_gate = ( cmf is None or cmf <= coil_distribution_cmf_ceiling or cmf_slope_break ) trailing_collapse_ready = ( trailing_delta_reference is not None and trailing_delta_reference <= -coil_distribution_trailing_delta_min ) if ( not coil_detected and not coil_breakout_detected and short_duration_coil and trailing_collapse_ready and coil_energy_cmf_gate ): coil_dissipation_triggered = True distribution_triggered = True distribution_soft_trigger = False distribution_opening_support_used = False direction = "bearish" volume_override = True volume_direction = "seller" volume_override_reason_string = "coil energy dissipation" confidence_hint = max(confidence_hint or 0.0, 0.84) score_hint = max(score_hint or 0.0, params.get("strong_score", 75.0) + 16.0) strength_hint = strength_hint or "strong" pattern_override = "Bearish Coil Exhaustion Divergence" recommendation_override = coil_breakout_recommendation coil_metadata.setdefault("coil_detected", True) coil_metadata.update( { "distribution_triggered": True, "coil_energy_dissipated": True, "coil_trailing_delta_pct": trailing_delta_reference, "coil_trailing_strength": trailing_strength or trailing_dominance, "coil_trailing_dominance": trailing_dominance, "coil_closing_delta_pct": closing_delta, "coil_short_term_delta_pct": short_term_delta, "volume_override_reason": "coil energy dissipation", } ) coil_metadata.setdefault("coil_price_pct", price_pct) coil_metadata.setdefault("coil_plateau_gain_pct", plateau_gain_pct) coil_metadata.setdefault("coil_plateau_range_pct", plateau_range_pct) coil_metadata.setdefault("coil_plateau_duration_minutes", plateau_duration_minutes) coil_metadata.setdefault("coil_plateau_slope_pct", plateau_slope_pct) coil_metadata.setdefault("coil_cmf_value", cmf) if cmf_short_slope is not None: coil_metadata["coil_cmf_short_slope"] = cmf_short_slope if cmf_long_slope is not None: coil_metadata["coil_cmf_long_slope"] = cmf_long_slope distribution_metadata = dict(distribution_metadata) distribution_metadata.update(coil_metadata) bullish_override_active = late_reversal_triggered or (absorption_detected and not coil_breakout_detected and not coil_detected) or coil_detected plateau_detected = False plateau_multi_wave = False second_wave_gate_ok = False buyers_ok = False delta_ok = True breakout_triggered = False breakout_recovery_triggered = False terminal_spike_detected = False forced_distribution_trigger = False terminal_wave_gate = False flow_exhaustion_detected = False flow_exhaustion_ratio = None flow_exhaustion_gap = None if ( not coil_detected and not coil_breakout_detected and plateau_gain_pct is not None and plateau_gain_pct >= plateau_gain_min and plateau_range_pct is not None and plateau_range_pct <= plateau_range_max and plateau_duration_minutes is not None and plateau_duration_minutes >= plateau_duration_min ): slope_ok = True if plateau_slope_pct is not None: slope_ok = plateau_slope_min <= plateau_slope_pct <= plateau_slope_max buyers_ok, plateau_opening_support = _buyer_supports_distribution( min_pct=distribution_buyers_pct_min, volume_minimum=distribution_volume_bias_min, opening_threshold=distribution_opening_delta_min, ) if plateau_opening_support: distribution_opening_support_used = True if closing_delta is not None: delta_ok = closing_delta >= -plateau_delta_guard if short_term_delta is not None: delta_ok = delta_ok and short_term_delta >= -plateau_delta_guard wave_gate_ok = False if plateau_wave_count is None: wave_gate_ok = True elif plateau_wave_count >= plateau_wave_count_min: wave_gate_ok = True plateau_multi_wave = True if second_wave_gain_pct is not None and second_wave_retracement_pct is not None: gain_ok = second_wave_gain_pct <= second_wave_gain_max retrace_ok = second_wave_retracement_pct >= second_wave_retracement_min start_ok = True if second_wave_start_minutes is not None and second_wave_start_min_minutes > 0: start_ok = second_wave_start_minutes >= second_wave_start_min_minutes peak_ok = True if second_wave_peak_pct is not None: peak_ok = second_wave_peak_pct >= max(plateau_gain_min * 0.5, plateau_gain_min - 1.0) second_wave_gate_ok = gain_ok and retrace_ok and start_ok and peak_ok if second_wave_gate_ok: plateau_multi_wave = True wave_gate_ok = wave_gate_ok or second_wave_gate_ok if ( slope_ok and buyers_ok and (delta_ok or exhaustion_detected) and (cmf >= distribution_cmf_min or exhaustion_detected) and wave_gate_ok ): plateau_detected = True if ( not coil_detected and not coil_breakout_detected and not plateau_detected and (second_wave_gate_ok or exhaustion_detected) and plateau_gain_pct is not None and plateau_gain_pct >= plateau_gain_min and plateau_range_pct is not None and plateau_range_pct <= plateau_range_max * 1.2 and plateau_duration_minutes is not None and plateau_duration_minutes >= plateau_duration_min * 0.8 and (cmf >= max(distribution_cmf_min * 0.7, 0.05) or exhaustion_detected) ): relaxed_min_pct = max(distribution_buyers_pct_min - 2.0, 45.0) relaxed_volume_min = max(distribution_volume_bias_min * 0.6, 0.01) buyers_ok, relaxed_opening_support = _buyer_supports_distribution( min_pct=relaxed_min_pct, volume_minimum=relaxed_volume_min, opening_threshold=max(distribution_opening_delta_min - 0.5, 0.5), ) if relaxed_opening_support: distribution_opening_support_used = True delta_guard_ok = True if closing_delta is not None: delta_guard_ok = closing_delta >= -plateau_delta_guard * 1.2 if short_term_delta is not None: delta_guard_ok = delta_guard_ok and short_term_delta >= -plateau_delta_guard * 1.2 if buyers_ok and delta_guard_ok: plateau_detected = True plateau_multi_wave = True terminal_buyers_ok = False terminal_opening_support = False if ( buyers_pct_reference is not None or volume_bias is not None or (opening_dominance == "buyer" and opening_delta is not None) ): terminal_buyers_ok, terminal_opening_support = _buyer_supports_distribution( min_pct=terminal_buyers_pct_min, volume_minimum=terminal_volume_bias_min, opening_threshold=max(distribution_opening_delta_min, 1.0), ) if terminal_opening_support: distribution_opening_support_used = True if ( not coil_detected and not distribution_triggered and plateau_gain_pct is not None and plateau_gain_pct >= max(terminal_gain_min, plateau_gain_min) and plateau_duration_minutes is not None and plateau_duration_minutes <= terminal_duration_max and closing_delta is not None and closing_delta >= terminal_closing_delta_min and terminal_buyers_ok ): slope_gate = True if plateau_slope_pct is not None: slope_gate = plateau_slope_pct <= terminal_slope_max wave_gate = False if second_wave_start_minutes is not None: wave_gate = second_wave_start_minutes >= terminal_wave_start_min if not wave_gate and plateau_wave_count is not None: wave_gate = plateau_wave_count >= max(plateau_wave_count_min, 2) if not wave_gate and second_wave_gain_pct is not None: wave_gate = True if not wave_gate and plateau_duration_minutes <= terminal_duration_max * 0.75: wave_gate = True if wave_gate and slope_gate: terminal_spike_detected = True forced_distribution_trigger = True distribution_triggered = True direction = "bearish" plateau_multi_wave = plateau_multi_wave or wave_gate terminal_wave_gate = wave_gate if volume_override_reason_string is None: volume_override_reason_string = "terminal distribution spike" pattern_override = "Bearish Distribution Divergence" if recommendation_override is None: recommendation_override = "Sell into strength; terminal distribution spike detected." confidence_hint = max(confidence_hint or 0.0, 0.9) score_hint = max(score_hint or 0.0, params.get("strong_score", 75.0) + 20.0) if not strength_hint or strength_hint == "moderate": strength_hint = "strong" if ( not coil_detected and not coil_breakout_detected and not distribution_triggered and price_pct is not None and closing_delta is not None and short_term_delta is not None ): abs_price = abs(price_pct) buyers_threshold = max(distribution_buyers_pct_min - 1.0, 52.0) buyers_volume_threshold = max(distribution_volume_bias_min * 0.9, 0.015) buyers_dominant, exhaustion_opening_support = _buyer_supports_distribution( min_pct=buyers_threshold, volume_minimum=buyers_volume_threshold, opening_threshold=max(distribution_opening_delta_min, 1.0), ) if exhaustion_opening_support: distribution_opening_support_used = True cmf_strength = cmf >= max(flow_exhaustion_cmf_min, distribution_cmf_min * 0.8) if ( abs_price <= flow_exhaustion_price_guard and closing_delta >= flow_exhaustion_delta_min and short_term_delta >= flow_exhaustion_short_delta_min and buyers_dominant and cmf_strength ): delta_gap_value: Optional[float] = None if full_session_delta is not None: delta_gap_value = short_term_delta - full_session_delta delta_ratio = closing_delta / max(abs_price, 0.75) full_delta_ok = ( full_session_delta is not None and full_session_delta >= flow_exhaustion_full_delta_min ) full_ratio_ok = True if full_session_delta is not None and closing_delta > 0: full_ratio_ok = (full_session_delta / closing_delta) <= flow_exhaustion_full_ratio_max elif full_session_delta is None: full_ratio_ok = False gap_ok = True if delta_gap_value is not None: gap_ok = delta_gap_value >= flow_exhaustion_delta_gap_min if ( delta_ratio >= flow_exhaustion_ratio_min and gap_ok and full_ratio_ok and full_delta_ok ): flow_exhaustion_detected = True flow_exhaustion_ratio = delta_ratio flow_exhaustion_gap = delta_gap_value forced_distribution_trigger = True distribution_triggered = True direction = "bearish" volume_override = True volume_direction = "buyer" volume_override_reason_string = "distribution flow exhaustion" strength_hint = strength_hint or "strong" confidence_hint = max(confidence_hint or 0.0, 0.88) score_hint = max(score_hint or 0.0, params.get("strong_score", 75.0) + 16.0) pattern_override = "Bearish Distribution Divergence" recommendation_override = "Sell into strength; flow exhaustion detected." distribution_metadata = dict(distribution_metadata) distribution_metadata.update( { "distribution_triggered": True, "distribution_delta_pct": short_term_delta, "distribution_buyers_pct": buyers_pct_reference, "distribution_volume_bias": volume_bias, "distribution_strength": short_term_strength or closing_dominance, "distribution_trailing_delta_pct": closing_delta, "distribution_flow_exhaustion": True, "distribution_flow_ratio": delta_ratio, } ) if distribution_opening_support_used: distribution_metadata["distribution_opening_support"] = True if delta_gap_value is not None: distribution_metadata["distribution_flow_delta_gap_pct"] = delta_gap_value if not coil_detected and not coil_breakout_detected and (plateau_detected or exhaustion_detected or terminal_spike_detected or flow_exhaustion_detected): forced_distribution = False if not distribution_triggered and not bullish_override_active: direction = "bearish" distribution_triggered = True forced_distribution = True elif forced_distribution_trigger: direction = "bearish" forced_distribution = True distribution_metadata = dict(distribution_metadata) plateau_payload: Dict[str, object] = { "plateau_detected": bool(plateau_detected), "plateau_gain_pct": plateau_gain_pct, "plateau_range_pct": plateau_range_pct, "plateau_duration_minutes": plateau_duration_minutes, "plateau_slope_pct": plateau_slope_pct, "plateau_wave_count": plateau_wave_count, } if distribution_opening_support_used: plateau_payload["distribution_opening_support"] = True if volume_cluster_conflict: plateau_payload["distribution_volume_cluster_conflict"] = True if volume_cluster_direction: plateau_payload["volume_cluster_direction"] = volume_cluster_direction if volume_cluster_price_direction: plateau_payload["volume_cluster_price_direction"] = volume_cluster_price_direction if ( buyers_pct_reference is not None and "distribution_buyers_pct" not in distribution_metadata ): plateau_payload["distribution_buyers_pct"] = buyers_pct_reference if ( volume_bias is not None and "distribution_volume_bias" not in distribution_metadata ): plateau_payload["distribution_volume_bias"] = volume_bias if ( short_delta_reference is not None and "distribution_delta_pct" not in distribution_metadata ): plateau_payload["distribution_delta_pct"] = short_delta_reference if ( trailing_delta_reference is not None and "distribution_trailing_delta_pct" not in distribution_metadata ): plateau_payload["distribution_trailing_delta_pct"] = trailing_delta_reference if ( trailing_strength and "distribution_trailing_strength" not in distribution_metadata ): plateau_payload["distribution_trailing_strength"] = trailing_strength if ( trailing_dominance and "distribution_trailing_dominance" not in distribution_metadata ): plateau_payload["distribution_trailing_dominance"] = trailing_dominance if ( short_term_strength and "distribution_strength" not in distribution_metadata ): plateau_payload["distribution_strength"] = short_term_strength if distribution_soft_trigger: plateau_payload["distribution_soft_trigger"] = True if plateau_multi_wave: plateau_payload["plateau_multi_wave"] = True if second_wave_peak_pct is not None: plateau_payload["second_wave_peak_pct"] = second_wave_peak_pct if second_wave_gain_pct is not None: plateau_payload["second_wave_gain_pct"] = second_wave_gain_pct if second_wave_retracement_pct is not None: plateau_payload["second_wave_retracement_pct"] = second_wave_retracement_pct if second_wave_start_minutes is not None: plateau_payload["second_wave_start_minutes"] = second_wave_start_minutes if exhaustion_detected: plateau_payload["distribution_exhaustion"] = True if terminal_spike_detected: plateau_payload["terminal_spike_detected"] = True plateau_payload["terminal_closing_delta_pct"] = closing_delta plateau_payload["terminal_wave_gate"] = terminal_wave_gate if flow_exhaustion_detected: plateau_payload["distribution_flow_exhaustion"] = True if flow_exhaustion_ratio is not None: plateau_payload["distribution_flow_ratio"] = flow_exhaustion_ratio if flow_exhaustion_gap is not None: plateau_payload["distribution_flow_delta_gap_pct"] = flow_exhaustion_gap plateau_payload["distribution_triggered"] = bool(distribution_triggered) if bullish_override_active and not forced_distribution: plateau_payload["distribution_bullish_override"] = True distribution_metadata.update(plateau_payload) if distribution_triggered: volume_override = True volume_direction = "buyer" if not volume_override_reason_string: volume_override_reason_string = "distribution plateau" confidence_hint = max(confidence_hint or 0.0, 0.88) score_hint = max(score_hint or 0.0, params.get("strong_score", 75.0) + 18.0) if not strength_hint: strength_hint = "strong" if not pattern_override: pattern_override = "Bearish Distribution Divergence" if not recommendation_override: recommendation_override = "Sell into strength; plateau distribution detected." breakout_price_min = float(params.get("breakout_price_gain_min", 0.0)) breakout_cmf_min = float(params.get("breakout_cmf_min", 0.0)) breakout_opening_max = float(params.get("breakout_opening_delta_max", -3.0)) breakout_full_session_max = float(params.get("breakout_full_session_max", -0.5)) breakout_closing_min = float(params.get("breakout_closing_delta_min", 0.5)) breakout_volume_bias_abs_max = float(params.get("breakout_volume_bias_abs_max", 0.04)) breakout_recovery_price_min = float(params.get("breakout_recovery_price_gain_min", 0.0)) breakout_recovery_opening_max = float(params.get("breakout_recovery_opening_delta_max", -8.0)) breakout_recovery_full_session_min = float(params.get("breakout_recovery_full_session_min", 0.0)) breakout_recovery_closing_min = float(params.get("breakout_recovery_closing_delta_min", -3.0)) breakout_recovery_cmf_min = float(params.get("breakout_recovery_cmf_min", 0.0)) breakout_recovery_volume_abs_max = float(params.get("breakout_recovery_volume_bias_abs_max", 0.06)) breakout_recovery_spread_min = float(params.get("breakout_recovery_spread_min", 0.0)) def _effective_pct(value: Optional[float]) -> Optional[float]: if value is None: return None scaled = float(value) if abs(scaled) < 1.0: scaled *= 100.0 return scaled opening_delta_effective = _effective_pct(opening_delta) full_session_delta_effective = _effective_pct(full_session_delta) closing_delta_effective = _effective_pct(closing_delta) price_pct_breakout = price_pct if abs(price_pct_breakout) < 1.0: price_pct_breakout *= 100.0 if ( not distribution_triggered and price_pct is not None and price_pct_breakout >= breakout_price_min > 0 and cmf >= breakout_cmf_min ): opening_ok = ( opening_delta_effective is not None and opening_delta_effective <= breakout_opening_max ) full_session_ok = ( full_session_delta_effective is not None and full_session_delta_effective <= breakout_full_session_max ) closing_ok = ( closing_delta_effective is not None and closing_delta_effective >= breakout_closing_min ) volume_ok = True if volume_bias is not None and breakout_volume_bias_abs_max > 0: volume_ok = abs(volume_bias) <= breakout_volume_bias_abs_max if opening_ok and full_session_ok and closing_ok and volume_ok: direction = "bullish" breakout_triggered = True volume_override = True volume_direction = "buyer" volume_override_reason_string = "breakout price thrust" pattern_override = pattern_override or "Bullish Breakout Divergence" recommendation_override = recommendation_override or "Ride momentum; buyers absorbing supply despite heavy open selling." confidence_hint = max(confidence_hint or 0.0, 0.88) score_hint = max(score_hint or 0.0, params.get("strong_score", 75.0) + 15.0) if not strength_hint or strength_hint == "moderate": strength_hint = "strong" if ( not distribution_triggered and not breakout_triggered and price_pct is not None and price_pct_breakout >= breakout_recovery_price_min > 0 and cmf >= breakout_recovery_cmf_min ): recovery_opening_ok = ( opening_delta_effective is not None and opening_delta_effective <= breakout_recovery_opening_max ) recovery_full_session_ok = ( full_session_delta_effective is not None and full_session_delta_effective >= breakout_recovery_full_session_min ) recovery_closing_ok = True if closing_delta_effective is not None: recovery_closing_ok = closing_delta_effective >= breakout_recovery_closing_min volume_ok = True if volume_bias is not None and breakout_recovery_volume_abs_max > 0: volume_ok = abs(volume_bias) <= breakout_recovery_volume_abs_max spread_ok = ( opening_recovery_pct is not None and breakout_recovery_spread_min is not None and opening_recovery_pct >= breakout_recovery_spread_min ) if (recovery_opening_ok or spread_ok) and recovery_full_session_ok and recovery_closing_ok and volume_ok: direction = "bullish" breakout_triggered = True breakout_recovery_triggered = True volume_override = True volume_direction = "buyer" if not volume_override_reason_string: volume_override_reason_string = "breakout recovery" pattern_override = pattern_override or "Bullish Breakout Reclaim Divergence" recommendation_override = ( recommendation_override or "Momentum reclaimed after early selling; expect continuation." ) confidence_hint = max(confidence_hint or 0.0, 0.9) score_hint = max(score_hint or 0.0, params.get("strong_score", 75.0) + 18.0) if not strength_hint or strength_hint == "moderate": strength_hint = "strong" if ( not distribution_triggered and not breakout_triggered and direction is None and not terminal_spike_detected ): if price_pct <= -0.5 and cmf >= 0.05: direction = "bullish" elif price_pct >= 0.5 and cmf <= -0.05: direction = "bearish" if distribution_triggered or breakout_triggered: volume_override = True if ( not distribution_triggered and not breakout_triggered and direction and volume_provided and volume_total > 0 and volume_bias_required > 0 and not volume_override ): if direction == "bullish": volume_aligned = volume_bias >= volume_bias_required else: volume_aligned = volume_bias <= -volume_bias_required if not volume_aligned: if abs(volume_bias) >= volume_bias_required: override_direction = "bullish" if volume_bias > 0 else "bearish" if override_direction != direction: direction = override_direction volume_override = True volume_override_reason_string = ( "dominant buyers" if volume_bias > 0 else "dominant sellers" ) else: direction = None else: direction = None if ( not distribution_triggered and not breakout_triggered and direction is None and volume_provided and volume_total > 0 and volume_bias_required > 0 and abs(volume_bias) >= volume_bias_required ): if volume_bias > 0 and price_pct >= -0.25: direction = "bullish" volume_override = True volume_direction = "buyer" volume_override_reason_string = "dominant buyers" elif volume_bias < 0 and price_pct <= 0.25: direction = "bearish" volume_override = True volume_direction = "seller" volume_override_reason_string = "dominant sellers" flow_closing_threshold = max(float(params.get("flow_closing_delta_min", 0.0)), 0.0) flow_delta_threshold = max(float(params.get("flow_acceleration_delta_min", 0.0)), 0.0) flow_cmf_threshold = float(params.get("flow_acceleration_cmf_min", 0.0)) if ( not distribution_triggered and closing_delta is not None and closing_dominance in {"buyer", "seller"} ): opening_reference = opening_delta if opening_delta is not None else 0.0 session_support = True if full_session_delta is not None: if closing_dominance == "buyer": session_support = full_session_delta >= -0.5 else: session_support = full_session_delta <= 0.5 if closing_dominance == "buyer": improvement = closing_delta - max(opening_reference, 0.0) qualifies = ( closing_delta >= flow_closing_threshold and improvement >= flow_delta_threshold and cmf >= flow_cmf_threshold and session_support ) override_direction = "bullish" else: improvement = abs(closing_delta) - max(abs(opening_reference), 0.0) qualifies = ( closing_delta <= -flow_closing_threshold and improvement >= flow_delta_threshold and cmf <= -flow_cmf_threshold and session_support ) override_direction = "bearish" if qualifies: flow_acceleration_delta = improvement override_allowed = ( direction is None or direction == override_direction or not (late_reversal_triggered or absorption_triggered) ) if override_allowed: direction = override_direction volume_override = True volume_direction = closing_dominance volume_override_reason_string = ( "late session buyer acceleration" if closing_dominance == "buyer" else "late session seller acceleration" ) if direction is None: reversal_info = _evaluate_reversal_candidate( price_pct=price_pct, cmf=cmf, opening_delta=opening_delta, opening_dominance=opening_dominance, closing_delta=closing_delta, closing_dominance=closing_dominance, full_session_delta=full_session_delta, params=params, ) if reversal_info: direction = "bullish" pattern_override = reversal_info.get("pattern") recommendation_override = reversal_info.get("recommendation") strength_hint = reversal_info.get("strength") confidence_hint = reversal_info.get("confidence") score_hint = reversal_info.get("score") reversal_metadata = dict(reversal_info.get("metadata") or {}) params["cmf_abs"] = min( params.get("cmf_abs", DEFAULT_THRESHOLDS["cmf_abs"]), max(0.12, float(reversal_metadata.get("reversal_cmf_min") or 0.12)), ) volume_override = True volume_override_reason_string = "reversal_flow_resilience" details: Dict[str, object] = { "price_change_pct": price_pct, "cmf_value": cmf, "thresholds": params, } if abs(price_pct) < 1.0: details["price_change_pct_effective"] = price_pct_breakout if reversal_metadata: details.update(reversal_metadata) if confidence_hint is not None: details["reversal_confidence_hint"] = confidence_hint if score_hint is not None: details["reversal_score_hint"] = score_hint if buyer_volume is not None: details["buyer_volume"] = buyer_total if seller_volume is not None: details["seller_volume"] = seller_total if volume_provided: details.update( { "volume_total": volume_total, "volume_bias": volume_bias, "volume_bias_threshold": volume_bias_required, } ) if volume_direction: details["volume_direction"] = volume_direction if closing_delta is not None: details["closing_delta_pct"] = closing_delta if opening_delta is not None: details["opening_delta_pct"] = opening_delta if full_session_delta is not None: details["full_session_delta_pct"] = full_session_delta if opening_recovery_pct is not None: details["opening_recovery_pct"] = opening_recovery_pct if closing_dominance: details["closing_dominance"] = closing_dominance if opening_dominance: details["opening_dominance"] = opening_dominance if full_session_dominance: details["full_session_dominance"] = full_session_dominance if short_term_delta is not None: details["short_term_delta_pct"] = short_term_delta if short_term_dominance: details["short_term_dominance"] = short_term_dominance if short_term_buyers_pct is not None: details["short_term_buyers_pct"] = short_term_buyers_pct if short_term_sellers_pct is not None: details["short_term_sellers_pct"] = short_term_sellers_pct if short_term_strength: details["short_term_strength"] = short_term_strength if flow_acceleration_delta is not None: details["flow_acceleration_delta"] = flow_acceleration_delta if volume_override: details["volume_override"] = True if volume_override_reason_string: details["volume_override_reason"] = volume_override_reason_string elif volume_direction: details["volume_override_reason"] = ( "dominant buyers" if volume_direction == "buyer" else "dominant sellers" ) if distribution_metadata: details.update(distribution_metadata) if coil_metadata and not distribution_metadata.get("coil_detected"): details.update(coil_metadata) if plateau_gain_pct is not None: details["plateau_gain_pct"] = plateau_gain_pct if plateau_range_pct is not None: details["plateau_range_pct"] = plateau_range_pct if plateau_duration_minutes is not None: details["plateau_duration_minutes"] = plateau_duration_minutes if plateau_slope_pct is not None: details["plateau_slope_pct"] = plateau_slope_pct if plateau_multi_wave: details["plateau_multi_wave"] = True if second_wave_peak_pct is not None: details["second_wave_peak_pct"] = second_wave_peak_pct if second_wave_gain_pct is not None: details["second_wave_gain_pct"] = second_wave_gain_pct if second_wave_retracement_pct is not None: details["second_wave_retracement_pct"] = second_wave_retracement_pct if second_wave_start_minutes is not None: details["second_wave_start_minutes"] = second_wave_start_minutes if breakout_triggered: details["breakout_triggered"] = True if opening_delta is not None: details["breakout_opening_delta_pct"] = opening_delta if full_session_delta is not None: details["breakout_full_session_delta_pct"] = full_session_delta if closing_delta is not None: details["breakout_closing_delta_pct"] = closing_delta details["breakout_volume_bias_abs_max"] = breakout_volume_bias_abs_max details["breakout_price_gain_min"] = breakout_price_min details["breakout_cmf_min"] = breakout_cmf_min if breakout_recovery_triggered: details["breakout_recovery_triggered"] = True details["breakout_recovery_price_gain_min"] = breakout_recovery_price_min details["breakout_recovery_opening_delta_max"] = breakout_recovery_opening_max details["breakout_recovery_full_session_min"] = breakout_recovery_full_session_min details["breakout_recovery_closing_delta_min"] = breakout_recovery_closing_min details["breakout_recovery_volume_bias_abs_max"] = breakout_recovery_volume_abs_max if opening_recovery_pct is not None: details["breakout_recovery_spread_pct"] = opening_recovery_pct if earnings_context_resolved: details["earnings_snapshot"] = { "date": earnings_context_resolved.get("earnings_date"), "time": earnings_context_resolved.get("time"), "days_since": earnings_context_resolved.get("days_since"), "eps_actual": earnings_context_resolved.get("eps_actual"), "eps_estimate": earnings_context_resolved.get("eps_estimate"), "eps_diff": earnings_context_resolved.get("eps_diff"), "eps_surprise_pct": earnings_context_resolved.get("eps_surprise_pct"), "recency_weight": earnings_context_resolved.get("recency_weight"), "scope": earnings_context_resolved.get("scope"), } > if earnings_adjustment_details: E UnboundLocalError: local variable 'earnings_adjustment_details' referenced before assignment rtrader/indicators/divergence_detector.py:1986: UnboundLocalError