Source code for dbs_annotator.utils.updater

"""Check GitHub Releases for newer versions of the app.

Design goals:

* Silent on failure -- a missing network connection, a GitHub outage, or a
  rate limit must never block startup or show an error dialog to the user.
* At most one check per cooldown window (default 24 h) so repeated launches
  do not spam the GitHub API. The last-check timestamp is persisted with
  :class:`QSettings` so it survives between sessions but never leaks PII.
* The HTTP fetch runs on a worker thread via :class:`QThreadPool`; the
  main-thread slot is only invoked if a strictly-newer version is found.
* The user can always trigger a check from a menu / button with
  ``force=True`` (even when automatic checks are disabled in preferences).
* Among all published (non-draft) releases, only the **highest** version
  greater than the running build is considered (PEP 440 ordering, including
  alpha / beta / rc). GitHub's ``/releases/latest`` endpoint is not used
  because it omits pre-releases.

The release repository is hardcoded to the canonical upstream; change
:data:`DEFAULT_RELEASES_REPO` if the project moves.
"""

from __future__ import annotations

import json
import logging
import ssl
import urllib.error
import urllib.request
from collections.abc import Callable
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Any, cast

import certifi
from packaging.version import InvalidVersion, Version
from PySide6.QtCore import QObject, QRunnable, QSettings, QThreadPool, Signal

from ..config import RELEASES_GITHUB_REPO
from ..version import get_version

logger = logging.getLogger(__name__)

#: Owner/repo pair on GitHub whose releases advertise new builds.
DEFAULT_RELEASES_REPO = RELEASES_GITHUB_REPO

DEFAULT_COOLDOWN = timedelta(hours=24)
DEFAULT_TIMEOUT_SECONDS = 10
_LAST_CHECK_KEY = "updater/last_check_iso"
_AUTO_CHECK_KEY = "updater/auto_check_enabled"
_RELEASES_PAGE_SIZE = 100
_MAX_RELEASE_PAGES = 5


def _ca_bundle_path() -> str:
    """Readable CA bundle path (Briefcase/MSI layouts may break ``where()``)."""
    path = certifi.where()
    if Path(path).is_file():
        return path
    try:
        from importlib.resources import as_file, files

        ref = files("certifi").joinpath("cacert.pem")
        with as_file(ref) as bundle:
            return str(bundle)
    except Exception:
        logger.warning(
            "certifi CA bundle not found at %r; HTTPS update checks may fail",
            path,
        )
        return path


def _ssl_context() -> ssl.SSLContext:
    """CA bundle for HTTPS in packaged apps (Briefcase MSI/ZIP on Windows)."""
    return ssl.create_default_context(cafile=_ca_bundle_path())


[docs] @dataclass(frozen=True) class ReleaseInfo: """Metadata for a GitHub release that is newer than the running app.""" version: str tag_name: str html_url: str published_at: str body: str #: ``True`` if GitHub marked the release as pre-release or the tag parses #: as a PEP 440 pre-release (alpha / beta / rc). is_prerelease: bool
def _parse_version(tag: str) -> Version | None: """Parse a release tag or version string with ``packaging.version``. Returns ``None`` if the tag does not look like a PEP 440-compatible version -- most commonly a lightweight tag used for infrastructure. Such tags are ignored for update-check purposes. """ candidate = tag.lstrip("vV").strip() try: return Version(candidate) except InvalidVersion: return None def _coerce_bool(value: object, default: bool) -> bool: if isinstance(value, bool): return value if isinstance(value, str): s = value.lower().strip() if s in ("false", "0", "no", ""): return False if s in ("true", "1", "yes"): return True return default if value is None: return default return bool(value) class _CheckSignals(QObject): """Qt signals for a check worker. Signals always run on the main thread, so handlers are free to touch the GUI without additional marshalling. """ update_available = Signal(object) up_to_date = Signal() failed = Signal(str) class _CheckWorker(QRunnable): """Single-shot worker that queries the GitHub Releases API.""" def __init__( self, repo: str, current_version: str, timeout: float, signals: _CheckSignals, ) -> None: super().__init__() self._repo = repo self._current_version = current_version self._timeout = timeout self._signals = signals def run(self) -> None: try: latest = self._fetch_newest_applicable_release() except Exception as exc: logger.info("Update check failed: %s", exc) self._signals.failed.emit(str(exc)) return if latest is None: self._signals.up_to_date.emit() return self._signals.update_available.emit(latest) def _request(self, url: str) -> urllib.request.Request: return urllib.request.Request( url, headers={ "Accept": "application/vnd.github+json", "User-Agent": f"DBSAnnotator/{self._current_version}", }, ) def _urlopen_json(self, url: str) -> object: request = self._request(url) with urllib.request.urlopen( request, timeout=self._timeout, context=_ssl_context() ) as response: return json.loads(response.read().decode("utf-8")) def _fetch_releases_page(self, page: int) -> list[dict]: url = ( f"https://api.github.com/repos/{self._repo}/releases" f"?per_page={_RELEASES_PAGE_SIZE}&page={page}" ) try: payload = self._urlopen_json(url) except urllib.error.HTTPError as exc: if exc.code == 404: raise RuntimeError( f"GitHub releases API returned 404 for {self._repo}." ) from exc raise if not isinstance(payload, list): return [] return cast(list[dict[str, Any]], payload) def _fetch_all_releases(self) -> list[dict]: merged: list[dict] = [] for page in range(1, _MAX_RELEASE_PAGES + 1): batch = self._fetch_releases_page(page) merged.extend(batch) if len(batch) < _RELEASES_PAGE_SIZE: break return merged def _fetch_newest_applicable_release(self) -> ReleaseInfo | None: """Return single newest published release with version *>* local.""" local = _parse_version(self._current_version) if local is None: raise ValueError( f"Installed version {self._current_version!r} is not a valid " "PEP 440 version." ) payloads = self._fetch_all_releases() if not payloads: raise RuntimeError( f"No published releases returned for {self._repo} " "(empty list or network issue)." ) best_remote: Version | None = None best_payload: dict | None = None for payload in payloads: if payload.get("draft"): continue tag = str(payload.get("tag_name", "")) if not tag: continue remote = _parse_version(tag) if remote is None or remote <= local: continue if best_remote is None or remote > best_remote: best_remote = remote best_payload = payload if best_remote is None or best_payload is None: return None gh_prerelease = bool(best_payload.get("prerelease")) is_prerelease = gh_prerelease or best_remote.is_prerelease return ReleaseInfo( version=str(best_remote), tag_name=str(best_payload.get("tag_name", "")), html_url=str(best_payload.get("html_url", "")), published_at=str(best_payload.get("published_at", "")), body=str(best_payload.get("body", "")), is_prerelease=is_prerelease, )
[docs] class UpdateChecker(QObject): """Orchestrates background update checks with a configurable cooldown. Create one of these on the main thread (typically owned by the main window) and call :meth:`check_async`. A ``check_async(force=True)`` call bypasses the cooldown -- wire it to a "Check for updates" menu action. Automatic checks respect :meth:`auto_update_checks_enabled` (stored in ``QSettings`` under :data:`_AUTO_CHECK_KEY`). """ update_available = Signal(object) up_to_date = Signal() failed = Signal(str) def __init__( self, repo: str = DEFAULT_RELEASES_REPO, current_version: str | None = None, cooldown: timedelta = DEFAULT_COOLDOWN, timeout: float = DEFAULT_TIMEOUT_SECONDS, parent: QObject | None = None, ) -> None: super().__init__(parent) self._repo = repo # None => resolve via get_version() on each check (matches Help UI / metadata). self._version_override = current_version self._cooldown = cooldown self._timeout = timeout self._settings = QSettings() self._check_in_progress = False self._signals = _CheckSignals() self._signals.update_available.connect(self._on_update_available) self._signals.up_to_date.connect(self._on_up_to_date) self._signals.failed.connect(self._on_failed) def is_busy(self) -> bool: """True while a background GitHub API request is in flight.""" return self._check_in_progress def _installed_version(self) -> str: if self._version_override is not None: return self._version_override return get_version() def auto_update_checks_enabled(self) -> bool: """Whether startup / periodic background checks are allowed.""" raw = self._settings.value(_AUTO_CHECK_KEY, True) return _coerce_bool(raw, True) def set_auto_update_checks_enabled(self, enabled: bool) -> None: """Persist preference for automatic update checks.""" self._settings.setValue(_AUTO_CHECK_KEY, enabled) self._settings.sync() def _finish_check(self) -> None: self._check_in_progress = False def _on_update_available(self, release: ReleaseInfo) -> None: self._finish_check() self._record_check_time() self.update_available.emit(release) def _on_up_to_date(self) -> None: self._finish_check() self._record_check_time() self.up_to_date.emit() def _on_failed(self, error: str) -> None: self._finish_check() # Intentionally do NOT record a check time on hard failures so the # next launch retries instead of waiting out the cooldown. self.failed.emit(error) def check_async( self, *, force: bool = False, now: Callable[[], datetime] = lambda: datetime.now(UTC), ) -> bool: """Schedule a background check. Args: force: If True, bypass the cooldown and automatic-check opt-out. now: Injectable clock, only for tests. Returns: True if a check was scheduled; False if the cooldown suppressed it, automatic checks are disabled, or (when not forced) opt-out applies. """ if not force and not self.auto_update_checks_enabled(): return False if not force and not self._cooldown_elapsed(now()): return False if self._check_in_progress: return False self._check_in_progress = True worker = _CheckWorker( repo=self._repo, current_version=self._installed_version(), timeout=self._timeout, signals=self._signals, ) QThreadPool.globalInstance().start(worker) return True def _cooldown_elapsed(self, now: datetime) -> bool: # QSettings.value overloads are loose in stubs; narrow before fromisoformat. raw = self._settings.value(_LAST_CHECK_KEY, "") if not isinstance(raw, str) or not raw: return True try: last = datetime.fromisoformat(raw) except ValueError: return True if last.tzinfo is None: last = last.replace(tzinfo=UTC) return (now - last) >= self._cooldown def _record_check_time(self) -> None: self._settings.setValue(_LAST_CHECK_KEY, datetime.now(UTC).isoformat())