Source code for cli.main

"""Command line entry point for the kde-cpi application."""

import asyncio
import json
from collections import defaultdict
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from datetime import UTC, datetime
from decimal import DivisionByZero, InvalidOperation
from pathlib import Path

import click
import structlog

from kde_cpi.data import (
    CpiDatabaseLoader,
    CpiDatasetBuilder,
    Dataset,
    Observation,
    load_full_history,
    update_current_periods,
)
from kde_cpi.logging import configure_logging
from kde_cpi.math import StatSummary, compute_statistics
from kde_cpi.output import generate_density_plot, generate_histogram_plot
from kde_cpi.output.utils import format_percent

DSN_HELP = "PostgreSQL connection string. May also be set via the KDE_CPI_DSN env var."
SCHEMA_HELP = (
    "Target database schema for CPI tables. May also be set via the KDE_CPI_SCHEMA env var."
)
DATA_FILE_HELP = "One or more specific CPI data partitions to ingest (e.g. cu.data.0.Current)."

LOG_FORMAT_CHOICES = ("console", "json")
LOG_LEVEL_CHOICES = ("critical", "error", "warning", "info", "debug")

logger = structlog.get_logger(__name__)


[docs] @dataclass(slots=True) class ObservationCache: """Precomputed lookup tables for per-series observations.""" observations: dict[str, dict[tuple[int, str], Observation]] latest: dict[str, tuple[tuple[int, str], Observation]] periods: list[tuple[int, str]]
[docs] @dataclass(frozen=True) class GrowthComponent: """Single YoY growth component derived from CPI series observations.""" series_id: str item_code: str item_name: str display_level: int series_title: str value: float year: int period: str
def _require_dsn(ctx: click.Context, override: str | None) -> str: """Return the resolved DSN, raising when none is provided.""" ctx.ensure_object(dict) dsn = override or ctx.obj.get("dsn") if not dsn: raise click.UsageError("A PostgreSQL DSN is required; pass --dsn or set KDE_CPI_DSN.") return dsn def _resolve_schema(ctx: click.Context, override: str | None) -> str: """Return the schema name, defaulting to the contextual configuration.""" ctx.ensure_object(dict) schema = override or ctx.obj.get("schema") or "public" return schema def _build_dataset(*, current_only: bool, data_files: Sequence[str] | None) -> Dataset: """Load CPI data using the shared dataset builder.""" build_log = logger.bind(scope="dataset-build", current_only=current_only) build_log.debug("dataset.build_start", data_files=list(data_files) if data_files else []) builder = CpiDatasetBuilder() try: if current_only: dataset = builder.load_current_observations() else: dataset = builder.load_dataset(data_files=data_files) finally: builder.close() if not dataset.observations: build_log.warning("dataset.build_empty", reason="no observations parsed") else: build_log.debug("dataset.build_complete", observations=len(dataset.observations)) return dataset def _echo_dataset_summary(action: str, dataset: Dataset) -> None: """Emit a concise data volume summary for terminal feedback.""" logger.info( "dataset.summary", action=action, series=len(dataset.series), observations=len(dataset.observations), areas=len(dataset.areas), items=len(dataset.items), ) click.echo( f"{action}: {len(dataset.series)} series, " f"{len(dataset.observations)} observations " f"across {len(dataset.areas)} areas and {len(dataset.items)} items." ) def _write_dataset(output: Path, dataset: Dataset) -> None: """Serialize a dataset to disk.""" output.parent.mkdir(parents=True, exist_ok=True) output.write_text(json.dumps(dataset.to_dict(), indent=2)) click.echo(f"Wrote dataset snapshot to {output}") logger.debug("dataset.snapshot_written", output=str(output)) @click.group() @click.option("--dsn", envvar="KDE_CPI_DSN", help=DSN_HELP, default=None) @click.option( "--schema", envvar="KDE_CPI_SCHEMA", default="public", show_default=True, help=SCHEMA_HELP, ) @click.option( "--log-level", type=click.Choice(LOG_LEVEL_CHOICES, case_sensitive=False), envvar="KDE_CPI_LOG_LEVEL", default="info", show_default=True, help="Verbosity for structured logs.", ) @click.option( "--log-format", type=click.Choice(LOG_FORMAT_CHOICES, case_sensitive=False), envvar="KDE_CPI_LOG_FORMAT", default="console", show_default=True, help="Render logs as console-friendly text or JSON.", ) @click.pass_context def cli( ctx: click.Context, dsn: str | None, schema: str, log_level: str, log_format: str, ) -> None: """Manage CPI ingestion, database loading, and reporting workflows.""" configure_logging(level=log_level, json_output=log_format.lower() == "json") ctx.ensure_object(dict) ctx.obj.update({"dsn": dsn, "schema": schema}) logger.bind(command_group="kde-cpi").debug( "cli.initialized", dsn=bool(dsn), schema=schema, log_level=log_level.lower(), log_format=log_format.lower(), ) @cli.command("fetch-dataset") @click.option( "--output", "output_path", type=click.Path(dir_okay=False, path_type=Path), help="Optional path to save the assembled dataset as JSON.", ) @click.option( "--data-file", "data_files", multiple=True, help=DATA_FILE_HELP, ) @click.option( "--current-only", is_flag=True, default=False, help="Limit ingestion to the current-year CPI partition.", ) def fetch_dataset( *, output_path: Path | None, data_files: tuple[str, ...], current_only: bool, ) -> None: """Download CPI flat files, stitch them together, and report counts.""" if current_only and data_files: raise click.UsageError("--current-only cannot be combined with --data-file.") cmd_log = logger.bind(command="fetch-dataset", current_only=current_only) cmd_log.info("command.start", data_files=list(data_files)) dataset = _build_dataset(current_only=current_only, data_files=data_files or None) _echo_dataset_summary("Fetched dataset", dataset) if output_path: _write_dataset(output_path, dataset) cmd_log.info( "dataset.written", output=str(output_path), records=len(dataset.observations), ) GROUP_BY_CHOICES = ("display-level", "item-code-length", "series-name-length") SERIES_LOCK_ALIASES: dict[str, str] = { "area": "area_code", "area_code": "area_code", "item": "item_code", "item_code": "item_code", "seasonal": "seasonal", "seasonality": "seasonal", "base": "base_code", "base_code": "base_code", "base_period": "base_period", "periodicity": "periodicity_code", "periodicity_code": "periodicity_code", } def _parse_series_locks(values: Sequence[str]) -> dict[str, str]: """Convert CLI-provided lock expressions into canonical metadata filters.""" locks: dict[str, str] = {} for raw in values: token = raw.strip() if not token: continue if "=" not in token: raise click.BadParameter( "Expected KEY=VALUE format for --series-lock.", param_hint="--series-lock" ) key, value = (part.strip() for part in token.split("=", 1)) if not key or not value: raise click.BadParameter( "Series locks require both a key and a value.", param_hint="--series-lock" ) canonical = SERIES_LOCK_ALIASES.get(key.lower()) if canonical is None: valid_keys = ", ".join(sorted(set(SERIES_LOCK_ALIASES.values()))) raise click.BadParameter( f"Unsupported series metadata key: {key!r}. Choose from: {valid_keys}.", param_hint="--series-lock", ) if canonical in locks: raise click.BadParameter( f"Duplicate lock for {canonical!r} detected.", param_hint="--series-lock" ) locks[canonical] = value return locks @cli.command("analyze") @click.option( "--group-by", type=click.Choice(GROUP_BY_CHOICES, case_sensitive=False), default="display-level", show_default=True, help="Choose how to bucket series before computing weighted statistics.", ) @click.option( "--length-bin-size", type=int, default=5, show_default=True, help="Bucket size when grouping legacy series-name-length (deprecated).", ) @click.option( "--output-dir", type=click.Path(file_okay=False, path_type=Path), default=Path("out"), show_default=True, help="Root directory for analysis artifacts.", ) @click.option( "--source", type=click.Choice(["database", "flatfiles"], case_sensitive=False), default="database", show_default=True, help="Where to read CPI data from before analysis.", ) @click.option( "--data-file", "data_files", multiple=True, help=DATA_FILE_HELP, ) @click.option( "--current-only", is_flag=True, default=False, help="Limit ingestion to the current-year CPI partition before analyzing (flatfiles only).", ) @click.option( "--selectable-only/--include-unselectable", default=True, show_default=True, help="Restrict the analysis set to items flagged as selectable in CPI metadata.", ) @click.option( "--series-lock", "series_lock_args", multiple=True, help="Constrain series metadata with KEY=VALUE locks (e.g. area_code=0000).", ) @click.option( "--min-sample-size", type=int, default=0, show_default=True, help="Warn when a group sample falls below this size. Set to 0 to disable.", ) @click.option( "--skip-small-samples/--include-small-samples", default=False, show_default=True, help="Skip KDE artifacts when the group is smaller than the minimum sample size.", ) @click.pass_context def analyze( ctx: click.Context, *, group_by: str, length_bin_size: int, output_dir: Path, source: str, data_files: tuple[str, ...], current_only: bool, selectable_only: bool, series_lock_args: tuple[str, ...], min_sample_size: int, skip_small_samples: bool, ) -> None: """Compute YoY growth distributions and emit charts/statistics.""" original_group = group_by.lower() legacy_series_grouping = original_group == "series-name-length" group_by_normalized = original_group source = source.lower() series_locks = _parse_series_locks(series_lock_args) if legacy_series_grouping: logger.warning( "analysis.group_by_legacy", original="series-name-length", replacement="item-code-length", ) group_by_normalized = "item-code-length" _validate_source_args(source, current_only=current_only, data_files=data_files) if legacy_series_grouping and length_bin_size <= 0: raise click.BadParameter("length-bin-size must be a positive integer.") cmd_log = logger.bind(command="analyze", group_by=group_by_normalized, source=source) cmd_log.info( "command.start", data_files=list(data_files), current_only=current_only, selectable_only=selectable_only, length_bin_size=length_bin_size, series_locks=series_locks, min_sample_size=min_sample_size, skip_small_samples=skip_small_samples, ) if source == "database": resolved_dsn = _require_dsn(ctx, None) resolved_schema = _resolve_schema(ctx, None) dataset = _load_dataset_from_database(resolved_dsn, resolved_schema) else: dataset = _build_dataset(current_only=current_only, data_files=data_files or None) components, _ = _compute_growth_components( dataset, selectable_only=selectable_only, target_period=None, series_locks=series_locks, ) if not components: raise click.ClickException("No year-over-year components were available for analysis.") analysis_dir = _create_analysis_dir(output_dir, group_by_normalized) groups = _group_components(components, group_by_normalized, length_bin_size=length_bin_size) group_summaries = [] for label, comps in groups.items(): if not comps: continue if _should_skip_sample( len(comps), min_sample_size=min_sample_size, skip_small_samples=skip_small_samples, scope="group", label=str(label), ): continue group_summaries.append(_render_group_reports(analysis_dir, label, comps)) generated_at = datetime.now(UTC) summary_payload = { "generated_at": generated_at.isoformat(), "group_by": group_by_normalized, "components_total": len(components), "group_count": len(group_summaries), "output_dir": str(analysis_dir), "groups": group_summaries, "series_locks": series_locks, "min_sample_size": min_sample_size, "skip_small_samples": skip_small_samples, } summary_path = analysis_dir / "summary.json" summary_path.write_text(json.dumps(summary_payload, indent=2)) click.echo(f"Analysis artifacts written to {analysis_dir}") cmd_log.info("command.completed", output=str(analysis_dir), groups=len(group_summaries)) @cli.command("compute") @click.option( "--date", help="Target month (YYYY-MM). Defaults to the latest available observations.", ) @click.option( "--group-by", type=click.Choice(GROUP_BY_CHOICES, case_sensitive=False), default="display-level", show_default=True, help="Grouping dimension for the summary.", ) @click.option( "--length-bin-size", type=int, default=5, show_default=True, help="Legacy bin size for series-name-length (deprecated).", ) @click.option( "--output", type=click.Path(dir_okay=False, path_type=Path), help="Optional path to write the JSON summary.", ) @click.option( "--source", type=click.Choice(["database", "flatfiles"], case_sensitive=False), default="database", show_default=True, help="Source for CPI components prior to analysis.", ) @click.option( "--data-file", "data_files", multiple=True, help=DATA_FILE_HELP, ) @click.option( "--current-only", is_flag=True, default=False, help="Limit flatfile ingestion to the current partition.", ) @click.option( "--selectable-only/--include-unselectable", default=True, show_default=True, help="Filter to selectable CPI items only.", ) @click.option( "--series-lock", "series_lock_args", multiple=True, help="Constrain series metadata with KEY=VALUE locks (e.g. seasonal=U).", ) @click.option( "--min-sample-size", type=int, default=0, show_default=True, help="Warn when a group sample falls below this size. Set to 0 to disable.", ) @click.option( "--skip-small-samples/--include-small-samples", default=False, show_default=True, help="Skip reporting groups that do not meet the minimum sample size.", ) @click.pass_context def compute( ctx: click.Context, *, date: str | None, group_by: str, length_bin_size: int, output: Path | None, source: str, data_files: tuple[str, ...], current_only: bool, selectable_only: bool, series_lock_args: tuple[str, ...], min_sample_size: int, skip_small_samples: bool, ) -> None: """Compute KDE-mode inflation summary without generating plots.""" group_by_normalized = group_by.lower() series_locks = _parse_series_locks(series_lock_args) legacy_series_grouping = group_by_normalized == "series-name-length" if legacy_series_grouping: logger.warning( "compute.group_by_legacy", original="series-name-length", replacement="item-code-length", ) group_by_normalized = "item-code-length" if legacy_series_grouping and length_bin_size <= 0: raise click.BadParameter( "length-bin-size must be a positive integer when using the legacy option." ) _validate_source_args(source, current_only=current_only, data_files=data_files) dataset, cache = _load_analysis_dataset( ctx, source=source, current_only=current_only, data_files=data_files, ) target_period: tuple[int, str] | None = None date_label = "latest" if date: year, period_code, dt = _parse_month(date) target_period = (year, period_code) date_label = dt.strftime("%Y-%m") elif cache.periods: latest_year, latest_period = cache.periods[-1] date_label = _format_period_label(latest_year, latest_period) components, _ = _compute_growth_components( dataset, selectable_only=selectable_only, target_period=target_period, cache=cache, series_locks=series_locks, ) if not components: raise click.ClickException("No components were available for the requested configuration.") groups = _group_components(components, group_by_normalized, length_bin_size=length_bin_size) summaries: list[dict[str, object]] = [] for label, comps in groups.items(): if not comps: continue if _should_skip_sample( len(comps), min_sample_size=min_sample_size, skip_small_samples=skip_small_samples, scope="group", label=str(label), ): continue summaries.append(_build_group_summary(label, comps)) payload = { "generated_at": datetime.now(UTC).isoformat(), "date": date_label, "group_by": group_by_normalized, "source": source.lower(), "selectable_only": selectable_only, "component_count": len(components), "group_count": len(summaries), "groups": summaries, "series_locks": series_locks, "min_sample_size": min_sample_size, "skip_small_samples": skip_small_samples, } document = json.dumps(payload, indent=2) if output: output.parent.mkdir(parents=True, exist_ok=True) output.write_text(document) click.echo(f"Wrote summary to {output}") else: click.echo(document) @cli.command("panel") @click.option("--start", required=True, help="Start month (YYYY-MM).") @click.option("--end", required=True, help="End month (YYYY-MM).") @click.option( "--group-by", type=click.Choice(GROUP_BY_CHOICES, case_sensitive=False), default="display-level", show_default=True, help="Grouping dimension for the panel.", ) @click.option( "--length-bin-size", type=int, default=5, show_default=True, help="Legacy bin size for series-name-length (deprecated).", ) @click.option( "--export", type=click.Path(dir_okay=False, path_type=Path), required=True, help="Destination file (.csv or .parquet).", ) @click.option( "--source", type=click.Choice(["database", "flatfiles"], case_sensitive=False), default="database", show_default=True, help="Source for CPI components prior to analysis.", ) @click.option( "--data-file", "data_files", multiple=True, help=DATA_FILE_HELP, ) @click.option( "--current-only", is_flag=True, default=False, help="Limit flatfile ingestion to the current partition.", ) @click.option( "--selectable-only/--include-unselectable", default=True, show_default=True, help="Filter to selectable CPI items only.", ) @click.option( "--series-lock", "series_lock_args", multiple=True, help="Constrain series metadata with KEY=VALUE locks (e.g. area_code=0000).", ) @click.option( "--min-sample-size", type=int, default=0, show_default=True, help="Warn when a group sample falls below this size. Set to 0 to disable.", ) @click.option( "--skip-small-samples/--include-small-samples", default=False, show_default=True, help="Skip rows whose group samples do not meet the minimum.", ) @click.pass_context def panel( ctx: click.Context, *, start: str, end: str, group_by: str, length_bin_size: int, export: Path, source: str, data_files: tuple[str, ...], current_only: bool, selectable_only: bool, series_lock_args: tuple[str, ...], min_sample_size: int, skip_small_samples: bool, ) -> None: """Generate a tidy panel of KDE-mode metrics over a date range.""" group_by_normalized = group_by.lower() series_locks = _parse_series_locks(series_lock_args) legacy_series_grouping = group_by_normalized == "series-name-length" if legacy_series_grouping: logger.warning( "panel.group_by_legacy", original="series-name-length", replacement="item-code-length", ) group_by_normalized = "item-code-length" if legacy_series_grouping and length_bin_size <= 0: raise click.BadParameter("length-bin-size must be positive when using the legacy option.") _validate_source_args(source, current_only=current_only, data_files=data_files) _, _, start_dt = _parse_month(start) _, _, end_dt = _parse_month(end) months = _month_sequence(start_dt, end_dt) dataset, cache = _load_analysis_dataset( ctx, source=source, current_only=current_only, data_files=data_files, ) rows: list[dict[str, object]] = [] for year, period_code, dt in months: date_label = dt.strftime("%Y-%m") components, cache = _compute_growth_components( dataset, selectable_only=selectable_only, target_period=(year, period_code), cache=cache, series_locks=series_locks, ) if not components: continue if _should_skip_sample( len(components), min_sample_size=min_sample_size, skip_small_samples=skip_small_samples, scope="period", label=date_label, ): continue groups = _group_components(components, group_by_normalized, length_bin_size=length_bin_size) for label, comps in groups.items(): if not comps: continue if _should_skip_sample( len(comps), min_sample_size=min_sample_size, skip_small_samples=skip_small_samples, scope="group", label=f"{date_label}:{label}", ): continue summary = _build_group_summary(label, comps) rows.append( _flatten_summary_row( date=date_label, group_label=label, summary=summary, group_by=group_by_normalized, selectable_only=selectable_only, source=source.lower(), ) ) if not rows: raise click.ClickException("No rows were produced for the requested range.") export.parent.mkdir(parents=True, exist_ok=True) if export.suffix.lower() == ".csv": _write_csv(rows, export) elif export.suffix.lower() in {".parquet", ".pq"}: _write_parquet(rows, export) else: raise click.BadParameter( "Export path must end with .csv or .parquet", param_hint="--export" ) click.echo(f"Panel written to {export}") @cli.command("metrics-timeseries") @click.option( "--start", required=True, help="Start of the analysis window (YYYY-MM).", ) @click.option( "--end", required=True, help="End of the analysis window (YYYY-MM).", ) @click.option( "--export", type=click.Path(dir_okay=False, path_type=Path), required=True, help="Destination file (.csv or .parquet) for the metrics time series.", ) @click.option( "--source", type=click.Choice(["flatfiles", "database"], case_sensitive=False), default="flatfiles", show_default=True, help="Where to source CPI observations.", ) @click.option( "--data-file", "data_files", multiple=True, help=DATA_FILE_HELP, ) @click.option( "--current-only", is_flag=True, default=False, help="Limit flatfile ingestion to the current partition.", ) @click.option( "--selectable-only/--include-unselectable", default=True, show_default=True, help="Filter to selectable CPI items only.", ) @click.option( "--series-lock", "series_lock_args", multiple=True, help="Constrain series metadata with KEY=VALUE locks (e.g. base_code=SA0).", ) @click.option( "--min-sample-size", type=int, default=0, show_default=True, help="Warn when the sample falls below this size. Set to 0 to disable.", ) @click.option( "--skip-small-samples/--include-small-samples", default=False, show_default=True, help="Skip periods that do not meet the minimum sample size.", ) @click.pass_context def metrics_timeseries( ctx: click.Context, *, start: str, end: str, export: Path, source: str, data_files: tuple[str, ...], current_only: bool, selectable_only: bool, series_lock_args: tuple[str, ...], min_sample_size: int, skip_small_samples: bool, ) -> None: """Aggregate KDE metrics into a tidy time series across months.""" _validate_source_args(source, current_only=current_only, data_files=data_files) _, _, start_dt = _parse_month(start) _, _, end_dt = _parse_month(end) months = _month_sequence(start_dt, end_dt) series_locks = _parse_series_locks(series_lock_args) logger.info( "timeseries.window", start=start, end=end, months=len(months), selectable_only=selectable_only, source=source, series_locks=series_locks, min_sample_size=min_sample_size, skip_small_samples=skip_small_samples, ) dataset, cache = _load_analysis_dataset( ctx, source=source, current_only=current_only, data_files=data_files, ) rows: list[dict[str, object]] = [] for year, period_code, dt in months: components, cache = _compute_growth_components( dataset, selectable_only=selectable_only, target_period=(year, period_code), cache=cache, series_locks=series_locks, ) if not components: logger.debug("timeseries.no_components", year=year, period=period_code) continue date_label = dt.strftime("%Y-%m") if _should_skip_sample( len(components), min_sample_size=min_sample_size, skip_small_samples=skip_small_samples, scope="period", label=date_label, ): continue stats = compute_statistics( [comp.value for comp in components], [1.0] * len(components), ) rows.append( _flatten_timeseries_row( date=date_label, year=year, period=period_code, stats=stats, component_count=len(components), selectable_only=selectable_only, source=source.lower(), ) ) if not rows: raise click.ClickException("No rows were produced for the requested range.") export.parent.mkdir(parents=True, exist_ok=True) if export.suffix.lower() == ".csv": _write_csv(rows, export) elif export.suffix.lower() in {".parquet", ".pq"}: _write_parquet(rows, export) else: raise click.BadParameter( "Export path must end with .csv or .parquet", param_hint="--export" ) click.echo(f"Metrics time series written to {export}") @cli.command("load-full") @click.option("--dsn", envvar="KDE_CPI_DSN", help=DSN_HELP, default=None) @click.option( "--schema", envvar="KDE_CPI_SCHEMA", default=None, help=SCHEMA_HELP, ) @click.option( "--no-truncate/--truncate", default=False, show_default=True, help="Skip truncating existing CPI tables before loading.", ) @click.option( "--data-file", "data_files", multiple=True, help=DATA_FILE_HELP, ) @click.pass_context def load_full( ctx: click.Context, *, dsn: str | None, schema: str | None, no_truncate: bool, data_files: tuple[str, ...], ) -> None: """Load the entire CPI history into PostgreSQL.""" resolved_dsn = _require_dsn(ctx, dsn) resolved_schema = _resolve_schema(ctx, schema) cmd_log = logger.bind(command="load-full", schema=resolved_schema) cmd_log.info( "command.start", truncate=not no_truncate, data_files=list(data_files), ) dataset = asyncio.run( load_full_history( resolved_dsn, schema=resolved_schema, truncate=not no_truncate, data_files=data_files or None, ) ) _echo_dataset_summary("Loaded dataset", dataset) cmd_log.info("command.completed", observations=len(dataset.observations)) @cli.command("update-current") @click.option("--dsn", envvar="KDE_CPI_DSN", help=DSN_HELP, default=None) @click.option( "--schema", envvar="KDE_CPI_SCHEMA", default=None, help=SCHEMA_HELP, ) @click.pass_context def update_current(ctx: click.Context, *, dsn: str | None, schema: str | None) -> None: """Refresh only the current-year CPI observations.""" resolved_dsn = _require_dsn(ctx, dsn) resolved_schema = _resolve_schema(ctx, schema) cmd_log = logger.bind(command="update-current", schema=resolved_schema) cmd_log.info("command.start") dataset = asyncio.run(update_current_periods(resolved_dsn, schema=resolved_schema)) _echo_dataset_summary("Updated current partitions", dataset) cmd_log.info("command.completed", observations=len(dataset.observations)) @cli.command("ensure-schema") @click.option("--dsn", envvar="KDE_CPI_DSN", help=DSN_HELP, default=None) @click.option( "--schema", envvar="KDE_CPI_SCHEMA", default=None, help=SCHEMA_HELP, ) @click.pass_context def ensure_schema(ctx: click.Context, *, dsn: str | None, schema: str | None) -> None: """Create the CPI tables if they are missing.""" resolved_dsn = _require_dsn(ctx, dsn) resolved_schema = _resolve_schema(ctx, schema) cmd_log = logger.bind(command="ensure-schema", schema=resolved_schema) cmd_log.info("command.start") async def _run() -> None: loader = CpiDatabaseLoader(dsn=resolved_dsn, schema=resolved_schema) try: await loader.ensure_schema() finally: await loader.close() asyncio.run(_run()) click.echo(f"Ensured schema objects in {resolved_schema}.") cmd_log.info("command.completed") @cli.command("sync-metadata") @click.option("--dsn", envvar="KDE_CPI_DSN", help=DSN_HELP, default=None) @click.option( "--schema", envvar="KDE_CPI_SCHEMA", default=None, help=SCHEMA_HELP, ) @click.option( "--current-only", is_flag=True, default=False, help="Use the smaller current-year partition to refresh metadata.", ) @click.option( "--data-file", "data_files", multiple=True, help=DATA_FILE_HELP, ) @click.pass_context def sync_metadata( ctx: click.Context, *, dsn: str | None, schema: str | None, current_only: bool, data_files: tuple[str, ...], ) -> None: """Upsert mapping tables and series definitions without touching observations.""" if current_only and data_files: raise click.UsageError("--current-only cannot be combined with --data-file.") dataset = _build_dataset(current_only=current_only, data_files=data_files or None) resolved_dsn = _require_dsn(ctx, dsn) resolved_schema = _resolve_schema(ctx, schema) cmd_log = logger.bind(command="sync-metadata", schema=resolved_schema) cmd_log.info( "command.start", current_only=current_only, data_files=list(data_files), ) async def _run() -> None: loader = CpiDatabaseLoader(dsn=resolved_dsn, schema=resolved_schema) try: await loader.sync_metadata(dataset) finally: await loader.close() asyncio.run(_run()) _echo_dataset_summary("Synced metadata using dataset", dataset) cmd_log.info("command.completed", observations=len(dataset.observations)) def _create_analysis_dir(base: Path, group_by: str) -> Path: """Return a timestamped output directory for analysis artifacts.""" timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S") slug = group_by.replace("-", "_") attempt = 0 while True: suffix = f"_{attempt}" if attempt else "" candidate = base / f"analysis_{slug}_{timestamp}{suffix}" try: candidate.mkdir(parents=True, exist_ok=False) return candidate except FileExistsError: attempt += 1 def _normalize_period(period: str) -> str: """Normalize CPI period codes for consistent lookups.""" return period.strip().upper() def _period_rank(period: str) -> int: """Return a sortable rank for CPI period codes (monthly-aware).""" period = _normalize_period(period) if len(period) >= 2 and period[1:].isdigit(): return int(period[1:]) return 0 def _period_sort_key(year: int, period: str) -> tuple[int, int, str]: """Provide a sorting tuple for period keys.""" return (year, _period_rank(period), _normalize_period(period)) def _build_observation_cache(dataset: Dataset) -> ObservationCache: """Precompute per-series observation lookups and metadata.""" observations: dict[str, dict[tuple[int, str], Observation]] = {} latest: dict[str, tuple[tuple[int, str], Observation]] = {} period_set: set[tuple[int, str]] = set() for obs in dataset.observations: period_code = _normalize_period(obs.period) key = (obs.year, period_code) period_set.add(key) series_entries = observations.setdefault(obs.series_id, {}) series_entries[key] = obs order_key = _period_sort_key(obs.year, period_code) existing = latest.get(obs.series_id) if existing is None or order_key > _period_sort_key(*existing[0]): latest[obs.series_id] = (key, obs) periods = sorted(period_set, key=lambda item: _period_sort_key(item[0], item[1])) return ObservationCache(observations=observations, latest=latest, periods=periods) def _load_dataset_from_database(dsn: str, schema: str) -> Dataset: """Load CPI data from PostgreSQL into a Dataset.""" async def _run() -> Dataset: loader = CpiDatabaseLoader(dsn=dsn, schema=schema) try: return await loader.fetch_dataset() finally: await loader.close() logger.info("analysis.load_from_db", schema=schema) dataset = asyncio.run(_run()) logger.info( "analysis.load_from_db_complete", series=len(dataset.series), observations=len(dataset.observations), ) return dataset def _series_matches(series: object, locks: Mapping[str, str] | None) -> bool: """Return True when a series satisfies all requested metadata locks.""" if not locks: return True if series is None: return False for key, expected in locks.items(): actual = getattr(series, key, None) if actual is None: return False if isinstance(actual, str): if actual.strip().upper() != expected.strip().upper(): return False else: if str(actual) != expected: return False return True def _compute_growth_components( dataset: Dataset, *, selectable_only: bool, target_period: tuple[int, str] | None = None, cache: ObservationCache | None = None, series_locks: Mapping[str, str] | None = None, ) -> tuple[list[GrowthComponent], ObservationCache]: """Derive YoY growth components per series from the dataset.""" cache = cache or _build_observation_cache(dataset) components: list[GrowthComponent] = [] normalized_target: tuple[int, str] | None = None if target_period is not None: normalized_target = (target_period[0], _normalize_period(target_period[1])) for series_id, series_obs in cache.observations.items(): series = dataset.series.get(series_id) if not _series_matches(series, series_locks): continue if normalized_target is not None: current_key = normalized_target else: latest_entry = cache.latest.get(series_id) if latest_entry is None: continue current_key = latest_entry[0] current = series_obs.get(current_key) if current is None: continue prev_key = (current_key[0] - 1, current_key[1]) previous = series_obs.get(prev_key) if previous is None: continue value = _compute_yoy(current, previous) if value is None: continue if series is None: continue item = dataset.items.get(series.item_code) if item is None: continue if selectable_only and not item.selectable: continue components.append( GrowthComponent( series_id=series_id, item_code=series.item_code, item_name=item.name, display_level=item.display_level, series_title=series.series_title, value=value, year=current.year, period=_normalize_period(current.period), ) ) logger.debug("analysis.components_computed", count=len(components)) return components, cache def _compute_yoy(current: Observation, previous: Observation) -> float | None: """Return the year-over-year change between two observations.""" if current.value.is_nan() or previous.value.is_nan(): return None if previous.value == 0: return None try: delta = (current.value - previous.value) / previous.value except (DivisionByZero, InvalidOperation): # pragma: no cover - defensive return None return float(delta) def _group_components( components: list[GrowthComponent], group_by: str, *, length_bin_size: int, ) -> dict[str, list[GrowthComponent]]: """Group components according to the requested strategy.""" groups: dict[str, list[GrowthComponent]] = defaultdict(list) if group_by == "display-level": for comp in components: groups[str(comp.display_level)].append(comp) elif group_by == "item-code-length": for comp in components: code = (comp.item_code or "").strip() length = len(code) label = f"{length} chars" groups[label].append(comp) else: raise ValueError(f"Unsupported group_by value: {group_by}") def sort_key(item: tuple[str, list[GrowthComponent]]) -> tuple[int, str]: label = item[0] for token in label.split(): if token.isdigit(): return (int(token), label) return (0, label) return dict(sorted(groups.items(), key=sort_key)) def _should_skip_sample( sample_size: int, *, min_sample_size: int, skip_small_samples: bool, scope: str, label: str, ) -> bool: """Emit warnings for undersized samples and indicate whether processing should stop.""" if min_sample_size <= 0 or sample_size >= min_sample_size: return False msg = f"Sample size {sample_size} below minimum {min_sample_size} for {scope} '{label}'." click.secho(f"Warning: {msg}", err=True) logger.warning( "analysis.small_sample", scope=scope, label=label, sample_size=sample_size, min_sample_size=min_sample_size, skip=skip_small_samples, ) return skip_small_samples def _render_group_reports( base_dir: Path, label: str, components: list[GrowthComponent], ) -> dict[str, object]: """Generate plots and summary payloads for a component group.""" group_dir = base_dir / f"group_{_sanitize_label(label)}" group_dir.mkdir(parents=True, exist_ok=True) values = [comp.value for comp in components] weights = [1.0] * len(components) density_report = generate_density_plot( values, weights, output_dir=group_dir, filename="density.png" ) histogram_report = generate_histogram_plot( values, weights, output_dir=group_dir, filename="histogram.png" ) group_summary = _build_group_summary(label, components, stats=density_report.statistics) group_summary["density_plot"] = str(density_report.path.relative_to(base_dir)) group_summary["histogram_plot"] = str(histogram_report.path.relative_to(base_dir)) (group_dir / "summary.json").write_text(json.dumps(group_summary, indent=2)) return group_summary def _build_group_summary( label: str, components: list[GrowthComponent], *, stats: StatSummary | None = None, ) -> dict[str, object]: """Create a JSON-friendly summary for a group of components.""" values = [comp.value for comp in components] weights = [1.0] * len(components) stats_obj = stats or compute_statistics(values, weights) stats_payload = _stats_to_dict(stats_obj) top_examples = sorted(components, key=lambda comp: abs(comp.value), reverse=True)[:5] examples = [ { "series_id": comp.series_id, "item_code": comp.item_code, "item_name": comp.item_name, "series_title": comp.series_title, "yoy": comp.value, "yoy_percent": format_percent(comp.value), } for comp in top_examples ] return { "label": label, "count": len(components), "stats": stats_payload, "examples": examples, } def _parse_month(value: str) -> tuple[int, str, datetime]: """Convert YYYY-MM strings into (year, period_code, datetime) tuples.""" try: dt = datetime.strptime(value, "%Y-%m") except ValueError as exc: # pragma: no cover - defensive raise ValueError(f"Invalid date '{value}'. Expected format YYYY-MM.") from exc period = f"M{dt.month:02d}" return dt.year, period, dt def _month_sequence(start: datetime, end: datetime) -> list[tuple[int, str, datetime]]: """Return inclusive list of (year, period_code, datetime) between two dates.""" if start > end: raise ValueError("start date must be before end date.") months: list[tuple[int, str, datetime]] = [] cursor = datetime(start.year, start.month, 1, tzinfo=start.tzinfo) end_marker = datetime(end.year, end.month, 1, tzinfo=end.tzinfo) while cursor <= end_marker: months.append((cursor.year, f"M{cursor.month:02d}", cursor)) if cursor.month == 12: cursor = datetime(cursor.year + 1, 1, 1, tzinfo=cursor.tzinfo) else: cursor = datetime(cursor.year, cursor.month + 1, 1, tzinfo=cursor.tzinfo) return months def _format_period_label(year: int, period: str) -> str: """Return YYYY-MM style labels when possible.""" period = _normalize_period(period) if period.startswith("M") and period[1:].isdigit(): return f"{year}-{int(period[1:]):02d}" return f"{year}-{period}" def _load_analysis_dataset( ctx: click.Context, *, source: str, current_only: bool, data_files: Sequence[str], ) -> tuple[Dataset, ObservationCache]: """Load CPI data from the requested source and build cache metadata.""" source = source.lower() if source == "database": resolved_dsn = _require_dsn(ctx, None) resolved_schema = _resolve_schema(ctx, None) dataset = _load_dataset_from_database(resolved_dsn, resolved_schema) else: dataset = _build_dataset(current_only=current_only, data_files=tuple(data_files) or None) cache = _build_observation_cache(dataset) return dataset, cache def _validate_source_args(source: str, *, current_only: bool, data_files: Sequence[str]) -> None: """Enforce valid flag combinations for dataset sourcing.""" source = source.lower() if source == "database": if data_files: raise click.UsageError("--data-file is only valid when --source flatfiles.") if current_only: raise click.UsageError("--current-only is only valid when --source flatfiles.") else: if current_only and data_files: raise click.UsageError("--current-only cannot be combined with --data-file.") def _flatten_summary_row( *, date: str, group_label: str, summary: Mapping[str, object], group_by: str, selectable_only: bool, source: str, ) -> dict[str, object]: """Flatten a group summary into a tabular row.""" stats_value = summary.get("stats", {}) stats: dict[str, object] if isinstance(stats_value, Mapping): stats = dict(stats_value) else: stats = {} return { "date": date, "group_label": group_label, "group_by": group_by, "selectable_only": selectable_only, "source": source, "count": summary.get("count", 0), "mode": stats.get("weighted_kde_mode"), "mode_percent": stats.get("weighted_kde_mode_percent"), "mean": stats.get("weighted_mean"), "median": stats.get("weighted_median"), "trimmed_mean": stats.get("trimmed_mean"), "std": stats.get("weighted_std"), "skewness": stats.get("weighted_skewness"), "kurtosis": stats.get("weighted_kurtosis"), "effective_sample_size": stats.get("effective_sample_size"), } def _flatten_timeseries_row( *, date: str, year: int, period: str, stats: StatSummary, component_count: int, selectable_only: bool, source: str, ) -> dict[str, object]: """Flatten time-series statistics into a single row.""" payload = _stats_to_dict(stats) return { "date": date, "year": year, "period": period, "selectable_only": selectable_only, "source": source, "component_count": component_count, **payload, } def _write_csv(rows: list[dict[str, object]], path: Path) -> None: """Write panel rows to CSV via pandas.""" import pandas as pd # type: ignore df = pd.DataFrame(rows) df.to_csv(path, index=False) def _write_parquet(rows: list[dict[str, object]], path: Path) -> None: """Write panel rows to parquet via pandas/pyarrow.""" import pandas as pd # type: ignore df = pd.DataFrame(rows) try: df.to_parquet(path, index=False) except (ImportError, ValueError) as exc: # pragma: no cover - optional deps raise click.ClickException( "Writing parquet requires pandas with pyarrow or fastparquet installed." ) from exc def _stats_to_dict(stats: StatSummary) -> dict[str, object]: """Convert a StatSummary into JSON-friendly primitives.""" return { "weighted_mean": stats.weighted_mean, "weighted_mean_percent": format_percent(stats.weighted_mean), "weighted_median": stats.weighted_median, "weighted_median_percent": format_percent(stats.weighted_median), "trimmed_mean": stats.trimmed_mean, "trimmed_mean_percent": format_percent(stats.trimmed_mean), "weighted_std": stats.weighted_std, "weighted_skewness": stats.weighted_skewness, "weighted_kurtosis": stats.weighted_kurtosis, "weighted_kde_mode": stats.weighted_kde_mode, "weighted_kde_mode_percent": format_percent(stats.weighted_kde_mode), "effective_sample_size": stats.effective_sample_size, } def _sanitize_label(label: str) -> str: """Return a filesystem-friendly version of the provided label.""" safe = [ch if ch.isalnum() or ch in {"-", "_"} else "_" for ch in label] cleaned = "".join(safe).strip("_") return cleaned or "group" if __name__ == "__main__": cli()