Coverage for /home/runner/work/bijux-cli/bijux-cli/src/bijux_cli/services/history.py: 100%

258 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-19 23:36 +0000

1# SPDX-License-Identifier: MIT 

2# Copyright © 2025 Bijan Mousavi 

3 

4"""Provides a persistent, cross-process safe command history service. 

5 

6This module defines the `History` class, a concrete implementation of the 

7`HistoryProtocol`. It provides a tolerant and robust store for CLI 

8invocation events with several key design features: 

9 

10 * **Persistence:** All history is saved to a single JSON array in a 

11 per-user file. 

12 * **Tolerance:** The service is resilient to empty, corrupt, or partially 

13 formed history files. If a file is unreadable, it is treated as empty 

14 and will be overwritten on the next successful write. 

15 * **Cross-Process Safety:** On POSIX systems, it uses `fcntl.flock` on a 

16 sidecar lock file to safely coordinate writes from multiple concurrent 

17 CLI processes. On other systems, it falls back to a thread lock. 

18 * **Atomic Writes:** All changes are written to a temporary file which is 

19 then atomically moved into place, preventing data corruption from 

20 interrupted writes. 

21 * **Memory Management:** The in-memory list of events is capped, and the 

22 on-disk file is trimmed to a smaller size to prevent unbounded growth. 

23 * **Simplicity:** The service intentionally avoids complex features like 

24 schema migrations. Unreadable state is discarded rather than repaired. 

25""" 

26 

27from __future__ import annotations 

28 

29from collections.abc import Iterator, MutableSequence, Sequence 

30from contextlib import contextmanager, suppress 

31import errno 

32import json 

33import os 

34from pathlib import Path 

35import sys 

36import tempfile 

37import threading 

38import time 

39from typing import Any, Final 

40import unicodedata 

41 

42from injector import inject 

43 

44from bijux_cli.contracts import HistoryProtocol 

45from bijux_cli.core.paths import HISTORY_FILE 

46from bijux_cli.infra.observability import Observability 

47from bijux_cli.infra.telemetry import LoggingTelemetry 

48 

49_MAX_IN_MEMORY: Final[int] = 10_000 

50"""Maximum number of entries retained in memory (and considered for writes).""" 

51_TRIM_THRESHOLD: Final[int] = 1_000 

52"""When persisting, keep at most this many most-recent events in the file.""" 

53_ENOSPC_ERRORS = {errno.ENOSPC, errno.EDQUOT} 

54"""OS error codes indicating the filesystem is full or quota exceeded.""" 

55_FILE_LOCK = threading.Lock() 

56"""Fallback lock for non-POSIX platforms when `fcntl` is unavailable.""" 

57fcntl: Any 

58try: 

59 import fcntl 

60except ImportError: # pragma: no cover 

61 fcntl = None 

62 

63 

64def _now() -> float: 

65 """Returns the current UNIX time with sub-second precision. 

66 

67 Returns: 

68 float: The current time in seconds since the epoch. 

69 """ 

70 return time.time() 

71 

72 

73def _ascii_clean(text: str) -> str: 

74 """Strips all diacritics and non-printable characters from a string. 

75 

76 Args: 

77 text (str): The input text to clean. 

78 

79 Returns: 

80 str: An ASCII-only version of the text. 

81 """ 

82 normalized = unicodedata.normalize("NFKD", text) 

83 without_marks = "".join(ch for ch in normalized if unicodedata.category(ch) != "Mn") 

84 return "".join(ch for ch in without_marks if 0x20 <= ord(ch) <= 0x7E) 

85 

86 

87def _lock_file_for(fp: Path) -> Path: 

88 """Returns the path for the sidecar lock file associated with `fp`. 

89 

90 Args: 

91 fp (Path): The primary file path. 

92 

93 Returns: 

94 Path: The corresponding lock file path (e.g., `file.lock`). 

95 """ 

96 return fp.with_name(fp.name + ".lock") 

97 

98 

99@contextmanager 

100def _interprocess_lock(fp: Path) -> Iterator[None]: 

101 """Provides a cross-process exclusive lock for a file path. 

102 

103 On POSIX systems, this uses `fcntl.flock` on a sidecar file to serialize 

104 access across different processes. On other platforms, it falls back to a 

105 `threading.Lock`, which only provides safety within a single process. 

106 

107 Args: 

108 fp (Path): The path to the file that requires locked access. 

109 

110 Yields: 

111 None: Yields control to the `with` block while the lock is held. 

112 """ 

113 if fcntl is None: 

114 with _FILE_LOCK: 

115 yield 

116 return 

117 lock_fp = _lock_file_for(fp) 

118 lock_fp.parent.mkdir(parents=True, exist_ok=True) 

119 f = lock_fp.open("a+") 

120 try: 

121 fcntl.flock(f.fileno(), fcntl.LOCK_EX) 

122 yield 

123 finally: 

124 with suppress(Exception): 

125 fcntl.flock(f.fileno(), fcntl.LOCK_UN) 

126 f.close() 

127 

128 

129def _maybe_simulate_disk_full() -> None: 

130 """Raises an `ENOSPC` error if a test environment variable is set.""" 

131 if os.getenv("BIJUXCLI_TEST_DISK_FULL") == "1": 

132 raise OSError(errno.ENOSPC, "No space left on device") 

133 

134 

135def _atomic_write_json(fp: Path, events: list[dict[str, Any]]) -> None: 

136 """Writes a list of events to a file atomically. 

137 

138 The data is written to a temporary file in the same directory and then 

139 renamed to the final destination, which is an atomic operation on POSIX 

140 systems. 

141 

142 Args: 

143 fp (Path): The destination file path. 

144 events (list[dict[str, Any]]): The list of history entries to write. 

145 

146 Raises: 

147 PermissionError: If the directory or file is not writable. 

148 OSError: For other filesystem errors, such as a full disk. 

149 """ 

150 _maybe_simulate_disk_full() 

151 fp.parent.mkdir(parents=True, exist_ok=True) 

152 to_write = events[-_TRIM_THRESHOLD:] if events else [] 

153 payload = ( 

154 "[]\n" if not to_write else json.dumps(to_write, ensure_ascii=False, indent=2) 

155 ) 

156 with tempfile.NamedTemporaryFile( 

157 "w", delete=False, dir=fp.parent, prefix=f".{fp.name}.", encoding="utf-8" 

158 ) as temp_file: 

159 temp_file.write(payload) 

160 temp_file.flush() 

161 os.fsync(temp_file.fileno()) 

162 temp_fp = Path(temp_file.name) 

163 os.replace(temp_fp, fp) 

164 

165 

166class History(HistoryProtocol): 

167 """Manages a persistent history of CLI command invocations. 

168 

169 This service maintains an in-memory list of command events and synchronizes 

170 it with a persisted JSON file. It is designed to be tolerant of file 

171 corruption and safe for concurrent use by multiple CLI processes. 

172 

173 Mutating operations (`add`, `clear`, `import_`) acquire a cross-process lock 

174 before modifying the file to prevent lost updates and race conditions. The 

175 sequence is always: lock, reload from disk, apply change in memory, write 

176 atomically, and release lock. 

177 

178 Attributes: 

179 _tel (LoggingTelemetry): The telemetry service for emitting events. 

180 _obs (Observability): The logging service for operational errors. 

181 _explicit_path (Path | None): A specific path to the history file, if 

182 provided during initialization. 

183 _events (list): The in-memory cache of history event dictionaries. 

184 _load_error (str | None): A message describing the last error that 

185 occurred while trying to load the history file, if any. 

186 """ 

187 

188 @inject 

189 def __init__( 

190 self, 

191 telemetry: LoggingTelemetry, 

192 observability: Observability, 

193 history_path: Path | None = None, 

194 ) -> None: 

195 """Initializes the History service. 

196 

197 Args: 

198 telemetry (LoggingTelemetry): The telemetry service. 

199 observability (Observability): The logging service. 

200 history_path (Path | None): An optional, explicit path to the 

201 history file. If None, a default path will be used. 

202 """ 

203 self._tel = telemetry 

204 self._obs = observability 

205 self._explicit_path = Path(history_path) if history_path else None 

206 self._events: list[dict[str, Any]] = [] 

207 self._load_error: str | None = None 

208 

209 def _get_history_path(self) -> Path: 

210 """Returns the resolved, absolute path to the history file. 

211 

212 The path is determined in the following order of precedence: 

213 1. An explicit path provided to the constructor. 

214 2. The `BIJUXCLI_HISTORY_FILE` environment variable. 

215 3. A `.bijux_history` file in the same directory as the `BIJUXCLI_CONFIG` file. 

216 4. The default `~/.bijux/.history` file. 

217 

218 Returns: 

219 Path: The absolute path to the history file. 

220 """ 

221 if self._explicit_path: 

222 return self._explicit_path 

223 env_file = os.environ.get("BIJUXCLI_HISTORY_FILE") 

224 if env_file: 

225 return Path(env_file).expanduser() 

226 cfg = os.environ.get("BIJUXCLI_CONFIG") 

227 if cfg: 

228 cfg_path = Path(cfg).expanduser() 

229 return cfg_path.parent / ".bijux_history" 

230 return HISTORY_FILE 

231 

232 def _reload(self) -> None: 

233 """Refreshes the in-memory state from the history file on disk. 

234 

235 This method is tolerant of errors. If the file is missing, empty, or 

236 corrupt, the in-memory list is cleared and an error state is noted, 

237 but an exception is not raised. 

238 """ 

239 self._load_error = None 

240 fp = self._get_history_path() 

241 try: 

242 if not fp.exists(): 

243 self._events = [] 

244 return 

245 raw = fp.read_text(encoding="utf-8", errors="ignore").strip() 

246 if not raw: 

247 self._events = [] 

248 return 

249 data = json.loads(raw) 

250 if not isinstance(data, list): 

251 self._events = [] 

252 self._load_error = ( 

253 f"Unexpected history file format (not JSON array): {fp}" 

254 ) 

255 return 

256 evs: list[dict[str, Any]] = [] 

257 for item in data: 

258 if not isinstance(item, dict): 

259 continue 

260 e = dict(item) 

261 e["command"] = _ascii_clean(str(e.get("command", ""))) 

262 evs.append(e) 

263 if len(evs) > _MAX_IN_MEMORY: 

264 evs = evs[-_MAX_IN_MEMORY:] 

265 self._events = evs 

266 except Exception as exc: 

267 self._load_error = f"History file corrupted or unreadable: {exc}" 

268 self._obs.log("error", self._load_error, extra={"path": str(fp)}) 

269 self._events = [] 

270 

271 def _dump(self) -> None: 

272 """Persists the current in-memory events to disk atomically.""" 

273 fp = self._get_history_path() 

274 with _interprocess_lock(fp): 

275 self._load_error = None 

276 try: 

277 _atomic_write_json(fp, self._events) 

278 except PermissionError as exc: 

279 self._handle_dump_error("write-permission", exc, fp) 

280 raise 

281 except OSError as exc: 

282 if exc.errno in _ENOSPC_ERRORS: 

283 self._handle_dump_error("persist", exc, fp) 

284 raise 

285 raise 

286 

287 def _handle_dump_error(self, kind: str, exc: OSError, fp: Path) -> None: 

288 """Logs and prints an error encountered during a file write operation. 

289 

290 Args: 

291 kind (str): A short code classifying the error (e.g., "persist"). 

292 exc (OSError): The originating exception. 

293 fp (Path): The path of the file that was being written to. 

294 """ 

295 msg = f"History {kind} error: {exc}" 

296 self._obs.log("error", msg, extra={"path": str(fp)}) 

297 self._load_error = msg 

298 print(msg, file=sys.stderr) 

299 

300 def add( 

301 self, 

302 command: str, 

303 *, 

304 params: Sequence[str] | None = None, 

305 success: bool | None = True, 

306 return_code: int | None = 0, 

307 duration_ms: float | None = None, 

308 ) -> None: 

309 """Appends a new command invocation to the history. 

310 

311 This operation is cross-process safe. It acquires a lock, reloads the 

312 latest history from disk, appends the new entry, and writes the 

313 updated history back atomically. Errors are logged but suppressed to 

314 allow the originating command to complete its execution. 

315 

316 Args: 

317 command (str): The command name (ASCII characters are enforced). 

318 params (Sequence[str] | None): A list of parameters and flags. 

319 success (bool | None): Whether the command succeeded. 

320 return_code (int | None): The exit code of the command. 

321 duration_ms (float | None): The command's duration in milliseconds. 

322 """ 

323 fp = self._get_history_path() 

324 entry = { 

325 "command": _ascii_clean(command), 

326 "params": list(params or []), 

327 "timestamp": _now(), 

328 "success": bool(success), 

329 "return_code": return_code if return_code is not None else 0, 

330 "duration_ms": float(duration_ms) if duration_ms is not None else None, 

331 } 

332 with _interprocess_lock(fp): 

333 self._reload() 

334 if self._load_error: 

335 msg = f"[error] Could not load command history: {self._load_error}" 

336 self._obs.log("error", msg, extra={"path": str(fp)}) 

337 print(msg, file=sys.stderr) 

338 self._events = [] 

339 self._events.append(entry) 

340 try: 

341 _atomic_write_json(fp, self._events) 

342 self._load_error = None 

343 except PermissionError as exc: 

344 msg = f"[error] Could not record command history: {exc}" 

345 self._obs.log("error", msg, extra={"path": str(fp)}) 

346 print(msg, file=sys.stderr) 

347 self._load_error = msg 

348 return 

349 except OSError as exc: 

350 if exc.errno in _ENOSPC_ERRORS: 

351 msg = f"[error] Could not record command history: {exc}" 

352 self._obs.log("error", msg, extra={"path": str(fp)}) 

353 print(msg, file=sys.stderr) 

354 self._load_error = msg 

355 return 

356 msg = f"[error] Could not record command history: {exc}" 

357 self._obs.log("error", msg, extra={"path": str(fp)}) 

358 print(msg, file=sys.stderr) 

359 self._load_error = msg 

360 return 

361 with suppress(Exception): 

362 self._tel.event("history_event_added", {"command": entry["command"]}) 

363 

364 def list( 

365 self, 

366 *, 

367 limit: int | None = 20, 

368 group_by: str | None = None, 

369 filter_cmd: str | None = None, 

370 sort: str | None = None, 

371 ) -> list[dict[str, Any]]: 

372 """Returns a view of the command history, with optional transformations. 

373 

374 This is a read-only operation and does not acquire a cross-process lock, 

375 meaning it may not reflect writes from concurrent processes. 

376 

377 Args: 

378 limit (int | None): The maximum number of entries to return. A value 

379 of 0 returns an empty list. 

380 group_by (str | None): If provided, returns a grouped summary. 

381 filter_cmd (str | None): If provided, returns only entries whose 

382 command contains this case-sensitive substring. 

383 sort (str | None): If 'timestamp', sorts entries by timestamp. 

384 

385 Returns: 

386 list[dict[str, Any]]: A list of history entries or grouped summaries. 

387 

388 Raises: 

389 RuntimeError: If the history file is corrupt. 

390 """ 

391 self._reload() 

392 fp = self._get_history_path() 

393 try: 

394 writable = os.access(fp.parent, os.W_OK) 

395 except Exception: 

396 writable = True 

397 if not writable: 

398 msg = f"Permission denied for history directory: {fp.parent}" 

399 self._obs.log("error", msg, extra={"path": str(fp)}) 

400 print(msg, file=sys.stderr) 

401 if self._load_error: 

402 raise RuntimeError(self._load_error) 

403 if limit == 0: 

404 return [] 

405 entries: list[dict[str, Any]] = list(self._events) 

406 if filter_cmd: 

407 needle = str(filter_cmd) 

408 entries = [e for e in entries if needle in (e.get("command") or "")] 

409 if sort == "timestamp": 

410 entries.sort(key=lambda e: e.get("timestamp", 0)) 

411 if group_by: 

412 grouped: dict[Any, MutableSequence[dict[str, Any]]] = {} 

413 for e in entries: 

414 grouped.setdefault(e.get(group_by, "unknown"), []).append(e) 

415 summary = [ 

416 { 

417 "group": k, 

418 "count": len(v), 

419 "last_run": max((x.get("timestamp", 0) for x in v), default=0), 

420 } 

421 for k, v in grouped.items() 

422 ] 

423 return summary[:limit] if (limit and limit > 0) else summary 

424 if limit and limit > 0: 

425 entries = entries[-limit:] 

426 return entries 

427 

428 def clear(self) -> None: 

429 """Erases all persisted history. 

430 

431 This operation is cross-process safe and atomic. 

432 

433 Raises: 

434 PermissionError: If the history file or directory is not writable. 

435 OSError: For other filesystem-related failures. 

436 """ 

437 fp = self._get_history_path() 

438 try: 

439 with _interprocess_lock(fp): 

440 self._events = [] 

441 _atomic_write_json(fp, self._events) 

442 self._load_error = None 

443 self._tel.event("history_cleared", {}) 

444 except Exception as exc: 

445 msg = f"History clear failed: {exc}" 

446 self._obs.log("error", msg, extra={"path": str(fp)}) 

447 self._load_error = msg 

448 raise 

449 finally: 

450 self._reload() 

451 

452 def flush(self) -> None: 

453 """Persists all in-memory history data to disk.""" 

454 self._dump() 

455 

456 def export(self, path: Path) -> None: 

457 """Exports the current history to a file as a JSON array. 

458 

459 This operation is a read-only snapshot and does not lock the source file. 

460 

461 Args: 

462 path (Path): The destination file path. 

463 

464 Raises: 

465 RuntimeError: On I/O failures. 

466 """ 

467 self._reload() 

468 try: 

469 path = path.expanduser() 

470 path.parent.mkdir(parents=True, exist_ok=True) 

471 text = json.dumps(self._events, ensure_ascii=False, indent=2) + "\n" 

472 path.write_text(text, encoding="utf-8") 

473 except Exception as exc: 

474 raise RuntimeError(f"Failed exporting history: {exc}") from exc 

475 

476 def import_(self, path: Path) -> None: 

477 """Imports history entries from a file, merging with current history. 

478 

479 This operation is cross-process safe and atomic. 

480 

481 Args: 

482 path (Path): The source file path containing a JSON array of entries. 

483 

484 Raises: 

485 RuntimeError: On I/O or parsing failures. 

486 """ 

487 fp = self._get_history_path() 

488 try: 

489 with _interprocess_lock(fp): 

490 self._reload() 

491 if self._load_error: 

492 raise RuntimeError(self._load_error) 

493 path = path.expanduser() 

494 if not path.exists(): 

495 raise RuntimeError(f"Import file not found: {path}") 

496 raw = path.read_text(encoding="utf-8") 

497 data = json.loads(raw) 

498 if not isinstance(data, list): 

499 raise RuntimeError( 

500 f"Invalid import format (not JSON array): {path}" 

501 ) 

502 imported: list[dict[str, Any]] = [] 

503 for item in data: 

504 if not isinstance(item, dict): 

505 continue 

506 e = dict(item) 

507 e["command"] = _ascii_clean(str(e.get("command", ""))) 

508 if "timestamp" not in e: 

509 e["timestamp"] = _now() 

510 imported.append(e) 

511 self._events.extend(imported) 

512 if len(self._events) > _MAX_IN_MEMORY: 

513 self._events = self._events[-_MAX_IN_MEMORY:] 

514 _atomic_write_json(fp, self._events) 

515 self._load_error = None 

516 with suppress(Exception): 

517 self._tel.event("history_imported", {"count": len(imported)}) 

518 

519 except Exception as exc: 

520 msg = f"History import failed: {exc}" 

521 self._obs.log( 

522 "error", msg, extra={"import_path": str(path), "history_path": str(fp)} 

523 ) 

524 raise RuntimeError(msg) from exc 

525 

526 

527__all__ = ["History"]