Module netmiko.utilities

Miscellaneous utility functions.

Source code
"""Miscellaneous utility functions."""
from typing import (
    Any,
    AnyStr,
    TypeVar,
    Callable,
    cast,
    Optional,
    Union,
    List,
    Dict,
    Tuple,
)
from typing import TYPE_CHECKING
from glob import glob
import sys
import io
import os
from pathlib import Path
import functools
from datetime import datetime
from textfsm import clitable
from textfsm.clitable import CliTableError
from netmiko import log

# For decorators
F = TypeVar("F", bound=Callable[..., Any])


if TYPE_CHECKING:
    from netmiko.base_connection import BaseConnection
    from os import PathLike

try:
    from ttp import ttp

    TTP_INSTALLED = True

except ImportError:
    TTP_INSTALLED = False

try:
    from genie.conf.base import Device
    from genie.libs.parser.utils import get_parser
    from pyats.datastructures import AttrDict

    GENIE_INSTALLED = True
except ImportError:
    GENIE_INSTALLED = False

# If we are on python < 3.7, we need to force the import of importlib.resources backport
if sys.version_info[:2] >= (3, 7):
    import importlib.resources as pkg_resources
else:
    import importlib_resources as pkg_resources

try:
    import serial.tools.list_ports

    PYSERIAL_INSTALLED = True
except ImportError:
    PYSERIAL_INSTALLED = False

# Dictionary mapping 'show run' for vendors with different command
SHOW_RUN_MAPPER = {
    "brocade_fos": "configShow",
    "juniper": "show configuration",
    "juniper_junos": "show configuration",
    "extreme": "show configuration",
    "extreme_ers": "show running-config",
    "extreme_exos": "show configuration",
    "extreme_netiron": "show running-config",
    "extreme_nos": "show running-config",
    "extreme_slx": "show running-config",
    "extreme_vdx": "show running-config",
    "extreme_vsp": "show running-config",
    "extreme_wing": "show running-config",
    "ericsson_ipos": "show configuration",
    "hp_comware": "display current-configuration",
    "huawei": "display current-configuration",
    "fortinet": "show full-configuration",
    "checkpoint": "show configuration",
    "cisco_wlc": "show run-config",
    "enterasys": "show running-config",
    "dell_force10": "show running-config",
    "avaya_vsp": "show running-config",
    "avaya_ers": "show running-config",
    "brocade_vdx": "show running-config",
    "brocade_nos": "show running-config",
    "brocade_fastiron": "show running-config",
    "brocade_netiron": "show running-config",
    "alcatel_aos": "show configuration snapshot",
    "cros_mtbr": "show running-config",
}

# Expand SHOW_RUN_MAPPER to include '_ssh' key
new_dict = {}
for k, v in SHOW_RUN_MAPPER.items():
    new_key = k + "_ssh"
    new_dict[k] = v
    new_dict[new_key] = v
SHOW_RUN_MAPPER = new_dict

# Default location of netmiko temp directory for netmiko tools
NETMIKO_BASE_DIR = "~/.netmiko"


def load_yaml_file(yaml_file: Union[str, bytes, "PathLike[Any]"]) -> Any:
    """Read YAML file."""
    try:
        import yaml
    except ImportError:
        sys.exit("Unable to import yaml module.")
    try:
        with io.open(yaml_file, "rt", encoding="utf-8") as fname:
            return yaml.safe_load(fname)
    except IOError:
        sys.exit("Unable to open YAML file")


def load_devices(file_name: Union[str, bytes, "PathLike[Any]", None] = None) -> Any:
    """Find and load .netmiko.yml file."""
    yaml_devices_file = find_cfg_file(file_name)
    return load_yaml_file(yaml_devices_file)


def find_cfg_file(
    file_name: Union[str, bytes, "PathLike[Any]", None] = None
) -> Union[str, bytes, "PathLike[Any]"]:
    """
    Search for netmiko_tools inventory file in the following order:
    NETMIKO_TOOLS_CFG environment variable
    Current directory
    Home directory
    Look for file named: .netmiko.yml or netmiko.yml
    Also allow NETMIKO_TOOLS_CFG to point directly at a file
    """
    if file_name and os.path.isfile(file_name):
        return file_name
    optional_path = os.environ.get("NETMIKO_TOOLS_CFG", "")
    if os.path.isfile(optional_path):
        return optional_path
    search_paths = [optional_path, ".", os.path.expanduser("~")]
    # Filter optional_path if null
    search_paths = [path for path in search_paths if path]
    for path in search_paths:
        files = glob(f"{path}/.netmiko.yml") + glob(f"{path}/netmiko.yml")
        if files:
            return files[0]
    raise IOError(
        ".netmiko.yml file not found in NETMIKO_TOOLS environment variable directory,"
        " current directory, or home directory."
    )


def display_inventory(my_devices: Dict[str, Union[List[str], Dict[str, Any]]]) -> None:
    """Print out inventory devices and groups."""
    inventory_groups = ["all"]
    inventory_devices = []
    for k, v in my_devices.items():
        if isinstance(v, list):
            inventory_groups.append(k)
        elif isinstance(v, dict):
            inventory_devices.append((k, v["device_type"]))

    inventory_groups.sort()
    inventory_devices.sort(key=lambda x: x[0])
    print("\nDevices:")
    print("-" * 40)
    for a_device, device_type in inventory_devices:
        device_type = f"  ({device_type})"
        print(f"{a_device:<25}{device_type:>15}")
    print("\n\nGroups:")
    print("-" * 40)
    for a_group in inventory_groups:
        print(a_group)
    print()


def obtain_all_devices(
    my_devices: Dict[str, Union[List[str], Dict[str, Any]]]
) -> Dict[str, Dict[str, Any]]:
    """Dynamically create 'all' group."""
    new_devices = {}
    for device_name, device_or_group in my_devices.items():
        # Skip any groups
        if not isinstance(device_or_group, list):
            new_devices[device_name] = device_or_group
    return new_devices


def obtain_netmiko_filename(device_name: str) -> str:
    """Create file name based on device_name."""
    _, netmiko_full_dir = find_netmiko_dir()
    return f"{netmiko_full_dir}/{device_name}.txt"


def write_tmp_file(device_name: str, output: str) -> str:
    file_name = obtain_netmiko_filename(device_name)
    with open(file_name, "w") as f:
        f.write(output)
    return file_name


def ensure_dir_exists(verify_dir: str) -> None:
    """Ensure directory exists. Create if necessary."""
    if not os.path.exists(verify_dir):
        # Doesn't exist create dir
        os.makedirs(verify_dir)
    else:
        # Exists
        if not os.path.isdir(verify_dir):
            # Not a dir, raise an exception
            raise ValueError(f"{verify_dir} is not a directory")


def find_netmiko_dir() -> Tuple[str, str]:
    """Check environment first, then default dir"""
    try:
        netmiko_base_dir = os.environ["NETMIKO_DIR"]
    except KeyError:
        netmiko_base_dir = NETMIKO_BASE_DIR
    netmiko_base_dir = os.path.expanduser(netmiko_base_dir)
    if netmiko_base_dir == "/":
        raise ValueError("/ cannot be netmiko_base_dir")
    netmiko_full_dir = f"{netmiko_base_dir}/tmp"
    return (netmiko_base_dir, netmiko_full_dir)


def write_bytes(out_data: AnyStr, encoding: str = "ascii") -> bytes:
    """Legacy for Python2 and Python3 compatible byte stream."""
    if sys.version_info[0] >= 3:
        if isinstance(out_data, str):
            if encoding == "utf-8":
                return out_data.encode("utf-8")
            else:
                return out_data.encode("ascii", "ignore")
        elif isinstance(out_data, bytes):
            return out_data
    msg = f"Invalid value for out_data neither unicode nor byte string: {str(out_data)}"
    raise ValueError(msg)


def check_serial_port(name: str) -> str:
    """returns valid COM Port."""

    if not PYSERIAL_INSTALLED:
        msg = (
            "\npyserial is not installed. Please PIP install pyserial:\n\n"
            "pip install pyserial\n\n"
        )
        raise ValueError(msg)

    try:
        cdc = next(serial.tools.list_ports.grep(name))
        serial_port = cdc[0]
        assert isinstance(serial_port, str)
        return serial_port
    except StopIteration:
        msg = f"device {name} not found. "
        msg += "available devices are: "
        ports = list(serial.tools.list_ports.comports())
        for p in ports:
            msg += f"{str(p)},"
        raise ValueError(msg)


def get_template_dir(_skip_ntc_package: bool = False) -> str:
    """
    Find and return the directory containing the TextFSM index file.

    Order of preference is:
    1) Find directory in `NET_TEXTFSM` Environment Variable.
    2) Check for pip installed `ntc-templates` location in this environment.
    3) ~/ntc-templates/ntc_templates/templates.

    If `index` file is not found in any of these locations, raise ValueError

    :return: directory containing the TextFSM index file

    """

    msg = """
Directory containing TextFSM index file not found.

Please set the NET_TEXTFSM environment variable to point at the directory containing your TextFSM
index file.

Alternatively, `pip install ntc-templates` (if using ntc-templates).

"""

    # Try NET_TEXTFSM environment variable
    template_dir = os.environ.get("NET_TEXTFSM")
    if template_dir is not None:
        template_dir = os.path.expanduser(template_dir)
        index = os.path.join(template_dir, "index")
        if not os.path.isfile(index):
            # Assume only base ./ntc-templates specified
            template_dir = os.path.join(template_dir, "templates")

    else:
        # Try 'pip installed' ntc-templates
        try:
            with pkg_resources.path(
                package="ntc_templates", resource="parse.py"
            ) as posix_path:
                # Example: /opt/venv/netmiko/lib/python3.8/site-packages/ntc_templates/templates
                template_dir = str(posix_path.parent.joinpath("templates"))
                # This is for Netmiko automated testing
                if _skip_ntc_package:
                    raise ModuleNotFoundError()

        except ModuleNotFoundError:
            # Finally check in ~/ntc-templates/ntc_templates/templates
            home_dir = os.path.expanduser("~")
            template_dir = os.path.join(
                home_dir, "ntc-templates", "ntc_templates", "templates"
            )

    index = os.path.join(template_dir, "index")
    if not os.path.isdir(template_dir) or not os.path.isfile(index):
        raise ValueError(msg)
    return os.path.abspath(template_dir)


def clitable_to_dict(cli_table: clitable.CliTable) -> List[Dict[str, str]]:
    """Converts TextFSM cli_table object to list of dictionaries."""
    return_list = []
    for row in cli_table:
        temp_dict = {}
        for index, element in enumerate(row):
            temp_dict[cli_table.header[index].lower()] = element
        return_list.append(temp_dict)
    return return_list


def _textfsm_parse(
    textfsm_obj: clitable.CliTable,
    raw_output: str,
    attrs: Dict[str, str],
    template_file: Optional[str] = None,
) -> Union[str, List[Dict[str, str]]]:
    """Perform the actual TextFSM parsing using the CliTable object."""
    tfsm_parse: Callable[..., Any] = textfsm_obj.ParseCmd
    try:
        # Parse output through template
        if template_file is not None:
            tfsm_parse(raw_output, templates=template_file)
        else:
            tfsm_parse(raw_output, attrs)

        structured_data = clitable_to_dict(textfsm_obj)
        if structured_data == []:
            return raw_output
        else:
            return structured_data
    except (FileNotFoundError, CliTableError):
        return raw_output


def get_structured_data_textfsm(
    raw_output: str,
    platform: Optional[str] = None,
    command: Optional[str] = None,
    template: Optional[str] = None,
) -> Union[str, List[Dict[str, str]]]:
    """
    Convert raw CLI output to structured data using TextFSM template.

    You can use a straight TextFSM file i.e. specify "template". If no template is specified,
    then you must use an CliTable index file.
    """
    if platform is None or command is None:
        attrs = {}
    else:
        attrs = {"Command": command, "Platform": platform}

    if template is None:
        if attrs == {}:
            raise ValueError(
                "Either 'platform/command' or 'template' must be specified."
            )
        template_dir = get_template_dir()
        index_file = os.path.join(template_dir, "index")
        textfsm_obj = clitable.CliTable(index_file, template_dir)
        output = _textfsm_parse(textfsm_obj, raw_output, attrs)

        # Retry the output if "cisco_xe" and not structured data
        if platform and "cisco_xe" in platform:
            if not isinstance(output, list):
                attrs["Platform"] = "cisco_ios"
                output = _textfsm_parse(textfsm_obj, raw_output, attrs)
        return output
    else:
        template_path = Path(os.path.expanduser(template))
        template_file = template_path.name
        template_dir_alt = template_path.parents[0]
        # CliTable with no index will fall-back to a TextFSM parsing behavior
        textfsm_obj = clitable.CliTable(template_dir=template_dir_alt)
        return _textfsm_parse(
            textfsm_obj, raw_output, attrs, template_file=template_file
        )


# For compatibility
get_structured_data = get_structured_data_textfsm


def get_structured_data_ttp(raw_output: str, template: str) -> Union[str, List[Any]]:
    """
    Convert raw CLI output to structured data using TTP template.

    You can use a straight TextFSM file i.e. specify "template"
    """
    if not TTP_INSTALLED:
        msg = "\nTTP is not installed. Please PIP install ttp:\n\npip install ttp\n"
        raise ValueError(msg)

    try:
        ttp_parser = ttp(data=raw_output, template=template)
        ttp_parser.parse(one=True)
        result: List[Any] = ttp_parser.result(format="raw")
        return result
    except Exception:
        return raw_output


def run_ttp_template(
    connection: "BaseConnection",
    template: Union[str, bytes, "PathLike[Any]"],
    res_kwargs: Dict[str, Any],
    **kwargs: Any,
) -> Any:
    """
    Helper function to run TTP template parsing.

    :param connection: Netmiko connection object

    :param template: TTP template

    :param res_kwargs: ``**res_kwargs`` arguments for TTP result method

    :param kwargs: ``**kwargs`` for TTP object instantiation
    """
    if not TTP_INSTALLED:
        msg = "\nTTP is not installed. Please PIP install ttp:\n" "pip install ttp\n"
        raise ValueError(msg)

    parser = ttp(template=template, **kwargs)

    # get inputs load for TTP template
    ttp_inputs_load = parser.get_input_load()
    log.debug("run_ttp_template: inputs load - {}".format(ttp_inputs_load))

    # go over template's inputs and collect output from devices
    for template_name, inputs in ttp_inputs_load.items():
        for input_name, input_params in inputs.items():
            method = input_params.get("method", "send_command")
            method_kwargs = input_params.get("kwargs", {})
            commands = input_params.get("commands", None)

            # run sanity checks
            if method not in dir(connection):
                log.warning(
                    "run_ttp_template: '{}' input, unsupported method '{}', skipping".format(
                        input_name, method
                    )
                )
                continue
            elif not commands:
                log.warning(
                    "run_ttp_template: '{}' input no commands to collect, skipping".format(
                        input_name
                    )
                )
                continue

            # collect commands output from device
            out_list = [
                getattr(connection, method)(command_string=command, **method_kwargs)
                for command in commands
            ]
            output = "\n".join(out_list)

            # add collected output to TTP parser object
            parser.add_input(
                data=output, input_name=input_name, template_name=template_name
            )

    # run parsing in single process
    parser.parse(one=True)

    return parser.result(**res_kwargs)


def get_structured_data_genie(
    raw_output: str, platform: str, command: str
) -> Union[str, Dict[str, Any]]:
    if not sys.version_info >= (3, 4):
        raise ValueError("Genie requires Python >= 3.4")

    if not GENIE_INSTALLED:
        msg = (
            "\nGenie and PyATS are not installed. Please PIP install both Genie and PyATS:\n"
            "pip install genie\npip install pyats\n"
        )
        raise ValueError(msg)

    if "cisco" not in platform:
        return raw_output

    genie_device_mapper = {
        "cisco_ios": "ios",
        "cisco_xe": "iosxe",
        "cisco_xr": "iosxr",
        "cisco_nxos": "nxos",
        "cisco_asa": "asa",
    }

    os = None
    # platform might be _ssh, _telnet, _serial strip that off
    if platform.count("_") > 1:
        base_list = platform.split("_")[:-1]
        base_platform = "_".join(base_list)
    else:
        base_platform = platform

    os = genie_device_mapper.get(base_platform)
    if os is None:
        return raw_output

    # Genie specific construct for doing parsing (based on Genie in Ansible)
    device = Device("new_device", os=os)
    device.custom.setdefault("abstraction", {})
    device.custom["abstraction"]["order"] = ["os"]
    device.cli = AttrDict({"execute": None})
    try:
        # Test whether there is a parser for given command (return Exception if fails)
        get_parser(command, device)
        parsed_output: Dict[str, Any] = device.parse(command, output=raw_output)
        return parsed_output
    except Exception:
        return raw_output


def structured_data_converter(
    raw_data: str,
    command: str,
    platform: str,
    use_textfsm: bool = False,
    use_ttp: bool = False,
    use_genie: bool = False,
    textfsm_template: Optional[str] = None,
    ttp_template: Optional[str] = None,
) -> Union[str, List[Any], Dict[str, Any]]:
    """
    Try structured data converters in the following order: TextFSM, TTP, Genie.

    Return the first structured data found, else return the raw_data as-is.
    """
    command = command.strip()
    if use_textfsm:
        structured_output_tfsm = get_structured_data_textfsm(
            raw_data, platform=platform, command=command, template=textfsm_template
        )
        if not isinstance(structured_output_tfsm, str):
            return structured_output_tfsm

    if use_ttp:
        if ttp_template is None:
            msg = """
The argument 'ttp_template=/path/to/template.ttp' must be set when use_ttp=True
"""
            raise ValueError(msg)
        else:
            structured_output_ttp = get_structured_data_ttp(
                raw_data, template=ttp_template
            )

        if not isinstance(structured_output_ttp, str):
            return structured_output_ttp

    if use_genie:
        structured_output_genie = get_structured_data_genie(
            raw_data, platform=platform, command=command
        )
        if not isinstance(structured_output_genie, str):
            return structured_output_genie
    return raw_data


def select_cmd_verify(func: F) -> F:
    """Override function cmd_verify argument with global setting."""

    @functools.wraps(func)
    def wrapper_decorator(self: "BaseConnection", *args: Any, **kwargs: Any) -> Any:
        if self.global_cmd_verify is not None:
            kwargs["cmd_verify"] = self.global_cmd_verify
        return func(self, *args, **kwargs)

    return cast(F, wrapper_decorator)


def m_exec_time(func: F) -> F:
    @functools.wraps(func)
    def wrapper_decorator(self: Any, *args: Any, **kwargs: Any) -> Any:
        start_time = datetime.now()
        result = func(self, *args, **kwargs)
        end_time = datetime.now()
        method_name = str(func)
        print(f"{method_name}: Elapsed time: {end_time - start_time}")
        return result

    return cast(F, wrapper_decorator)


def f_exec_time(func: F) -> F:
    @functools.wraps(func)
    def wrapper_decorator(*args: Any, **kwargs: Any) -> Any:
        start_time = datetime.now()
        result = func(*args, **kwargs)
        end_time = datetime.now()
        print(f"Elapsed time: {end_time - start_time}")
        return result

    return cast(F, wrapper_decorator)


def calc_old_timeout(
    max_loops: Optional[int] = None,
    delay_factor: Optional[float] = None,
    loop_delay: float = 0.2,
    old_timeout: int = 100,
) -> float:
    """
    loop_delay is .2 in Netmiko 3.x
    delay_factor would multiple the loop delay
    Number of loops was typically 500

    Thus each loop would sleep (loop_delay * delay_factor) seconds
    That sleep would happen max_loops time

    Formula is (loop_delay * delay_factor) * max_loops

    There was a way Netmiko's self.timeout could override the default settings and essentially be
    used to adjust max_loops (this was probably rarely used).
    """
    if max_loops is None:
        max_loops = 500
    if delay_factor is None:
        delay_factor = 1.0
    # This is the logic for having self.timeout override max_loops
    if delay_factor == 1 and max_loops == 500:
        max_loops = int(old_timeout / loop_delay)

    return max_loops * loop_delay * delay_factor

Functions

def calc_old_timeout(max_loops=None, delay_factor=None, loop_delay=0.2, old_timeout=100)

loop_delay is .2 in Netmiko 3.x delay_factor would multiple the loop delay Number of loops was typically 500

Thus each loop would sleep (loop_delay * delay_factor) seconds That sleep would happen max_loops time

Formula is (loop_delay * delay_factor) * max_loops

There was a way Netmiko's self.timeout could override the default settings and essentially be used to adjust max_loops (this was probably rarely used).

Source code
def calc_old_timeout(
    max_loops: Optional[int] = None,
    delay_factor: Optional[float] = None,
    loop_delay: float = 0.2,
    old_timeout: int = 100,
) -> float:
    """
    loop_delay is .2 in Netmiko 3.x
    delay_factor would multiple the loop delay
    Number of loops was typically 500

    Thus each loop would sleep (loop_delay * delay_factor) seconds
    That sleep would happen max_loops time

    Formula is (loop_delay * delay_factor) * max_loops

    There was a way Netmiko's self.timeout could override the default settings and essentially be
    used to adjust max_loops (this was probably rarely used).
    """
    if max_loops is None:
        max_loops = 500
    if delay_factor is None:
        delay_factor = 1.0
    # This is the logic for having self.timeout override max_loops
    if delay_factor == 1 and max_loops == 500:
        max_loops = int(old_timeout / loop_delay)

    return max_loops * loop_delay * delay_factor
def check_serial_port(name)

returns valid COM Port.

Source code
def check_serial_port(name: str) -> str:
    """returns valid COM Port."""

    if not PYSERIAL_INSTALLED:
        msg = (
            "\npyserial is not installed. Please PIP install pyserial:\n\n"
            "pip install pyserial\n\n"
        )
        raise ValueError(msg)

    try:
        cdc = next(serial.tools.list_ports.grep(name))
        serial_port = cdc[0]
        assert isinstance(serial_port, str)
        return serial_port
    except StopIteration:
        msg = f"device {name} not found. "
        msg += "available devices are: "
        ports = list(serial.tools.list_ports.comports())
        for p in ports:
            msg += f"{str(p)},"
        raise ValueError(msg)
def clitable_to_dict(cli_table)

Converts TextFSM cli_table object to list of dictionaries.

Source code
def clitable_to_dict(cli_table: clitable.CliTable) -> List[Dict[str, str]]:
    """Converts TextFSM cli_table object to list of dictionaries."""
    return_list = []
    for row in cli_table:
        temp_dict = {}
        for index, element in enumerate(row):
            temp_dict[cli_table.header[index].lower()] = element
        return_list.append(temp_dict)
    return return_list
def display_inventory(my_devices)

Print out inventory devices and groups.

Source code
def display_inventory(my_devices: Dict[str, Union[List[str], Dict[str, Any]]]) -> None:
    """Print out inventory devices and groups."""
    inventory_groups = ["all"]
    inventory_devices = []
    for k, v in my_devices.items():
        if isinstance(v, list):
            inventory_groups.append(k)
        elif isinstance(v, dict):
            inventory_devices.append((k, v["device_type"]))

    inventory_groups.sort()
    inventory_devices.sort(key=lambda x: x[0])
    print("\nDevices:")
    print("-" * 40)
    for a_device, device_type in inventory_devices:
        device_type = f"  ({device_type})"
        print(f"{a_device:<25}{device_type:>15}")
    print("\n\nGroups:")
    print("-" * 40)
    for a_group in inventory_groups:
        print(a_group)
    print()
def ensure_dir_exists(verify_dir)

Ensure directory exists. Create if necessary.

Source code
def ensure_dir_exists(verify_dir: str) -> None:
    """Ensure directory exists. Create if necessary."""
    if not os.path.exists(verify_dir):
        # Doesn't exist create dir
        os.makedirs(verify_dir)
    else:
        # Exists
        if not os.path.isdir(verify_dir):
            # Not a dir, raise an exception
            raise ValueError(f"{verify_dir} is not a directory")
def f_exec_time(func)
Source code
def f_exec_time(func: F) -> F:
    @functools.wraps(func)
    def wrapper_decorator(*args: Any, **kwargs: Any) -> Any:
        start_time = datetime.now()
        result = func(*args, **kwargs)
        end_time = datetime.now()
        print(f"Elapsed time: {end_time - start_time}")
        return result

    return cast(F, wrapper_decorator)
def find_cfg_file(file_name=None)

Search for netmiko_tools inventory file in the following order: NETMIKO_TOOLS_CFG environment variable Current directory Home directory Look for file named: .netmiko.yml or netmiko.yml Also allow NETMIKO_TOOLS_CFG to point directly at a file

Source code
def find_cfg_file(
    file_name: Union[str, bytes, "PathLike[Any]", None] = None
) -> Union[str, bytes, "PathLike[Any]"]:
    """
    Search for netmiko_tools inventory file in the following order:
    NETMIKO_TOOLS_CFG environment variable
    Current directory
    Home directory
    Look for file named: .netmiko.yml or netmiko.yml
    Also allow NETMIKO_TOOLS_CFG to point directly at a file
    """
    if file_name and os.path.isfile(file_name):
        return file_name
    optional_path = os.environ.get("NETMIKO_TOOLS_CFG", "")
    if os.path.isfile(optional_path):
        return optional_path
    search_paths = [optional_path, ".", os.path.expanduser("~")]
    # Filter optional_path if null
    search_paths = [path for path in search_paths if path]
    for path in search_paths:
        files = glob(f"{path}/.netmiko.yml") + glob(f"{path}/netmiko.yml")
        if files:
            return files[0]
    raise IOError(
        ".netmiko.yml file not found in NETMIKO_TOOLS environment variable directory,"
        " current directory, or home directory."
    )
def find_netmiko_dir()

Check environment first, then default dir

Source code
def find_netmiko_dir() -> Tuple[str, str]:
    """Check environment first, then default dir"""
    try:
        netmiko_base_dir = os.environ["NETMIKO_DIR"]
    except KeyError:
        netmiko_base_dir = NETMIKO_BASE_DIR
    netmiko_base_dir = os.path.expanduser(netmiko_base_dir)
    if netmiko_base_dir == "/":
        raise ValueError("/ cannot be netmiko_base_dir")
    netmiko_full_dir = f"{netmiko_base_dir}/tmp"
    return (netmiko_base_dir, netmiko_full_dir)
def get_structured_data(raw_output, platform=None, command=None, template=None)

Convert raw CLI output to structured data using TextFSM template.

You can use a straight TextFSM file i.e. specify "template". If no template is specified, then you must use an CliTable index file.

Source code
def get_structured_data_textfsm(
    raw_output: str,
    platform: Optional[str] = None,
    command: Optional[str] = None,
    template: Optional[str] = None,
) -> Union[str, List[Dict[str, str]]]:
    """
    Convert raw CLI output to structured data using TextFSM template.

    You can use a straight TextFSM file i.e. specify "template". If no template is specified,
    then you must use an CliTable index file.
    """
    if platform is None or command is None:
        attrs = {}
    else:
        attrs = {"Command": command, "Platform": platform}

    if template is None:
        if attrs == {}:
            raise ValueError(
                "Either 'platform/command' or 'template' must be specified."
            )
        template_dir = get_template_dir()
        index_file = os.path.join(template_dir, "index")
        textfsm_obj = clitable.CliTable(index_file, template_dir)
        output = _textfsm_parse(textfsm_obj, raw_output, attrs)

        # Retry the output if "cisco_xe" and not structured data
        if platform and "cisco_xe" in platform:
            if not isinstance(output, list):
                attrs["Platform"] = "cisco_ios"
                output = _textfsm_parse(textfsm_obj, raw_output, attrs)
        return output
    else:
        template_path = Path(os.path.expanduser(template))
        template_file = template_path.name
        template_dir_alt = template_path.parents[0]
        # CliTable with no index will fall-back to a TextFSM parsing behavior
        textfsm_obj = clitable.CliTable(template_dir=template_dir_alt)
        return _textfsm_parse(
            textfsm_obj, raw_output, attrs, template_file=template_file
        )
def get_structured_data_genie(raw_output, platform, command)
Source code
def get_structured_data_genie(
    raw_output: str, platform: str, command: str
) -> Union[str, Dict[str, Any]]:
    if not sys.version_info >= (3, 4):
        raise ValueError("Genie requires Python >= 3.4")

    if not GENIE_INSTALLED:
        msg = (
            "\nGenie and PyATS are not installed. Please PIP install both Genie and PyATS:\n"
            "pip install genie\npip install pyats\n"
        )
        raise ValueError(msg)

    if "cisco" not in platform:
        return raw_output

    genie_device_mapper = {
        "cisco_ios": "ios",
        "cisco_xe": "iosxe",
        "cisco_xr": "iosxr",
        "cisco_nxos": "nxos",
        "cisco_asa": "asa",
    }

    os = None
    # platform might be _ssh, _telnet, _serial strip that off
    if platform.count("_") > 1:
        base_list = platform.split("_")[:-1]
        base_platform = "_".join(base_list)
    else:
        base_platform = platform

    os = genie_device_mapper.get(base_platform)
    if os is None:
        return raw_output

    # Genie specific construct for doing parsing (based on Genie in Ansible)
    device = Device("new_device", os=os)
    device.custom.setdefault("abstraction", {})
    device.custom["abstraction"]["order"] = ["os"]
    device.cli = AttrDict({"execute": None})
    try:
        # Test whether there is a parser for given command (return Exception if fails)
        get_parser(command, device)
        parsed_output: Dict[str, Any] = device.parse(command, output=raw_output)
        return parsed_output
    except Exception:
        return raw_output
def get_structured_data_textfsm(raw_output, platform=None, command=None, template=None)

Convert raw CLI output to structured data using TextFSM template.

You can use a straight TextFSM file i.e. specify "template". If no template is specified, then you must use an CliTable index file.

Source code
def get_structured_data_textfsm(
    raw_output: str,
    platform: Optional[str] = None,
    command: Optional[str] = None,
    template: Optional[str] = None,
) -> Union[str, List[Dict[str, str]]]:
    """
    Convert raw CLI output to structured data using TextFSM template.

    You can use a straight TextFSM file i.e. specify "template". If no template is specified,
    then you must use an CliTable index file.
    """
    if platform is None or command is None:
        attrs = {}
    else:
        attrs = {"Command": command, "Platform": platform}

    if template is None:
        if attrs == {}:
            raise ValueError(
                "Either 'platform/command' or 'template' must be specified."
            )
        template_dir = get_template_dir()
        index_file = os.path.join(template_dir, "index")
        textfsm_obj = clitable.CliTable(index_file, template_dir)
        output = _textfsm_parse(textfsm_obj, raw_output, attrs)

        # Retry the output if "cisco_xe" and not structured data
        if platform and "cisco_xe" in platform:
            if not isinstance(output, list):
                attrs["Platform"] = "cisco_ios"
                output = _textfsm_parse(textfsm_obj, raw_output, attrs)
        return output
    else:
        template_path = Path(os.path.expanduser(template))
        template_file = template_path.name
        template_dir_alt = template_path.parents[0]
        # CliTable with no index will fall-back to a TextFSM parsing behavior
        textfsm_obj = clitable.CliTable(template_dir=template_dir_alt)
        return _textfsm_parse(
            textfsm_obj, raw_output, attrs, template_file=template_file
        )
def get_structured_data_ttp(raw_output, template)

Convert raw CLI output to structured data using TTP template.

You can use a straight TextFSM file i.e. specify "template"

Source code
def get_structured_data_ttp(raw_output: str, template: str) -> Union[str, List[Any]]:
    """
    Convert raw CLI output to structured data using TTP template.

    You can use a straight TextFSM file i.e. specify "template"
    """
    if not TTP_INSTALLED:
        msg = "\nTTP is not installed. Please PIP install ttp:\n\npip install ttp\n"
        raise ValueError(msg)

    try:
        ttp_parser = ttp(data=raw_output, template=template)
        ttp_parser.parse(one=True)
        result: List[Any] = ttp_parser.result(format="raw")
        return result
    except Exception:
        return raw_output
def get_template_dir()

Find and return the directory containing the TextFSM index file.

Order of preference is: 1) Find directory in NET_TEXTFSM Environment Variable. 2) Check for pip installed ntc-templates location in this environment. 3) ~/ntc-templates/ntc_templates/templates.

If index file is not found in any of these locations, raise ValueError

:return: directory containing the TextFSM index file

Source code
def get_template_dir(_skip_ntc_package: bool = False) -> str:
    """
    Find and return the directory containing the TextFSM index file.

    Order of preference is:
    1) Find directory in `NET_TEXTFSM` Environment Variable.
    2) Check for pip installed `ntc-templates` location in this environment.
    3) ~/ntc-templates/ntc_templates/templates.

    If `index` file is not found in any of these locations, raise ValueError

    :return: directory containing the TextFSM index file

    """

    msg = """
Directory containing TextFSM index file not found.

Please set the NET_TEXTFSM environment variable to point at the directory containing your TextFSM
index file.

Alternatively, `pip install ntc-templates` (if using ntc-templates).

"""

    # Try NET_TEXTFSM environment variable
    template_dir = os.environ.get("NET_TEXTFSM")
    if template_dir is not None:
        template_dir = os.path.expanduser(template_dir)
        index = os.path.join(template_dir, "index")
        if not os.path.isfile(index):
            # Assume only base ./ntc-templates specified
            template_dir = os.path.join(template_dir, "templates")

    else:
        # Try 'pip installed' ntc-templates
        try:
            with pkg_resources.path(
                package="ntc_templates", resource="parse.py"
            ) as posix_path:
                # Example: /opt/venv/netmiko/lib/python3.8/site-packages/ntc_templates/templates
                template_dir = str(posix_path.parent.joinpath("templates"))
                # This is for Netmiko automated testing
                if _skip_ntc_package:
                    raise ModuleNotFoundError()

        except ModuleNotFoundError:
            # Finally check in ~/ntc-templates/ntc_templates/templates
            home_dir = os.path.expanduser("~")
            template_dir = os.path.join(
                home_dir, "ntc-templates", "ntc_templates", "templates"
            )

    index = os.path.join(template_dir, "index")
    if not os.path.isdir(template_dir) or not os.path.isfile(index):
        raise ValueError(msg)
    return os.path.abspath(template_dir)
def load_devices(file_name=None)

Find and load .netmiko.yml file.

Source code
def load_devices(file_name: Union[str, bytes, "PathLike[Any]", None] = None) -> Any:
    """Find and load .netmiko.yml file."""
    yaml_devices_file = find_cfg_file(file_name)
    return load_yaml_file(yaml_devices_file)
def load_yaml_file(yaml_file)

Read YAML file.

Source code
def load_yaml_file(yaml_file: Union[str, bytes, "PathLike[Any]"]) -> Any:
    """Read YAML file."""
    try:
        import yaml
    except ImportError:
        sys.exit("Unable to import yaml module.")
    try:
        with io.open(yaml_file, "rt", encoding="utf-8") as fname:
            return yaml.safe_load(fname)
    except IOError:
        sys.exit("Unable to open YAML file")
def m_exec_time(func)
Source code
def m_exec_time(func: F) -> F:
    @functools.wraps(func)
    def wrapper_decorator(self: Any, *args: Any, **kwargs: Any) -> Any:
        start_time = datetime.now()
        result = func(self, *args, **kwargs)
        end_time = datetime.now()
        method_name = str(func)
        print(f"{method_name}: Elapsed time: {end_time - start_time}")
        return result

    return cast(F, wrapper_decorator)
def obtain_all_devices(my_devices)

Dynamically create 'all' group.

Source code
def obtain_all_devices(
    my_devices: Dict[str, Union[List[str], Dict[str, Any]]]
) -> Dict[str, Dict[str, Any]]:
    """Dynamically create 'all' group."""
    new_devices = {}
    for device_name, device_or_group in my_devices.items():
        # Skip any groups
        if not isinstance(device_or_group, list):
            new_devices[device_name] = device_or_group
    return new_devices
def obtain_netmiko_filename(device_name)

Create file name based on device_name.

Source code
def obtain_netmiko_filename(device_name: str) -> str:
    """Create file name based on device_name."""
    _, netmiko_full_dir = find_netmiko_dir()
    return f"{netmiko_full_dir}/{device_name}.txt"
def run_ttp_template(connection, template, res_kwargs, **kwargs)

Helper function to run TTP template parsing.

:param connection: Netmiko connection object

:param template: TTP template

:param res_kwargs: **res_kwargs arguments for TTP result method

:param kwargs: **kwargs for TTP object instantiation

Source code
def run_ttp_template(
    connection: "BaseConnection",
    template: Union[str, bytes, "PathLike[Any]"],
    res_kwargs: Dict[str, Any],
    **kwargs: Any,
) -> Any:
    """
    Helper function to run TTP template parsing.

    :param connection: Netmiko connection object

    :param template: TTP template

    :param res_kwargs: ``**res_kwargs`` arguments for TTP result method

    :param kwargs: ``**kwargs`` for TTP object instantiation
    """
    if not TTP_INSTALLED:
        msg = "\nTTP is not installed. Please PIP install ttp:\n" "pip install ttp\n"
        raise ValueError(msg)

    parser = ttp(template=template, **kwargs)

    # get inputs load for TTP template
    ttp_inputs_load = parser.get_input_load()
    log.debug("run_ttp_template: inputs load - {}".format(ttp_inputs_load))

    # go over template's inputs and collect output from devices
    for template_name, inputs in ttp_inputs_load.items():
        for input_name, input_params in inputs.items():
            method = input_params.get("method", "send_command")
            method_kwargs = input_params.get("kwargs", {})
            commands = input_params.get("commands", None)

            # run sanity checks
            if method not in dir(connection):
                log.warning(
                    "run_ttp_template: '{}' input, unsupported method '{}', skipping".format(
                        input_name, method
                    )
                )
                continue
            elif not commands:
                log.warning(
                    "run_ttp_template: '{}' input no commands to collect, skipping".format(
                        input_name
                    )
                )
                continue

            # collect commands output from device
            out_list = [
                getattr(connection, method)(command_string=command, **method_kwargs)
                for command in commands
            ]
            output = "\n".join(out_list)

            # add collected output to TTP parser object
            parser.add_input(
                data=output, input_name=input_name, template_name=template_name
            )

    # run parsing in single process
    parser.parse(one=True)

    return parser.result(**res_kwargs)
def select_cmd_verify(func)

Override function cmd_verify argument with global setting.

Source code
def select_cmd_verify(func: F) -> F:
    """Override function cmd_verify argument with global setting."""

    @functools.wraps(func)
    def wrapper_decorator(self: "BaseConnection", *args: Any, **kwargs: Any) -> Any:
        if self.global_cmd_verify is not None:
            kwargs["cmd_verify"] = self.global_cmd_verify
        return func(self, *args, **kwargs)

    return cast(F, wrapper_decorator)
def structured_data_converter(raw_data, command, platform, use_textfsm=False, use_ttp=False, use_genie=False, textfsm_template=None, ttp_template=None)

Try structured data converters in the following order: TextFSM, TTP, Genie.

Return the first structured data found, else return the raw_data as-is.

Source code
def structured_data_converter(
    raw_data: str,
    command: str,
    platform: str,
    use_textfsm: bool = False,
    use_ttp: bool = False,
    use_genie: bool = False,
    textfsm_template: Optional[str] = None,
    ttp_template: Optional[str] = None,
) -> Union[str, List[Any], Dict[str, Any]]:
    """
    Try structured data converters in the following order: TextFSM, TTP, Genie.

    Return the first structured data found, else return the raw_data as-is.
    """
    command = command.strip()
    if use_textfsm:
        structured_output_tfsm = get_structured_data_textfsm(
            raw_data, platform=platform, command=command, template=textfsm_template
        )
        if not isinstance(structured_output_tfsm, str):
            return structured_output_tfsm

    if use_ttp:
        if ttp_template is None:
            msg = """
The argument 'ttp_template=/path/to/template.ttp' must be set when use_ttp=True
"""
            raise ValueError(msg)
        else:
            structured_output_ttp = get_structured_data_ttp(
                raw_data, template=ttp_template
            )

        if not isinstance(structured_output_ttp, str):
            return structured_output_ttp

    if use_genie:
        structured_output_genie = get_structured_data_genie(
            raw_data, platform=platform, command=command
        )
        if not isinstance(structured_output_genie, str):
            return structured_output_genie
    return raw_data
def write_bytes(out_data, encoding='ascii')

Legacy for Python2 and Python3 compatible byte stream.

Source code
def write_bytes(out_data: AnyStr, encoding: str = "ascii") -> bytes:
    """Legacy for Python2 and Python3 compatible byte stream."""
    if sys.version_info[0] >= 3:
        if isinstance(out_data, str):
            if encoding == "utf-8":
                return out_data.encode("utf-8")
            else:
                return out_data.encode("ascii", "ignore")
        elif isinstance(out_data, bytes):
            return out_data
    msg = f"Invalid value for out_data neither unicode nor byte string: {str(out_data)}"
    raise ValueError(msg)
def write_tmp_file(device_name, output)
Source code
def write_tmp_file(device_name: str, output: str) -> str:
    file_name = obtain_netmiko_filename(device_name)
    with open(file_name, "w") as f:
        f.write(output)
    return file_name