Coverage for /home/runner/work/bijux-cli/bijux-cli/src/bijux_cli/services/history.py: 100%
258 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-19 23:36 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-19 23:36 +0000
1# SPDX-License-Identifier: MIT
2# Copyright © 2025 Bijan Mousavi
4"""Provides a persistent, cross-process safe command history service.
6This module defines the `History` class, a concrete implementation of the
7`HistoryProtocol`. It provides a tolerant and robust store for CLI
8invocation events with several key design features:
10 * **Persistence:** All history is saved to a single JSON array in a
11 per-user file.
12 * **Tolerance:** The service is resilient to empty, corrupt, or partially
13 formed history files. If a file is unreadable, it is treated as empty
14 and will be overwritten on the next successful write.
15 * **Cross-Process Safety:** On POSIX systems, it uses `fcntl.flock` on a
16 sidecar lock file to safely coordinate writes from multiple concurrent
17 CLI processes. On other systems, it falls back to a thread lock.
18 * **Atomic Writes:** All changes are written to a temporary file which is
19 then atomically moved into place, preventing data corruption from
20 interrupted writes.
21 * **Memory Management:** The in-memory list of events is capped, and the
22 on-disk file is trimmed to a smaller size to prevent unbounded growth.
23 * **Simplicity:** The service intentionally avoids complex features like
24 schema migrations. Unreadable state is discarded rather than repaired.
25"""
27from __future__ import annotations
29from collections.abc import Iterator, MutableSequence, Sequence
30from contextlib import contextmanager, suppress
31import errno
32import json
33import os
34from pathlib import Path
35import sys
36import tempfile
37import threading
38import time
39from typing import Any, Final
40import unicodedata
42from injector import inject
44from bijux_cli.contracts import HistoryProtocol
45from bijux_cli.core.paths import HISTORY_FILE
46from bijux_cli.infra.observability import Observability
47from bijux_cli.infra.telemetry import LoggingTelemetry
49_MAX_IN_MEMORY: Final[int] = 10_000
50"""Maximum number of entries retained in memory (and considered for writes)."""
51_TRIM_THRESHOLD: Final[int] = 1_000
52"""When persisting, keep at most this many most-recent events in the file."""
53_ENOSPC_ERRORS = {errno.ENOSPC, errno.EDQUOT}
54"""OS error codes indicating the filesystem is full or quota exceeded."""
55_FILE_LOCK = threading.Lock()
56"""Fallback lock for non-POSIX platforms when `fcntl` is unavailable."""
57fcntl: Any
58try:
59 import fcntl
60except ImportError: # pragma: no cover
61 fcntl = None
64def _now() -> float:
65 """Returns the current UNIX time with sub-second precision.
67 Returns:
68 float: The current time in seconds since the epoch.
69 """
70 return time.time()
73def _ascii_clean(text: str) -> str:
74 """Strips all diacritics and non-printable characters from a string.
76 Args:
77 text (str): The input text to clean.
79 Returns:
80 str: An ASCII-only version of the text.
81 """
82 normalized = unicodedata.normalize("NFKD", text)
83 without_marks = "".join(ch for ch in normalized if unicodedata.category(ch) != "Mn")
84 return "".join(ch for ch in without_marks if 0x20 <= ord(ch) <= 0x7E)
87def _lock_file_for(fp: Path) -> Path:
88 """Returns the path for the sidecar lock file associated with `fp`.
90 Args:
91 fp (Path): The primary file path.
93 Returns:
94 Path: The corresponding lock file path (e.g., `file.lock`).
95 """
96 return fp.with_name(fp.name + ".lock")
99@contextmanager
100def _interprocess_lock(fp: Path) -> Iterator[None]:
101 """Provides a cross-process exclusive lock for a file path.
103 On POSIX systems, this uses `fcntl.flock` on a sidecar file to serialize
104 access across different processes. On other platforms, it falls back to a
105 `threading.Lock`, which only provides safety within a single process.
107 Args:
108 fp (Path): The path to the file that requires locked access.
110 Yields:
111 None: Yields control to the `with` block while the lock is held.
112 """
113 if fcntl is None:
114 with _FILE_LOCK:
115 yield
116 return
117 lock_fp = _lock_file_for(fp)
118 lock_fp.parent.mkdir(parents=True, exist_ok=True)
119 f = lock_fp.open("a+")
120 try:
121 fcntl.flock(f.fileno(), fcntl.LOCK_EX)
122 yield
123 finally:
124 with suppress(Exception):
125 fcntl.flock(f.fileno(), fcntl.LOCK_UN)
126 f.close()
129def _maybe_simulate_disk_full() -> None:
130 """Raises an `ENOSPC` error if a test environment variable is set."""
131 if os.getenv("BIJUXCLI_TEST_DISK_FULL") == "1":
132 raise OSError(errno.ENOSPC, "No space left on device")
135def _atomic_write_json(fp: Path, events: list[dict[str, Any]]) -> None:
136 """Writes a list of events to a file atomically.
138 The data is written to a temporary file in the same directory and then
139 renamed to the final destination, which is an atomic operation on POSIX
140 systems.
142 Args:
143 fp (Path): The destination file path.
144 events (list[dict[str, Any]]): The list of history entries to write.
146 Raises:
147 PermissionError: If the directory or file is not writable.
148 OSError: For other filesystem errors, such as a full disk.
149 """
150 _maybe_simulate_disk_full()
151 fp.parent.mkdir(parents=True, exist_ok=True)
152 to_write = events[-_TRIM_THRESHOLD:] if events else []
153 payload = (
154 "[]\n" if not to_write else json.dumps(to_write, ensure_ascii=False, indent=2)
155 )
156 with tempfile.NamedTemporaryFile(
157 "w", delete=False, dir=fp.parent, prefix=f".{fp.name}.", encoding="utf-8"
158 ) as temp_file:
159 temp_file.write(payload)
160 temp_file.flush()
161 os.fsync(temp_file.fileno())
162 temp_fp = Path(temp_file.name)
163 os.replace(temp_fp, fp)
166class History(HistoryProtocol):
167 """Manages a persistent history of CLI command invocations.
169 This service maintains an in-memory list of command events and synchronizes
170 it with a persisted JSON file. It is designed to be tolerant of file
171 corruption and safe for concurrent use by multiple CLI processes.
173 Mutating operations (`add`, `clear`, `import_`) acquire a cross-process lock
174 before modifying the file to prevent lost updates and race conditions. The
175 sequence is always: lock, reload from disk, apply change in memory, write
176 atomically, and release lock.
178 Attributes:
179 _tel (LoggingTelemetry): The telemetry service for emitting events.
180 _obs (Observability): The logging service for operational errors.
181 _explicit_path (Path | None): A specific path to the history file, if
182 provided during initialization.
183 _events (list): The in-memory cache of history event dictionaries.
184 _load_error (str | None): A message describing the last error that
185 occurred while trying to load the history file, if any.
186 """
188 @inject
189 def __init__(
190 self,
191 telemetry: LoggingTelemetry,
192 observability: Observability,
193 history_path: Path | None = None,
194 ) -> None:
195 """Initializes the History service.
197 Args:
198 telemetry (LoggingTelemetry): The telemetry service.
199 observability (Observability): The logging service.
200 history_path (Path | None): An optional, explicit path to the
201 history file. If None, a default path will be used.
202 """
203 self._tel = telemetry
204 self._obs = observability
205 self._explicit_path = Path(history_path) if history_path else None
206 self._events: list[dict[str, Any]] = []
207 self._load_error: str | None = None
209 def _get_history_path(self) -> Path:
210 """Returns the resolved, absolute path to the history file.
212 The path is determined in the following order of precedence:
213 1. An explicit path provided to the constructor.
214 2. The `BIJUXCLI_HISTORY_FILE` environment variable.
215 3. A `.bijux_history` file in the same directory as the `BIJUXCLI_CONFIG` file.
216 4. The default `~/.bijux/.history` file.
218 Returns:
219 Path: The absolute path to the history file.
220 """
221 if self._explicit_path:
222 return self._explicit_path
223 env_file = os.environ.get("BIJUXCLI_HISTORY_FILE")
224 if env_file:
225 return Path(env_file).expanduser()
226 cfg = os.environ.get("BIJUXCLI_CONFIG")
227 if cfg:
228 cfg_path = Path(cfg).expanduser()
229 return cfg_path.parent / ".bijux_history"
230 return HISTORY_FILE
232 def _reload(self) -> None:
233 """Refreshes the in-memory state from the history file on disk.
235 This method is tolerant of errors. If the file is missing, empty, or
236 corrupt, the in-memory list is cleared and an error state is noted,
237 but an exception is not raised.
238 """
239 self._load_error = None
240 fp = self._get_history_path()
241 try:
242 if not fp.exists():
243 self._events = []
244 return
245 raw = fp.read_text(encoding="utf-8", errors="ignore").strip()
246 if not raw:
247 self._events = []
248 return
249 data = json.loads(raw)
250 if not isinstance(data, list):
251 self._events = []
252 self._load_error = (
253 f"Unexpected history file format (not JSON array): {fp}"
254 )
255 return
256 evs: list[dict[str, Any]] = []
257 for item in data:
258 if not isinstance(item, dict):
259 continue
260 e = dict(item)
261 e["command"] = _ascii_clean(str(e.get("command", "")))
262 evs.append(e)
263 if len(evs) > _MAX_IN_MEMORY:
264 evs = evs[-_MAX_IN_MEMORY:]
265 self._events = evs
266 except Exception as exc:
267 self._load_error = f"History file corrupted or unreadable: {exc}"
268 self._obs.log("error", self._load_error, extra={"path": str(fp)})
269 self._events = []
271 def _dump(self) -> None:
272 """Persists the current in-memory events to disk atomically."""
273 fp = self._get_history_path()
274 with _interprocess_lock(fp):
275 self._load_error = None
276 try:
277 _atomic_write_json(fp, self._events)
278 except PermissionError as exc:
279 self._handle_dump_error("write-permission", exc, fp)
280 raise
281 except OSError as exc:
282 if exc.errno in _ENOSPC_ERRORS:
283 self._handle_dump_error("persist", exc, fp)
284 raise
285 raise
287 def _handle_dump_error(self, kind: str, exc: OSError, fp: Path) -> None:
288 """Logs and prints an error encountered during a file write operation.
290 Args:
291 kind (str): A short code classifying the error (e.g., "persist").
292 exc (OSError): The originating exception.
293 fp (Path): The path of the file that was being written to.
294 """
295 msg = f"History {kind} error: {exc}"
296 self._obs.log("error", msg, extra={"path": str(fp)})
297 self._load_error = msg
298 print(msg, file=sys.stderr)
300 def add(
301 self,
302 command: str,
303 *,
304 params: Sequence[str] | None = None,
305 success: bool | None = True,
306 return_code: int | None = 0,
307 duration_ms: float | None = None,
308 ) -> None:
309 """Appends a new command invocation to the history.
311 This operation is cross-process safe. It acquires a lock, reloads the
312 latest history from disk, appends the new entry, and writes the
313 updated history back atomically. Errors are logged but suppressed to
314 allow the originating command to complete its execution.
316 Args:
317 command (str): The command name (ASCII characters are enforced).
318 params (Sequence[str] | None): A list of parameters and flags.
319 success (bool | None): Whether the command succeeded.
320 return_code (int | None): The exit code of the command.
321 duration_ms (float | None): The command's duration in milliseconds.
322 """
323 fp = self._get_history_path()
324 entry = {
325 "command": _ascii_clean(command),
326 "params": list(params or []),
327 "timestamp": _now(),
328 "success": bool(success),
329 "return_code": return_code if return_code is not None else 0,
330 "duration_ms": float(duration_ms) if duration_ms is not None else None,
331 }
332 with _interprocess_lock(fp):
333 self._reload()
334 if self._load_error:
335 msg = f"[error] Could not load command history: {self._load_error}"
336 self._obs.log("error", msg, extra={"path": str(fp)})
337 print(msg, file=sys.stderr)
338 self._events = []
339 self._events.append(entry)
340 try:
341 _atomic_write_json(fp, self._events)
342 self._load_error = None
343 except PermissionError as exc:
344 msg = f"[error] Could not record command history: {exc}"
345 self._obs.log("error", msg, extra={"path": str(fp)})
346 print(msg, file=sys.stderr)
347 self._load_error = msg
348 return
349 except OSError as exc:
350 if exc.errno in _ENOSPC_ERRORS:
351 msg = f"[error] Could not record command history: {exc}"
352 self._obs.log("error", msg, extra={"path": str(fp)})
353 print(msg, file=sys.stderr)
354 self._load_error = msg
355 return
356 msg = f"[error] Could not record command history: {exc}"
357 self._obs.log("error", msg, extra={"path": str(fp)})
358 print(msg, file=sys.stderr)
359 self._load_error = msg
360 return
361 with suppress(Exception):
362 self._tel.event("history_event_added", {"command": entry["command"]})
364 def list(
365 self,
366 *,
367 limit: int | None = 20,
368 group_by: str | None = None,
369 filter_cmd: str | None = None,
370 sort: str | None = None,
371 ) -> list[dict[str, Any]]:
372 """Returns a view of the command history, with optional transformations.
374 This is a read-only operation and does not acquire a cross-process lock,
375 meaning it may not reflect writes from concurrent processes.
377 Args:
378 limit (int | None): The maximum number of entries to return. A value
379 of 0 returns an empty list.
380 group_by (str | None): If provided, returns a grouped summary.
381 filter_cmd (str | None): If provided, returns only entries whose
382 command contains this case-sensitive substring.
383 sort (str | None): If 'timestamp', sorts entries by timestamp.
385 Returns:
386 list[dict[str, Any]]: A list of history entries or grouped summaries.
388 Raises:
389 RuntimeError: If the history file is corrupt.
390 """
391 self._reload()
392 fp = self._get_history_path()
393 try:
394 writable = os.access(fp.parent, os.W_OK)
395 except Exception:
396 writable = True
397 if not writable:
398 msg = f"Permission denied for history directory: {fp.parent}"
399 self._obs.log("error", msg, extra={"path": str(fp)})
400 print(msg, file=sys.stderr)
401 if self._load_error:
402 raise RuntimeError(self._load_error)
403 if limit == 0:
404 return []
405 entries: list[dict[str, Any]] = list(self._events)
406 if filter_cmd:
407 needle = str(filter_cmd)
408 entries = [e for e in entries if needle in (e.get("command") or "")]
409 if sort == "timestamp":
410 entries.sort(key=lambda e: e.get("timestamp", 0))
411 if group_by:
412 grouped: dict[Any, MutableSequence[dict[str, Any]]] = {}
413 for e in entries:
414 grouped.setdefault(e.get(group_by, "unknown"), []).append(e)
415 summary = [
416 {
417 "group": k,
418 "count": len(v),
419 "last_run": max((x.get("timestamp", 0) for x in v), default=0),
420 }
421 for k, v in grouped.items()
422 ]
423 return summary[:limit] if (limit and limit > 0) else summary
424 if limit and limit > 0:
425 entries = entries[-limit:]
426 return entries
428 def clear(self) -> None:
429 """Erases all persisted history.
431 This operation is cross-process safe and atomic.
433 Raises:
434 PermissionError: If the history file or directory is not writable.
435 OSError: For other filesystem-related failures.
436 """
437 fp = self._get_history_path()
438 try:
439 with _interprocess_lock(fp):
440 self._events = []
441 _atomic_write_json(fp, self._events)
442 self._load_error = None
443 self._tel.event("history_cleared", {})
444 except Exception as exc:
445 msg = f"History clear failed: {exc}"
446 self._obs.log("error", msg, extra={"path": str(fp)})
447 self._load_error = msg
448 raise
449 finally:
450 self._reload()
452 def flush(self) -> None:
453 """Persists all in-memory history data to disk."""
454 self._dump()
456 def export(self, path: Path) -> None:
457 """Exports the current history to a file as a JSON array.
459 This operation is a read-only snapshot and does not lock the source file.
461 Args:
462 path (Path): The destination file path.
464 Raises:
465 RuntimeError: On I/O failures.
466 """
467 self._reload()
468 try:
469 path = path.expanduser()
470 path.parent.mkdir(parents=True, exist_ok=True)
471 text = json.dumps(self._events, ensure_ascii=False, indent=2) + "\n"
472 path.write_text(text, encoding="utf-8")
473 except Exception as exc:
474 raise RuntimeError(f"Failed exporting history: {exc}") from exc
476 def import_(self, path: Path) -> None:
477 """Imports history entries from a file, merging with current history.
479 This operation is cross-process safe and atomic.
481 Args:
482 path (Path): The source file path containing a JSON array of entries.
484 Raises:
485 RuntimeError: On I/O or parsing failures.
486 """
487 fp = self._get_history_path()
488 try:
489 with _interprocess_lock(fp):
490 self._reload()
491 if self._load_error:
492 raise RuntimeError(self._load_error)
493 path = path.expanduser()
494 if not path.exists():
495 raise RuntimeError(f"Import file not found: {path}")
496 raw = path.read_text(encoding="utf-8")
497 data = json.loads(raw)
498 if not isinstance(data, list):
499 raise RuntimeError(
500 f"Invalid import format (not JSON array): {path}"
501 )
502 imported: list[dict[str, Any]] = []
503 for item in data:
504 if not isinstance(item, dict):
505 continue
506 e = dict(item)
507 e["command"] = _ascii_clean(str(e.get("command", "")))
508 if "timestamp" not in e:
509 e["timestamp"] = _now()
510 imported.append(e)
511 self._events.extend(imported)
512 if len(self._events) > _MAX_IN_MEMORY:
513 self._events = self._events[-_MAX_IN_MEMORY:]
514 _atomic_write_json(fp, self._events)
515 self._load_error = None
516 with suppress(Exception):
517 self._tel.event("history_imported", {"count": len(imported)})
519 except Exception as exc:
520 msg = f"History import failed: {exc}"
521 self._obs.log(
522 "error", msg, extra={"import_path": str(path), "history_path": str(fp)}
523 )
524 raise RuntimeError(msg) from exc
527__all__ = ["History"]