Coverage for / home / runner / work / bijux-cli / bijux-cli / src / bijux_cli / services / config / __init__.py: 99%
322 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-26 17:59 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-26 17:59 +0000
1# SPDX-License-Identifier: Apache-2.0
2# Copyright © 2025 Bijan Mousavi
4"""Provides a robust, file-based configuration management service.
6This module defines the `Config` class, a concrete implementation of the
7`ConfigProtocol`. It is responsible for loading, accessing, and persisting
8key-value configuration settings from `.env` files.
10Key features include:
11 * **Atomic Writes:** Changes are written to a temporary file before being
12 atomically moved into place to prevent data corruption.
13 * **Cross-Process Safety:** On POSIX systems, `fcntl.flock` is used with
14 retries to handle concurrent access from multiple CLI processes.
15 * **Key Normalization:** Configuration keys are handled case-insensitively
16 and the `BIJUXCLI_` prefix is optional.
17 * **Security Checks:** Includes validation to prevent operating on device
18 files or traversing symbolic link loops.
19"""
21from __future__ import annotations
23import codecs
24import fcntl
25import json
26import os
27from pathlib import Path
28from tempfile import NamedTemporaryFile
29import time
30from typing import Any
32from injector import inject
34from bijux_cli.core.errors import ConfigError
35from bijux_cli.infra.paths import CONFIG_FILE
36from bijux_cli.services.config.contracts import ConfigProtocol
37from bijux_cli.services.contracts import ObservabilityProtocol
39yaml: Any
40try:
41 import yaml
42except ImportError: # pragma: no cover
43 yaml = None
46def _escape(raw: str) -> str:
47 """Escapes special characters in a string for safe storage in a .env file.
49 Args:
50 raw (str): The raw string to escape.
52 Returns:
53 str: The escaped string.
54 """
55 return (
56 raw.replace("\\", "\\\\")
57 .replace("\n", "\\n")
58 .replace("\r", "\\r")
59 .replace('"', '\\"')
60 )
63def _unescape(raw: str) -> str:
64 """Reverses the escaping process for strings read from a .env file.
66 Args:
67 raw (str): The escaped string to unescape.
69 Returns:
70 str: The unescaped, original string.
72 Raises:
73 ValueError: If the input string contains an invalid escape sequence.
74 """
75 try:
76 return codecs.decode(raw, "unicode_escape")
77 except UnicodeDecodeError as err:
78 raise ValueError(f"Invalid escaped string: {raw}") from err
81def _detect_symlink_loop(path: Path, max_depth: int = 10) -> None:
82 """Detects symbolic link loops in a given path to prevent infinite recursion.
84 Args:
85 path (Path): The path to check.
86 max_depth (int): The maximum number of symbolic links to follow.
88 Raises:
89 ConfigError: If a loop is detected or the traversal depth exceeds `max_depth`.
90 """
91 seen: set[Path] = set()
92 curr = path
93 for _ in range(max_depth):
94 if not curr.is_symlink():
95 return
96 try:
97 raw = os.readlink(curr)
98 except OSError as exc:
99 raise ConfigError(
100 f"Symlink loop detected: {curr}", http_status=400
101 ) from exc
102 target = Path(raw)
103 if not target.is_absolute():
104 target = curr.parent / target
105 if target in seen:
106 raise ConfigError(f"Symlink loop detected: {curr}", http_status=400)
107 seen.add(target)
108 curr = target
109 raise ConfigError(f"Symlink depth exceeded: {path}", http_status=400)
112class Config(ConfigProtocol):
113 """A robust configuration handler for `.env` files.
115 This service manages loading, saving, and persisting configuration values,
116 featuring atomic writes and key normalization. Keys are stored internally
117 in lowercase and without the `BIJUXCLI_` prefix.
119 Attributes:
120 _di (Any): The dependency injection container.
121 _log (ObservabilityProtocol): The logging service.
122 _data (dict[str, str]): The in-memory dictionary of configuration data.
123 _path (Path | None): The path to the configuration file being managed.
124 """
126 @inject
127 def __init__(self, dependency_injector: Any) -> None:
128 """Initializes the Config service and attempts to autoload configuration.
130 Args:
131 dependency_injector (Any): The DI container for resolving dependencies.
132 """
133 self._di = dependency_injector
134 self._log: ObservabilityProtocol = dependency_injector.resolve(
135 ObservabilityProtocol
136 )
137 self._data: dict[str, str] = {}
138 self._path: Path | None = None
139 try:
140 self.load()
141 except FileNotFoundError:
142 pass
143 except ConfigError as e:
144 self._log.log(
145 "error", f"Auto-load of config failed during init: {e}", extra={}
146 )
148 def load(self, path: str | Path | None = None) -> None:
149 """Loads configuration from a `.env` file.
151 This method reads a specified `.env` file, parsing `KEY=VALUE` pairs.
152 It handles comments, validates syntax, and normalizes keys. If no path
153 is given, it uses the default path from `.env` or environment.
155 Args:
156 path (str | Path | None): Path to the `.env` file. If None, uses
157 the default path from the environment or project structure.
159 Raises:
160 FileNotFoundError: If a specified config file does not exist.
161 ValueError: If a line is malformed or contains non-ASCII characters.
162 ConfigError: If the file is binary or another parsing error occurs.
163 """
164 import_path = Path(path) if path is not None else None
165 current_path = Path(os.getenv("BIJUXCLI_CONFIG", str(CONFIG_FILE)))
166 self._validate_config_path(current_path)
167 if import_path:
168 self._validate_config_path(import_path)
169 read_path = import_path or current_path
170 _detect_symlink_loop(read_path)
171 if not read_path.exists():
172 if import_path is not None:
173 raise FileNotFoundError(f"Config file not found: {read_path}")
174 self._data = {}
175 return
176 new_data = {}
177 try:
178 content = read_path.read_text(encoding="utf-8")
179 for i, line in enumerate(content.splitlines()):
180 stripped = line.strip()
181 if not stripped or stripped.startswith("#"):
182 continue
183 if "=" not in line:
184 raise ValueError(f"Malformed line {i + 1}: {line}")
185 key_part, val_part = line.split("=", 1)
186 key = key_part.strip()
187 value = _unescape(val_part)
188 if len(value) >= 2 and value[0] == '"' and value[-1] == '"':
189 value = value[1:-1]
190 if not all(ord(c) < 128 for c in key + value):
191 raise ValueError(f"Non-ASCII characters in line {i + 1}: {line}")
192 normalized_key = key.strip().removeprefix("BIJUXCLI_").lower()
193 new_data[normalized_key] = value
194 except UnicodeDecodeError as exc:
195 self._log.log(
196 "error",
197 f"Failed to parse config file {read_path}: Binary or non-text content",
198 extra={"path": str(read_path)},
199 )
200 raise ConfigError(
201 f"Failed to parse config file {read_path}: Binary or non-text content",
202 http_status=400,
203 ) from exc
204 except Exception as exc:
205 self._log.log(
206 "error",
207 f"Failed to parse config file {read_path}: {exc}",
208 extra={"path": str(read_path)},
209 )
210 raise ConfigError(
211 f"Failed to parse config file {read_path}: {exc}", http_status=400
212 ) from exc
213 self._data = new_data
214 if import_path is not None and import_path != current_path:
215 self._path = current_path
216 self.set_many(new_data)
217 else:
218 self._path = read_path
219 self._log.log(
220 "info",
221 f"Loaded config from {read_path} (active: {self._path})",
222 extra={"src": str(read_path), "active": str(self._path)},
223 )
225 def set_many(self, items: dict[str, Any]) -> None:
226 """Sets multiple key-value pairs and persists them to the config file.
228 Args:
229 items (dict[str, Any]): A dictionary of key-value pairs to set.
230 """
231 self._data = {k: str(v) for k, v in items.items()}
232 if not self._path:
233 self._path = Path(os.getenv("BIJUXCLI_CONFIG", str(CONFIG_FILE)))
234 self._validate_config_path(self._path)
235 _detect_symlink_loop(self._path)
236 self._path.parent.mkdir(parents=True, exist_ok=True)
237 tmp_path = self._path.with_suffix(".tmp")
238 tmp_path.parent.mkdir(parents=True, exist_ok=True)
239 retry = 40
240 while retry > 0:
241 try:
242 with open(tmp_path, "w", encoding="utf-8", newline="") as temp_file:
243 fd = temp_file.fileno()
244 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
245 for k, v in self._data.items():
246 safe_v = _escape(str(v))
247 temp_file.write(f"BIJUXCLI_{k.upper()}={safe_v}\n")
248 temp_file.flush()
249 os.fsync(fd)
250 fcntl.flock(fd, fcntl.LOCK_UN)
251 tmp_path.replace(self._path)
252 self._log.log(
253 "info",
254 f"Persisted config to {self._path}",
255 extra={"path": str(self._path)},
256 )
257 return
258 except BlockingIOError:
259 retry -= 1
260 time.sleep(0.05)
261 except Exception as exc:
262 if tmp_path.exists():
263 tmp_path.unlink()
264 self._log.log(
265 "error",
266 f"Failed to persist config to {self._path}: {exc}",
267 extra={"path": str(self._path)},
268 )
269 raise ConfigError(
270 f"Failed to persist config to {self._path}: {exc}", http_status=500
271 ) from exc
272 if tmp_path.exists(): 272 ↛ 274line 272 didn't jump to line 274 because the condition on line 272 was always true
273 tmp_path.unlink()
274 raise ConfigError(
275 f"Failed to persist config to {self._path}: File locked after retries",
276 http_status=400,
277 )
279 def all(self) -> dict[str, str]:
280 """Returns all configuration key-value pairs.
282 Returns:
283 dict[str, str]: A dictionary of all configuration data.
284 """
285 return dict(self._data)
287 def list_keys(self) -> list[str]:
288 """Returns a list of all configuration keys.
290 Returns:
291 list[str]: A list of all keys in the configuration.
292 """
293 return list(self._data.keys())
295 def clear(self) -> None:
296 """Deletes all configuration entries and removes the config file.
298 Raises:
299 ConfigError: If the config file cannot be deleted due to a lock
300 or other filesystem error.
301 """
302 self._data = {}
303 if self._path and self._path.exists():
304 try:
305 retry = 40
306 while True:
307 try:
308 with open(self._path, "a+") as real:
309 fcntl.flock(real.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
310 self._path.unlink()
311 fcntl.flock(real.fileno(), fcntl.LOCK_UN)
312 break
313 except BlockingIOError as err:
314 retry -= 1
315 if retry == 0:
316 raise ConfigError(
317 f"Failed to clear config file {self._path}: File locked",
318 http_status=400,
319 ) from err
320 time.sleep(0.05)
321 except Exception as exc:
322 self._log.log(
323 "error",
324 f"Failed to clear config file {self._path}: {exc}",
325 extra={"path": str(self._path)},
326 )
327 raise ConfigError(
328 f"Failed to clear config file {self._path}: {exc}", http_status=500
329 ) from exc
330 self._log.log(
331 "info",
332 "Cleared config data",
333 extra={"path": str(self._path) if self._path else "None"},
334 )
336 def get(self, key: str, default: Any = None) -> Any:
337 """Retrieves a configuration value by key.
339 The key is normalized (lowercase, `BIJUXCLI_` prefix removed), and the
340 environment is checked first before consulting the in-memory store.
342 Args:
343 key (str): The key to retrieve.
344 default (Any): The value to return if the key is not found.
346 Returns:
347 Any: The value associated with the key, or the default value.
349 Raises:
350 ConfigError: If the key is not found and no default is provided.
351 """
352 normalized_key = key.strip().removeprefix("BIJUXCLI_").lower()
353 env_key = f"BIJUXCLI_{normalized_key.upper()}"
354 if env_key in os.environ:
355 return os.environ[env_key]
356 value = self._data.get(normalized_key, default)
357 if isinstance(value, str):
358 val_lower = value.lower()
359 if val_lower in {"true", "false"}:
360 return val_lower == "true"
361 if value is default and default is None:
362 self._log.log(
363 "error", f"Config key not found: {key}", extra={"key": normalized_key}
364 )
365 raise ConfigError(f"Config key not found: {key}", http_status=400)
366 self._log.log(
367 "debug",
368 f"Retrieved config key: {normalized_key}",
369 extra={"key": normalized_key, "value": str(value)},
370 )
371 return value
373 def set(self, key: str, value: Any) -> None:
374 """Sets a single configuration key-value pair and persists it.
376 Args:
377 key (str): The key to set (case-insensitive, `BIJUXCLI_` prefix optional).
378 value (Any): The value to associate with the key.
380 Returns:
381 None:
383 Raises:
384 ConfigError: If the configuration cannot be persisted.
385 """
386 normalized_key = key.strip().removeprefix("BIJUXCLI_").lower()
387 self._data[normalized_key] = str(value)
388 if not self._path:
389 self._path = Path(os.getenv("BIJUXCLI_CONFIG", str(CONFIG_FILE)))
390 self._validate_config_path(self._path)
391 _detect_symlink_loop(self._path)
392 self._path.parent.mkdir(parents=True, exist_ok=True)
393 tmp_path = self._path.with_suffix(".tmp")
394 retry = 40
395 while retry > 0:
396 try:
397 with open(tmp_path, "w", encoding="utf-8", newline="") as temp_file:
398 fd = temp_file.fileno()
399 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
400 for k, v in self._data.items():
401 safe_v = _escape(str(v))
402 temp_file.write(f"BIJUXCLI_{k.upper()}={safe_v}\n")
403 temp_file.flush()
404 os.fsync(fd)
405 fcntl.flock(fd, fcntl.LOCK_UN)
406 tmp_path.replace(self._path)
407 self._log.log(
408 "info",
409 f"Persisted config to {self._path}",
410 extra={"path": str(self._path), "key": normalized_key},
411 )
412 return
413 except BlockingIOError:
414 retry -= 1
415 time.sleep(0.05)
416 except Exception as exc:
417 if tmp_path.exists():
418 tmp_path.unlink()
419 self._log.log(
420 "error",
421 f"Failed to persist config to {self._path}: {exc}",
422 extra={"path": str(self._path)},
423 )
424 raise ConfigError(
425 f"Failed to persist config to {self._path}: {exc}", http_status=500
426 ) from exc
427 if tmp_path.exists(): 427 ↛ 429line 427 didn't jump to line 429 because the condition on line 427 was always true
428 tmp_path.unlink()
429 raise ConfigError(
430 f"Failed to persist config to {self._path}: File locked after retries",
431 http_status=400,
432 )
434 def reload(self) -> None:
435 """Reloads configuration from the last-loaded file path.
437 Raises:
438 ConfigError: If no file path has been previously loaded.
439 """
440 if self._path is None:
441 self._log.log("error", "Config.reload() called before load()", extra={})
442 raise ConfigError("Config.reload() called before load()", http_status=400)
443 if not self._path.exists():
444 self._log.log(
445 "error", f"Config file missing for reload: {self._path}", extra={}
446 )
447 raise ConfigError(
448 f"Config file missing for reload: {self._path}", http_status=400
449 )
450 self.load(self._path)
452 def export(self, path: str | Path, out_format: str | None = None) -> None:
453 """Exports the configuration to a file or standard output.
455 Args:
456 path (str | Path): The destination file path, or "-" for stdout.
457 out_format (str | None): The output format ('env', 'json', 'yaml').
458 If None, the format is auto-detected from the file extension.
460 Raises:
461 ConfigError: If the format is unsupported or the export fails.
462 """
463 export_path = Path(path) if path != "-" else path
464 output_fmt = (
465 out_format.lower()
466 if out_format
467 else (
468 "env"
469 if path == "-" or str(path).endswith(".env")
470 else "yaml"
471 if str(path).endswith((".yaml", ".yml"))
472 else "json"
473 )
474 )
475 try:
476 if output_fmt == "env":
477 lines = [f"BIJUXCLI_{k.upper()}={v}" for k, v in self._data.items()]
478 text = "\n".join(lines) + ("\n" if lines else "")
479 elif output_fmt == "json":
480 text = (
481 json.dumps({k.upper(): v for k, v in self._data.items()}, indent=2)
482 + "\n"
483 )
484 elif output_fmt == "yaml":
485 if yaml is None:
486 raise ConfigError(
487 "PyYAML not installed for YAML support", http_status=400
488 )
489 text = yaml.safe_dump(
490 {k.upper(): v for k, v in self._data.items()}, sort_keys=False
491 )
492 else:
493 raise ConfigError(f"Unsupported format: {output_fmt}", http_status=400)
494 if path == "-":
495 print(text, end="")
496 self._log.log(
497 "info",
498 "Exported config to stdout",
499 extra={"format": output_fmt},
500 )
501 return
502 export_path = Path(path)
503 export_path.resolve(strict=False)
504 if not export_path.parent.exists():
505 raise ConfigError(
506 f"No such file or directory: {export_path.parent}", http_status=400
507 )
508 if export_path.exists() and not os.access(export_path, os.W_OK):
509 raise PermissionError(f"Permission denied: '{export_path}'")
510 if not os.access(export_path.parent, os.W_OK):
511 raise PermissionError(f"Permission denied: '{export_path.parent}'")
512 with NamedTemporaryFile(
513 "w", delete=False, dir=export_path.parent, encoding="utf-8", newline=""
514 ) as temp_file:
515 fd = temp_file.fileno()
516 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
517 temp_file.write(text)
518 temp_file.flush()
519 os.fsync(fd)
520 fcntl.flock(fd, fcntl.LOCK_UN)
521 Path(temp_file.name).replace(export_path)
522 self._log.log(
523 "info",
524 f"Exported config to {export_path}",
525 extra={"path": str(export_path), "format": output_fmt},
526 )
527 except BlockingIOError as exc:
528 self._log.log(
529 "error",
530 f"Failed to export config to {export_path}: File locked",
531 extra={"path": str(export_path), "format": output_fmt},
532 )
533 raise ConfigError(
534 f"Failed to export config to {export_path}: File locked",
535 http_status=400,
536 ) from exc
537 except (OSError, PermissionError, ValueError) as exc:
538 self._log.log(
539 "error",
540 f"Failed to export config to {export_path}: {exc}",
541 extra={"path": str(export_path), "format": output_fmt},
542 )
543 raise ConfigError(
544 f"Failed to export config to {export_path}: {exc}", http_status=400
545 ) from exc
547 def delete(self, key: str) -> None:
548 """Deletes a configuration key and persists the change.
550 Args:
551 key (str): The key to delete (case-insensitive, `BIJUXCLI_` prefix optional).
553 Raises:
554 ConfigError: If the key does not exist or the change cannot be persisted.
555 """
556 normalized_key = key.strip().removeprefix("BIJUXCLI_").lower()
557 if normalized_key not in self._data:
558 self._log.log(
559 "error", f"Config key not found: {key}", extra={"key": normalized_key}
560 )
561 raise ConfigError(f"Config key not found: {key}", http_status=400)
562 del self._data[normalized_key]
563 if not self._path:
564 self._path = Path(os.getenv("BIJUXCLI_CONFIG", str(CONFIG_FILE)))
565 self._validate_config_path(self._path)
566 _detect_symlink_loop(self._path)
567 self._path.parent.mkdir(parents=True, exist_ok=True)
568 tmp_path = self._path.with_suffix(".tmp")
569 retry = 40
570 while retry > 0:
571 try:
572 with open(tmp_path, "w", encoding="utf-8", newline="") as temp_file:
573 fd = temp_file.fileno()
574 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
575 for k, v in self._data.items():
576 safe_v = _escape(str(v))
577 temp_file.write(f"BIJUXCLI_{k.upper()}={safe_v}\n")
578 temp_file.flush()
579 os.fsync(fd)
580 fcntl.flock(fd, fcntl.LOCK_UN)
581 tmp_path.replace(self._path)
582 self._log.log(
583 "info",
584 f"Deleted config key and persisted to {self._path}",
585 extra={"path": str(self._path), "key": normalized_key},
586 )
587 return
588 except BlockingIOError:
589 retry -= 1
590 time.sleep(0.05)
591 except Exception as exc:
592 if tmp_path.exists(): 592 ↛ 594line 592 didn't jump to line 594 because the condition on line 592 was always true
593 tmp_path.unlink()
594 self._log.log(
595 "error",
596 f"Failed to persist config after deleting {normalized_key}: {exc}",
597 extra={"path": str(self._path), "key": normalized_key},
598 )
599 raise ConfigError(
600 f"Failed to persist config after deleting {normalized_key}: {exc}",
601 http_status=500,
602 ) from exc
603 if tmp_path.exists(): 603 ↛ 605line 603 didn't jump to line 605 because the condition on line 603 was always true
604 tmp_path.unlink()
605 raise ConfigError(
606 f"Failed to persist config to {self._path}: File locked after retries",
607 http_status=400,
608 )
610 def unset(self, key: str) -> None:
611 """Removes a configuration key (alias for `delete`).
613 Args:
614 key (str): The key to remove.
615 """
616 self.delete(key)
618 def save(self) -> None:
619 """Persists the current in-memory configuration to its source file."""
620 if not self._path:
621 self._path = Path(os.getenv("BIJUXCLI_CONFIG", str(CONFIG_FILE)))
622 self._validate_config_path(self._path)
623 try:
624 self.set_many(self._data)
625 except Exception as exc:
626 self._log.log(
627 "error",
628 f"Failed to save config to {self._path}: {exc}",
629 extra={"path": str(self._path)},
630 )
631 raise ConfigError(
632 f"Failed to save config to {self._path}: {exc}", http_status=500
633 ) from exc
635 @staticmethod
636 def _validate_config_path(path: Path) -> None:
637 """Prevents using device files or other unsafe paths as a config file.
639 Args:
640 path (Path): The path to validate.
642 Raises:
643 ConfigError: If the path is determined to be unsafe.
644 """
645 pstr = path.as_posix()
646 if (
647 pstr.startswith("/dev/")
648 or pstr == "/dev/null"
649 or pstr.startswith("\\\\.\\")
650 ):
651 raise ConfigError(
652 f"Invalid config path: {path} is a device file or not allowed"
653 )
655 @staticmethod
656 def _preflight_write(path: Path) -> None:
657 """Performs pre-flight checks before a write operation.
659 Fails fast if the path has a symlink loop or the file is locked.
661 Args:
662 path (Path): The path to check.
664 Raises:
665 ConfigError: If the path is invalid or the file is locked.
666 """
667 _detect_symlink_loop(path)
668 if path.exists():
669 try:
670 with open(path, "a+") as f:
671 fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
672 fcntl.flock(f.fileno(), fcntl.LOCK_UN)
673 except BlockingIOError as exc:
674 raise ConfigError(
675 f"Failed to persist config to {path}: File locked", http_status=400
676 ) from exc
679__all__ = ["Config"]