# This module has been originally modified and enhanced from Red Hat Update
# Agent's config module.
#
# Copyright (c) 2010 - 2012 Red Hat, Inc.
#
# This software is licensed to you under the GNU General Public License,
# version 2 (GPLv2). There is NO WARRANTY for this software, express or
# implied, including the implied warranties of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2
# along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
#
# Red Hat trademarks are not licensed under GPLv2. No permission is
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.
#

import sys
import os
import logging
from iniparse import SafeConfigParser
from iniparse.compat import NoOptionError, InterpolationMissingOptionError, NoSectionError
import re
import tempfile
from typing import Dict, List, Optional, Tuple
from subscription_manager.i18n import ugettext as _

CONFIG_ENV_VAR = "RHSM_CONFIG"

DEFAULT_CONFIG_DIR = "/etc/rhsm/"
HOST_CONFIG_DIR = "/etc/rhsm-host/"  # symlink inside docker containers
DEFAULT_CONFIG_PATH = "%srhsm.conf" % DEFAULT_CONFIG_DIR
DEFAULT_PROXY_PORT = "3128"
DEFAULT_SERVER_TIMEOUT = "180"

# Defaults for connecting to RHSM, used to "reset" the configuration file
# if requested by the user:
DEFAULT_HOSTNAME = "subscription.rhsm.redhat.com"
DEFAULT_PORT = "443"
DEFAULT_PREFIX = "/subscription"

DEFAULT_CDN_HOSTNAME = "cdn.redhat.com"
DEFAULT_CDN_PORT = "443"
DEFAULT_CDN_PREFIX = "/"

DEFAULT_CA_CERT_DIR = "/etc/rhsm/ca/"

DEFAULT_ENT_CERT_DIR = "/etc/pki/entitlement"
HOST_ENT_CERT_DIR = "/etc/pki/entitlement-host"

SERVER_DEFAULTS = {
    "hostname": DEFAULT_HOSTNAME,
    "prefix": DEFAULT_PREFIX,
    "port": DEFAULT_PORT,
    "server_timeout": DEFAULT_SERVER_TIMEOUT,
    "insecure": "0",
    "proxy_hostname": "",
    "proxy_scheme": "http",
    "proxy_user": "",
    "proxy_port": "",
    "proxy_password": "",
    "no_proxy": "",
}
RHSM_DEFAULTS = {
    "baseurl": "https://" + DEFAULT_CDN_HOSTNAME,
    "repomd_gpg_url": "",
    "ca_cert_dir": DEFAULT_CA_CERT_DIR,
    "repo_ca_cert": "%(ca_cert_dir)sredhat-uep.pem",
    "productcertdir": "/etc/pki/product",
    "entitlementcertdir": DEFAULT_ENT_CERT_DIR,
    "consumercertdir": "/etc/pki/consumer",
    "manage_repos": "1",
    "full_refresh_on_yum": "0",
    "report_package_profile": "1",
    "plugindir": "/usr/share/rhsm-plugins",
    "pluginconfdir": "/etc/rhsm/pluginconf.d",
    "auto_enable_yum_plugins": "1",
    "package_profile_on_trans": "0",
    "inotify": "1",
    "progress_messages": "1",
}

RHSMCERTD_DEFAULTS = {
    "certcheckinterval": "240",
    "autoattachinterval": "1440",
    "splay": "1",
    "disable": "0",
    "auto_registration": "0",
    "auto_registration_interval": "60",
}

LOGGING_DEFAULTS = {
    "default_log_level": "INFO",
}

# Defaults are applied to each section in the config file.
DEFAULTS = {
    "server": SERVER_DEFAULTS,
    "rhsm": RHSM_DEFAULTS,
    "rhsmcertd": RHSMCERTD_DEFAULTS,
    "logging": LOGGING_DEFAULTS,
}


log = logging.getLogger(__name__)


def in_container() -> bool:
    """
    Are we running in a docker container or not?
    """
    # For development in containers we must be able to turn container detection
    # off
    if os.environ.get("SMDEV_CONTAINER_OFF", ""):
        return False

    def in_ocp() -> bool:
        """
        Is the system running as pod in OCP (OpenShift Container Platform)?

        Check some of the canonical environment variables set by Kubernets
        (on which OCP is based):
        https://kubernetes.io/docs/concepts/containers/container-environment/
        in particular, look for the "kubernetes" default service

        "container=oci" is generally set by Red Hat-based containers.
        """
        return (
            os.environ.get("KUBERNETES_PORT", "") != ""
            and os.environ.get("KUBERNETES_SERVICE_HOST", "") != ""
            and os.environ.get("KUBERNETES_SERVICE_PORT", "") != ""
            and os.environ.get("container", "") == "oci"
        )

    # Known locations to check for as an easy way to detect whether
    # we are running in a container; note that pods in OCP are not
    # considered containers but standalone systems
    locations: List[str] = [
        # podman:
        # https://github.com/containers/podman/issues/6192
        # https://github.com/containers/podman/issues/3586#issuecomment-661918679
        "/run/.containerenv",
        # docker:
        # https://github.com/moby/moby/issues/18355
        "/.dockerenv",
        # The host rhsm configuration was shared with us, so assume
        # we must be running in a container
        HOST_CONFIG_DIR,
    ]
    for fn in locations:
        if os.path.exists(fn):
            log.debug(f"in_container(): found '{fn}', may be a container")
            is_ocp = in_ocp()
            if is_ocp:
                log.debug("in_container(): found kubernetes/OCP environment, not considering container")
                return False
            return True
    return False


class RhsmConfigParser(SafeConfigParser):
    """Config file parser for rhsm configuration."""

    # defaults unused but kept to preserve compatibility
    def __init__(self, config_file: Optional[str] = None, defaults=None):
        self.config_file: str = config_file
        SafeConfigParser.__init__(self)
        self.read(self.config_file)

    def read(self, file_names: Optional[List[str]] = None) -> List[str]:
        """
        Read configuration files. When configuration files are not specified, then read self.config_file
        :param file_names: list of configuration files
        :return: number of configuration files read
        """
        if file_names is None:
            return super(RhsmConfigParser, self).read(self.config_file)
        else:
            return super(RhsmConfigParser, self).read(file_names)

    def save(self, config_file: Optional[str] = None) -> None:
        """Writes config file to storage."""
        # Use self.config_file if config_file is not provided in method arguments.
        config_file: str = config_file or self.config_file

        rhsm_conf_dir: str = os.path.dirname(config_file)

        # When /etc/rhsm does not exist, then try to create it
        if os.path.isdir(rhsm_conf_dir) is False:
            os.makedirs(rhsm_conf_dir)

        # Create a temporary file to write config data to it and
        # rename the file to the expected config file name after successfully
        # writing all config data.
        # Refer to BZ 1719725: https://bugzilla.redhat.com/show_bug.cgi?id=1719725
        with tempfile.NamedTemporaryFile(mode="w", dir=rhsm_conf_dir, delete=False) as fo:
            self.write(fo)
            fo.flush()
            mode: int
            try:
                mode = os.stat(config_file).st_mode
            except IOError:
                mode = 0o644
            os.rename(fo.name, config_file)
            os.chmod(config_file, mode)

    def get(self, section: str, prop: str) -> str:
        """Get a value from rhsm config.

        :param section: config file section
        :param prop: what config property to find, the config item name
        :return: The string value of the config item.

        If config item exists, but is not set, an empty string is returned.
        """
        try:
            return SafeConfigParser.get(self, section, prop)
        except InterpolationMissingOptionError:
            # if there is an interpolation error, resolve it
            raw_val: str = super(RhsmConfigParser, self).get(section, prop, True)
            interpolations: List[str] = re.findall(r"%\((.*?)\)s", raw_val)
            changed: bool = False
            for interp in interpolations:
                # Defaults aren't interpolated by default, so bake them in as necessary
                # has_option throws an exception if the section doesn't exist,
                # but at this point we know it does.
                if self.has_option(section, interp):
                    super(RhsmConfigParser, self).set(section, interp, self.get(section, interp))
                    changed = True
            if changed:
                # Now that we have the required values, we can interpolate
                return self.get(section, prop)
            # If nothing has been changed (we couldn't fix it) re-raise the exception
            raise
        except (NoOptionError, NoSectionError) as er:
            try:
                return DEFAULTS[section][prop.lower()]
            except KeyError:
                # re-raise the NoOptionError, not the key error
                raise er

    def set(self, section: str, name: str, value: str) -> None:
        try:
            # If the value doesn't exist, or isn't equal, write it
            if self.get(section, name) != value:
                raise NoOptionError
        except Exception:
            if not self.has_section(section):
                self.add_section(section)
            super(RhsmConfigParser, self).set(section, name, value)
        if section == "logging" and name == "default_log_level":
            self.is_log_level_valid(value)

    def is_log_level_valid(self, value: str, print_warning: bool = True) -> bool:
        """
        Check if provided default_log_level value is valid or not
        :param value: value of default_log_level
        :param print_warning: print warning, when provided value is not valid
        :return: True, when value is valid. Otherwise return False
        """
        valid: List[str] = ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"]
        if value not in valid + ["NOTSET"]:
            if print_warning is True:
                print(
                    _("Invalid Log Level: {lvl}, setting to INFO for this run.".format(lvl=value)),
                    file=sys.stderr,
                )
                print(
                    _(
                        "Please use:  subscription-manager config --logging.default_log_level=<Log Level> to "
                        "set the default_log_level to a valid value."
                    ),
                    file=sys.stderr,
                )
                valid_str = ", ".join(valid)
                print(_("Valid Values: {valid_str}".format(valid_str=valid_str)), file=sys.stderr)
            return False
        return True

    def get_int(self, section: str, prop: str) -> Optional[int]:
        """Get an int value from the config.

        :param section: the config section
        :param prop: the config item name
        :return:
            An int cast from the string read from.
            If config item is unset, return None.
        :raises ValueError:
            If the config value found can not be coerced into an int
        """
        value_string: str = self.get(section, prop)
        if value_string == "":
            return None
        try:
            value_int = int(value_string)
            # we could also try to handle port name
            # strings (ie, 'http') here with getservbyname
        except (ValueError, TypeError):
            raise ValueError("Section: %s, Property: %s - Integer value expected" % (section, prop))
        return value_int

    # Overriding this method to address
    # http://code.google.com/p/iniparse/issues/detail?id=9
    def defaults(self) -> Dict[str, str]:
        result: List[Tuple[str, str]] = []
        for section in DEFAULTS:
            result += [(key, value) for (key, value) in list(DEFAULTS[section].items())]
        return dict(result)

    def sections(self) -> List[str]:
        result: List[str] = super(RhsmConfigParser, self).sections()
        for section in DEFAULTS:
            if section not in result:
                result.append(section)
        return result

    def has_option(self, section: str, prop: str) -> bool:
        try:
            self.get(section, prop)
            return True
        except NoOptionError:
            return False

    def items(self, section: str) -> List[Tuple[str, str]]:
        result: Dict[str, str] = {}
        for key in DEFAULTS.get(section, {}):
            result[key] = DEFAULTS[section][key]
        if self.has_section(section):
            super_result: List[str] = super(RhsmConfigParser, self).options(section)
            for key in super_result:
                if self.get(section, key) and len(self.get(section, key).strip()) > 0:
                    result[key] = self.get(section, key)
        return list(result.items())

    def options(self, section: str) -> List[str]:
        # This is necessary because with the way we handle defaults, parser.has_section('xyz')
        # will return True if 'xyz' exists only in the defaults but parser.options('xyz')
        #  will throw an exception.
        items = set()
        for key in DEFAULTS.get(section, {}):
            items.add(key)
        if self.has_section(section):
            super_result = super(RhsmConfigParser, self).options(section)
            items.update(super_result)
        return list(items)

    def is_default(self, section: str, prop: str, value: str) -> bool:
        if self.get_default(section, prop) == value:
            return True
        return False

    def has_default(self, section: str, prop: str) -> bool:
        return section in DEFAULTS and prop.lower() in DEFAULTS[section]

    def get_default(self, section: str, prop: str) -> Optional[str]:
        if self.has_default(section, prop.lower()):
            return DEFAULTS[section][prop.lower()]
        return None


class RhsmHostConfigParser(RhsmConfigParser):
    """
    Sub-class of config parser automatically loaded when we detect that
    we're running in a container environment.

    Host config is shared with containers as /etc/rhsm-host. However the
    rhsm.conf within will still be referencing /etc/rhsm for a couple
    properties. (ca_cert_dir, repo_ca_cert)

    Instead we load config file normally, and assume to replace occurrences
    of /etc/rhsm with /etc/rhsm-host in these properties.

    A similar adjustment is necessary for /etc/pki/entitlement-host if
    present.
    """

    def __init__(self, config_file: Optional[str] = None, defaults=None) -> None:
        super().__init__(config_file, defaults)

        # Override the ca_cert_dir and repo_ca_cert if necessary:
        ca_cert_dir: str = self.get("rhsm", "ca_cert_dir")
        repo_ca_cert: str = self.get("rhsm", "repo_ca_cert")

        ca_cert_dir = ca_cert_dir.replace(DEFAULT_CONFIG_DIR, HOST_CONFIG_DIR)
        repo_ca_cert = repo_ca_cert.replace(DEFAULT_CONFIG_DIR, HOST_CONFIG_DIR)
        self.set("rhsm", "ca_cert_dir", ca_cert_dir)
        self.set("rhsm", "repo_ca_cert", repo_ca_cert)

        # Similarly if /etc/pki/entitlement-host exists, override this too.
        # If for some reason the host config is pointing to another directory
        # we leave the config setting alone, our tooling isn't going to be
        # able to handle it anyhow.
        if os.path.exists(HOST_ENT_CERT_DIR):
            ent_cert_dir = self.get("rhsm", "entitlementcertdir")
            if ent_cert_dir == DEFAULT_ENT_CERT_DIR or ent_cert_dir == DEFAULT_ENT_CERT_DIR + "/":
                ent_cert_dir = HOST_ENT_CERT_DIR
            self.set("rhsm", "entitlementcertdir", ent_cert_dir)


CFG: Optional[RhsmConfigParser] = None


def get_config_parser() -> RhsmConfigParser:
    """
    Get an :class:`RhsmConfig` instance

    Will use the first config file defined in the following list:
    - /etc/rhsm-host/rhsm.conf if it exists (only in containers)
    - /etc/rhsm/rhsm.conf
    """
    global CFG

    try:
        CFG = CFG
    except NameError:
        CFG = None

    if CFG is None:
        # Load alternate config file implementation if we detect that we're
        # running in a container.
        if in_container():
            CFG = RhsmHostConfigParser(config_file=os.path.join(HOST_CONFIG_DIR, "rhsm.conf"))
        else:
            CFG = RhsmConfigParser(config_file=DEFAULT_CONFIG_PATH)

    return CFG


# Deprecated but still in use by other applications
def initConfig(configFile=None):
    return get_config_parser()
