"""
This module adds YAML file support. YAML module is an extension for text rules
tailor made for .yaml/.yml files. The main difference lies in the way it works.
First, the .yaml/.yml file is parsed, then the modifications are made on the
already parsed file.
"""
from abc import abstractmethod
from copy import deepcopy
import logging
from pathlib import Path
from typing import Any, Dict, Hashable, List, Optional, Union
from ruamel.yaml import YAML
from hammurabi.rules.common import SinglePathRule
from hammurabi.rules.mixins import SelectorMixin
[docs]class SingleDocumentYAMLFileRule(SinglePathRule, SelectorMixin):
"""
Extend :class:`hammurabi.rules.base.Rule` to handle parsed content
manipulations on a single file.
"""
def __init__(
self, name: str, path: Optional[Path] = None, key: str = "", **kwargs
) -> None:
self.yaml = YAML()
self.yaml.default_flow_style = False
self.key = self.validate(key, required=True)
self.loaded_yaml = Union[Dict[Hashable, Any], List[Any], None]
super().__init__(name, path, **kwargs)
def _get_parent(self) -> Any:
"""
Get the parent of the given key by its selector.
:return: Return the parent if there is any
:rtype: Dict[str, Any]
"""
# Get the parent for modifications. If there is no parent,
# then the parent is the document root
return (
self._get_by_selector(self.loaded_yaml, self.key) or self.loaded_yaml or {}
)
def _write_dump(self, data: Any, delete: bool = False) -> None:
"""
Helper function to write the dump into file.
:param data: The modified data
:type data: :class:``hammurabi.rules.mixins.Any`
:param delete: Indicate if the key should be deleted
:type delete: bool
"""
updated_data = self._set_by_selector(self.loaded_yaml, self.key, data, delete)
self.yaml.dump(updated_data, self.param)
[docs] def pre_task_hook(self) -> None:
"""
Parse the yaml file for later use.
"""
logging.debug('parsing "%s" yaml file', self.param)
self.loaded_yaml = self.yaml.load(self.param)
[docs] @abstractmethod
def task(self) -> Path:
"""
Abstract method representing how a :func:`hammurabi.rules.base.Rule.task`
must be parameterized. Any difference in the parameters will result in
pylint/mypy errors.
For more details please check :func:`hammurabi.rules.base.Rule.task`.
"""
[docs]class YAMLKeyExists(SingleDocumentYAMLFileRule):
"""
Ensure that the given key exists. If needed, the rule will create a key with the
given name, and optionally the specified value. In case the value is set, the value
will be assigned to the key. If no value is set, the key will be created with an empty
value.
Example usage:
>>> from pathlib import Path
>>> from hammurabi import Law, Pillar, YAMLKeyExists
>>>
>>> example_law = Law(
>>> name="Name of the law",
>>> description="Well detailed description what this law does.",
>>> rules=(
>>> YAMLKeyExists(
>>> name="Ensure service descriptor has stack",
>>> path=Path("./service.yaml"),
>>> key="stack",
>>> value="my-awesome-stack",
>>> ),
>>> )
>>> )
>>>
>>> pillar = Pillar()
>>> pillar.register(example_law)
.. warning::
Compared to :mod:`hammurabi.rules.text.LineExists`, this rule is NOT able to add a
key before or after a target.
"""
def __init__(
self,
name: str,
path: Optional[Path] = None,
key: str = "",
value: Union[None, list, dict, str, int, float] = None,
**kwargs,
) -> None:
self.value = value
super().__init__(name, path, key, **kwargs)
[docs] def task(self) -> Path:
"""
Ensure that the given key exists in the yaml file. If needed, create the
key with the given name, and optionally the specified value.
:return: Return the input path as an output
:rtype: Path
"""
parent = self._get_parent()
key_name: str = self.key.split(".")[-1]
inserted = parent.setdefault(key_name, self.value)
# Only write the changes if we did any change
if inserted == parent[key_name]:
self._write_dump(parent[key_name])
return self.param
[docs]class YAMLKeyNotExists(SingleDocumentYAMLFileRule):
"""
Ensure that the given key not exists. If needed, the rule will remove a key with the
given name, including its value.
Example usage:
>>> from pathlib import Path
>>> from hammurabi import Law, Pillar, YAMLKeyNotExists
>>>
>>> example_law = Law(
>>> name="Name of the law",
>>> description="Well detailed description what this law does.",
>>> rules=(
>>> YAMLKeyNotExists(
>>> name="Ensure outdated_key is removed",
>>> path=Path("./service.yaml"),
>>> key="outdated_key",
>>> ),
>>> )
>>> )
>>>
>>> pillar = Pillar()
>>> pillar.register(example_law)
"""
[docs] def task(self) -> Path:
"""
Ensure that the given key does not exists in the yaml file.
:return: Return the input path as an output
:rtype: Path
"""
parent = self._get_parent()
key_name: str = self.key.split(".")[-1]
if key_name in parent.keys():
parent.pop(key_name)
self._write_dump(parent, delete=True)
return self.param
[docs]class YAMLKeyRenamed(SingleDocumentYAMLFileRule):
"""
Ensure that the given key is renamed. In case the key can not be found,
a ``LookupError`` exception will be raised to stop the execution. The
execution must be stopped at this point, because if other rules depending
on the rename they will fail otherwise.
Example usage:
>>> from pathlib import Path
>>> from hammurabi import Law, Pillar, YAMLKeyRenamed
>>>
>>> example_law = Law(
>>> name="Name of the law",
>>> description="Well detailed description what this law does.",
>>> rules=(
>>> YAMLKeyRenamed(
>>> name="Ensure service descriptor has dependencies",
>>> path=Path("./service.yaml"),
>>> key="development.depends_on",
>>> value="dependencies",
>>> ),
>>> )
>>> )
>>>
>>> pillar = Pillar()
>>> pillar.register(example_law)
"""
def __init__(
self,
name: str,
path: Optional[Path] = None,
key: str = "",
new_name: str = "",
**kwargs,
) -> None:
self.new_name = self.validate(new_name, required=True)
super().__init__(name, path, key, **kwargs)
[docs] def task(self) -> Path:
"""
Ensure that the given key is renamed. In case the key can not be found,
a ``LookupError`` exception will be raised to stop the execution. The
execution must be stopped at this point, because if other rules depending
on the rename they will fail otherwise.
:raises: ``LookupError`` raised if no key can be renamed or both the new and
old keys are in the config file
:return: Return the input path as an output
:rtype: Path
"""
parent = self._get_parent()
key_name: str = self.key.split(".")[-1]
has_old_key = key_name in parent
has_new_key = self.new_name in parent
if not has_old_key and not has_new_key:
raise LookupError(f'No matching section for "{key_name}"')
if has_old_key and has_new_key:
raise LookupError(f'Both "{key_name}" and "{self.new_name}" set')
if has_new_key:
return self.param
parent[self.new_name] = deepcopy(parent[key_name])
parent.pop(key_name)
# Delete is True since we need to delete the old key
self._write_dump(parent, delete=True)
return self.param
[docs]class YAMLValueExists(SingleDocumentYAMLFileRule):
"""
Ensure that the given key has the expected value(s). In case the key cannot
be found, a ``LookupError`` exception will be raised to stop the execution.
This rule is special in the way that the value can be almost anything. For
more information please read the warning below.
Example usage:
>>> from pathlib import Path
>>> from hammurabi import Law, Pillar, YAMLValueExists
>>>
>>> example_law = Law(
>>> name="Name of the law",
>>> description="Well detailed description what this law does.",
>>> rules=(
>>> YAMLValueExists(
>>> name="Ensure service descriptor has dependencies",
>>> path=Path("./service.yaml"),
>>> key="development.dependencies",
>>> value=["service1", "service2", "service3"],
>>> ),
>>> # Or
>>> YAMLValueExists(
>>> name="Add infra alerting to existing alerting components",
>>> path=Path("./service.yaml"),
>>> key="development.alerting",
>>> value={"infra": "#slack-channel-2"},
>>> ),
>>> # Or
>>> YAMLValueExists(
>>> name="Add support info",
>>> path=Path("./service.yaml"),
>>> key="development.supported",
>>> value=True,
>>> ),
>>> # Or even
>>> YAMLValueExists(
>>> name="Make sure that no development branch is set",
>>> path=Path("./service.yaml"),
>>> key="development.branch",
>>> value=None,
>>> ),
>>> )
>>> )
>>>
>>> pillar = Pillar()
>>> pillar.register(example_law)
.. warning::
Since the value can be anything from ``None`` to a list of lists, and
rule piping passes the 1st argument (``path``) to the next rule the ``value``
parameter can not be defined in ``__init__`` before the ``path``. Hence
the ``value`` parameter must have a default value. The default value is
set to ``None``, which translates to the following:
Using the ``YAMLValueExists`` rule and not assigning value to ``value``
parameter will set the matching ``key``'s value to `None`` by default in
the document.
"""
def __init__(
self,
name: str,
path: Optional[Path] = None,
key: str = "",
value: Union[None, list, dict, str, int, float] = None,
**kwargs,
) -> None:
self.value = value
super().__init__(name, path, key, **kwargs)
[docs] def task(self) -> Path:
"""
Ensure that the given key has the expected value(s). In case the key cannot
be found, a ``LookupError`` exception will be raised to stop the execution.
.. warning::
Since the value can be anything from ``None`` to a list of lists, and
rule piping passes the 1st argument (``path``) to the next rule the ``value``
parameter can not be defined in ``__init__`` before the ``path``. Hence
the ``value`` parameter must have a default value. The default value is
set to ``None``, which translates to the following:
Using the ``YAMLValueExists`` rule and not assigning value to ``value``
parameter will set the matching ``key``'s value to `None`` by default in
the document.
:raises: ``LookupError`` raised if no key can be renamed or both the new and
old keys are in the config file
:return: Return the input path as an output
:rtype: Path
"""
parent = self._get_parent()
key_name: str = self.key.split(".")[-1]
if key_name not in parent:
raise LookupError(f'No matching key for selector "{self.key}"')
if self.value is None:
parent[key_name] = self.value
if isinstance(parent.get(key_name), list):
if isinstance(self.value, list):
parent[key_name].extend(self.value)
else:
parent[key_name].append(self.value)
elif isinstance(parent.get(key_name), dict):
parent[key_name].update(self.value)
else:
parent[key_name] = self.value
self._write_dump(parent[key_name])
return self.param
[docs]class YAMLValueNotExists(SingleDocumentYAMLFileRule):
"""
Ensure that the key has no value given. In case the key cannot be found,
a ``LookupError`` exception will be raised to stop the execution.
Compared to ``hammurabi.rules.yaml.YAMLValueExists``, this rule can only
accept simple value for its ``value`` parameter. No ``list``, ``dict``, or
``None`` can be used.
Based on the key's value's type if the value contains (or equals for simple types)
value provided in the ``value`` parameter the value is:
1. Set to None (if the key's value's type is not a dict or list)
2. Removed from the list (if the key's value's type is a list)
3. Removed from the dict (if the key's value's type is a dict)
Example usage:
>>> from pathlib import Path
>>> from hammurabi import Law, Pillar, YAMLValueNotExists
>>>
>>> example_law = Law(
>>> name="Name of the law",
>>> description="Well detailed description what this law does.",
>>> rules=(
>>> YAMLValueNotExists(
>>> name="Remove decommissioned service from dependencies",
>>> path=Path("./service.yaml"),
>>> key="development.dependencies",
>>> value="service4",
>>> ),
>>> )
>>> )
>>>
>>> pillar = Pillar()
>>> pillar.register(example_law)
"""
def __init__(
self,
name: str,
path: Optional[Path] = None,
key: str = "",
value: Union[str, int, float] = None,
**kwargs,
) -> None:
self.value = self.validate(value, required=True)
super().__init__(name, path, key, **kwargs)
[docs] def task(self) -> Path:
"""
Ensure that the key has no value given. In case the key cannot be found,
a ``LookupError`` exception will be raised to stop the execution.
Based on the key's value's type if the value contains (or equals for simple types)
value provided in the ``value`` parameter the value is:
1. Set to None (if the key's value's type is not a dict or list)
2. Removed from the list (if the key's value's type is a list)
3. Removed from the dict (if the key's value's type is a dict)
:return: Return the input path as an output
:rtype: Path
"""
parent = self._get_parent()
key_name: str = self.key.split(".")[-1]
write_needed = False
if key_name not in parent:
return self.param
current_value = parent.get(key_name)
if isinstance(current_value, list):
if self.value in current_value:
parent[key_name].remove(self.value)
write_needed = True
elif isinstance(current_value, dict):
if self.value in current_value:
del parent[key_name][self.value]
write_needed = True
else:
if current_value == self.value:
parent[key_name] = None
write_needed = True
if write_needed:
self._write_dump(parent[key_name])
return self.param