Coverage for  / home / runner / work / bijux-cli / bijux-cli / src / bijux_cli / services / history / __init__.py: 100%

258 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-01-26 17:59 +0000

1# SPDX-License-Identifier: Apache-2.0 

2# Copyright © 2025 Bijan Mousavi 

3 

4"""Provides a persistent, cross-process safe command history service. 

5 

6This module defines the `History` class, a concrete implementation of the 

7`HistoryProtocol`. It provides a tolerant and robust store for CLI 

8invocation events with several key design features: 

9 

10 * **Persistence:** All history is saved to a single JSON array in a 

11 per-user file. 

12 * **Tolerance:** The service is resilient to empty, corrupt, or partially 

13 formed history files. If a file is unreadable, it is treated as empty 

14 and will be overwritten on the next successful write. 

15 * **Cross-Process Safety:** On POSIX systems, it uses `fcntl.flock` on a 

16 sidecar lock file to safely coordinate writes from multiple concurrent 

17 CLI processes. On other systems, it falls back to a thread lock. 

18 * **Atomic Writes:** All changes are written to a temporary file which is 

19 then atomically moved into place, preventing data corruption from 

20 interrupted writes. 

21 * **Memory Management:** The in-memory list of events is capped, and the 

22 on-disk file is trimmed to a smaller size to prevent unbounded growth. 

23 * **Simplicity:** The service intentionally avoids complex features like 

24 schema migrations. Unreadable state is discarded rather than repaired. 

25""" 

26 

27from __future__ import annotations 

28 

29from collections.abc import Iterator, MutableSequence, Sequence 

30from contextlib import contextmanager, suppress 

31import errno 

32import json 

33import os 

34from pathlib import Path 

35import sys 

36import tempfile 

37import threading 

38import time 

39from typing import Any, Final 

40import unicodedata 

41 

42from injector import inject 

43 

44from bijux_cli.infra.paths import HISTORY_FILE 

45from bijux_cli.services.contracts import TelemetryProtocol 

46from bijux_cli.services.history.contracts import HistoryProtocol 

47from bijux_cli.services.logging.observability import Observability 

48 

49_MAX_IN_MEMORY: Final[int] = 10_000 

50"""Maximum number of entries retained in memory (and considered for writes).""" 

51_TRIM_THRESHOLD: Final[int] = 1_000 

52"""When persisting, keep at most this many most-recent events in the file.""" 

53_ENOSPC_ERRORS = {errno.ENOSPC, errno.EDQUOT} 

54"""OS error codes indicating the filesystem is full or quota exceeded.""" 

55_FILE_LOCK = threading.Lock() 

56"""Fallback lock for non-POSIX platforms when `fcntl` is unavailable.""" 

57fcntl: Any 

58try: 

59 import fcntl 

60except ImportError: # pragma: no cover 

61 fcntl = None 

62 

63 

64def _now() -> float: 

65 """Returns the current UNIX time with sub-second precision. 

66 

67 Returns: 

68 float: The current time in seconds since the epoch. 

69 """ 

70 return time.time() 

71 

72 

73def _ascii_clean(text: str) -> str: 

74 """Strips all diacritics and non-printable characters from a string. 

75 

76 Args: 

77 text (str): The input text to clean. 

78 

79 Returns: 

80 str: An ASCII-only version of the text. 

81 """ 

82 normalized = unicodedata.normalize("NFKD", text) 

83 without_marks = "".join(ch for ch in normalized if unicodedata.category(ch) != "Mn") 

84 return "".join(ch for ch in without_marks if 0x20 <= ord(ch) <= 0x7E) 

85 

86 

87def _lock_file_for(fp: Path) -> Path: 

88 """Returns the path for the sidecar lock file associated with `fp`. 

89 

90 Args: 

91 fp (Path): The primary file path. 

92 

93 Returns: 

94 Path: The corresponding lock file path (e.g., `file.lock`). 

95 """ 

96 return fp.with_name(fp.name + ".lock") 

97 

98 

99@contextmanager 

100def _interprocess_lock(fp: Path) -> Iterator[None]: 

101 """Provides a cross-process exclusive lock for a file path. 

102 

103 On POSIX systems, this uses `fcntl.flock` on a sidecar file to serialize 

104 access across different processes. On other platforms, it falls back to a 

105 `threading.Lock`, which only provides safety within a single process. 

106 

107 Args: 

108 fp (Path): The path to the file that requires locked access. 

109 

110 Yields: 

111 None: Yields control to the `with` block while the lock is held. 

112 """ 

113 if fcntl is None: 

114 with _FILE_LOCK: 

115 yield 

116 return 

117 lock_fp = _lock_file_for(fp) 

118 lock_fp.parent.mkdir(parents=True, exist_ok=True) 

119 f = lock_fp.open("a+") 

120 try: 

121 fcntl.flock(f.fileno(), fcntl.LOCK_EX) 

122 yield 

123 finally: 

124 with suppress(Exception): 

125 fcntl.flock(f.fileno(), fcntl.LOCK_UN) 

126 f.close() 

127 

128 

129def _maybe_simulate_disk_full() -> None: 

130 """Raises an `ENOSPC` error if a test environment variable is set.""" 

131 if os.getenv("BIJUXCLI_TEST_DISK_FULL") == "1": 

132 raise OSError(errno.ENOSPC, "No space left on device") 

133 

134 

135def _atomic_write_json(fp: Path, events: list[dict[str, Any]]) -> None: 

136 """Writes a list of events to a file atomically. 

137 

138 The data is written to a temporary file in the same directory and then 

139 renamed to the final destination, which is an atomic operation on POSIX 

140 systems. 

141 

142 Args: 

143 fp (Path): The destination file path. 

144 events (list[dict[str, Any]]): The list of history entries to write. 

145 

146 Raises: 

147 PermissionError: If the directory or file is not writable. 

148 OSError: For other filesystem errors, such as a full disk. 

149 """ 

150 _maybe_simulate_disk_full() 

151 fp.parent.mkdir(parents=True, exist_ok=True) 

152 to_write = events[-_TRIM_THRESHOLD:] if events else [] 

153 payload = ( 

154 "[]\n" if not to_write else json.dumps(to_write, ensure_ascii=False, indent=2) 

155 ) 

156 with tempfile.NamedTemporaryFile( 

157 "w", delete=False, dir=fp.parent, prefix=f".{fp.name}.", encoding="utf-8" 

158 ) as temp_file: 

159 temp_file.write(payload) 

160 temp_file.flush() 

161 os.fsync(temp_file.fileno()) 

162 temp_fp = Path(temp_file.name) 

163 os.replace(temp_fp, fp) 

164 

165 

166class History(HistoryProtocol): 

167 """Manages a persistent history of CLI command invocations. 

168 

169 This service maintains an in-memory list of command events and synchronizes 

170 it with a persisted JSON file. It is designed to be tolerant of file 

171 corruption and safe for concurrent use by multiple CLI processes. 

172 

173 Mutating operations (`add`, `clear`, `import_`) acquire a cross-process lock 

174 before modifying the file to prevent lost updates and race conditions. The 

175 sequence is always: lock, reload from disk, apply change in memory, write 

176 atomically, and release lock. 

177 

178 Attributes: 

179 _tel (TelemetryProtocol): The telemetry service for emitting events. 

180 _obs (Observability): The logging service for operational errors. 

181 _explicit_path (Path | None): A specific path to the history file, if 

182 provided during initialization. 

183 _events (list): The in-memory cache of history event dictionaries. 

184 _load_error (str | None): A message describing the last error that 

185 occurred while trying to load the history file, if any. 

186 """ 

187 

188 @inject 

189 def __init__( 

190 self, 

191 telemetry: TelemetryProtocol, 

192 observability: Observability, 

193 history_path: Path | None = None, 

194 ) -> None: 

195 """Initializes the History service. 

196 

197 Args: 

198 telemetry (TelemetryProtocol): The telemetry service. 

199 observability (Observability): The logging service. 

200 history_path (Path | None): An optional, explicit path to the 

201 history file. If None, a default path will be used. 

202 """ 

203 self._tel = telemetry 

204 self._obs = observability 

205 self._explicit_path = Path(history_path) if history_path else None 

206 self._events: list[dict[str, Any]] = [] 

207 self._load_error: str | None = None 

208 

209 def _get_history_path(self) -> Path: 

210 """Returns the resolved, absolute path to the history file. 

211 

212 The path is determined in the following order of precedence: 

213 1. An explicit path provided to the constructor. 

214 2. The `BIJUXCLI_HISTORY_FILE` environment variable. 

215 3. A `.bijux_history` file in the same directory as the `BIJUXCLI_CONFIG` file. 

216 4. The default `~/.bijux/.history` file. 

217 

218 Returns: 

219 Path: The absolute path to the history file. 

220 """ 

221 if self._explicit_path: 

222 return self._explicit_path 

223 env_file = os.environ.get("BIJUXCLI_HISTORY_FILE") 

224 if env_file: 

225 return Path(env_file).expanduser() 

226 cfg = os.environ.get("BIJUXCLI_CONFIG") 

227 if cfg: 

228 cfg_path = Path(cfg).expanduser() 

229 return cfg_path.parent / ".bijux_history" 

230 return HISTORY_FILE 

231 

232 def _reload(self) -> None: 

233 """Refreshes the in-memory state from the history file on disk. 

234 

235 This method is tolerant of errors. If the file is missing, empty, or 

236 corrupt, the in-memory list is cleared and an error state is noted, 

237 but an exception is not raised. 

238 """ 

239 self._load_error = None 

240 fp = self._get_history_path() 

241 try: 

242 if not fp.exists(): 

243 self._events = [] 

244 return 

245 raw = fp.read_text(encoding="utf-8", errors="ignore").strip() 

246 if not raw: 

247 self._events = [] 

248 return 

249 data = json.loads(raw) 

250 if not isinstance(data, list): 

251 self._events = [] 

252 self._load_error = ( 

253 f"Unexpected history file format (not JSON array): {fp}" 

254 ) 

255 return 

256 evs: list[dict[str, Any]] = [] 

257 for item in data: 

258 if not isinstance(item, dict): 

259 continue 

260 e = dict(item) 

261 e["command"] = _ascii_clean(str(e.get("command", ""))) 

262 evs.append(e) 

263 if len(evs) > _MAX_IN_MEMORY: 

264 evs = evs[-_MAX_IN_MEMORY:] 

265 self._events = evs 

266 except Exception as exc: 

267 self._load_error = f"History file corrupted or unreadable: {exc}" 

268 self._obs.log("error", self._load_error, extra={"path": str(fp)}) 

269 self._events = [] 

270 

271 def _dump(self) -> None: 

272 """Persists the current in-memory events to disk atomically.""" 

273 fp = self._get_history_path() 

274 with _interprocess_lock(fp): 

275 self._load_error = None 

276 try: 

277 _atomic_write_json(fp, self._events) 

278 except PermissionError as exc: 

279 self._handle_dump_error("write-permission", exc, fp) 

280 raise 

281 except OSError as exc: 

282 if exc.errno in _ENOSPC_ERRORS: 

283 self._handle_dump_error("persist", exc, fp) 

284 raise 

285 raise 

286 

287 def _handle_dump_error(self, kind: str, exc: OSError, fp: Path) -> None: 

288 """Logs and prints an error encountered during a file write operation. 

289 

290 Args: 

291 kind (str): A short code classifying the error (e.g., "persist"). 

292 exc (OSError): The originating exception. 

293 fp (Path): The path of the file that was being written to. 

294 """ 

295 msg = f"History {kind} error: {exc}" 

296 self._obs.log("error", msg, extra={"path": str(fp)}) 

297 self._load_error = msg 

298 print(msg, file=sys.stderr) 

299 

300 def add( 

301 self, 

302 command: str, 

303 *, 

304 params: Sequence[str] | None = None, 

305 success: bool | None = True, 

306 return_code: int | None = 0, 

307 duration_ms: float | None = None, 

308 raw: dict[str, Any] | None = None, 

309 ) -> None: 

310 """Appends a new command invocation to the history. 

311 

312 This operation is cross-process safe. It acquires a lock, reloads the 

313 latest history from disk, appends the new entry, and writes the 

314 updated history back atomically. Errors are logged but suppressed to 

315 allow the originating command to complete its execution. 

316 

317 Args: 

318 command (str): The command name (ASCII characters are enforced). 

319 params (Sequence[str] | None): A list of parameters and flags. 

320 success (bool | None): Whether the command succeeded. 

321 return_code (int | None): The exit code of the command. 

322 duration_ms (float | None): The command's duration in milliseconds. 

323 raw (dict[str, Any] | None): Optional raw metadata payload. 

324 """ 

325 fp = self._get_history_path() 

326 entry = { 

327 "command": _ascii_clean(command), 

328 "params": list(params or []), 

329 "timestamp": _now(), 

330 "success": bool(success), 

331 "return_code": return_code if return_code is not None else 0, 

332 "duration_ms": float(duration_ms) if duration_ms is not None else None, 

333 "raw": raw or {}, 

334 } 

335 with _interprocess_lock(fp): 

336 self._reload() 

337 if self._load_error: 

338 msg = f"[error] Could not load command history: {self._load_error}" 

339 self._obs.log("error", msg, extra={"path": str(fp)}) 

340 print(msg, file=sys.stderr) 

341 self._events = [] 

342 self._events.append(entry) 

343 try: 

344 _atomic_write_json(fp, self._events) 

345 self._load_error = None 

346 except PermissionError as exc: 

347 msg = f"[error] Could not record command history: {exc}" 

348 self._obs.log("error", msg, extra={"path": str(fp)}) 

349 print(msg, file=sys.stderr) 

350 self._load_error = msg 

351 return 

352 except OSError as exc: 

353 if exc.errno in _ENOSPC_ERRORS: 

354 msg = f"[error] Could not record command history: {exc}" 

355 self._obs.log("error", msg, extra={"path": str(fp)}) 

356 print(msg, file=sys.stderr) 

357 self._load_error = msg 

358 return 

359 msg = f"[error] Could not record command history: {exc}" 

360 self._obs.log("error", msg, extra={"path": str(fp)}) 

361 print(msg, file=sys.stderr) 

362 self._load_error = msg 

363 return 

364 with suppress(Exception): 

365 self._tel.event("history_event_added", {"command": entry["command"]}) 

366 

367 def list( 

368 self, 

369 *, 

370 limit: int | None = 20, 

371 group_by: str | None = None, 

372 filter_cmd: str | None = None, 

373 sort: str | None = None, 

374 ) -> list[dict[str, Any]]: 

375 """Returns a view of the command history, with optional transformations. 

376 

377 This is a read-only operation and does not acquire a cross-process lock, 

378 meaning it may not reflect writes from concurrent processes. 

379 

380 Args: 

381 limit (int | None): The maximum number of entries to return. A value 

382 of 0 returns an empty list. 

383 group_by (str | None): If provided, returns a grouped summary. 

384 filter_cmd (str | None): If provided, returns only entries whose 

385 command contains this case-sensitive substring. 

386 sort (str | None): If 'timestamp', sorts entries by timestamp. 

387 

388 Returns: 

389 list[dict[str, Any]]: A list of history entries or grouped summaries. 

390 

391 Raises: 

392 RuntimeError: If the history file is corrupt. 

393 """ 

394 self._reload() 

395 fp = self._get_history_path() 

396 try: 

397 writable = os.access(fp.parent, os.W_OK) 

398 except Exception: 

399 writable = True 

400 if not writable: 

401 msg = f"Permission denied for history directory: {fp.parent}" 

402 self._obs.log("error", msg, extra={"path": str(fp)}) 

403 print(msg, file=sys.stderr) 

404 if self._load_error: 

405 raise RuntimeError(self._load_error) 

406 if limit == 0: 

407 return [] 

408 entries: list[dict[str, Any]] = list(self._events) 

409 if filter_cmd: 

410 needle = str(filter_cmd) 

411 entries = [e for e in entries if needle in (e.get("command") or "")] 

412 if sort == "timestamp": 

413 entries.sort(key=lambda e: e.get("timestamp", 0)) 

414 if group_by: 

415 grouped: dict[Any, MutableSequence[dict[str, Any]]] = {} 

416 for e in entries: 

417 grouped.setdefault(e.get(group_by, "unknown"), []).append(e) 

418 summary = [ 

419 { 

420 "group": k, 

421 "count": len(v), 

422 "last_run": max((x.get("timestamp", 0) for x in v), default=0), 

423 } 

424 for k, v in grouped.items() 

425 ] 

426 return summary[:limit] if (limit and limit > 0) else summary 

427 if limit and limit > 0: 

428 entries = entries[-limit:] 

429 return entries 

430 

431 def clear(self) -> None: 

432 """Erases all persisted history. 

433 

434 This operation is cross-process safe and atomic. 

435 

436 Raises: 

437 PermissionError: If the history file or directory is not writable. 

438 OSError: For other filesystem-related failures. 

439 """ 

440 fp = self._get_history_path() 

441 try: 

442 with _interprocess_lock(fp): 

443 self._events = [] 

444 _atomic_write_json(fp, self._events) 

445 self._load_error = None 

446 self._tel.event("history_cleared", {}) 

447 except Exception as exc: 

448 msg = f"History clear failed: {exc}" 

449 self._obs.log("error", msg, extra={"path": str(fp)}) 

450 self._load_error = msg 

451 raise 

452 finally: 

453 self._reload() 

454 

455 def flush(self) -> None: 

456 """Persists all in-memory history data to disk.""" 

457 self._dump() 

458 

459 def export(self, path: Path) -> None: 

460 """Exports the current history to a file as a JSON array. 

461 

462 This operation is a read-only snapshot and does not lock the source file. 

463 

464 Args: 

465 path (Path): The destination file path. 

466 

467 Raises: 

468 RuntimeError: On I/O failures. 

469 """ 

470 self._reload() 

471 try: 

472 path = path.expanduser() 

473 path.parent.mkdir(parents=True, exist_ok=True) 

474 text = json.dumps(self._events, ensure_ascii=False, indent=2) + "\n" 

475 path.write_text(text, encoding="utf-8") 

476 except Exception as exc: 

477 raise RuntimeError(f"Failed exporting history: {exc}") from exc 

478 

479 def import_(self, path: Path) -> None: 

480 """Imports history entries from a file, merging with current history. 

481 

482 This operation is cross-process safe and atomic. 

483 

484 Args: 

485 path (Path): The source file path containing a JSON array of entries. 

486 

487 Raises: 

488 RuntimeError: On I/O or parsing failures. 

489 """ 

490 fp = self._get_history_path() 

491 try: 

492 with _interprocess_lock(fp): 

493 self._reload() 

494 if self._load_error: 

495 raise RuntimeError(self._load_error) 

496 path = path.expanduser() 

497 if not path.exists(): 

498 raise RuntimeError(f"Import file not found: {path}") 

499 raw = path.read_text(encoding="utf-8") 

500 data = json.loads(raw) 

501 if not isinstance(data, list): 

502 raise RuntimeError( 

503 f"Invalid import format (not JSON array): {path}" 

504 ) 

505 imported: list[dict[str, Any]] = [] 

506 for item in data: 

507 if not isinstance(item, dict): 

508 continue 

509 e = dict(item) 

510 e["command"] = _ascii_clean(str(e.get("command", ""))) 

511 if "timestamp" not in e: 

512 e["timestamp"] = _now() 

513 imported.append(e) 

514 self._events.extend(imported) 

515 if len(self._events) > _MAX_IN_MEMORY: 

516 self._events = self._events[-_MAX_IN_MEMORY:] 

517 _atomic_write_json(fp, self._events) 

518 self._load_error = None 

519 with suppress(Exception): 

520 self._tel.event("history_imported", {"count": len(imported)}) 

521 

522 except Exception as exc: 

523 msg = f"History import failed: {exc}" 

524 self._obs.log( 

525 "error", msg, extra={"import_path": str(path), "history_path": str(fp)} 

526 ) 

527 raise RuntimeError(msg) from exc 

528 

529 

530__all__ = ["History"]