"""
Longitudinal report exporter for DBS Annotator.
Combines data from multiple annotation TSV files and generates a unified
longitudinal report in Word or PDF format, with best-entry highlighting
based on user-selected scale optimization preferences.
"""
import os
import re
import tempfile
from datetime import datetime
import pandas as pd
from docx import Document
from docx.document import Document as DocumentType
from docx.enum.text import WD_ALIGN_PARAGRAPH
from docx.oxml import OxmlElement
from docx.oxml.ns import qn
from docx.shared import Inches, Pt, RGBColor
from PySide6.QtCore import Qt, QTimer
from PySide6.QtWidgets import QFileDialog, QMessageBox, QWidget
from .. import __app_name__, __version__
from ..config import PLACEHOLDERS
from ..config_electrode_models import ELECTRODE_MODELS, MANUFACTURERS, ContactState
from ..models import is_session_scale_value_omitted
[docs]
class LongitudinalExporter:
"""Generate longitudinal reports from multiple annotation TSV files."""
def __init__(self):
self.scale_optimization_prefs: list = []
self.clinical_scale_prefs: list = []
def set_scale_optimization_prefs(self, prefs: list) -> None:
"""Set scale optimization preferences for best-entry highlighting."""
self.scale_optimization_prefs = prefs or []
def set_clinical_scale_prefs(self, prefs: list | None) -> None:
"""Set clinical scale optimization preferences."""
self.clinical_scale_prefs = prefs or []
# ------------------------------------------------------------------
# Public export API
# ------------------------------------------------------------------
def export_to_word(
self, file_paths: list[str], parent: QWidget | None = None, sections=None
) -> bool:
"""Export longitudinal report to Word format."""
try:
default_name = self._generate_filename(file_paths, ".docx")
start_dir = os.path.dirname(file_paths[0]) if file_paths else ""
start_path = (
os.path.join(start_dir, default_name) if start_dir else default_name
)
file_path, _ = QFileDialog.getSaveFileName(
parent,
"Export Longitudinal Report",
start_path,
"Word Files (*.docx);;All Files (*)",
)
if not file_path:
return False
if not file_path.endswith(".docx"):
file_path += ".docx"
ok = self._build_report(file_paths, file_path, sections=sections)
if not ok:
QMessageBox.warning(
parent, "No Data", "No session data found in the loaded files."
)
return False
self._show_transient_message(
parent, "Export Completed", f"Report saved:\n{file_path}"
)
return True
except Exception as e:
QMessageBox.critical(
parent, "Export Error", f"Failed to export report:\n{e}"
)
return False
def export_to_pdf(
self, file_paths: list[str], parent: QWidget | None = None, sections=None
) -> bool:
"""Export longitudinal report to PDF (via intermediate Word)."""
try:
default_name = self._generate_filename(file_paths, ".pdf")
start_dir = os.path.dirname(file_paths[0]) if file_paths else ""
start_path = (
os.path.join(start_dir, default_name) if start_dir else default_name
)
pdf_path, _ = QFileDialog.getSaveFileName(
parent,
"Export Longitudinal Report",
start_path,
"PDF Files (*.pdf);;All Files (*)",
)
if not pdf_path:
return False
if not pdf_path.endswith(".pdf"):
pdf_path += ".pdf"
docx_tmp = os.path.splitext(pdf_path)[0] + "_tmp.docx"
ok = self._build_report(file_paths, docx_tmp, sections=sections)
if not ok:
QMessageBox.warning(
parent, "No Data", "No session data found in the loaded files."
)
return False
try:
self._convert_docx_to_pdf(docx_tmp, pdf_path)
finally:
try:
os.unlink(docx_tmp)
except Exception:
pass
self._show_transient_message(
parent, "Export Completed", f"Report saved:\n{pdf_path}"
)
self._open_file(pdf_path)
return True
except Exception as e:
QMessageBox.critical(
parent, "Export Error", f"Failed to export report:\n{e}"
)
return False
@staticmethod
def _open_file(path: str) -> None:
"""Open a file with the system default application."""
try:
import subprocess
import sys
if sys.platform == "win32":
os.startfile(path) # noqa: S606
elif sys.platform == "darwin":
subprocess.Popen(["open", path]) # noqa: S603
else:
subprocess.Popen(["xdg-open", path]) # noqa: S603
except Exception:
pass
# ------------------------------------------------------------------
# Report building
# ------------------------------------------------------------------
def _build_report(
self, file_paths: list[str], out_path: str, sections=None
) -> bool:
"""Read all files, merge, and build the Word document."""
# Sort files chronologically by earliest date+time in each file
def get_file_datetime(path):
try:
df = pd.read_csv(path, sep="\t", na_filter=False)
if "date" in df.columns and "time" in df.columns:
# Combine date and time to create datetime for sorting
df["datetime"] = pd.to_datetime(
df["date"] + " " + df["time"], errors="coerce"
)
valid_times = df["datetime"].dropna()
if not valid_times.empty:
return valid_times.min() # Use earliest time in file
# Fallback to filename date if available
basename = os.path.basename(path)
import re
date_match = re.search(r"ses-(\d{8})", basename)
if date_match:
date_str = date_match.group(1)
return pd.to_datetime(date_str, format="%Y%m%d")
# If no date info available, return a very old date to put it at the end
return pd.Timestamp("1900-01-01")
except Exception:
return pd.Timestamp("1900-01-01")
# Sort files from oldest to newest
file_paths = sorted(file_paths, key=get_file_datetime)
frames = []
for path in file_paths:
try:
from .tsv_columns import read_session_tsv
df = read_session_tsv(path)
# Tag each row with its source file for traceability
df["_source_file"] = os.path.basename(path)
frames.append(df)
except Exception as e:
print(f"[WARNING] Could not read {path}: {e}")
if not frames:
return False
df_all = pd.concat(frames, ignore_index=True)
if df_all.empty:
return False
df_all = self._normalize_block_id(df_all)
# Split initial vs session rows
if "is_initial" in df_all.columns:
df_all["is_initial"] = (
pd.to_numeric(df_all["is_initial"], errors="coerce")
.fillna(0)
.astype(int)
)
df_all[df_all["is_initial"] == 1]
df_session = df_all[df_all["is_initial"] == 0]
else:
df_all.iloc[0:0]
df_session = df_all
doc = Document()
section = doc.sections[0]
section.left_margin = Inches(0.5)
section.right_margin = Inches(0.5)
section.top_margin = Inches(0.75)
section.bottom_margin = Inches(0.75)
# Title
title = doc.add_heading("Longitudinal DBS Report", 0)
title.alignment = WD_ALIGN_PARAGRAPH.CENTER
doc.add_paragraph(
f"Generated on: {datetime.now().astimezone().strftime('%Y-%m-%d %H:%M:%S')}"
f" by {__app_name__} v{__version__}"
)
# Patient info (from first file)
patient_id = self._extract_patient_id(file_paths)
if patient_id:
doc.add_paragraph(f"Patient ID: {patient_id}")
doc.add_paragraph(f"Files included: {len(file_paths)}")
for fp in file_paths:
doc.add_paragraph(f" {os.path.basename(fp)}")
doc.add_paragraph("")
# Determine which sections to include (default: sessions_overview
# plus session_data children).
all_keys = [
"sessions_overview",
"session_data",
"session_data_graph",
"session_data_table",
"electrode_config",
"programming_summary",
]
if sections is not None:
active = set(sections)
else:
# Default: sessions_overview + both session_data children
active = {"sessions_overview", "session_data_graph"}
# Render in the defined order
for key in all_keys:
if key not in active:
continue
if key == "sessions_overview":
doc.add_paragraph("")
self._add_sessions_overview(doc, df_all, file_paths)
if key == "session_data":
# Treat parent as both children
doc.add_paragraph("")
doc.add_heading("Session Data", level=1)
self._add_scales_timeline_chart(doc, df_session, file_paths)
self._add_longitudinal_data_table(
doc,
df_session,
file_paths,
include_chart=False,
include_heading=False,
)
elif key == "session_data_graph":
# Handle graph separately - add heading first
doc.add_paragraph("")
doc.add_heading("Session Data", level=1)
self._add_scales_timeline_chart(doc, df_session, file_paths)
elif key == "session_data_table":
# Handle table separately - add heading first if graph not selected
doc.add_paragraph("")
doc.add_heading("Session Data", level=1)
self._add_longitudinal_data_table(
doc,
df_session,
file_paths,
include_chart=False,
include_heading=False,
)
doc.add_paragraph("")
if key == "electrode_config":
doc.add_paragraph("")
self._add_electrode_config_section(doc, df_all, file_paths)
if key == "programming_summary":
doc.add_paragraph("")
self._add_programming_summary(doc, df_all, file_paths)
doc.save(out_path)
return True
# ------------------------------------------------------------------
# Report sections
# ------------------------------------------------------------------
def _add_sessions_overview(
self, doc: DocumentType, df: pd.DataFrame, file_paths: list[str]
) -> None:
"""Add a summary table listing each session file with date and entry count."""
doc.add_heading("Sessions Overview", level=1)
self._add_clinical_scales_timeline_chart(doc, df, file_paths)
headers = ["#", "File", "Date", "Entries", "Clinical scales", "Values"]
table = doc.add_table(rows=1 + len(file_paths), cols=len(headers))
table.style = "Table Grid"
table.autofit = False
# Column widths - use full page width
section = doc.sections[0]
page_w = (
int(section.page_width or 0)
- int(section.left_margin or 0)
- int(section.right_margin or 0)
) / 914400
base_w = {
"#": 0.25,
"File": 2.0,
"Date": 1.0,
"Entries": 0.7,
"Clinical scales": 1.2,
"Values": 1.0,
}
widths = [base_w.get(h, 0.8) for h in headers]
# Adjust the "Values" column to fill remaining space
values_idx = headers.index("Values")
used = sum(w for j, w in enumerate(widths) if j != values_idx)
widths[values_idx] = max(1.0, page_w - used)
w_twips = [Inches(max(0.25, w)) for w in widths]
for row in table.rows:
for ci, cell in enumerate(row.cells):
cell.width = w_twips[ci]
for i, h in enumerate(headers):
cell = table.rows[0].cells[i]
cell.text = h
for p in cell.paragraphs:
for run in p.runs:
run.bold = True
for idx, fp in enumerate(file_paths):
row_cells = table.rows[idx + 1].cells
basename = os.path.basename(fp)
row_cells[0].text = str(idx + 1)
row_cells[1].text = basename
sub_df = (
df[df["_source_file"] == basename]
if "_source_file" in df.columns
else df
)
date_str = ""
if "date" in sub_df.columns and not sub_df.empty:
dates = sub_df["date"].dropna().unique()
if len(dates) > 0:
date_str = ", ".join(str(d) for d in sorted(dates))
row_cells[2].text = date_str
session_rows = sub_df
if "is_initial" in sub_df.columns:
session_rows = sub_df[sub_df["is_initial"] == 0]
# Count unique block_IDs (entries)
if "block_ID" in session_rows.columns:
unique_entries = session_rows["block_ID"].nunique()
else:
unique_entries = len(session_rows)
row_cells[3].text = str(unique_entries)
# Collect scales from is_initial=1 rows with highest block_ID per file
scale_pairs = []
if "scale_name" in sub_df.columns and "scale_value" in sub_df.columns:
# Filter for is_initial=1 only (baseline)
baseline_df = sub_df.copy()
if "is_initial" in baseline_df.columns:
baseline_df = baseline_df[
pd.to_numeric(baseline_df["is_initial"], errors="coerce")
.fillna(0)
.astype(int)
== 1
]
if not baseline_df.empty and "block_ID" in baseline_df.columns:
try:
baseline_df["block_ID_num"] = pd.to_numeric(
baseline_df["block_ID"], errors="coerce"
)
max_block = baseline_df["block_ID_num"].max()
# Get ALL rows with the highest block_ID (there
# could be multiple).
max_block_rows = baseline_df[
baseline_df["block_ID_num"] == max_block
]
# Collect all scale pairs from these rows
all_scales = {}
for _, row in max_block_rows.iterrows():
sn = str(row.get("scale_name", "") or "").strip()
sv = str(row.get("scale_value", "") or "").strip()
if sn:
sn_lines = [
s.strip() for s in sn.split("\n") if s.strip()
]
sv_lines = [
s.strip() for s in sv.split("\n") if s.strip()
]
while len(sv_lines) < len(sn_lines):
sv_lines.append("")
# Store scales, keeping the first non-omitted
# value per scale name.
for name, val in zip(sn_lines, sv_lines, strict=False):
if name and not is_session_scale_value_omitted(val):
if name not in all_scales:
all_scales[name] = val
# Convert to list of tuples
scale_pairs = list(all_scales.items())
except Exception:
scale_pairs = []
row_cells[4].text = (
"\n".join(p[0] for p in scale_pairs) if scale_pairs else ""
)
row_cells[5].text = (
"\n".join(p[1] for p in scale_pairs) if scale_pairs else ""
)
def _add_electrode_config_section(
self, doc: DocumentType, df_all: pd.DataFrame, file_paths: list[str]
) -> None:
"""Add per-file electrode configuration (Initial / Final, Left / Right).
Each file gets its own heading and a 4-column table matching the
single-session report layout. A page break separates consecutive files.
"""
if df_all is None or df_all.empty:
return
if "electrode_model" not in df_all.columns:
return
if "_source_file" not in df_all.columns:
return
doc.add_heading("Electrode Configurations", level=1)
from docx.enum.text import WD_BREAK
any_rendered = False
for _fp_idx, fp in enumerate(file_paths):
basename = os.path.basename(fp)
sub = df_all[df_all["_source_file"] == basename].copy()
if sub.empty:
continue
if "is_initial" in sub.columns:
sub["is_initial"] = (
pd.to_numeric(sub["is_initial"], errors="coerce")
.fillna(0)
.astype(int)
)
df_init = sub[sub["is_initial"] == 1]
df_final = sub[sub["is_initial"] == 0]
else:
df_init = sub.iloc[0:0]
df_final = sub
if df_init.empty and df_final.empty:
continue
# Pick representative rows
init_row = self._pick_latest_row(df_init) if not df_init.empty else None
final_row = self._pick_latest_row(df_final) if not df_final.empty else None
model_name = ""
for candidate in (final_row, init_row):
if candidate is not None:
m = str(candidate.get("electrode_model", "") or "").strip()
if m:
model_name = m
break
if not model_name:
continue
# Page break before each file except the first
if any_rendered:
para = doc.add_paragraph()
para.add_run().add_break(WD_BREAK.PAGE)
any_rendered = True
# File sub-heading
label = basename.replace("_events.tsv", "").replace(".tsv", "")
doc.add_heading(label, level=2)
manufacturer = self._get_manufacturer_for_model(model_name)
if manufacturer:
doc.add_paragraph(f"Electrode model: {manufacturer} | {model_name}")
else:
doc.add_paragraph(f"Electrode model: {model_name}")
# Helper to extract contact strings from a row
def _contacts(row, side):
if row is None:
return "", ""
anode = str(row.get(f"{side}_anode", "") or "")
cathode = str(row.get(f"{side}_cathode", "") or "")
return anode, cathode
i_la, i_lc = _contacts(init_row, "left")
i_ra, i_rc = _contacts(init_row, "right")
f_la, f_lc = _contacts(final_row, "left")
f_ra, f_rc = _contacts(final_row, "right")
# Render PNGs
tmp_files = []
init_model = (
str(init_row.get("electrode_model", "") or model_name)
if init_row is not None
else model_name
)
final_model = (
str(final_row.get("electrode_model", "") or model_name)
if final_row is not None
else model_name
)
png_init_l = (
self._render_electrode_png(init_model, i_la, i_lc)
if init_row is not None
else None
)
png_init_r = (
self._render_electrode_png(init_model, i_ra, i_rc)
if init_row is not None
else None
)
png_final_l = (
self._render_electrode_png(final_model, f_la, f_lc)
if final_row is not None
else None
)
png_final_r = (
self._render_electrode_png(final_model, f_ra, f_rc)
if final_row is not None
else None
)
for p in (png_init_l, png_init_r, png_final_l, png_final_r):
if p:
tmp_files.append(p)
# 4-column x 4-row table: Init L | Init R | Final L | Final R
t = doc.add_table(rows=4, cols=4)
t.autofit = False
# Set column widths to use full page width
section = doc.sections[0]
page_w = (
int(section.page_width or 0)
- int(section.left_margin or 0)
- int(section.right_margin or 0)
) / 914400
col_w = page_w / 4
w_twips = Inches(col_w)
for row in t.rows:
for cell in row.cells:
cell.width = w_twips
# Remove borders
tbl = t._tbl
tblPr = tbl.tblPr if tbl.tblPr is not None else tbl._add_tblPr() # noqa: N806
borders = OxmlElement("w:tblBorders")
for bname in ("top", "left", "bottom", "right", "insideH", "insideV"):
b = OxmlElement(f"w:{bname}")
b.set(qn("w:val"), "none")
b.set(qn("w:sz"), "0")
b.set(qn("w:space"), "0")
b.set(qn("w:color"), "auto")
borders.append(b)
tblPr.append(borders)
# Row 0: "Initial Settings" (merged 0-1) | "Final Settings" (merged 2-3)
for merged_start, merged_end, heading_text in [
(0, 1, "Initial Settings"),
(2, 3, "Final Settings"),
]:
cell = t.cell(0, merged_start).merge(t.cell(0, merged_end))
cell.text = heading_text
for p in cell.paragraphs:
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
for run in p.runs:
run.bold = True
# Row 1: Left | Right | Left | Right
for col, txt in enumerate(["Left", "Right", "Left", "Right"]):
t.cell(1, col).text = txt
for p in t.cell(1, col).paragraphs:
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
# Row 2: config text
configs = [
(i_la, i_lc),
(i_ra, i_rc),
(f_la, f_lc),
(f_ra, f_rc),
]
for col, (anode, cathode) in enumerate(configs):
t.cell(2, col).text = f"+{anode}\n-{cathode}".strip()
for p in t.cell(2, col).paragraphs:
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
# Row 3: electrode images
png_list = [png_init_l, png_init_r, png_final_l, png_final_r]
for col, png in enumerate(png_list):
cell = t.cell(3, col)
cell.text = ""
p = cell.paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
if png:
run = p.add_run()
try:
run.add_picture(png, width=Inches(1.15))
except Exception:
pass
# Cleanup temp PNG files
for pth in tmp_files:
try:
os.unlink(pth)
except Exception:
pass
doc.add_paragraph("")
def _add_programming_summary(
self, doc: DocumentType, df_all: pd.DataFrame, file_paths: list[str]
) -> None:
"""Add a per-session programming summary table."""
if df_all is None or df_all.empty:
return
if "_source_file" not in df_all.columns:
return
doc.add_heading("Programming Summary", level=1)
headers = [
"Session",
"Configurations",
"Amplitude (L)",
"Amplitude (R)",
"Frequency (L)",
"Frequency (R)",
"Pulse Width (L)",
"Pulse Width (R)",
]
rows_data = []
for fp in file_paths:
basename = os.path.basename(fp)
sub = df_all[df_all["_source_file"] == basename]
if sub.empty:
continue
if "is_initial" in sub.columns:
sub_sess = sub[
pd.to_numeric(sub["is_initial"], errors="coerce")
.fillna(0)
.astype(int)
== 0
]
else:
sub_sess = sub
label = basename.replace("_events.tsv", "").replace(".tsv", "")
sub_n = self._normalize_block_id(sub_sess)
n_configs = (
sub_n["block_ID"].nunique() if "block_ID" in sub_n.columns else 0
)
def _range_str(series, unit="", split_sum: bool = False):
parsed_vals: list[float] = []
for raw in series:
if pd.isna(raw):
continue
text = str(raw).strip()
if not text:
continue
if split_sum and "_" in text:
try:
parts = [
float(p.strip()) for p in text.split("_") if p.strip()
]
if parts:
parsed_vals.append(sum(parts))
continue
except ValueError:
pass
m = re.search(r"[-+]?\d*\.?\d+", text)
if m:
try:
parsed_vals.append(float(m.group(0)))
except ValueError:
pass
vals = pd.Series(parsed_vals, dtype=float).dropna()
if vals.empty:
return "N/A"
mn, mx = vals.min(), vals.max()
if mn == mx:
return f"{mn:.1f}{unit}" if unit else f"{mn:g}"
return f"{mn:.1f}–{mx:.1f}{unit}" if unit else f"{mn:g}–{mx:g}"
amp_l = _range_str(
sub_sess.get("left_amplitude", pd.Series()), " mA", split_sum=True
)
amp_r = _range_str(
sub_sess.get("right_amplitude", pd.Series()), " mA", split_sum=True
)
freq_l = _range_str(sub_sess.get("left_stim_freq", pd.Series()), " Hz")
freq_r = _range_str(sub_sess.get("right_stim_freq", pd.Series()), " Hz")
pw_l = _range_str(sub_sess.get("left_pulse_width", pd.Series()), " µs")
pw_r = _range_str(sub_sess.get("right_pulse_width", pd.Series()), " µs")
rows_data.append(
[label, str(n_configs), amp_l, amp_r, freq_l, freq_r, pw_l, pw_r]
)
if not rows_data:
doc.add_paragraph("No programming data available.")
return
table = doc.add_table(rows=1 + len(rows_data), cols=len(headers))
table.style = "Table Grid"
# Column widths - use full page width
section = doc.sections[0]
page_w = (
int(section.page_width or 0)
- int(section.left_margin or 0)
- int(section.right_margin or 0)
) / 914400
base_w = {
"Session": 2.0,
"Configurations": 0.8,
"Amplitude (L)": 1.2,
"Amplitude (R)": 1.2,
"Frequency (L)": 1.2,
"Frequency (R)": 1.2,
"Pulse Width (L)": 1.2,
"Pulse Width (R)": 1.2,
}
widths = [base_w.get(h, 1.0) for h in headers]
# Adjust the last column to fill remaining space
last_idx = len(headers) - 1
used = sum(w for j, w in enumerate(widths) if j != last_idx)
widths[last_idx] = max(1.0, page_w - used)
w_twips = [Inches(max(0.25, w)) for w in widths]
for row in table.rows:
for idx, cell in enumerate(row.cells):
cell.width = w_twips[idx]
for i, h in enumerate(headers):
cell = table.rows[0].cells[i]
cell.text = h
for p in cell.paragraphs:
for run in p.runs:
run.bold = True
for r_idx, row_vals in enumerate(rows_data):
for c_idx, val in enumerate(row_vals):
table.rows[r_idx + 1].cells[c_idx].text = val
doc.add_paragraph("")
def _add_longitudinal_data_table(
self,
doc: DocumentType,
df_session: pd.DataFrame,
file_paths: list[str] | None = None,
include_chart: bool = True,
include_heading: bool = True,
) -> None:
"""Add the main longitudinal data table with green highlighting.
Args:
doc: Word document
df_session: Session data DataFrame
file_paths: List of file paths for chart generation
include_chart: Whether to include the scales timeline chart
include_heading: Whether to include the "Session Data" heading
"""
if include_heading:
doc.add_heading("Session Data", level=1)
if include_chart and file_paths:
self._add_scales_timeline_chart(doc, df_session, file_paths)
if df_session is None or df_session.empty:
doc.add_paragraph("No session data available.")
return
lateral_df = self._create_lateral_table(df_session)
if lateral_df.empty:
doc.add_paragraph("No session data available.")
return
# Ensure date column is present (populated inside _create_lateral_table)
if "date" not in lateral_df.columns:
lateral_df["date"] = ""
columns_to_exclude = [
"time",
"onset",
"block_ID",
"session_ID",
"source",
"is_initial",
"electrode_model",
"_source_file",
"_global_entry_id",
]
display_cols = [c for c in lateral_df.columns if c not in columns_to_exclude]
lateral_cols = [
"date",
"laterality",
"frequency",
"anode",
"cathode",
"amplitude",
"pulse_width",
]
common_cols = ["group_ID", "scale_name", "scale_value", "notes"]
lateral_cols = [c for c in lateral_cols if c in display_cols]
common_cols = [c for c in common_cols if c in display_cols]
ordered = lateral_cols + common_cols
if not ordered:
doc.add_paragraph("No displayable columns found.")
return
table = doc.add_table(rows=lateral_df.shape[0] + 1, cols=len(ordered))
table.style = "Table Grid"
table.autofit = False
# Column widths
section = doc.sections[0]
page_w = (
int(section.page_width or 0)
- int(section.left_margin or 0)
- int(section.right_margin or 0)
) / 914400
base_w = {
"date": 0.65,
"laterality": 0.25,
"group_ID": 0.35,
"frequency": 0.45,
"anode": 0.45,
"cathode": 0.60,
"amplitude": 0.60,
"pulse_width": 0.50,
"scale_name": 1.00,
"scale_value": 0.55,
}
widths = [base_w.get(c, 0.5) for c in ordered]
if "notes" in ordered:
ni = ordered.index("notes")
used = sum(w for j, w in enumerate(widths) if j != ni)
widths[ni] = max(1.5, page_w - used)
w_twips = [Inches(max(0.25, w)) for w in widths]
for row in table.rows:
for idx, cell in enumerate(row.cells):
cell.width = w_twips[idx]
# Header
for i, col in enumerate(ordered):
cell = table.rows[0].cells[i]
cell.text = self._column_header(col)
for p in cell.paragraphs:
for run in p.runs:
run.bold = True
# Find best / second-best entries
best_ids, second_ids = self._find_best_and_second_best(lateral_df)
prev_entry_id = None
for i, (_, row) in enumerate(lateral_df.iterrows()):
row_cells = table.rows[i + 1].cells
entry_id = row.get("_global_entry_id", None)
if best_ids and entry_id in best_ids:
self._highlight_cells(row_cells, "best")
elif second_ids and entry_id in second_ids:
self._highlight_cells(row_cells, "second")
# Separator between entries
if (
prev_entry_id is not None
and entry_id != prev_entry_id
and row.get("laterality") == "L"
):
for cell in row_cells:
self._set_cell_border_top(cell, sz=24)
prev_entry_id = entry_id
for j, col in enumerate(ordered):
val = row.get(col, "")
cell_text = str(val) if pd.notna(val) else ""
if col in ("frequency", "pulse_width"):
try:
v = float(val)
cell_text = str(int(v)) if v == int(v) else str(v)
except (ValueError, TypeError):
pass
if col in common_cols and row.get("laterality") == "R" and i > 0:
prev_cell = table.rows[i].cells[j]
prev_cell.merge(row_cells[j])
row_cells[j].text = ""
elif col == "cathode" and "_" in cell_text:
# Multi-contact cathode: show stacked with Total label
contacts = cell_text.replace("_", "\n")
row_cells[j].text = contacts + "\nTotal"
elif col == "amplitude" and "_" in cell_text:
# Multi-contact amplitude: show stacked values with total
parts = cell_text.split("_")
try:
# Validate all parts are numbers and calculate total
values = [float(p) for p in parts]
total = sum(values)
total_str = f"{total:.2f}".rstrip("0").rstrip(".")
row_cells[j].text = "\n".join(parts) + f"\n{total_str}"
except (ValueError, TypeError):
row_cells[j].text = cell_text
else:
row_cells[j].text = cell_text
# Legend
self._add_table_legend(doc, best_ids, second_ids)
def _add_scales_timeline_chart(
self,
doc: DocumentType,
df_session: pd.DataFrame,
file_paths: list[str],
) -> None:
"""Add a timeline chart of session scale trends across all files.
X-axis labels: ``{date}_{block_ID}_{run_ID}``
"""
from .report_chart_utils import add_chart_to_doc, build_scales_chart
if df_session is None or df_session.empty:
return
if (
"scale_name" not in df_session.columns
or "scale_value" not in df_session.columns
):
return
scale_data, x_ticks = self._collect_session_scale_data(df_session, file_paths)
if not scale_data:
return
png = build_scales_chart(
scale_data,
self.scale_optimization_prefs,
title="Session Scale Trends",
x_label="Session Block",
y_label="Scale Value",
x_ticks=x_ticks,
rotate_x_ticks=True,
)
add_chart_to_doc(doc, png)
def _add_clinical_scales_timeline_chart(
self,
doc: DocumentType,
df_all: pd.DataFrame,
file_paths: list[str],
) -> None:
"""Add a timeline chart of clinical (baseline) scale trends.
For each source file, takes only the latest block_ID rows with
is_initial == 1 and plots one value per file per scale.
No aggregated General Index line is drawn for clinical scales.
X-axis labels: ``{date}_{run_ID}``
"""
from .report_chart_utils import add_chart_to_doc, build_scales_chart
if df_all is None or df_all.empty:
return
if "is_initial" not in df_all.columns:
return
scale_data, x_ticks = self._collect_clinical_scale_data(df_all, file_paths)
if not scale_data:
return
png = build_scales_chart(
scale_data,
self.clinical_scale_prefs,
title="Clinical Scale Trends",
x_label="Session",
y_label="Scale Value",
x_ticks=x_ticks,
show_general_index=False,
rotate_x_ticks=True,
)
add_chart_to_doc(doc, png)
def _collect_clinical_scale_data(
self,
df_all: pd.DataFrame,
file_paths: list[str],
) -> tuple[dict[str, dict[int, float]], list[tuple[int, str]]]:
"""Collect clinical scale values using the latest block_ID with
``is_initial=1`` per file.
X-tick labels: ``{date}_{run_ID}``
Returns:
(scale_data, x_ticks)
"""
import math as _math
if df_all is None or df_all.empty:
return {}, []
required = {"scale_name", "scale_value", "_source_file", "is_initial"}
if not required.issubset(df_all.columns):
return {}, []
df_clin = df_all.copy()
df_clin["is_initial"] = (
pd.to_numeric(df_clin["is_initial"], errors="coerce").fillna(0).astype(int)
)
df_clin = df_clin[df_clin["is_initial"] == 1]
if df_clin.empty:
return {}, []
if "block_ID" in df_clin.columns:
df_clin["block_ID"] = pd.to_numeric(
df_clin["block_ID"], errors="coerce"
).fillna(0)
else:
df_clin["block_ID"] = 0
# Build ordered source list (files already sorted earliest→latest)
sources = [os.path.basename(fp) for fp in file_paths]
source_idx = {s: i for i, s in enumerate(sources)}
scale_data: dict[str, dict[int, float]] = {}
tick_labels: list[str] = []
for src in sources:
sidx = source_idx[src]
df_src = df_clin[df_clin["_source_file"] == src]
# Build tick label: {date}_{run_ID} - always add tick even if no data
date_str = self._extract_date_from_source(df_all, src)
run_id = self._extract_run_from_filename(src)
tick_labels.append(f"{date_str}_{run_id}" if run_id else date_str)
if df_src.empty:
# No clinical scales for this file - leave data blank
continue
max_bid = df_src["block_ID"].max()
df_latest = df_src[df_src["block_ID"] == max_bid]
for _, row in df_latest.iterrows():
sname = str(row.get("scale_name", "") or "").strip()
sval = str(row.get("scale_value", "") or "").strip()
if not sname or is_session_scale_value_omitted(sval):
continue
try:
val = float(sval)
except ValueError:
continue
if _math.isnan(val):
continue
scale_data.setdefault(sname, {})[sidx] = val
# Return tick labels even if no scale data (shows empty ticks)
x_ticks = [(i, lbl) for i, lbl in enumerate(tick_labels)]
return scale_data, x_ticks
def _collect_session_scale_data(
self,
df_session: pd.DataFrame,
file_paths: list[str],
) -> tuple[dict[str, dict[int, float]], list[tuple[int, str]]]:
"""Collect all session scale values across files, one point per block per file.
X-tick labels: ``{date}_{run_ID}``
Returns:
(scale_data, x_ticks)
"""
import math as _math
if df_session is None or df_session.empty:
return {}, []
if (
"scale_name" not in df_session.columns
or "scale_value" not in df_session.columns
or "_source_file" not in df_session.columns
):
return {}, []
# Ensure block_ID is numeric
df = df_session.copy()
if "block_ID" in df.columns:
df["block_ID"] = pd.to_numeric(df["block_ID"], errors="coerce").fillna(0)
else:
df["block_ID"] = 0
# Build ordered (source, block_ID) pairs as the x-axis
sources = [os.path.basename(fp) for fp in file_paths]
point_keys: list[tuple[str, float]] = [] # (source_file, block_ID)
tick_labels: list[str] = []
for src in sources:
df_src = df[df["_source_file"] == src]
if df_src.empty:
continue
date_str = self._extract_date_from_source(df, src)
run_id = self._extract_run_from_filename(src)
blocks = sorted(df_src["block_ID"].unique())
for i, bid in enumerate(blocks):
point_keys.append((src, bid))
bid_str = str(int(bid)) if bid == int(bid) else str(bid)
if i == 0:
# First block: full label {date}_{run_id}_{block_ID}
parts = [date_str]
if run_id:
parts.append(run_id)
parts.append(bid_str)
tick_labels.append("_".join(parts))
else:
# Subsequent blocks: only block_ID
tick_labels.append(bid_str)
if not point_keys:
return {}, []
key_idx = {k: i for i, k in enumerate(point_keys)}
scale_data: dict[str, dict[int, float]] = {}
for (src, bid), idx in key_idx.items():
df_block = df[(df["_source_file"] == src) & (df["block_ID"] == bid)]
for _, row in df_block.iterrows():
sname = str(row.get("scale_name", "") or "").strip()
sval = str(row.get("scale_value", "") or "").strip()
if not sname or is_session_scale_value_omitted(sval):
continue
try:
val = float(sval)
except ValueError:
continue
if _math.isnan(val):
continue
# If multiple rows for same scale in same block, keep last
scale_data.setdefault(sname, {})[idx] = val
if not scale_data:
return {}, []
x_ticks = [(i, lbl) for i, lbl in enumerate(tick_labels)]
return scale_data, x_ticks
@staticmethod
def _extract_date_from_source(df: pd.DataFrame, source_file: str) -> str:
"""Extract the date string for a given source file."""
sub = df[df["_source_file"] == source_file]
if "date" in sub.columns and not sub.empty:
dates = sub["date"].dropna().unique()
if len(dates) > 0:
return str(sorted(dates)[0])
# Fallback: extract from filename
match = re.search(r"ses-(\d{8})", source_file)
if match:
return match.group(1)
return "unknown"
@staticmethod
def _extract_run_from_filename(filename: str) -> str:
"""Extract run-XX from a BIDS-style filename."""
match = re.search(r"run-(\d+)", filename)
return f"run{match.group(1)}" if match else ""
# ------------------------------------------------------------------
# Data helpers
# ------------------------------------------------------------------
def _create_lateral_table(self, df: pd.DataFrame) -> pd.DataFrame:
"""Create lateral (L/R) table structure similar to SessionExporter."""
if df.empty:
return df
df = self._normalize_block_id(df)
# Sort by source file (chronological order) and block_ID (ascending)
if "_source_file" in df.columns:
df = df.sort_values(by=["_source_file", "block_ID"], ascending=[True, True])
elif "block_ID" in df.columns:
df = df.sort_values(by=["block_ID"], ascending=True)
# Create a global entry id combining source file + block_ID
if "_source_file" in df.columns and "block_ID" in df.columns:
df["_global_entry_id"] = (
df["_source_file"] + "_" + df["block_ID"].astype(str)
)
elif "block_ID" in df.columns:
df["_global_entry_id"] = df["block_ID"].astype(str)
else:
df["_global_entry_id"] = range(len(df))
groups = df.groupby("_global_entry_id", sort=False, dropna=False)
lateral_data = []
for entry_id, block_df in groups:
first = block_df.iloc[0]
# Collect scales (filter out NaN values)
scale_pairs = []
seen = set()
for _, r in block_df.iterrows():
sn = str(r.get("scale_name", "") or "").strip()
sv = str(r.get("scale_value", "") or "").strip()
if not sn or is_session_scale_value_omitted(sv):
continue
if (sn, sv) not in seen:
seen.add((sn, sv))
scale_pairs.append((sn, sv))
combined_sn = "\n".join(p[0] for p in scale_pairs) if scale_pairs else ""
combined_sv = "\n".join(p[1] for p in scale_pairs) if scale_pairs else ""
source_label = (
str(first.get("_source_file", ""))
.replace("_events.tsv", "")
.replace(".tsv", "")
)
date_val = str(first.get("date", "") or "")
common = {
"_global_entry_id": entry_id,
"source": source_label,
"date": date_val,
"program_ID": first.get("program_ID") or first.get("group_ID", ""),
"scale_name": combined_sn,
"scale_value": combined_sv,
"notes": first.get("notes", ""),
}
lat_map = {
"left_stim_freq": "frequency",
"left_cathode": "cathode",
"left_anode": "anode",
"left_amplitude": "amplitude",
"left_pulse_width": "pulse_width",
"right_stim_freq": "frequency",
"right_cathode": "cathode",
"right_anode": "anode",
"right_amplitude": "amplitude",
"right_pulse_width": "pulse_width",
}
left = dict(common)
right = dict(common)
left["laterality"] = "L"
right["laterality"] = "R"
for col, generic in lat_map.items():
if col.startswith("left_"):
left[generic] = first.get(col, "")
else:
right[generic] = first.get(col, "")
lateral_data.append(left)
lateral_data.append(right)
return pd.DataFrame(lateral_data)
def _find_best_and_second_best(self, lateral_df: pd.DataFrame) -> tuple:
"""Find entry IDs with the best and second-best scores."""
if lateral_df is None or lateral_df.empty:
return [], []
if "_global_entry_id" not in lateral_df.columns:
return [], []
if (
"scale_name" not in lateral_df.columns
or "scale_value" not in lateral_df.columns
):
return [], []
try:
pref_lookup = {}
for pref in self.scale_optimization_prefs:
if len(pref) >= 5:
name, _, _, mode, custom_val = pref
pref_lookup[name.strip().lower()] = (mode, custom_val)
df_l = lateral_df[lateral_df.get("laterality", "") == "L"].copy()
if df_l.empty:
df_l = lateral_df.drop_duplicates(subset=["_global_entry_id"]).copy()
scores = {}
for _, row in df_l.iterrows():
eid = row.get("_global_entry_id")
if eid is None:
continue
names = str(row.get("scale_name", "") or "").split("\n")
values = str(row.get("scale_value", "") or "").split("\n")
total = 0.0
has_val = False
import math as _math
for i, vl in enumerate(values):
vl = vl.strip()
if not vl:
continue
try:
val = float(vl)
except ValueError:
continue
if _math.isnan(val):
continue
sn = names[i].strip().lower() if i < len(names) else ""
mode, cv = pref_lookup.get(sn, ("min", ""))
if mode == "ignore":
continue
has_val = True
if mode in ("low", "min"):
total += val
elif mode in ("high", "max"):
total -= val
elif mode == "custom":
try:
total += abs(val - float(cv))
except ValueError:
total += val
if has_val:
scores[eid] = total
if not scores:
return [], []
unique = sorted(set(scores.values()))
best = [eid for eid, s in scores.items() if s == unique[0]]
second = (
[eid for eid, s in scores.items() if s == unique[1]]
if len(unique) > 1
else []
)
return best, second
except Exception:
return [], []
# ------------------------------------------------------------------
# Formatting / utility helpers
# ------------------------------------------------------------------
@staticmethod
def _get_manufacturer_for_model(model_name: str) -> str:
"""Return the manufacturer string for a given electrode model name."""
if not model_name:
return ""
for manufacturer, models in (MANUFACTURERS or {}).items():
try:
if model_name in models:
return str(manufacturer)
except Exception:
continue
return ""
def _render_electrode_png(
self,
model_name: str,
anode_text: str,
cathode_text: str,
target_size_px: tuple = (440, 900),
) -> str | None:
"""Render electrode configuration to a temporary PNG file."""
try:
from PySide6.QtGui import QColor as _QColor
from PySide6.QtGui import QPainter, QPixmap
from ..models import ElectrodeCanvas
model = ELECTRODE_MODELS.get(model_name)
if not model:
return None
canvas = ElectrodeCanvas()
canvas.set_model(model)
canvas.resize(*target_size_px)
try:
canvas.set_export_mode(True)
except Exception:
pass
# Apply contact states
canvas.contact_states.clear()
canvas.case_state = ContactState.OFF
def apply_tokens(text: str, state: int) -> None:
if not text:
return
for token in str(text).split("_"):
token = token.strip()
if not token:
continue
if token == "case":
canvas.case_state = state
continue
if token.startswith("E") and len(token) >= 2:
try:
if token[-1].isalpha():
idx = int(token[1:-1])
seg_map = {"a": 0, "b": 1, "c": 2}
seg_char = token[-1].lower()
if seg_char in seg_map:
canvas.contact_states[(idx, seg_map[seg_char])] = (
state
)
else:
idx = int(token[1:])
if model.is_directional:
for seg in range(3):
canvas.contact_states[(idx, seg)] = state
else:
canvas.contact_states[(idx, 0)] = state
except Exception:
continue
apply_tokens(anode_text, ContactState.ANODIC)
apply_tokens(cathode_text, ContactState.CATHODIC)
canvas.update()
# Render with white background
original_paint = canvas.paintEvent
def white_bg_paint(event):
painter = QPainter(canvas)
painter.fillRect(canvas.rect(), Qt.GlobalColor.white)
original_paint(event)
canvas.paintEvent = white_bg_paint # type: ignore[assignment] # ty: ignore[invalid-assignment]
pixmap = QPixmap(canvas.size())
pixmap.fill(Qt.GlobalColor.white)
canvas.render(pixmap)
# Crop white borders
image = pixmap.toImage()
white_rgb = _QColor(Qt.GlobalColor.white).rgb()
left, top, right, bottom = image.width(), image.height(), 0, 0
for y in range(image.height()):
for x in range(image.width()):
if image.pixel(x, y) != white_rgb:
left = min(left, x)
top = min(top, y)
right = max(right, x)
bottom = max(bottom, y)
if right > left and bottom > top:
margin = 20
left = max(0, left - margin)
top = max(0, top - margin)
right = min(image.width() - 1, right + margin)
bottom = min(image.height() - 1, bottom + margin)
cropped = pixmap.copy(left, top, right - left + 1, bottom - top + 1)
else:
cropped = pixmap
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".png")
tmp.close()
cropped.save(tmp.name, "PNG")
return tmp.name
except Exception:
return None
@staticmethod
def _pick_latest_row(df: pd.DataFrame):
"""Return the row with the highest block_ID, or the last row if unavailable."""
if df is None or df.empty:
return None
for col in ("block_ID", "block_id", "blockId", "blockID"):
if col in df.columns:
try:
numeric = pd.to_numeric(df[col], errors="coerce")
max_val = numeric.max()
if pd.notna(max_val):
return df.loc[numeric == max_val].iloc[-1]
except Exception:
pass
return df.iloc[-1]
@staticmethod
def _normalize_block_id(df: pd.DataFrame) -> pd.DataFrame:
from .tsv_columns import normalize_block_id_dataframe
normalized = normalize_block_id_dataframe(df)
return df if normalized is None else normalized
@staticmethod
def _column_header(col: str) -> str:
m = {
"source": "#",
"date": "Date",
"scale_name": PLACEHOLDERS.get("scale_name", "Scale"),
"scale_value": PLACEHOLDERS.get("scale_value", "Value"),
"frequency": PLACEHOLDERS.get("frequency", "Freq"),
"anode": "+",
"cathode": "-",
"amplitude": PLACEHOLDERS.get("amplitude", "Amp"),
"pulse_width": PLACEHOLDERS.get("pulse_width", "PW"),
"group_ID": "Grp",
"laterality": "",
}
return m.get(col, col.replace("_", " ").title())
@staticmethod
def _extract_patient_id(file_paths: list[str]) -> str:
for fp in file_paths:
m = re.search(r"sub-([^_]+)", os.path.basename(fp))
if m:
return m.group(1)
return ""
@staticmethod
def _generate_filename(file_paths: list[str], ext: str) -> str:
today = datetime.now().astimezone().strftime("%Y%m%d")
pid = LongitudinalExporter._extract_patient_id(file_paths)
if pid:
return f"sub-{pid}_longitudinal-report_{today}{ext}"
return f"longitudinal-report_{today}{ext}"
def _highlight_cells(self, row_cells, intensity: str = "best") -> None:
color = "96D2A0" if intensity == "best" else "C8EBCD"
for cell in row_cells:
try:
shd = OxmlElement("w:shd")
shd.set(qn("w:fill"), color)
cell._tc.get_or_add_tcPr().append(shd)
except Exception:
pass
@staticmethod
def _set_cell_border_top(cell, sz=12):
try:
tcPr = cell._tc.get_or_add_tcPr() # noqa: N806
borders = OxmlElement("w:tcBorders")
top = OxmlElement("w:top")
top.set(qn("w:val"), "single")
top.set(qn("w:sz"), str(sz))
top.set(qn("w:space"), "0")
top.set(qn("w:color"), "000000")
borders.append(top)
tcPr.append(borders)
except Exception:
pass
def _add_table_legend(
self, doc: DocumentType, best_ids: list, second_ids: list
) -> None:
if not best_ids and not second_ids:
return
doc.add_paragraph()
legend = doc.add_paragraph()
legend.add_run("Legend: ").bold = True
if best_ids:
r = legend.add_run("■ ")
r.font.color.rgb = RGBColor(0x96, 0xD2, 0xA0)
legend.add_run("Optimal entry ")
if second_ids:
r = legend.add_run("■ ")
r.font.color.rgb = RGBColor(0xC8, 0xEB, 0xCD)
legend.add_run("Second-best entry")
if self.scale_optimization_prefs:
tp = doc.add_paragraph()
tp.add_run("Scale targets: ").bold = True
parts = []
for pref in self.scale_optimization_prefs:
if len(pref) >= 5:
name, smin, smax, mode, cv = pref
if mode == "ignore":
continue
elif mode == "min":
parts.append(f"{name}: min")
elif mode == "max":
parts.append(f"{name}: max")
elif mode == "custom":
parts.append(f"{name}: {cv}")
if parts:
tp.add_run("; ".join(parts))
for run in tp.runs:
run.font.size = Pt(9)
disc = doc.add_paragraph()
dr = disc.add_run(
"Note: The highlighted rows are derived exclusively from the recorded "
"session scale values and represent a computational ranking intended "
"solely as a reference. This color-coded indication does not constitute "
"clinical guidance."
)
dr.font.size = Pt(9)
dr.font.italic = True
def _show_transient_message(
self, parent, title: str, text: str, msecs: int = 2000
) -> None:
msg = QMessageBox(parent)
msg.setIcon(QMessageBox.Icon.Information)
msg.setWindowTitle(title)
msg.setText(text)
msg.setStandardButtons(QMessageBox.StandardButton.NoButton)
msg.setWindowModality(Qt.WindowModality.NonModal)
msg.show()
timer = QTimer(msg)
timer.setSingleShot(True)
def _close():
try:
msg.accept()
except Exception:
pass
timer.timeout.connect(_close)
timer.start(max(0, int(msecs)))
def _convert_docx_to_pdf(self, docx_path: str, pdf_path: str) -> None:
"""Convert Word → PDF using the same strategy as SessionExporter."""
import shutil
import subprocess
errors = []
try:
from docx2pdf import convert as _convert
_convert(docx_path, pdf_path)
if os.path.exists(pdf_path):
return
except Exception as e:
errors.append(f"docx2pdf: {e}")
try:
abs_d = os.path.abspath(docx_path).replace("'", "''")
abs_p = os.path.abspath(pdf_path).replace("'", "''")
ps = (
"$w = New-Object -ComObject Word.Application; "
"$w.Visible = $false; "
f"$d = $w.Documents.Open('{abs_d}'); "
f"$d.SaveAs2('{abs_p}', 17); "
"$d.Close(); $w.Quit()"
)
subprocess.run(
["powershell", "-NoProfile", "-Command", ps],
check=True,
capture_output=True,
timeout=60,
)
if os.path.exists(pdf_path):
return
except Exception as e:
errors.append(f"Word COM: {e}")
soffice = shutil.which("soffice")
if soffice:
try:
out_dir = os.path.dirname(os.path.abspath(pdf_path))
subprocess.run(
[
soffice,
"--headless",
"--convert-to",
"pdf",
"--outdir",
out_dir,
os.path.abspath(docx_path),
],
check=True,
capture_output=True,
timeout=60,
)
lo_out = os.path.join(
out_dir, os.path.splitext(os.path.basename(docx_path))[0] + ".pdf"
)
if lo_out != pdf_path and os.path.exists(lo_out):
shutil.move(lo_out, pdf_path)
if os.path.exists(pdf_path):
return
except Exception as e:
errors.append(f"LibreOffice: {e}")
raise RuntimeError(
"Could not convert to PDF:\n"
+ "\n".join(errors)
+ "\n\nPlease export to Word and convert manually."
)