Coverage for /home/runner/work/bijux-cli/bijux-cli/src/bijux_cli/services/config.py: 99%
320 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-19 23:36 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-19 23:36 +0000
1# SPDX-License-Identifier: MIT
2# Copyright © 2025 Bijan Mousavi
4"""Provides a robust, file-based configuration management service.
6This module defines the `Config` class, a concrete implementation of the
7`ConfigProtocol`. It is responsible for loading, accessing, and persisting
8key-value configuration settings from `.env` files.
10Key features include:
11 * **Atomic Writes:** Changes are written to a temporary file before being
12 atomically moved into place to prevent data corruption.
13 * **Cross-Process Safety:** On POSIX systems, `fcntl.flock` is used with
14 retries to handle concurrent access from multiple CLI processes.
15 * **Key Normalization:** Configuration keys are handled case-insensitively
16 and the `BIJUXCLI_` prefix is optional.
17 * **Security Checks:** Includes validation to prevent operating on device
18 files or traversing symbolic link loops.
19"""
21from __future__ import annotations
23import codecs
24import fcntl
25import json
26import os
27from pathlib import Path
28from tempfile import NamedTemporaryFile
29import time
30from typing import Any
32from injector import inject
34from bijux_cli.contracts import ConfigProtocol, ObservabilityProtocol
35from bijux_cli.core.exceptions import CommandError
36from bijux_cli.core.paths import CONFIG_FILE
38yaml: Any
39try:
40 import yaml
41except ImportError: # pragma: no cover
42 yaml = None
45def _escape(raw: str) -> str:
46 """Escapes special characters in a string for safe storage in a .env file.
48 Args:
49 raw (str): The raw string to escape.
51 Returns:
52 str: The escaped string.
53 """
54 return (
55 raw.replace("\\", "\\\\")
56 .replace("\n", "\\n")
57 .replace("\r", "\\r")
58 .replace('"', '\\"')
59 )
62def _unescape(raw: str) -> str:
63 """Reverses the escaping process for strings read from a .env file.
65 Args:
66 raw (str): The escaped string to unescape.
68 Returns:
69 str: The unescaped, original string.
71 Raises:
72 ValueError: If the input string contains an invalid escape sequence.
73 """
74 try:
75 return codecs.decode(raw, "unicode_escape")
76 except UnicodeDecodeError as err:
77 raise ValueError(f"Invalid escaped string: {raw}") from err
80def _detect_symlink_loop(path: Path, max_depth: int = 10) -> None:
81 """Detects symbolic link loops in a given path to prevent infinite recursion.
83 Args:
84 path (Path): The path to check.
85 max_depth (int): The maximum number of symbolic links to follow.
87 Raises:
88 CommandError: If a loop is detected or the traversal depth exceeds `max_depth`.
89 """
90 seen: set[Path] = set()
91 curr = path
92 for _ in range(max_depth):
93 if not curr.is_symlink():
94 return
95 try:
96 raw = os.readlink(curr)
97 except OSError as exc:
98 raise CommandError(
99 f"Symlink loop detected: {curr}", http_status=400
100 ) from exc
101 target = Path(raw)
102 if not target.is_absolute():
103 target = curr.parent / target
104 if target in seen:
105 raise CommandError(f"Symlink loop detected: {curr}", http_status=400)
106 seen.add(target)
107 curr = target
108 raise CommandError(f"Symlink depth exceeded: {path}", http_status=400)
111class Config(ConfigProtocol):
112 """A robust configuration handler for `.env` files.
114 This service manages loading, saving, and persisting configuration values,
115 featuring atomic writes and key normalization. Keys are stored internally
116 in lowercase and without the `BIJUXCLI_` prefix.
118 Attributes:
119 _di (Any): The dependency injection container.
120 _log (ObservabilityProtocol): The logging service.
121 _data (dict[str, str]): The in-memory dictionary of configuration data.
122 _path (Path | None): The path to the configuration file being managed.
123 """
125 @inject
126 def __init__(self, dependency_injector: Any) -> None:
127 """Initializes the Config service and attempts to autoload configuration.
129 Args:
130 dependency_injector (Any): The DI container for resolving dependencies.
131 """
132 self._di = dependency_injector
133 self._log: ObservabilityProtocol = dependency_injector.resolve(
134 ObservabilityProtocol
135 )
136 self._data: dict[str, str] = {}
137 self._path: Path | None = None
138 try:
139 self.load()
140 except FileNotFoundError:
141 pass
142 except CommandError as e:
143 self._log.log(
144 "error", f"Auto-load of config failed during init: {e}", extra={}
145 )
147 def load(self, path: str | Path | None = None) -> None:
148 """Loads configuration from a `.env` file.
150 This method reads a specified `.env` file, parsing `KEY=VALUE` pairs.
151 It handles comments, validates syntax, and normalizes keys. If no path
152 is given, it uses the default path from `.env` or environment.
154 Args:
155 path (str | Path | None): Path to the `.env` file. If None, uses
156 the default path from the environment or project structure.
158 Raises:
159 FileNotFoundError: If a specified config file does not exist.
160 ValueError: If a line is malformed or contains non-ASCII characters.
161 CommandError: If the file is binary or another parsing error occurs.
162 """
163 import_path = Path(path) if path is not None else None
164 current_path = Path(os.getenv("BIJUXCLI_CONFIG", str(CONFIG_FILE)))
165 self._validate_config_path(current_path)
166 if import_path:
167 self._validate_config_path(import_path)
168 read_path = import_path or current_path
169 _detect_symlink_loop(read_path)
170 if not read_path.exists():
171 if import_path is not None:
172 raise FileNotFoundError(f"Config file not found: {read_path}")
173 self._data = {}
174 return
175 new_data = {}
176 try:
177 content = read_path.read_text(encoding="utf-8")
178 for i, line in enumerate(content.splitlines()):
179 stripped = line.strip()
180 if not stripped or stripped.startswith("#"):
181 continue
182 if "=" not in line:
183 raise ValueError(f"Malformed line {i + 1}: {line}")
184 key_part, val_part = line.split("=", 1)
185 key = key_part.strip()
186 value = _unescape(val_part)
187 if len(value) >= 2 and value[0] == '"' and value[-1] == '"':
188 value = value[1:-1]
189 if not all(ord(c) < 128 for c in key + value):
190 raise ValueError(f"Non-ASCII characters in line {i + 1}: {line}")
191 normalized_key = key.strip().removeprefix("BIJUXCLI_").lower()
192 new_data[normalized_key] = value
193 except UnicodeDecodeError as exc:
194 self._log.log(
195 "error",
196 f"Failed to parse config file {read_path}: Binary or non-text content",
197 extra={"path": str(read_path)},
198 )
199 raise CommandError(
200 f"Failed to parse config file {read_path}: Binary or non-text content",
201 http_status=400,
202 ) from exc
203 except Exception as exc:
204 self._log.log(
205 "error",
206 f"Failed to parse config file {read_path}: {exc}",
207 extra={"path": str(read_path)},
208 )
209 raise CommandError(
210 f"Failed to parse config file {read_path}: {exc}", http_status=400
211 ) from exc
212 self._data = new_data
213 if import_path is not None and import_path != current_path:
214 self._path = current_path
215 self.set_many(new_data)
216 else:
217 self._path = read_path
218 self._log.log(
219 "info",
220 f"Loaded config from {read_path} (active: {self._path})",
221 extra={"src": str(read_path), "active": str(self._path)},
222 )
224 def set_many(self, items: dict[str, Any]) -> None:
225 """Sets multiple key-value pairs and persists them to the config file.
227 Args:
228 items (dict[str, Any]): A dictionary of key-value pairs to set.
229 """
230 self._data = {k: str(v) for k, v in items.items()}
231 if not self._path:
232 self._path = Path(os.getenv("BIJUXCLI_CONFIG", str(CONFIG_FILE)))
233 self._validate_config_path(self._path)
234 _detect_symlink_loop(self._path)
235 self._path.parent.mkdir(parents=True, exist_ok=True)
236 tmp_path = self._path.with_suffix(".tmp")
237 retry = 40
238 while retry > 0:
239 try:
240 with open(tmp_path, "w", encoding="utf-8", newline="") as temp_file:
241 fd = temp_file.fileno()
242 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
243 for k, v in self._data.items():
244 safe_v = _escape(str(v))
245 temp_file.write(f"BIJUXCLI_{k.upper()}={safe_v}\n")
246 temp_file.flush()
247 os.fsync(fd)
248 fcntl.flock(fd, fcntl.LOCK_UN)
249 tmp_path.replace(self._path)
250 self._log.log(
251 "info",
252 f"Persisted config to {self._path}",
253 extra={"path": str(self._path)},
254 )
255 return
256 except BlockingIOError:
257 retry -= 1
258 time.sleep(0.05)
259 except Exception as exc:
260 if tmp_path.exists():
261 tmp_path.unlink()
262 self._log.log(
263 "error",
264 f"Failed to persist config to {self._path}: {exc}",
265 extra={"path": str(self._path)},
266 )
267 raise CommandError(
268 f"Failed to persist config to {self._path}: {exc}", http_status=500
269 ) from exc
270 if tmp_path.exists(): 270 ↛ 272line 270 didn't jump to line 272 because the condition on line 270 was always true
271 tmp_path.unlink()
272 raise CommandError(
273 f"Failed to persist config to {self._path}: File locked after retries",
274 http_status=400,
275 )
277 def all(self) -> dict[str, str]:
278 """Returns all configuration key-value pairs.
280 Returns:
281 dict[str, str]: A dictionary of all configuration data.
282 """
283 return dict(self._data)
285 def list_keys(self) -> list[str]:
286 """Returns a list of all configuration keys.
288 Returns:
289 list[str]: A list of all keys in the configuration.
290 """
291 return list(self._data.keys())
293 def clear(self) -> None:
294 """Deletes all configuration entries and removes the config file.
296 Raises:
297 CommandError: If the config file cannot be deleted due to a lock
298 or other filesystem error.
299 """
300 self._data = {}
301 if self._path and self._path.exists():
302 try:
303 retry = 40
304 while True:
305 try:
306 with open(self._path, "a+") as real:
307 fcntl.flock(real.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
308 self._path.unlink()
309 fcntl.flock(real.fileno(), fcntl.LOCK_UN)
310 break
311 except BlockingIOError as err:
312 retry -= 1
313 if retry == 0:
314 raise CommandError(
315 f"Failed to clear config file {self._path}: File locked",
316 http_status=400,
317 ) from err
318 time.sleep(0.05)
319 except Exception as exc:
320 self._log.log(
321 "error",
322 f"Failed to clear config file {self._path}: {exc}",
323 extra={"path": str(self._path)},
324 )
325 raise CommandError(
326 f"Failed to clear config file {self._path}: {exc}", http_status=500
327 ) from exc
328 self._log.log(
329 "info",
330 "Cleared config data",
331 extra={"path": str(self._path) if self._path else "None"},
332 )
334 def get(self, key: str, default: Any = None) -> Any:
335 """Retrieves a configuration value by key.
337 The key is normalized (lowercase, `BIJUXCLI_` prefix removed), and the
338 environment is checked first before consulting the in-memory store.
340 Args:
341 key (str): The key to retrieve.
342 default (Any): The value to return if the key is not found.
344 Returns:
345 Any: The value associated with the key, or the default value.
347 Raises:
348 CommandError: If the key is not found and no default is provided.
349 """
350 normalized_key = key.strip().removeprefix("BIJUXCLI_").lower()
351 env_key = f"BIJUXCLI_{normalized_key.upper()}"
352 if env_key in os.environ:
353 return os.environ[env_key]
354 value = self._data.get(normalized_key, default)
355 if isinstance(value, str):
356 val_lower = value.lower()
357 if val_lower in {"true", "false"}:
358 return val_lower == "true"
359 if value is default and default is None:
360 self._log.log(
361 "error", f"Config key not found: {key}", extra={"key": normalized_key}
362 )
363 raise CommandError(f"Config key not found: {key}", http_status=400)
364 self._log.log(
365 "debug",
366 f"Retrieved config key: {normalized_key}",
367 extra={"key": normalized_key, "value": str(value)},
368 )
369 return value
371 def set(self, key: str, value: Any) -> None:
372 """Sets a single configuration key-value pair and persists it.
374 Args:
375 key (str): The key to set (case-insensitive, `BIJUXCLI_` prefix optional).
376 value (Any): The value to associate with the key.
378 Returns:
379 None:
381 Raises:
382 CommandError: If the configuration cannot be persisted.
383 """
384 normalized_key = key.strip().removeprefix("BIJUXCLI_").lower()
385 self._data[normalized_key] = str(value)
386 if not self._path:
387 self._path = Path(os.getenv("BIJUXCLI_CONFIG", str(CONFIG_FILE)))
388 self._validate_config_path(self._path)
389 _detect_symlink_loop(self._path)
390 self._path.parent.mkdir(parents=True, exist_ok=True)
391 tmp_path = self._path.with_suffix(".tmp")
392 retry = 40
393 while retry > 0:
394 try:
395 with open(tmp_path, "w", encoding="utf-8", newline="") as temp_file:
396 fd = temp_file.fileno()
397 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
398 for k, v in self._data.items():
399 safe_v = _escape(str(v))
400 temp_file.write(f"BIJUXCLI_{k.upper()}={safe_v}\n")
401 temp_file.flush()
402 os.fsync(fd)
403 fcntl.flock(fd, fcntl.LOCK_UN)
404 tmp_path.replace(self._path)
405 self._log.log(
406 "info",
407 f"Persisted config to {self._path}",
408 extra={"path": str(self._path), "key": normalized_key},
409 )
410 return
411 except BlockingIOError:
412 retry -= 1
413 time.sleep(0.05)
414 except Exception as exc:
415 if tmp_path.exists():
416 tmp_path.unlink()
417 self._log.log(
418 "error",
419 f"Failed to persist config to {self._path}: {exc}",
420 extra={"path": str(self._path)},
421 )
422 raise CommandError(
423 f"Failed to persist config to {self._path}: {exc}", http_status=500
424 ) from exc
425 if tmp_path.exists(): 425 ↛ 427line 425 didn't jump to line 427 because the condition on line 425 was always true
426 tmp_path.unlink()
427 raise CommandError(
428 f"Failed to persist config to {self._path}: File locked after retries",
429 http_status=400,
430 )
432 def reload(self) -> None:
433 """Reloads configuration from the last-loaded file path.
435 Raises:
436 CommandError: If no file path has been previously loaded.
437 """
438 if self._path is None:
439 self._log.log("error", "Config.reload() called before load()", extra={})
440 raise CommandError("Config.reload() called before load()", http_status=400)
441 if not self._path.exists():
442 self._log.log(
443 "error", f"Config file missing for reload: {self._path}", extra={}
444 )
445 raise CommandError(
446 f"Config file missing for reload: {self._path}", http_status=400
447 )
448 self.load(self._path)
450 def export(self, path: str | Path, out_format: str | None = None) -> None:
451 """Exports the configuration to a file or standard output.
453 Args:
454 path (str | Path): The destination file path, or "-" for stdout.
455 out_format (str | None): The output format ('env', 'json', 'yaml').
456 If None, the format is auto-detected from the file extension.
458 Raises:
459 CommandError: If the format is unsupported or the export fails.
460 """
461 export_path = Path(path) if path != "-" else path
462 output_fmt = (
463 out_format.lower()
464 if out_format
465 else (
466 "env"
467 if path == "-" or str(path).endswith(".env")
468 else "yaml"
469 if str(path).endswith((".yaml", ".yml"))
470 else "json"
471 )
472 )
473 try:
474 if output_fmt == "env":
475 lines = [f"BIJUXCLI_{k.upper()}={v}" for k, v in self._data.items()]
476 text = "\n".join(lines) + ("\n" if lines else "")
477 elif output_fmt == "json":
478 text = (
479 json.dumps({k.upper(): v for k, v in self._data.items()}, indent=2)
480 + "\n"
481 )
482 elif output_fmt == "yaml":
483 if yaml is None:
484 raise CommandError(
485 "PyYAML not installed for YAML support", http_status=400
486 )
487 text = yaml.safe_dump(
488 {k.upper(): v for k, v in self._data.items()}, sort_keys=False
489 )
490 else:
491 raise CommandError(f"Unsupported format: {output_fmt}", http_status=400)
492 if path == "-":
493 print(text, end="")
494 self._log.log(
495 "info",
496 "Exported config to stdout",
497 extra={"format": output_fmt},
498 )
499 return
500 export_path = Path(path)
501 export_path.resolve(strict=False)
502 if not export_path.parent.exists():
503 raise CommandError(
504 f"No such file or directory: {export_path.parent}", http_status=400
505 )
506 if export_path.exists() and not os.access(export_path, os.W_OK):
507 raise PermissionError(f"Permission denied: '{export_path}'")
508 if not os.access(export_path.parent, os.W_OK):
509 raise PermissionError(f"Permission denied: '{export_path.parent}'")
510 with NamedTemporaryFile(
511 "w", delete=False, dir=export_path.parent, encoding="utf-8", newline=""
512 ) as temp_file:
513 fd = temp_file.fileno()
514 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
515 temp_file.write(text)
516 temp_file.flush()
517 os.fsync(fd)
518 fcntl.flock(fd, fcntl.LOCK_UN)
519 Path(temp_file.name).replace(export_path)
520 self._log.log(
521 "info",
522 f"Exported config to {export_path}",
523 extra={"path": str(export_path), "format": output_fmt},
524 )
525 except BlockingIOError as exc:
526 self._log.log(
527 "error",
528 f"Failed to export config to {export_path}: File locked",
529 extra={"path": str(export_path), "format": output_fmt},
530 )
531 raise CommandError(
532 f"Failed to export config to {export_path}: File locked",
533 http_status=400,
534 ) from exc
535 except (OSError, PermissionError, ValueError) as exc:
536 self._log.log(
537 "error",
538 f"Failed to export config to {export_path}: {exc}",
539 extra={"path": str(export_path), "format": output_fmt},
540 )
541 raise CommandError(
542 f"Failed to export config to {export_path}: {exc}", http_status=400
543 ) from exc
545 def delete(self, key: str) -> None:
546 """Deletes a configuration key and persists the change.
548 Args:
549 key (str): The key to delete (case-insensitive, `BIJUXCLI_` prefix optional).
551 Raises:
552 CommandError: If the key does not exist or the change cannot be persisted.
553 """
554 normalized_key = key.strip().removeprefix("BIJUXCLI_").lower()
555 if normalized_key not in self._data:
556 self._log.log(
557 "error", f"Config key not found: {key}", extra={"key": normalized_key}
558 )
559 raise CommandError(f"Config key not found: {key}", http_status=400)
560 del self._data[normalized_key]
561 if not self._path:
562 self._path = Path(os.getenv("BIJUXCLI_CONFIG", str(CONFIG_FILE)))
563 self._validate_config_path(self._path)
564 _detect_symlink_loop(self._path)
565 self._path.parent.mkdir(parents=True, exist_ok=True)
566 tmp_path = self._path.with_suffix(".tmp")
567 retry = 40
568 while retry > 0:
569 try:
570 with open(tmp_path, "w", encoding="utf-8", newline="") as temp_file:
571 fd = temp_file.fileno()
572 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
573 for k, v in self._data.items():
574 safe_v = _escape(str(v))
575 temp_file.write(f"BIJUXCLI_{k.upper()}={safe_v}\n")
576 temp_file.flush()
577 os.fsync(fd)
578 fcntl.flock(fd, fcntl.LOCK_UN)
579 tmp_path.replace(self._path)
580 self._log.log(
581 "info",
582 f"Deleted config key and persisted to {self._path}",
583 extra={"path": str(self._path), "key": normalized_key},
584 )
585 return
586 except BlockingIOError:
587 retry -= 1
588 time.sleep(0.05)
589 except Exception as exc:
590 if tmp_path.exists(): 590 ↛ 592line 590 didn't jump to line 592 because the condition on line 590 was always true
591 tmp_path.unlink()
592 self._log.log(
593 "error",
594 f"Failed to persist config after deleting {normalized_key}: {exc}",
595 extra={"path": str(self._path), "key": normalized_key},
596 )
597 raise CommandError(
598 f"Failed to persist config after deleting {normalized_key}: {exc}",
599 http_status=500,
600 ) from exc
601 if tmp_path.exists(): 601 ↛ 603line 601 didn't jump to line 603 because the condition on line 601 was always true
602 tmp_path.unlink()
603 raise CommandError(
604 f"Failed to persist config to {self._path}: File locked after retries",
605 http_status=400,
606 )
608 def unset(self, key: str) -> None:
609 """Removes a configuration key (alias for `delete`).
611 Args:
612 key (str): The key to remove.
613 """
614 self.delete(key)
616 def save(self) -> None:
617 """Persists the current in-memory configuration to its source file."""
618 if not self._path:
619 self._path = Path(os.getenv("BIJUXCLI_CONFIG", str(CONFIG_FILE)))
620 self._validate_config_path(self._path)
621 try:
622 self.set_many(self._data)
623 except Exception as exc:
624 self._log.log(
625 "error",
626 f"Failed to save config to {self._path}: {exc}",
627 extra={"path": str(self._path)},
628 )
629 raise CommandError(
630 f"Failed to save config to {self._path}: {exc}", http_status=500
631 ) from exc
633 @staticmethod
634 def _validate_config_path(path: Path) -> None:
635 """Prevents using device files or other unsafe paths as a config file.
637 Args:
638 path (Path): The path to validate.
640 Raises:
641 CommandError: If the path is determined to be unsafe.
642 """
643 pstr = path.as_posix()
644 if (
645 pstr.startswith("/dev/")
646 or pstr == "/dev/null"
647 or pstr.startswith("\\\\.\\")
648 ):
649 raise CommandError(
650 f"Invalid config path: {path} is a device file or not allowed"
651 )
653 @staticmethod
654 def _preflight_write(path: Path) -> None:
655 """Performs pre-flight checks before a write operation.
657 Fails fast if the path has a symlink loop or the file is locked.
659 Args:
660 path (Path): The path to check.
662 Raises:
663 CommandError: If the path is invalid or the file is locked.
664 """
665 _detect_symlink_loop(path)
666 if path.exists():
667 try:
668 with open(path, "a+") as f:
669 fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
670 fcntl.flock(f.fileno(), fcntl.LOCK_UN)
671 except BlockingIOError as exc:
672 raise CommandError(
673 f"Failed to persist config to {path}: File locked", http_status=400
674 ) from exc
677__all__ = ["Config"]