Source code for credproxy.substitutions

#  SPDX-License-Identifier: MPL-2.0
#  Copyright 2025-present John Mille <john@ews-network.net>

from __future__ import annotations

import os
import re
from typing import Any
from pathlib import Path

from credproxy.settings import FROM_ENV_TAG, FROM_FILE_TAG, TAG_SEPARATOR


# Substitution tags are now imported from credproxy.settings

# Build regex pattern dynamically based on configurable tags
VARIABLE_PATTERN = re.compile(
    rf"\$\{{({FROM_ENV_TAG}|{FROM_FILE_TAG}){re.escape(TAG_SEPARATOR)}([^}}]+)\}}"
)


[docs] def substitute_variables(value: Any) -> Any: """ Substitute variables in configuration values. Recursively. Supports: - ${{FROM_ENV_TAG}}{TAG_SEPARATOR}VAR_NAME - environment variable - ${{FROM_FILE_TAG}}{TAG_SEPARATOR}/path/to/file - file contents Default syntax: - ${fromEnv:VAR_NAME} - environment variable - ${fromFile:/path/to/file} - file contents Environment variables to customize: - FROM_ENV_TAG: Change "fromEnv" tag (default: "fromEnv") - FROM_FILE_TAG: Change "fromFile" tag (default: "fromFile") - TAG_SEPARATOR: Change ":" separator (default: ":") Args: value: The value to substitute variables in Returns: The value with variables substituted """ if isinstance(value, str): return _substitute_string(value) elif isinstance(value, dict): return {key: substitute_variables(val) for key, val in value.items()} elif isinstance(value, list): return [substitute_variables(item) for item in value] else: return value
def _substitute_string(value: str, depth: int = 0, max_depth: int = 10) -> str: """Substitute variables in a string with recursion depth limit. Args: value: The string value to substitute depth: Current recursion depth (default: 0) max_depth: Maximum allowed recursion depth (default: 10) Returns: String with variables substituted Raises: ValueError: If maximum recursion depth is exceeded """ if depth >= max_depth: raise ValueError( f"Maximum substitution depth ({max_depth}) exceeded. " f"Check for circular references in configuration." ) def replace_match(match): var_type, var_value = match.groups() if var_type == FROM_ENV_TAG: substituted = _substitute_env(var_value) elif var_type == FROM_FILE_TAG: substituted = _substitute_file(var_value) else: raise ValueError(f"Unknown variable type: {var_type}") # Recursively substitute variables in the result with incremented depth return _substitute_string(substituted, depth + 1, max_depth) return VARIABLE_PATTERN.sub(replace_match, value) def _substitute_env(var_name: str) -> str: """Substitute environment variable.""" env_value = os.getenv(var_name) if env_value is None: raise ValueError(f"Environment variable '{var_name}' not found") return env_value def _substitute_file(file_path: str) -> str: """Substitute file contents.""" path = Path(file_path) if not path.exists(): raise ValueError(f"File '{file_path}' not found") try: content = path.read_text() # Check if content is effectively a single line with just trailing newline if content.endswith("\n"): # Remove trailing newline and check if there are any other newlines content_without_trailing_newline = content[:-1] if "\n" not in content_without_trailing_newline: # Single line with trailing newline - remove the trailing newline return content_without_trailing_newline else: # Multiple lines - preserve all newlines including the trailing one return content else: # No trailing newline - return as-is return content except Exception as error: raise ValueError(f"Error reading file '{file_path}': {error}")