Coverage for / home / runner / work / bijux-cli / bijux-cli / src / bijux_cli / services / history / __init__.py: 100%
258 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-26 17:59 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-26 17:59 +0000
1# SPDX-License-Identifier: Apache-2.0
2# Copyright © 2025 Bijan Mousavi
4"""Provides a persistent, cross-process safe command history service.
6This module defines the `History` class, a concrete implementation of the
7`HistoryProtocol`. It provides a tolerant and robust store for CLI
8invocation events with several key design features:
10 * **Persistence:** All history is saved to a single JSON array in a
11 per-user file.
12 * **Tolerance:** The service is resilient to empty, corrupt, or partially
13 formed history files. If a file is unreadable, it is treated as empty
14 and will be overwritten on the next successful write.
15 * **Cross-Process Safety:** On POSIX systems, it uses `fcntl.flock` on a
16 sidecar lock file to safely coordinate writes from multiple concurrent
17 CLI processes. On other systems, it falls back to a thread lock.
18 * **Atomic Writes:** All changes are written to a temporary file which is
19 then atomically moved into place, preventing data corruption from
20 interrupted writes.
21 * **Memory Management:** The in-memory list of events is capped, and the
22 on-disk file is trimmed to a smaller size to prevent unbounded growth.
23 * **Simplicity:** The service intentionally avoids complex features like
24 schema migrations. Unreadable state is discarded rather than repaired.
25"""
27from __future__ import annotations
29from collections.abc import Iterator, MutableSequence, Sequence
30from contextlib import contextmanager, suppress
31import errno
32import json
33import os
34from pathlib import Path
35import sys
36import tempfile
37import threading
38import time
39from typing import Any, Final
40import unicodedata
42from injector import inject
44from bijux_cli.infra.paths import HISTORY_FILE
45from bijux_cli.services.contracts import TelemetryProtocol
46from bijux_cli.services.history.contracts import HistoryProtocol
47from bijux_cli.services.logging.observability import Observability
49_MAX_IN_MEMORY: Final[int] = 10_000
50"""Maximum number of entries retained in memory (and considered for writes)."""
51_TRIM_THRESHOLD: Final[int] = 1_000
52"""When persisting, keep at most this many most-recent events in the file."""
53_ENOSPC_ERRORS = {errno.ENOSPC, errno.EDQUOT}
54"""OS error codes indicating the filesystem is full or quota exceeded."""
55_FILE_LOCK = threading.Lock()
56"""Fallback lock for non-POSIX platforms when `fcntl` is unavailable."""
57fcntl: Any
58try:
59 import fcntl
60except ImportError: # pragma: no cover
61 fcntl = None
64def _now() -> float:
65 """Returns the current UNIX time with sub-second precision.
67 Returns:
68 float: The current time in seconds since the epoch.
69 """
70 return time.time()
73def _ascii_clean(text: str) -> str:
74 """Strips all diacritics and non-printable characters from a string.
76 Args:
77 text (str): The input text to clean.
79 Returns:
80 str: An ASCII-only version of the text.
81 """
82 normalized = unicodedata.normalize("NFKD", text)
83 without_marks = "".join(ch for ch in normalized if unicodedata.category(ch) != "Mn")
84 return "".join(ch for ch in without_marks if 0x20 <= ord(ch) <= 0x7E)
87def _lock_file_for(fp: Path) -> Path:
88 """Returns the path for the sidecar lock file associated with `fp`.
90 Args:
91 fp (Path): The primary file path.
93 Returns:
94 Path: The corresponding lock file path (e.g., `file.lock`).
95 """
96 return fp.with_name(fp.name + ".lock")
99@contextmanager
100def _interprocess_lock(fp: Path) -> Iterator[None]:
101 """Provides a cross-process exclusive lock for a file path.
103 On POSIX systems, this uses `fcntl.flock` on a sidecar file to serialize
104 access across different processes. On other platforms, it falls back to a
105 `threading.Lock`, which only provides safety within a single process.
107 Args:
108 fp (Path): The path to the file that requires locked access.
110 Yields:
111 None: Yields control to the `with` block while the lock is held.
112 """
113 if fcntl is None:
114 with _FILE_LOCK:
115 yield
116 return
117 lock_fp = _lock_file_for(fp)
118 lock_fp.parent.mkdir(parents=True, exist_ok=True)
119 f = lock_fp.open("a+")
120 try:
121 fcntl.flock(f.fileno(), fcntl.LOCK_EX)
122 yield
123 finally:
124 with suppress(Exception):
125 fcntl.flock(f.fileno(), fcntl.LOCK_UN)
126 f.close()
129def _maybe_simulate_disk_full() -> None:
130 """Raises an `ENOSPC` error if a test environment variable is set."""
131 if os.getenv("BIJUXCLI_TEST_DISK_FULL") == "1":
132 raise OSError(errno.ENOSPC, "No space left on device")
135def _atomic_write_json(fp: Path, events: list[dict[str, Any]]) -> None:
136 """Writes a list of events to a file atomically.
138 The data is written to a temporary file in the same directory and then
139 renamed to the final destination, which is an atomic operation on POSIX
140 systems.
142 Args:
143 fp (Path): The destination file path.
144 events (list[dict[str, Any]]): The list of history entries to write.
146 Raises:
147 PermissionError: If the directory or file is not writable.
148 OSError: For other filesystem errors, such as a full disk.
149 """
150 _maybe_simulate_disk_full()
151 fp.parent.mkdir(parents=True, exist_ok=True)
152 to_write = events[-_TRIM_THRESHOLD:] if events else []
153 payload = (
154 "[]\n" if not to_write else json.dumps(to_write, ensure_ascii=False, indent=2)
155 )
156 with tempfile.NamedTemporaryFile(
157 "w", delete=False, dir=fp.parent, prefix=f".{fp.name}.", encoding="utf-8"
158 ) as temp_file:
159 temp_file.write(payload)
160 temp_file.flush()
161 os.fsync(temp_file.fileno())
162 temp_fp = Path(temp_file.name)
163 os.replace(temp_fp, fp)
166class History(HistoryProtocol):
167 """Manages a persistent history of CLI command invocations.
169 This service maintains an in-memory list of command events and synchronizes
170 it with a persisted JSON file. It is designed to be tolerant of file
171 corruption and safe for concurrent use by multiple CLI processes.
173 Mutating operations (`add`, `clear`, `import_`) acquire a cross-process lock
174 before modifying the file to prevent lost updates and race conditions. The
175 sequence is always: lock, reload from disk, apply change in memory, write
176 atomically, and release lock.
178 Attributes:
179 _tel (TelemetryProtocol): The telemetry service for emitting events.
180 _obs (Observability): The logging service for operational errors.
181 _explicit_path (Path | None): A specific path to the history file, if
182 provided during initialization.
183 _events (list): The in-memory cache of history event dictionaries.
184 _load_error (str | None): A message describing the last error that
185 occurred while trying to load the history file, if any.
186 """
188 @inject
189 def __init__(
190 self,
191 telemetry: TelemetryProtocol,
192 observability: Observability,
193 history_path: Path | None = None,
194 ) -> None:
195 """Initializes the History service.
197 Args:
198 telemetry (TelemetryProtocol): The telemetry service.
199 observability (Observability): The logging service.
200 history_path (Path | None): An optional, explicit path to the
201 history file. If None, a default path will be used.
202 """
203 self._tel = telemetry
204 self._obs = observability
205 self._explicit_path = Path(history_path) if history_path else None
206 self._events: list[dict[str, Any]] = []
207 self._load_error: str | None = None
209 def _get_history_path(self) -> Path:
210 """Returns the resolved, absolute path to the history file.
212 The path is determined in the following order of precedence:
213 1. An explicit path provided to the constructor.
214 2. The `BIJUXCLI_HISTORY_FILE` environment variable.
215 3. A `.bijux_history` file in the same directory as the `BIJUXCLI_CONFIG` file.
216 4. The default `~/.bijux/.history` file.
218 Returns:
219 Path: The absolute path to the history file.
220 """
221 if self._explicit_path:
222 return self._explicit_path
223 env_file = os.environ.get("BIJUXCLI_HISTORY_FILE")
224 if env_file:
225 return Path(env_file).expanduser()
226 cfg = os.environ.get("BIJUXCLI_CONFIG")
227 if cfg:
228 cfg_path = Path(cfg).expanduser()
229 return cfg_path.parent / ".bijux_history"
230 return HISTORY_FILE
232 def _reload(self) -> None:
233 """Refreshes the in-memory state from the history file on disk.
235 This method is tolerant of errors. If the file is missing, empty, or
236 corrupt, the in-memory list is cleared and an error state is noted,
237 but an exception is not raised.
238 """
239 self._load_error = None
240 fp = self._get_history_path()
241 try:
242 if not fp.exists():
243 self._events = []
244 return
245 raw = fp.read_text(encoding="utf-8", errors="ignore").strip()
246 if not raw:
247 self._events = []
248 return
249 data = json.loads(raw)
250 if not isinstance(data, list):
251 self._events = []
252 self._load_error = (
253 f"Unexpected history file format (not JSON array): {fp}"
254 )
255 return
256 evs: list[dict[str, Any]] = []
257 for item in data:
258 if not isinstance(item, dict):
259 continue
260 e = dict(item)
261 e["command"] = _ascii_clean(str(e.get("command", "")))
262 evs.append(e)
263 if len(evs) > _MAX_IN_MEMORY:
264 evs = evs[-_MAX_IN_MEMORY:]
265 self._events = evs
266 except Exception as exc:
267 self._load_error = f"History file corrupted or unreadable: {exc}"
268 self._obs.log("error", self._load_error, extra={"path": str(fp)})
269 self._events = []
271 def _dump(self) -> None:
272 """Persists the current in-memory events to disk atomically."""
273 fp = self._get_history_path()
274 with _interprocess_lock(fp):
275 self._load_error = None
276 try:
277 _atomic_write_json(fp, self._events)
278 except PermissionError as exc:
279 self._handle_dump_error("write-permission", exc, fp)
280 raise
281 except OSError as exc:
282 if exc.errno in _ENOSPC_ERRORS:
283 self._handle_dump_error("persist", exc, fp)
284 raise
285 raise
287 def _handle_dump_error(self, kind: str, exc: OSError, fp: Path) -> None:
288 """Logs and prints an error encountered during a file write operation.
290 Args:
291 kind (str): A short code classifying the error (e.g., "persist").
292 exc (OSError): The originating exception.
293 fp (Path): The path of the file that was being written to.
294 """
295 msg = f"History {kind} error: {exc}"
296 self._obs.log("error", msg, extra={"path": str(fp)})
297 self._load_error = msg
298 print(msg, file=sys.stderr)
300 def add(
301 self,
302 command: str,
303 *,
304 params: Sequence[str] | None = None,
305 success: bool | None = True,
306 return_code: int | None = 0,
307 duration_ms: float | None = None,
308 raw: dict[str, Any] | None = None,
309 ) -> None:
310 """Appends a new command invocation to the history.
312 This operation is cross-process safe. It acquires a lock, reloads the
313 latest history from disk, appends the new entry, and writes the
314 updated history back atomically. Errors are logged but suppressed to
315 allow the originating command to complete its execution.
317 Args:
318 command (str): The command name (ASCII characters are enforced).
319 params (Sequence[str] | None): A list of parameters and flags.
320 success (bool | None): Whether the command succeeded.
321 return_code (int | None): The exit code of the command.
322 duration_ms (float | None): The command's duration in milliseconds.
323 raw (dict[str, Any] | None): Optional raw metadata payload.
324 """
325 fp = self._get_history_path()
326 entry = {
327 "command": _ascii_clean(command),
328 "params": list(params or []),
329 "timestamp": _now(),
330 "success": bool(success),
331 "return_code": return_code if return_code is not None else 0,
332 "duration_ms": float(duration_ms) if duration_ms is not None else None,
333 "raw": raw or {},
334 }
335 with _interprocess_lock(fp):
336 self._reload()
337 if self._load_error:
338 msg = f"[error] Could not load command history: {self._load_error}"
339 self._obs.log("error", msg, extra={"path": str(fp)})
340 print(msg, file=sys.stderr)
341 self._events = []
342 self._events.append(entry)
343 try:
344 _atomic_write_json(fp, self._events)
345 self._load_error = None
346 except PermissionError as exc:
347 msg = f"[error] Could not record command history: {exc}"
348 self._obs.log("error", msg, extra={"path": str(fp)})
349 print(msg, file=sys.stderr)
350 self._load_error = msg
351 return
352 except OSError as exc:
353 if exc.errno in _ENOSPC_ERRORS:
354 msg = f"[error] Could not record command history: {exc}"
355 self._obs.log("error", msg, extra={"path": str(fp)})
356 print(msg, file=sys.stderr)
357 self._load_error = msg
358 return
359 msg = f"[error] Could not record command history: {exc}"
360 self._obs.log("error", msg, extra={"path": str(fp)})
361 print(msg, file=sys.stderr)
362 self._load_error = msg
363 return
364 with suppress(Exception):
365 self._tel.event("history_event_added", {"command": entry["command"]})
367 def list(
368 self,
369 *,
370 limit: int | None = 20,
371 group_by: str | None = None,
372 filter_cmd: str | None = None,
373 sort: str | None = None,
374 ) -> list[dict[str, Any]]:
375 """Returns a view of the command history, with optional transformations.
377 This is a read-only operation and does not acquire a cross-process lock,
378 meaning it may not reflect writes from concurrent processes.
380 Args:
381 limit (int | None): The maximum number of entries to return. A value
382 of 0 returns an empty list.
383 group_by (str | None): If provided, returns a grouped summary.
384 filter_cmd (str | None): If provided, returns only entries whose
385 command contains this case-sensitive substring.
386 sort (str | None): If 'timestamp', sorts entries by timestamp.
388 Returns:
389 list[dict[str, Any]]: A list of history entries or grouped summaries.
391 Raises:
392 RuntimeError: If the history file is corrupt.
393 """
394 self._reload()
395 fp = self._get_history_path()
396 try:
397 writable = os.access(fp.parent, os.W_OK)
398 except Exception:
399 writable = True
400 if not writable:
401 msg = f"Permission denied for history directory: {fp.parent}"
402 self._obs.log("error", msg, extra={"path": str(fp)})
403 print(msg, file=sys.stderr)
404 if self._load_error:
405 raise RuntimeError(self._load_error)
406 if limit == 0:
407 return []
408 entries: list[dict[str, Any]] = list(self._events)
409 if filter_cmd:
410 needle = str(filter_cmd)
411 entries = [e for e in entries if needle in (e.get("command") or "")]
412 if sort == "timestamp":
413 entries.sort(key=lambda e: e.get("timestamp", 0))
414 if group_by:
415 grouped: dict[Any, MutableSequence[dict[str, Any]]] = {}
416 for e in entries:
417 grouped.setdefault(e.get(group_by, "unknown"), []).append(e)
418 summary = [
419 {
420 "group": k,
421 "count": len(v),
422 "last_run": max((x.get("timestamp", 0) for x in v), default=0),
423 }
424 for k, v in grouped.items()
425 ]
426 return summary[:limit] if (limit and limit > 0) else summary
427 if limit and limit > 0:
428 entries = entries[-limit:]
429 return entries
431 def clear(self) -> None:
432 """Erases all persisted history.
434 This operation is cross-process safe and atomic.
436 Raises:
437 PermissionError: If the history file or directory is not writable.
438 OSError: For other filesystem-related failures.
439 """
440 fp = self._get_history_path()
441 try:
442 with _interprocess_lock(fp):
443 self._events = []
444 _atomic_write_json(fp, self._events)
445 self._load_error = None
446 self._tel.event("history_cleared", {})
447 except Exception as exc:
448 msg = f"History clear failed: {exc}"
449 self._obs.log("error", msg, extra={"path": str(fp)})
450 self._load_error = msg
451 raise
452 finally:
453 self._reload()
455 def flush(self) -> None:
456 """Persists all in-memory history data to disk."""
457 self._dump()
459 def export(self, path: Path) -> None:
460 """Exports the current history to a file as a JSON array.
462 This operation is a read-only snapshot and does not lock the source file.
464 Args:
465 path (Path): The destination file path.
467 Raises:
468 RuntimeError: On I/O failures.
469 """
470 self._reload()
471 try:
472 path = path.expanduser()
473 path.parent.mkdir(parents=True, exist_ok=True)
474 text = json.dumps(self._events, ensure_ascii=False, indent=2) + "\n"
475 path.write_text(text, encoding="utf-8")
476 except Exception as exc:
477 raise RuntimeError(f"Failed exporting history: {exc}") from exc
479 def import_(self, path: Path) -> None:
480 """Imports history entries from a file, merging with current history.
482 This operation is cross-process safe and atomic.
484 Args:
485 path (Path): The source file path containing a JSON array of entries.
487 Raises:
488 RuntimeError: On I/O or parsing failures.
489 """
490 fp = self._get_history_path()
491 try:
492 with _interprocess_lock(fp):
493 self._reload()
494 if self._load_error:
495 raise RuntimeError(self._load_error)
496 path = path.expanduser()
497 if not path.exists():
498 raise RuntimeError(f"Import file not found: {path}")
499 raw = path.read_text(encoding="utf-8")
500 data = json.loads(raw)
501 if not isinstance(data, list):
502 raise RuntimeError(
503 f"Invalid import format (not JSON array): {path}"
504 )
505 imported: list[dict[str, Any]] = []
506 for item in data:
507 if not isinstance(item, dict):
508 continue
509 e = dict(item)
510 e["command"] = _ascii_clean(str(e.get("command", "")))
511 if "timestamp" not in e:
512 e["timestamp"] = _now()
513 imported.append(e)
514 self._events.extend(imported)
515 if len(self._events) > _MAX_IN_MEMORY:
516 self._events = self._events[-_MAX_IN_MEMORY:]
517 _atomic_write_json(fp, self._events)
518 self._load_error = None
519 with suppress(Exception):
520 self._tel.event("history_imported", {"count": len(imported)})
522 except Exception as exc:
523 msg = f"History import failed: {exc}"
524 self._obs.log(
525 "error", msg, extra={"import_path": str(path), "history_path": str(fp)}
526 )
527 raise RuntimeError(msg) from exc
530__all__ = ["History"]