"""
Session data exporter for DBS Annotator.
This module provides functionality to export session data to Word and PDF.
"""
import csv
import os
import re
import shutil
import subprocess
import tempfile
from datetime import datetime
from typing import Protocol, cast
import pandas as pd
from docx import Document
from docx.document import Document as DocumentType
from docx.enum.text import WD_ALIGN_PARAGRAPH
from docx.oxml import OxmlElement
from docx.oxml.ns import qn
from docx.shared import Inches
from PySide6.QtCore import Qt, QTimer
from PySide6.QtGui import QPainter, QPixmap
from PySide6.QtWidgets import QMessageBox, QWidget
from .. import __app_name__, __version__
from ..config import PLACEHOLDERS
from ..config_electrode_models import ELECTRODE_MODELS, MANUFACTURERS, ContactState
from ..models import ElectrodeCanvas, is_session_scale_value_omitted
from .tsv_columns import BLOCK_ID_COLUMN
class _ExportTransientParent(Protocol):
"""QWidget host that may store a reference to a transient export message."""
_export_transient_msg: QMessageBox | None
[docs]
class SessionExporter:
"""
Handles exporting session data to various formats.
This class provides methods to export the collected session data
to Word and PDF.
"""
def __init__(self, session_data):
"""
Initialize the session exporter.
Args:
session_data: The SessionData instance containing collected data
"""
self.session_data = session_data
# Scale optimization preferences: list of (name, min, max, mode, custom_value)
# mode: "low", "high", "custom", "ignore"
self.scale_optimization_prefs: list = []
def set_scale_optimization_prefs(self, prefs: list) -> None:
"""Set scale optimization preferences for best block calculation."""
self.scale_optimization_prefs = prefs or []
def _generate_bids_report_filename(self, extension: str = ".docx") -> str:
"""Generate BIDS-friendly report filename from TSV file path."""
tsv_path = getattr(self.session_data, "file_path", "") or ""
today_str = datetime.now().astimezone().strftime("%Y%m%d")
if tsv_path:
base = os.path.basename(tsv_path)
import re
sub_match = re.search(r"sub-([^_]+)", base)
run_match = re.search(r"run-([0-9]+)", base)
task_match = re.search(r"task-([^_]+)", base)
patient_id = sub_match.group(1) if sub_match else "unknown"
run_num = run_match.group(1) if run_match else "01"
task = task_match.group(1) if task_match else "programming"
return (
f"sub-{patient_id}_ses-{today_str}_task-{task}"
f"_run-{run_num}_report{extension}"
)
# Fallback
fallback_time = datetime.now().astimezone().strftime("%H%M%S")
return f"dbs_session_report_{today_str}_{fallback_time}{extension}"
def _extract_bids_info_from_path(self) -> tuple:
"""Extract patient ID and session number from BIDS filename."""
tsv_path = getattr(self.session_data, "file_path", "") or ""
patient_id = ""
session_num = ""
if tsv_path:
base = os.path.basename(tsv_path)
# Parse sub-XXX and ses-XXX from filename
import re
sub_match = re.search(r"sub-([^_]+)", base)
ses_match = re.search(r"ses-([^_]+)", base)
if sub_match:
patient_id = sub_match.group(1)
if ses_match:
raw_session = ses_match.group(1)
# Format session date from 20250103 to 2025-01-03
if len(raw_session) == 8 and raw_session.isdigit():
try:
year = raw_session[:4]
month = raw_session[4:6]
day = raw_session[6:8]
session_num = f"{year}-{month}-{day}"
except Exception:
session_num = raw_session
else:
session_num = raw_session
return patient_id, session_num
def _show_transient_message(
self,
parent: QWidget | None,
title: str,
text: str,
*,
msecs: int = 2000,
icon: QMessageBox.Icon = QMessageBox.Icon.Information,
) -> None:
msg = QMessageBox(parent)
msg.setIcon(icon)
msg.setWindowTitle(title)
msg.setText(text)
msg.setStandardButtons(QMessageBox.StandardButton.NoButton)
msg.setWindowModality(Qt.WindowModality.NonModal)
msg.show()
if parent is not None:
try:
cast(_ExportTransientParent, parent)._export_transient_msg = msg
except Exception:
pass
# Use a dedicated QTimer owned by the message box so it reliably fires.
# Some Qt builds do not ship QWeakPointer.
timer = QTimer(msg)
timer.setSingleShot(True)
def _close_msg() -> None:
try:
msg.accept()
except Exception:
try:
msg.close()
except Exception:
pass
if parent is not None:
try:
host = cast(_ExportTransientParent, parent)
if host._export_transient_msg is msg:
host._export_transient_msg = None
except Exception:
pass
timer.timeout.connect(_close_msg)
timer.start(max(0, int(msecs)))
def _convert_docx_to_pdf(self, docx_path: str, pdf_path: str) -> None:
"""Convert a Word document to PDF using the best available method.
Tries in order:
1. docx2pdf (requires Microsoft Word COM)
2. Word COM via PowerShell subprocess
3. LibreOffice headless
Raises RuntimeError if no conversion method succeeds.
"""
errors: list[str] = []
# 1. Try docx2pdf
try:
from docx2pdf import convert as _docx2pdf_convert
_docx2pdf_convert(docx_path, pdf_path)
if os.path.exists(pdf_path):
return
except Exception as exc:
errors.append(f"docx2pdf: {exc}")
# 2. Try Word COM via PowerShell (Windows)
try:
abs_docx = os.path.abspath(docx_path).replace("'", "''")
abs_pdf = os.path.abspath(pdf_path).replace("'", "''")
ps_script = (
"$w = New-Object -ComObject Word.Application; "
"$w.Visible = $false; "
f"$d = $w.Documents.Open('{abs_docx}'); "
f"$d.SaveAs2('{abs_pdf}', 17); "
"$d.Close(); $w.Quit()"
)
subprocess.run(
["powershell", "-NoProfile", "-Command", ps_script],
check=True,
capture_output=True,
timeout=60,
)
if os.path.exists(pdf_path):
return
except Exception as exc:
errors.append(f"Word COM (PowerShell): {exc}")
# 3. Try LibreOffice headless
soffice = shutil.which("soffice")
if soffice:
try:
out_dir = os.path.dirname(os.path.abspath(pdf_path))
subprocess.run(
[
soffice,
"--headless",
"--convert-to",
"pdf",
"--outdir",
out_dir,
os.path.abspath(docx_path),
],
check=True,
capture_output=True,
timeout=60,
)
# LibreOffice outputs with same basename
lo_output = os.path.join(
out_dir, os.path.splitext(os.path.basename(docx_path))[0] + ".pdf"
)
if lo_output != pdf_path and os.path.exists(lo_output):
shutil.move(lo_output, pdf_path)
if os.path.exists(pdf_path):
return
except Exception as exc:
errors.append(f"LibreOffice: {exc}")
else:
errors.append("LibreOffice: soffice not found on PATH")
detail = "\n".join(errors)
raise RuntimeError(
f"Could not convert to PDF. Tried all available methods:\n{detail}\n\n"
"Please export to Word (.docx) and convert to PDF manually."
)
def _read_session_data(self) -> pd.DataFrame | None:
"""
Read session data from the TSV file.
Returns:
DataFrame with session data or None if error
"""
try:
if hasattr(self.session_data, "file_path") and self.session_data.file_path:
from .tsv_columns import read_session_tsv
return read_session_tsv(self.session_data.file_path)
return None
except Exception:
return None
def _normalize_block_id_column(self, df: pd.DataFrame) -> pd.DataFrame:
from .tsv_columns import normalize_block_id_dataframe
normalized = normalize_block_id_dataframe(df)
return df if normalized is None else normalized
def _get_manufacturer_for_model(self, model_name: str) -> str:
"""Return the manufacturer string for a given electrode model name."""
if not model_name:
return ""
for manufacturer, models in (MANUFACTURERS or {}).items():
try:
if model_name in models:
return str(manufacturer)
except Exception:
continue
return ""
def _pick_latest_session_row(self, df: pd.DataFrame) -> pd.Series | None:
"""Return the row with the highest session_ID and block_ID."""
if df is None or df.empty:
return None
# Prefer: latest session_ID, then latest block_ID
if "session_ID" in df.columns:
try:
s = pd.to_numeric(df["session_ID"], errors="coerce")
max_sid = s.max()
if pd.notna(max_sid):
df_sid = df.loc[s == max_sid]
return self._pick_latest_row(df_sid)
except Exception:
pass
return self._pick_latest_row(df)
def _add_summary_section(
self,
doc: DocumentType,
df: pd.DataFrame,
df_initial: pd.DataFrame,
df_final: pd.DataFrame,
) -> None:
"""Add the initial clinical notes section to the Word document."""
doc.add_heading("Initial Clinical Notes", level=1)
latest_init = self._pick_latest_session_row(df_initial)
if latest_init is not None:
init_sid = latest_init.get("session_ID", None)
if init_sid is not None and "session_ID" in df_initial.columns:
df_init_session = df_initial[df_initial["session_ID"] == init_sid]
else:
df_init_session = df_initial
scale_names = []
scale_values = []
seen = set()
for _, r in df_init_session.iterrows():
sname = r.get("scale_name", "")
sval = r.get("scale_value", "")
if pd.notna(sname) and str(sname).strip():
name_str = str(sname).strip()
val_str = str(sval) if pd.notna(sval) else ""
if is_session_scale_value_omitted(val_str):
continue
key = (name_str, val_str)
if key not in seen:
seen.add(key)
scale_names.append(name_str)
scale_values.append(val_str)
for sn, sv in zip(scale_names, scale_values, strict=False):
doc.add_paragraph(f"{sn}: {sv}")
notes = str(latest_init.get("notes", "") or "")
if notes.strip():
doc.add_paragraph(f"Initial Notes: {notes}")
def _add_programming_summary(
self,
doc: DocumentType,
df: pd.DataFrame,
df_initial: pd.DataFrame,
df_final: pd.DataFrame,
) -> None:
"""Add programming summary with session statistics."""
from docx.shared import Pt
doc.add_heading("Programming Summary", level=1)
if df is None or df.empty:
doc.add_paragraph("No session data available.")
return
# Session duration (from first to last timestamp)
duration_str = "N/A"
try:
if "time" in df.columns and "date" in df.columns:
timestamps = pd.to_datetime(
df["date"].astype(str).str.cat(df["time"].astype(str), sep=" "),
errors="coerce",
).dropna()
elif "time" in df.columns:
timestamps = pd.to_datetime(df["time"], errors="coerce").dropna()
else:
timestamps = pd.Series(dtype="datetime64[ns]")
if len(timestamps) >= 2:
duration = timestamps.max() - timestamps.min()
total_mins = int(duration.total_seconds() / 60)
if total_mins >= 60:
hours = total_mins // 60
mins = total_mins % 60
duration_str = f"{hours}h {mins}min"
else:
duration_str = f"{total_mins} min"
except Exception:
pass
# Number of configurations tested
df_normalized = self._normalize_block_id_column(df)
num_configs = 0
if "block_ID" in df_normalized.columns:
num_configs = df_normalized["block_ID"].nunique()
# Parameter ranges per side (Left / Right)
def _param_range(series, *, split_sum: bool = False):
parsed_vals: list[float] = []
for raw in series:
if pd.isna(raw):
continue
text = str(raw).strip()
if not text:
continue
if split_sum and "_" in text:
try:
parts = [float(p.strip()) for p in text.split("_") if p.strip()]
if parts:
parsed_vals.append(sum(parts))
continue
except ValueError:
pass
# Extract first numeric token (tolerates units like "60 µs")
m = re.search(r"[-+]?\d*\.?\d+", text)
if m:
try:
parsed_vals.append(float(m.group(0)))
except ValueError:
pass
vals = pd.Series(parsed_vals, dtype=float).dropna()
if len(vals) == 0:
return "N/A"
if vals.min() == vals.max():
return f"{vals.min()}"
return vals.min(), vals.max()
amp_l = amp_r = freq_l = freq_r = pw_l = pw_r = "N/A"
try:
for prefix, side_label in [("left_", "L"), ("right_", "R")]:
amp_col = f"{prefix}amplitude"
freq_col = f"{prefix}stim_freq"
pw_col = f"{prefix}pulse_width"
if amp_col in df.columns:
r = _param_range(df[amp_col], split_sum=True)
val = (
f"{r[0]:.1f} - {r[1]:.1f} mA"
if isinstance(r, tuple)
else (f"{float(r):.1f} mA" if r != "N/A" else r)
)
if side_label == "L":
amp_l = val
else:
amp_r = val
if freq_col in df.columns:
r = _param_range(df[freq_col])
val = (
f"{r[0]:.0f} - {r[1]:.0f} Hz"
if isinstance(r, tuple)
else (f"{float(r):.0f} Hz" if r != "N/A" else r)
)
if side_label == "L":
freq_l = val
else:
freq_r = val
if pw_col in df.columns:
r = _param_range(df[pw_col])
val = (
f"{r[0]:.0f} - {r[1]:.0f} µs"
if isinstance(r, tuple)
else (f"{float(r):.0f} µs" if r != "N/A" else r)
)
if side_label == "L":
pw_l = val
else:
pw_r = val
except Exception:
pass
# Add summary paragraphs
summary_items = [
f"Session Duration: {duration_str}",
f"Configurations Tested: {num_configs}",
f"Amplitude Range: L: {amp_l} | R: {amp_r}",
f"Frequency Range: L: {freq_l} | R: {freq_r}",
f"Pulse Width Range: L: {pw_l} | R: {pw_r}",
]
for item in summary_items:
para = doc.add_paragraph(item)
for run in para.runs:
run.font.size = Pt(11)
doc.add_paragraph("")
def _create_lateral_table_data(self, df):
"""
Create lateral table structure for Word and PDF exports.
Returns DataFrame with lateral structure:
- Left side parameters in first row
- Right side parameters in second row
- Non-lateral data merged vertically
- Multiple scales from same block grouped in single cell
"""
if df.empty:
return df
df = self._normalize_block_id_column(df)
# Group by block_ID to consolidate multiple scales
if "block_ID" in df.columns:
grouped = df.groupby("block_ID", sort=False, dropna=False)
else:
grouped = [(0, df)]
# Create new lateral structure
lateral_data = []
# Process each block
for bid, block_df in grouped:
# Get first row to extract common values
first_row = block_df.iloc[0]
# Collect all scales for this block
scale_pairs = []
seen_pairs = set()
for _, row in block_df.iterrows():
sname = row.get("scale_name", "")
sval = row.get("scale_value", "")
if pd.notna(sname) and str(sname).strip():
name_str = str(sname).strip()
val_str = str(sval) if pd.notna(sval) else ""
if is_session_scale_value_omitted(val_str):
continue
key = (name_str, val_str)
if key not in seen_pairs:
seen_pairs.add(key)
scale_pairs.append(key)
scale_names = [p[0] for p in scale_pairs]
scale_values = [p[1] for p in scale_pairs]
# Join multiple scales with newlines for internal separation
combined_scale_name = "\n".join(scale_names) if scale_names else ""
combined_scale_value = "\n".join(scale_values) if scale_values else ""
# Left side row
left_row = {}
right_row = {}
# Keep block_ID in the output for styling logic (excluded from
# display columns later).
left_row[BLOCK_ID_COLUMN] = bid
right_row[BLOCK_ID_COLUMN] = bid
# Common columns (non-lateral) - use combined scales with internal lines
left_row["program_ID"] = first_row.get("program_ID") or first_row.get(
"group_ID", ""
)
left_row["scale_name"] = combined_scale_name
left_row["scale_value"] = combined_scale_value
left_row["notes"] = first_row.get("notes", "")
right_row["program_ID"] = first_row.get("program_ID") or first_row.get(
"group_ID", ""
)
right_row["scale_name"] = combined_scale_name
right_row["scale_value"] = combined_scale_value
right_row["notes"] = first_row.get("notes", "")
# Lateral columns - map to generic names
lateral_mappings = {
"left_stim_freq": "frequency",
"left_cathode": "cathode",
"left_anode": "anode",
"left_amplitude": "amplitude",
"left_pulse_width": "pulse_width",
"right_stim_freq": "frequency",
"right_cathode": "cathode",
"right_anode": "anode",
"right_amplitude": "amplitude",
"right_pulse_width": "pulse_width",
}
# Left side parameters
for left_col, generic_col in lateral_mappings.items():
if left_col.startswith("left_"):
left_row[generic_col] = first_row.get(left_col, "")
# Right side parameters
for right_col, _generic_cbuild_scales_chartol in lateral_mappings.items():
if right_col.startswith("right_"):
right_row[generic_col] = first_row.get(right_col, "")
# Add lateral indicator
left_row["laterality"] = "L"
right_row["laterality"] = "R"
lateral_data.append(left_row)
lateral_data.append(right_row)
return pd.DataFrame(lateral_data)
def _add_session_data_table(
self,
doc: DocumentType,
df_table: pd.DataFrame,
with_chart: bool = True,
with_table: bool = True,
) -> None:
"""Add the lateral session-data table to the Word document."""
doc.add_heading("Session Data", level=1)
if df_table is None or df_table.empty:
return
df_table = self._normalize_block_id_column(df_table)
lateral_df = self._create_lateral_table_data(df_table)
# Chart BEFORE the table
if with_chart:
self._add_scales_timeline_chart(doc, lateral_df)
if not with_table:
return
columns_to_exclude = [
"date",
"time",
"onset",
"block_ID",
"session_ID",
"is_initial",
"electrode_model",
]
display_columns = [
col for col in lateral_df.columns if col not in columns_to_exclude
]
lateral_cols = [
"laterality",
"frequency",
"anode",
"cathode",
"amplitude",
"pulse_width",
]
common_cols = ["group_ID", "scale_name", "scale_value", "notes"]
lateral_cols = [col for col in lateral_cols if col in display_columns]
common_cols = [col for col in common_cols if col in display_columns]
ordered_columns = lateral_cols + common_cols
table = doc.add_table(rows=lateral_df.shape[0] + 1, cols=len(ordered_columns))
table.style = "Table Grid"
table.autofit = False
# Define column widths in inches
section = doc.sections[0]
page_width_inches = (
int(section.page_width or 0)
- int(section.left_margin or 0)
- int(section.right_margin or 0)
) / 914400
base_in = {
"laterality": 0.30,
"group_ID": 0.40,
"frequency": 0.50,
"anode": 0.45,
"cathode": 0.60,
"amplitude": 0.60,
"pulse_width": 0.50,
"scale_name": 1.10,
"scale_value": 0.60,
}
widths_in = [base_in.get(c, 0.5) for c in ordered_columns]
if "notes" in ordered_columns:
notes_idx = ordered_columns.index("notes")
used = sum(w for j, w in enumerate(widths_in) if j != notes_idx)
widths_in[notes_idx] = max(2.5, page_width_inches - used)
# Apply widths to each cell in every row (required for python-docx)
widths_twips = [Inches(max(0.25, w)) for w in widths_in]
for row in table.rows:
for idx, cell in enumerate(row.cells):
cell.width = widths_twips[idx]
# Header row
hdr_cells = table.rows[0].cells
for i, col_name in enumerate(ordered_columns):
hdr_cells[i].text = self._column_header(col_name)
for paragraph in hdr_cells[i].paragraphs:
for run in paragraph.runs:
run.font.bold = True
# Find best and second-best blocks for green highlighting
best_block_ids, second_best_ids = self._find_best_and_second_best_blocks(
lateral_df
)
prev_block_id = None
scale_name_idx = (
ordered_columns.index("scale_name")
if "scale_name" in ordered_columns
else -1
)
scale_value_idx = (
ordered_columns.index("scale_value")
if "scale_value" in ordered_columns
else -1
)
for i, (_, row) in enumerate(lateral_df.iterrows()):
row_cells = table.rows[i + 1].cells
# Highlight best block(s) with darker green, second-best with lighter green
current_block_id = row.get("block_ID", None)
if best_block_ids and current_block_id in best_block_ids:
self._highlight_cells_green(row_cells, intensity="best")
elif second_best_ids and current_block_id in second_best_ids:
self._highlight_cells_green(row_cells, intensity="second")
current_block_id = row.get("block_ID", None)
if (
prev_block_id is not None
and current_block_id != prev_block_id
and row.get("laterality") == "L"
):
for cell in row_cells:
self._set_cell_border_top(cell, sz=24)
prev_block_id = current_block_id
scale_name_lines = None
scale_value_lines = None
if scale_name_idx >= 0 and scale_value_idx >= 0:
try:
raw_sn = row.get("scale_name", "")
raw_sv = row.get("scale_value", "")
sn_text = str(raw_sn) if pd.notna(raw_sn) else ""
sv_text = str(raw_sv) if pd.notna(raw_sv) else ""
sn_parts = [s.strip() for s in sn_text.split("\n") if s.strip()]
sv_parts = [s.strip() for s in sv_text.split("\n") if s.strip()]
while len(sv_parts) < len(sn_parts):
sv_parts.append("")
filtered_names: list[str] = []
filtered_values: list[str] = []
for name, val in zip(sn_parts, sv_parts, strict=False):
if not name or is_session_scale_value_omitted(val):
continue
filtered_names.append(name)
filtered_values.append(val)
sn_text = "\n".join(filtered_names)
sv_text = "\n".join(filtered_values)
scale_name_lines = sn_text.split("\n") if sn_text else [""]
scale_value_lines = sv_text.split("\n") if sv_text else [""]
max_len = max(len(scale_name_lines), len(scale_value_lines))
scale_name_lines += [""] * (max_len - len(scale_name_lines))
scale_value_lines += [""] * (max_len - len(scale_value_lines))
except Exception:
scale_name_lines = None
scale_value_lines = None
for j, col in enumerate(ordered_columns):
if col not in row:
continue
# Format numeric values: use int for frequency/pulse_width
# when they have no decimals.
cell_value = str(row[col]) if pd.notna(row[col]) else ""
if col in ["frequency", "pulse_width"]:
try:
val = float(row[col])
if val.is_integer():
cell_value = str(int(val))
except (ValueError, TypeError):
pass
if col in common_cols:
merged_target_cell = None
did_merge = False
if row.get("laterality") == "R" and i > 0:
prev_cell = table.rows[i].cells[j]
prev_cell.merge(row_cells[j])
row_cells[j].text = ""
merged_target_cell = prev_cell
did_merge = True
target_cell = (
merged_target_cell
if did_merge and merged_target_cell is not None
else row_cells[j]
)
if (
col == "scale_name"
and scale_name_lines is not None
and len(scale_name_lines) > 1
):
target_cell.text = "\n".join(scale_name_lines)
elif (
col == "scale_value"
and scale_value_lines is not None
and len(scale_value_lines) > 1
):
target_cell.text = "\n".join(scale_value_lines)
else:
target_cell.text = cell_value
elif col == "cathode" and "_" in cell_value:
# Multi-contact cathode: show stacked with Total label
contacts = cell_value.replace("_", "\n")
row_cells[j].text = contacts + "\nTotal"
elif col == "amplitude" and "_" in cell_value:
# Multi-contact amplitude: show stacked values with total
parts = cell_value.split("_")
try:
# Validate all parts are numbers and calculate total
values = [float(p) for p in parts]
total = sum(values)
total_str = f"{total:.2f}".rstrip("0").rstrip(".")
row_cells[j].text = "\n".join(parts) + f"\n{total_str}"
except (ValueError, TypeError):
row_cells[j].text = cell_value
else:
row_cells[j].text = cell_value
# Add legend and clinical disclaimer below table
self._add_table_legend(doc, best_block_ids, second_best_ids)
def _add_table_legend(
self, doc: DocumentType, best_ids: list, second_ids: list
) -> None:
"""Add color legend and clinical disclaimer below the session data table."""
from docx.shared import Pt, RGBColor
# Only add legend if there are highlighted blocks
if not best_ids and not second_ids:
return
doc.add_paragraph() # spacing
# Legend paragraph
legend_para = doc.add_paragraph()
legend_para.add_run("Legend: ").bold = True
if best_ids:
best_run = legend_para.add_run("■ ")
best_run.font.color.rgb = RGBColor(0x96, 0xD2, 0xA0)
legend_para.add_run("Optimal configuration ")
if second_ids:
second_run = legend_para.add_run("■ ")
second_run.font.color.rgb = RGBColor(0xC8, 0xEB, 0xCD)
legend_para.add_run("Second-best configuration")
# Show target values used for optimization
if self.scale_optimization_prefs:
targets_para = doc.add_paragraph()
targets_para.add_run("Scale targets: ").bold = True
target_parts = []
for pref in self.scale_optimization_prefs:
if len(pref) >= 5:
name, smin, smax, mode, custom_val = pref
if mode == "ignore":
continue
elif mode == "min":
target_parts.append(f"{name}: min")
elif mode == "max":
target_parts.append(f"{name}: max")
elif mode == "custom":
target_parts.append(f"{name}: {custom_val}")
if target_parts:
targets_para.add_run("; ".join(target_parts))
for run in targets_para.runs:
run.font.size = Pt(9)
# Clinical disclaimer
disclaimer_para = doc.add_paragraph()
disclaimer_run = disclaimer_para.add_run(
"Note: The highlighted rows are derived exclusively from the recorded "
"session scale values and represent a computational ranking intended "
"solely as a reference. This color-coded indication does not constitute "
"clinical guidance."
)
disclaimer_run.font.size = Pt(9)
disclaimer_run.font.italic = True
def _add_scales_timeline_chart(
self, doc: DocumentType, lateral_df: pd.DataFrame
) -> None:
"""Add a rainbow-colored timeline chart of session scales with a
general index line."""
import math as _math
from .report_chart_utils import add_chart_to_doc, build_scales_chart
# Guard: need valid input
if lateral_df is None or lateral_df.empty:
return
if (
"scale_name" not in lateral_df.columns
or "scale_value" not in lateral_df.columns
):
return
if "block_ID" not in lateral_df.columns:
return
# Use L rows only to avoid duplicates
if "laterality" in lateral_df.columns:
df_l = lateral_df[lateral_df["laterality"] == "L"].copy()
else:
df_l = lateral_df.copy()
if df_l.empty:
df_l = lateral_df.drop_duplicates(subset=["block_ID"]).copy()
# Collect scale values per block
scale_data: dict[str, dict[int, float]] = {}
for _, row in df_l.iterrows():
try:
bid = int(row.get("block_ID", 0))
except (ValueError, TypeError):
continue
names = str(row.get("scale_name", "") or "").split("\n")
values = str(row.get("scale_value", "") or "").split("\n")
for i, name in enumerate(names):
name = name.strip()
if not name:
continue
val_str = values[i].strip() if i < len(values) else ""
try:
val = float(val_str)
except ValueError:
continue
if _math.isnan(val):
continue
scale_data.setdefault(name, {})[bid] = val
if not scale_data:
return
png = build_scales_chart(
scale_data,
self.scale_optimization_prefs,
title="Session Scales Timeline",
x_label="Block",
y_label="Scale Value",
)
add_chart_to_doc(doc, png)
def _column_header(self, col: str) -> str:
"""Map an internal column name to a human-readable table header."""
placeholder_map = {
"scale_name": PLACEHOLDERS.get("scale_name"),
"scale_value": PLACEHOLDERS.get("scale_value"),
"frequency": PLACEHOLDERS.get("frequency"),
"anode": "+",
"cathode": "-",
"amplitude": PLACEHOLDERS.get("amplitude"),
"pulse_width": PLACEHOLDERS.get("pulse_width"),
"program_ID": "Prog",
"laterality": "",
}
if col in placeholder_map and placeholder_map[col] is not None:
return str(placeholder_map[col])
return str(col).replace("_", " ").title()
def _pick_latest_row(self, df: pd.DataFrame) -> pd.Series | None:
"""Return the row with the highest block_ID, or the last row."""
if df is None or df.empty:
return None
if "block_ID" in df.columns:
try:
bid = pd.to_numeric(df["block_ID"], errors="coerce")
result = df.loc[bid.idxmax()]
if isinstance(result, pd.DataFrame):
return result.iloc[-1]
return result
except Exception:
return df.iloc[-1]
return df.iloc[-1]
def _find_best_and_second_best_blocks(self, lateral_df: pd.DataFrame) -> tuple:
"""
Find block_IDs with the best and second-best scores based on the
current optimization preferences.
Returns a tuple: (best_block_ids, second_best_block_ids)
Each is a list (may have multiple if tied).
"""
if lateral_df is None or lateral_df.empty:
return [], []
if (
"block_ID" not in lateral_df.columns
or "scale_value" not in lateral_df.columns
):
return [], []
if "scale_name" not in lateral_df.columns:
return [], []
try:
# Build preference lookup: scale_name -> (mode, custom_value)
pref_lookup = {}
for pref in self.scale_optimization_prefs:
if len(pref) >= 5:
name, _, _, mode, custom_val = pref
pref_lookup[name.strip().lower()] = (mode, custom_val)
# Get unique blocks (use only L rows to avoid double counting)
df_l = lateral_df[lateral_df.get("laterality", "") == "L"].copy()
if df_l.empty:
df_l = lateral_df.drop_duplicates(subset=["block_ID"]).copy()
block_scores = {}
for _, row in df_l.iterrows():
bid = row.get("block_ID")
if bid is None:
continue
scale_name_str = str(row.get("scale_name", "") or "")
scale_val_str = str(row.get("scale_value", "") or "")
names = scale_name_str.split("\n")
values = scale_val_str.split("\n")
# Calculate weighted score for this block
total_score = 0.0
has_value = False
for i, val_line in enumerate(values):
val_line = val_line.strip()
if not val_line:
continue
try:
val = float(val_line)
except ValueError:
continue
import math as _math
if _math.isnan(val):
continue
# Get the corresponding scale name
scale_name = names[i].strip().lower() if i < len(names) else ""
mode, custom_val = pref_lookup.get(scale_name, ("min", ""))
if mode == "ignore":
continue # Skip this scale
has_value = True
if mode in ("low", "min"):
# Lower is better - use value directly as score (lower = better)
total_score += val
elif mode in ("high", "max"):
# Higher is better - negate so lower score = better
total_score -= val
elif mode == "custom":
# Closer to custom value is better - use absolute distance
try:
target = float(custom_val) if custom_val else 0.0
total_score += abs(val - target)
except ValueError:
total_score += val
if has_value:
block_scores[bid] = total_score
if not block_scores:
return [], []
# Sort unique scores
unique_scores = sorted(set(block_scores.values()))
# Best blocks (lowest score)
best_score = unique_scores[0]
best_blocks = [
bid for bid, score in block_scores.items() if score == best_score
]
# Second best blocks (second lowest score, if exists)
second_best_blocks = []
if len(unique_scores) > 1:
second_score = unique_scores[1]
second_best_blocks = [
bid for bid, score in block_scores.items() if score == second_score
]
return best_blocks, second_best_blocks
except Exception:
return [], []
def _highlight_cells_green(self, row_cells, intensity: str = "best") -> None:
"""
Apply green background to all cells in a row.
Args:
row_cells: List of cells to highlight
intensity: "best" for darker green, "second" for lighter green
"""
# Best = darker green, Second = lighter green
color = "96D2A0" if intensity == "best" else "C8EBCD"
for cell in row_cells:
try:
shading_elm = OxmlElement("w:shd")
shading_elm.set(qn("w:fill"), color)
cell._tc.get_or_add_tcPr().append(shading_elm)
except Exception:
pass
def _set_cell_border_top(self, cell, sz=12):
"""Set top border of a cell to specified size (in eighths of a point)."""
try:
tc = cell._tc
tcPr = tc.get_or_add_tcPr() # noqa: N806
tcBorders = OxmlElement("w:tcBorders") # noqa: N806
top = OxmlElement("w:top")
top.set(qn("w:val"), "single")
top.set(qn("w:sz"), str(sz))
top.set(qn("w:space"), "0")
top.set(qn("w:color"), "000000")
tcBorders.append(top)
tcPr.append(tcBorders)
except Exception:
pass
def _set_paragraph_bottom_border(
self, paragraph, sz: int = 6, color: str = "000000"
) -> None:
"""Draw a bottom border line under a Word paragraph."""
try:
pPr = paragraph._p.get_or_add_pPr() # noqa: N806
pBdr = OxmlElement("w:pBdr") # noqa: N806
bottom = OxmlElement("w:bottom")
bottom.set(qn("w:val"), "single")
bottom.set(qn("w:sz"), str(sz))
bottom.set(qn("w:space"), "1")
bottom.set(qn("w:color"), str(color))
pBdr.append(bottom)
pPr.append(pBdr)
except Exception:
pass
def _write_multiline_cell_with_dividers(
self, cell, lines: list, divider_sz: int = 12, divider_color: str = "000000"
) -> None:
"""Write each line as its own paragraph and draw a full-width
divider under each line except the last."""
try:
cell.text = ""
if not lines:
return
p0 = cell.paragraphs[0]
p0.text = str(lines[0])
if len(lines) > 1:
self._set_paragraph_bottom_border(
p0, sz=divider_sz, color=divider_color
)
for k in range(1, len(lines)):
p = cell.add_paragraph(str(lines[k]))
if k < len(lines) - 1:
self._set_paragraph_bottom_border(
p, sz=divider_sz, color=divider_color
)
except Exception:
try:
cell.text = "\n".join([str(x) for x in (lines or [])])
except Exception:
pass
def _apply_contact_tokens_to_canvas(
self, canvas: ElectrodeCanvas, anode_text: str, cathode_text: str
) -> None:
"""Parse anode/cathode token strings and set the corresponding canvas states."""
model = canvas.model
if not model:
return
canvas.contact_states.clear()
canvas.case_state = ContactState.OFF
def apply_tokens(text: str, state: int) -> None:
if not text:
return
for token in str(text).split("_"):
token = token.strip()
if not token:
continue
if token == "case":
canvas.case_state = state
continue
if token.startswith("E") and len(token) >= 2:
try:
if token[-1].isalpha():
idx = int(token[1:-1])
seg_char = token[-1].lower()
seg_map = {"a": 0, "b": 1, "c": 2}
if seg_char in seg_map:
canvas.contact_states[(idx, seg_map[seg_char])] = state
else:
idx = int(token[1:])
if model.is_directional:
for seg in range(3):
canvas.contact_states[(idx, seg)] = state
else:
canvas.contact_states[(idx, 0)] = state
except Exception:
continue
apply_tokens(anode_text, ContactState.ANODIC)
apply_tokens(cathode_text, ContactState.CATHODIC)
canvas.update()
# Force white background by temporarily overriding paintEvent
original_paint = canvas.paintEvent
def white_bg_paint(event):
painter = QPainter(canvas)
painter.fillRect(canvas.rect(), Qt.GlobalColor.white)
original_paint(event)
canvas.paintEvent = white_bg_paint # type: ignore[assignment] # ty: ignore[invalid-assignment]
def _render_electrode_png(
self,
model_name: str,
anode_text: str,
cathode_text: str,
target_size_px: tuple[int, int] = (440, 900),
) -> str | None:
model = ELECTRODE_MODELS.get(model_name)
if not model:
return None
canvas = ElectrodeCanvas()
canvas.set_model(model)
canvas.resize(*target_size_px)
try:
canvas.set_export_mode(True)
except Exception:
pass
self._apply_contact_tokens_to_canvas(canvas, anode_text, cathode_text)
# Force white background by temporarily overriding paintEvent
original_paint = canvas.paintEvent
def white_bg_paint(event):
painter = QPainter(canvas)
painter.fillRect(canvas.rect(), Qt.GlobalColor.white)
original_paint(event)
canvas.paintEvent = white_bg_paint # type: ignore[assignment] # ty: ignore[invalid-assignment]
pixmap = QPixmap(canvas.size())
pixmap.fill(Qt.GlobalColor.white)
canvas.render(pixmap)
# Crop white borders
image = pixmap.toImage()
from PySide6.QtGui import QColor as _QColor
# Find bounding box of non-white content
left, top, right, bottom = image.width(), image.height(), 0, 0
white_rgb = _QColor(Qt.GlobalColor.white).rgb()
for y in range(image.height()):
for x in range(image.width()):
if image.pixel(x, y) != white_rgb:
left = min(left, x)
top = min(top, y)
right = max(right, x)
bottom = max(bottom, y)
if right > left and bottom > top:
margin = 20 # small margin in pixels
left = max(0, left - margin)
top = max(0, top - margin)
right = min(image.width() - 1, right + margin)
bottom = min(image.height() - 1, bottom + margin)
cropped = pixmap.copy(left, top, right - left + 1, bottom - top + 1)
else:
cropped = pixmap
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".png")
tmp.close()
cropped.save(tmp.name, "PNG")
return tmp.name
def _add_electrode_config_section(
self, doc: DocumentType, df: pd.DataFrame, df_initial: pd.DataFrame
) -> None:
"""Add the initial vs final electrode configuration images to the document."""
if df is None or df.empty:
return
if "session_ID" not in df.columns or "is_initial" not in df.columns:
return
dfc = df.copy()
dfc["session_ID"] = pd.to_numeric(dfc["session_ID"], errors="coerce")
dfc["is_initial"] = (
pd.to_numeric(dfc["is_initial"], errors="coerce").fillna(0).astype(int)
)
df_init = dfc[dfc["is_initial"] == 1]
df_final = dfc[dfc["is_initial"] == 0]
if df_init.empty or df_final.empty:
return
init_session = int(dfc.loc[df_init.index, "session_ID"].max())
final_session = int(dfc.loc[df_final.index, "session_ID"].max())
init_row = self._pick_latest_row(df_init[df_init["session_ID"] == init_session])
final_row = self._pick_latest_row(
df_final[df_final["session_ID"] == final_session]
)
if init_row is None or final_row is None:
return
init_model = str(init_row.get("electrode_model", "") or "")
final_model = str(final_row.get("electrode_model", "") or "")
if not init_model or not final_model:
return
paths = {
"Init L": self._render_electrode_png(
init_model,
str(init_row.get("left_anode", "") or ""),
str(init_row.get("left_cathode", "") or ""),
),
"Init R": self._render_electrode_png(
init_model,
str(init_row.get("right_anode", "") or ""),
str(init_row.get("right_cathode", "") or ""),
),
"Final L": self._render_electrode_png(
final_model,
str(final_row.get("left_anode", "") or ""),
str(final_row.get("left_cathode", "") or ""),
),
"Final R": self._render_electrode_png(
final_model,
str(final_row.get("right_anode", "") or ""),
str(final_row.get("right_cathode", "") or ""),
),
}
if not all(paths.values()):
return
doc.add_heading("Electrode Configurations", level=1)
# Add electrode model info
latest_init = self._pick_latest_session_row(df_initial)
if latest_init is None:
return
model = str(latest_init.get("electrode_model", "") or "")
manufacturer = self._get_manufacturer_for_model(model)
if model:
if manufacturer:
doc.add_paragraph(f"Electrode model: {manufacturer} | {model}")
else:
doc.add_paragraph(f"Electrode model: {model}")
# 4 columns x 4 rows table, no borders
# Row 0: "Initial Settings" (merged cols 0-1),
# "Final Settings" (merged cols 2-3)
# Row 1: Left, Right, Left, Right
# Row 2: Anode/Cathode config text
# Row 3: PNG images
t = doc.add_table(rows=4, cols=4)
t.autofit = False
# Remove all borders
tbl = t._tbl
tblPr = tbl.tblPr if tbl.tblPr is not None else tbl._add_tblPr() # noqa: N806
borders = OxmlElement("w:tblBorders")
for border_name in ("top", "left", "bottom", "right", "insideH", "insideV"):
border = OxmlElement(f"w:{border_name}")
border.set(qn("w:val"), "none")
border.set(qn("w:sz"), "0")
border.set(qn("w:space"), "0")
border.set(qn("w:color"), "auto")
borders.append(border)
tblPr.append(borders)
# Row 0: merged headers
cell_init = t.cell(0, 0).merge(t.cell(0, 1))
cell_init.text = "Initial Settings"
for p in cell_init.paragraphs:
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
for run in p.runs:
run.bold = True
cell_final = t.cell(0, 2).merge(t.cell(0, 3))
cell_final.text = "Final Settings"
for p in cell_final.paragraphs:
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
for run in p.runs:
run.bold = True
# Row 1: Left / Right labels
for c, label in enumerate(["Left", "Right", "Left", "Right"]):
cell = t.cell(1, c)
cell.text = label
for p in cell.paragraphs:
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
# Captions in order: Init L, Init R, Final L, Final R
all_captions = [
(
str(init_row.get("left_anode", "") or ""),
str(init_row.get("left_cathode", "") or ""),
),
(
str(init_row.get("right_anode", "") or ""),
str(init_row.get("right_cathode", "") or ""),
),
(
str(final_row.get("left_anode", "") or ""),
str(final_row.get("left_cathode", "") or ""),
),
(
str(final_row.get("right_anode", "") or ""),
str(final_row.get("right_cathode", "") or ""),
),
]
all_img_paths = [
paths["Init L"],
paths["Init R"],
paths["Final L"],
paths["Final R"],
]
# Row 2: Config text
for c, (anode_txt, cathode_txt) in enumerate(all_captions):
cell = t.cell(2, c)
cell.text = ""
p = cell.paragraphs[0]
p.text = f"Anode: {anode_txt}\nCathode: {cathode_txt}".strip()
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
# Row 3: PNG images
for c, img_path in enumerate(all_img_paths):
cell = t.cell(3, c)
cell.text = ""
p = cell.paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run()
run.add_picture(img_path, width=Inches(1.15))
for pth in paths.values():
if pth is not None:
try:
os.unlink(pth)
except Exception:
pass
def export_to_pdf(self, parent: QWidget | None = None, sections=None) -> bool:
"""Export session data to PDF by generating a Word report and converting it."""
try:
if not self.session_data.is_file_open():
QMessageBox.warning(
parent,
"No Session Data",
"No session file is currently open. Please start a session first.",
)
return False
default_filename = self._generate_bids_report_filename(".pdf")
start_dir = os.path.dirname(
getattr(self.session_data, "file_path", "") or ""
)
start_path = (
os.path.join(start_dir, default_filename)
if start_dir
else default_filename
)
from PySide6.QtWidgets import QFileDialog
pdf_path, _ = QFileDialog.getSaveFileName(
parent,
"Export Session Report",
start_path,
"PDF Files (*.pdf);;All Files (*)",
)
if not pdf_path:
return False
if not pdf_path.endswith(".pdf"):
pdf_path += ".pdf"
docx_path = os.path.splitext(pdf_path)[0] + "_tmp.docx"
ok = self._export_to_word_path(docx_path, sections=sections)
if not ok:
return False
try:
self._convert_docx_to_pdf(docx_path, pdf_path)
finally:
try:
os.unlink(docx_path)
except Exception:
pass
self._show_transient_message(
parent,
"Export Completed",
f"Report saved successfully:\n{pdf_path}",
msecs=2000,
)
self._open_file(pdf_path)
return True
except Exception as e:
QMessageBox.critical(
parent,
"Export Error",
f"Failed to export session data to PDF:\n{str(e)}",
)
return False
@staticmethod
def _open_file(path: str) -> None:
"""Open a file with the system default application."""
try:
import subprocess
import sys
if sys.platform == "win32":
os.startfile(path) # noqa: S606
elif sys.platform == "darwin":
subprocess.Popen(["open", path]) # noqa: S603
else:
subprocess.Popen(["xdg-open", path]) # noqa: S603
except Exception:
pass
def export_to_word(self, parent: QWidget | None = None, sections=None) -> bool:
"""
Export session data to Word format.
Args:
parent: Parent widget for dialog display
sections: List of section keys to include
Returns:
True if export was successful, False otherwise
"""
try:
# Get the current session data
if not self.session_data.is_file_open():
QMessageBox.warning(
parent,
"No Session Data",
"No session file is currently open. Please start a session first.",
)
return False
# Generate BIDS-friendly default filename from TSV path
default_filename = self._generate_bids_report_filename(".docx")
# Use same directory as TSV file
start_dir = os.path.dirname(
getattr(self.session_data, "file_path", "") or ""
)
start_path = (
os.path.join(start_dir, default_filename)
if start_dir
else default_filename
)
# Get save location
from PySide6.QtWidgets import QFileDialog
file_path, _ = QFileDialog.getSaveFileName(
parent,
"Export Session Report",
start_path,
"Word Files (*.docx);;All Files (*)",
)
if not file_path:
return False # User cancelled
# Ensure .docx extension
if not file_path.endswith(".docx"):
file_path += ".docx"
ok = self._export_to_word_path(file_path, sections=sections)
if not ok:
QMessageBox.warning(
parent,
"No Data to Export",
"No session data has been recorded yet.",
)
return False
self._show_transient_message(
parent,
"Export Completed",
f"Report saved successfully:\n{file_path}",
msecs=2000,
)
return True
except Exception as e:
QMessageBox.critical(
parent,
"Export Error",
f"Failed to export session data to Word:\n{str(e)}",
)
return False
def _export_to_word_path(self, file_path: str, sections=None) -> bool:
"""Generate the Word report at an explicit path (used also by PDF export)."""
df = self._read_session_data()
if df is None or df.empty:
return False
df = df.copy()
df = self._normalize_block_id_column(df)
if "is_initial" in df.columns:
df["is_initial"] = (
pd.to_numeric(df["is_initial"], errors="coerce").fillna(0).astype(int)
)
df_initial: pd.DataFrame = (
df[df["is_initial"] == 1] if "is_initial" in df.columns else df.iloc[0:0]
)
df_table: pd.DataFrame = (
df[df["is_initial"] != 1] if "is_initial" in df.columns else df
)
doc = Document()
section = doc.sections[0]
section.left_margin = Inches(0.5) # default ~1.0
section.right_margin = Inches(0.5) # default ~1.0
section.top_margin = Inches(0.75)
section.bottom_margin = Inches(0.75)
title = doc.add_heading("Clinical DBS Session Report", 0)
title.alignment = WD_ALIGN_PARAGRAPH.CENTER
timestamp = datetime.now().astimezone().strftime("%Y-%m-%d %H:%M:%S")
doc.add_paragraph(f"Generated on: {timestamp} by {__app_name__} v{__version__}")
patient_id, session_num = self._extract_bids_info_from_path()
if patient_id or session_num:
info_parts = []
if patient_id:
info_parts.append(f"Patient ID: {patient_id}")
if session_num:
info_parts.append(f"Session: {session_num}")
doc.add_paragraph(" ".join(info_parts))
# Determine which sections to include (default: all except the
# parent section when children exist).
all_keys = [
"initial_notes",
"session_data",
"session_data_graph",
"session_data_table",
"electrode_config",
"programming_summary",
]
if sections is not None:
active = set(sections)
else:
# Default: all sections, but use children instead of parent for session_data
active = set(all_keys) - {"session_data"}
# Render in the defined order
for key in all_keys:
if key not in active:
continue
if key == "initial_notes":
doc.add_paragraph("")
self._add_summary_section(doc, df, df_initial, df_table)
if key == "session_data":
# Treat parent as both graph and table
doc.add_paragraph("")
self._add_session_data_table(
doc, df_table, with_chart=True, with_table=True
)
elif key == "session_data_graph":
# Handle graph separately - always add heading
doc.add_paragraph("")
self._add_session_data_table(
doc, df_table, with_chart=True, with_table=False
)
elif key == "session_data_table":
# Handle table separately - always add heading
doc.add_paragraph("")
self._add_session_data_table(
doc, df_table, with_chart=False, with_table=True
)
if key == "electrode_config":
doc.add_paragraph("")
self._add_electrode_config_section(doc, df, df_initial)
if key == "programming_summary":
doc.add_paragraph("")
self._add_programming_summary(doc, df, df_initial, df_table)
doc.save(file_path)
return True
def _add_report_footer(self, doc: DocumentType) -> None:
"""Add footer with patient ID and session number."""
from docx.shared import Pt
patient_id, session_num = self._extract_bids_info_from_path()
if patient_id or session_num:
doc.add_paragraph("")
doc.add_paragraph("")
footer_para = doc.add_paragraph()
footer_run = footer_para.add_run("─" * 50)
footer_run.font.size = Pt(8)
info_para = doc.add_paragraph()
if patient_id:
info_para.add_run(f"Patient ID: {patient_id} ")
if session_num:
info_para.add_run(f"Session: {session_num}")
for run in info_para.runs:
run.font.size = Pt(10)
def _read_simple_annotations(self) -> list[tuple[str, str]]:
"""Read (timestamp, annotation) pairs from the simple annotations TSV."""
file_path = getattr(self.session_data, "file_path", None)
if not file_path and getattr(self.session_data, "tsv_file", None) is not None:
try:
file_path = self.session_data.tsv_file.name
except Exception:
file_path = None
if not file_path or not os.path.exists(file_path):
return []
items: list[tuple[str, str]] = []
try:
with open(file_path, newline="", encoding="utf-8-sig") as f:
reader = csv.DictReader(f, delimiter="\t")
for row in reader:
if not row:
continue
norm = {}
for k, v in row.items():
try:
kk = str(k).strip().lstrip("\ufeff").lower()
except Exception:
continue
norm[kk] = v
# Extract date and time separately
date_str = norm.get("date", "")
time_str = norm.get("time", "")
# Build timestamp: if both exist, combine; otherwise
# fall back to time / timestamp / date.
if date_str and time_str:
t = f"{date_str} {time_str}"
else:
t = str(
norm.get("time", "")
or norm.get("timestamp", "")
or norm.get("date", "")
or ""
)
a = str(
norm.get("notes", "")
or norm.get("annotation", "")
or norm.get("note", "")
or norm.get("text", "")
or ""
)
# Fallback: if headers are unexpected, use the
# first/second column values.
if not (t.strip() or a.strip()):
try:
vals = list(norm.values())
t = str(vals[0] or "") if len(vals) >= 1 else ""
a = str(vals[1] or "") if len(vals) >= 2 else ""
except Exception:
t, a = "", ""
if a.strip() or t.strip():
items.append((t, a))
except Exception:
return []
return items
def _export_annotations_to_word_path(self, file_path: str) -> bool:
"""Generate the annotations Word report at an explicit path."""
annotations = self._read_simple_annotations()
if not annotations:
return False
doc = Document()
title = doc.add_heading("Annotations Report", 0)
title.alignment = WD_ALIGN_PARAGRAPH.CENTER
timestamp = datetime.now().astimezone().strftime("%Y-%m-%d %H:%M:%S")
doc.add_paragraph(f"Generated on: {timestamp} by {__app_name__} v{__version__}")
doc.add_paragraph("")
for t, a in annotations:
line = f"{t}: {a}" if t.strip() else a
doc.add_paragraph(line)
doc.save(file_path)
return True
def export_annotations_to_word(self, parent: QWidget | None = None) -> bool:
"""Export simple annotations to a Word document."""
try:
if not self.session_data.is_file_open():
QMessageBox.warning(
parent,
"No Session Data",
"No annotation file is currently open. "
"Please open or create one first.",
)
return False
if getattr(self.session_data, "tsv_file", None) is not None:
try:
self.session_data.tsv_file.flush()
except Exception:
pass
from PySide6.QtWidgets import QFileDialog
default_filename = self._generate_bids_report_filename(".docx")
start_dir = os.path.dirname(
getattr(self.session_data, "file_path", "") or ""
)
start_path = (
os.path.join(start_dir, default_filename)
if start_dir
else default_filename
)
file_path, _ = QFileDialog.getSaveFileName(
parent,
"Export Annotations Report",
start_path,
"Word Files (*.docx);;All Files (*)",
)
if not file_path:
return False
if not file_path.endswith(".docx"):
file_path += ".docx"
ok = self._export_annotations_to_word_path(file_path)
if not ok:
QMessageBox.warning(
parent,
"No Data to Export",
"No annotations have been recorded yet.",
)
return False
self._show_transient_message(
parent,
"Export Completed",
f"Report saved successfully:\n{file_path}",
msecs=2000,
)
return True
except Exception as e:
QMessageBox.critical(
parent,
"Export Error",
f"Failed to export annotations to Word:\n{str(e)}",
)
return False
def export_annotations_to_pdf(self, parent: QWidget | None = None) -> bool:
"""Export simple annotations to PDF via intermediate Word conversion."""
try:
if not self.session_data.is_file_open():
QMessageBox.warning(
parent,
"No Session Data",
"No annotation file is currently open. "
"Please open or create one first.",
)
return False
if getattr(self.session_data, "tsv_file", None) is not None:
try:
self.session_data.tsv_file.flush()
except Exception:
pass
from PySide6.QtWidgets import QFileDialog
default_filename = self._generate_bids_report_filename(".pdf")
start_dir = os.path.dirname(
getattr(self.session_data, "file_path", "") or ""
)
start_path = (
os.path.join(start_dir, default_filename)
if start_dir
else default_filename
)
pdf_path, _ = QFileDialog.getSaveFileName(
parent,
"Export Annotations Report",
start_path,
"PDF Files (*.pdf);;All Files (*)",
)
if not pdf_path:
return False
if not pdf_path.endswith(".pdf"):
pdf_path += ".pdf"
docx_path = os.path.splitext(pdf_path)[0] + "_tmp.docx"
ok = self._export_annotations_to_word_path(docx_path)
if not ok:
QMessageBox.warning(
parent,
"No Data to Export",
"No annotations have been recorded yet.",
)
return False
try:
self._convert_docx_to_pdf(docx_path, pdf_path)
finally:
try:
os.unlink(docx_path)
except Exception:
pass
self._show_transient_message(
parent,
"Export Completed",
f"Report saved successfully:\n{pdf_path}",
msecs=2000,
)
return True
except Exception as e:
QMessageBox.critical(
parent,
"Export Error",
f"Failed to export annotations to PDF:\n{str(e)}",
)
return False