Module netmiko.cisco.cisco_nxos_ssh

Source code
from __future__ import print_function
from __future__ import unicode_literals
import re
import time
import os
from netmiko.cisco_base_connection import CiscoSSHConnection
from netmiko.cisco_base_connection import CiscoFileTransfer


class CiscoNxosSSH(CiscoSSHConnection):
    def session_preparation(self):
        """Prepare the session after the connection has been established."""
        self._test_channel_read(pattern=r"[>#]")
        self.ansi_escape_codes = True
        self.set_base_prompt()
        self.disable_paging()
        # Clear the read buffer
        time.sleep(0.3 * self.global_delay_factor)
        self.clear_buffer()

    def normalize_linefeeds(self, a_string):
        """Convert '\r\n' or '\r\r\n' to '\n, and remove extra '\r's in the text."""
        newline = re.compile(r"(\r\r\n|\r\n)")
        # NX-OS fix for incorrect MD5 on 9K (due to strange <enter> patterns on NX-OS)
        return newline.sub(self.RESPONSE_RETURN, a_string).replace("\r", "\n")

    def check_config_mode(self, check_string=")#", pattern="#"):
        """Checks if the device is in configuration mode or not."""
        return super(CiscoNxosSSH, self).check_config_mode(
            check_string=check_string, pattern=pattern
        )


class CiscoNxosFileTransfer(CiscoFileTransfer):
    """Cisco NXOS SCP File Transfer driver."""

    def __init__(
        self,
        ssh_conn,
        source_file,
        dest_file,
        file_system="bootflash:",
        direction="put",
    ):
        self.ssh_ctl_chan = ssh_conn
        self.source_file = source_file
        self.dest_file = dest_file
        self.direction = direction

        if file_system:
            self.file_system = file_system
        else:
            raise ValueError("Destination file system must be specified for NX-OS")

        if direction == "put":
            self.source_md5 = self.file_md5(source_file)
            self.file_size = os.stat(source_file).st_size
        elif direction == "get":
            self.source_md5 = self.remote_md5(remote_file=source_file)
            self.file_size = self.remote_file_size(remote_file=source_file)
        else:
            raise ValueError("Invalid direction specified")

    def check_file_exists(self, remote_cmd=""):
        """Check if the dest_file already exists on the file system (return boolean)."""
        if self.direction == "put":
            if not remote_cmd:
                remote_cmd = "dir {}{}".format(self.file_system, self.dest_file)
            remote_out = self.ssh_ctl_chan.send_command_expect(remote_cmd)
            search_string = r"{}.*Usage for".format(self.dest_file)
            if "No such file or directory" in remote_out:
                return False
            elif re.search(search_string, remote_out, flags=re.DOTALL):
                return True
            else:
                raise ValueError("Unexpected output from check_file_exists")
        elif self.direction == "get":
            return os.path.exists(self.dest_file)

    def remote_file_size(self, remote_cmd="", remote_file=None):
        """Get the file size of the remote file."""
        if remote_file is None:
            if self.direction == "put":
                remote_file = self.dest_file
            elif self.direction == "get":
                remote_file = self.source_file

        if not remote_cmd:
            remote_cmd = "dir {}/{}".format(self.file_system, remote_file)

        remote_out = self.ssh_ctl_chan.send_command(remote_cmd)
        # Match line containing file name
        escape_file_name = re.escape(remote_file)
        pattern = r".*({}).*".format(escape_file_name)
        match = re.search(pattern, remote_out)
        if match:
            file_size = match.group(0)
            file_size = file_size.split()[0]

        if "No such file or directory" in remote_out:
            raise IOError("Unable to find file on remote system")
        else:
            return int(file_size)

    @staticmethod
    def process_md5(md5_output, pattern=r"= (.*)"):
        """Not needed on NX-OS."""
        raise NotImplementedError

    def remote_md5(self, base_cmd="show file", remote_file=None):
        if remote_file is None:
            if self.direction == "put":
                remote_file = self.dest_file
            elif self.direction == "get":
                remote_file = self.source_file
        remote_md5_cmd = "{} {}{} md5sum".format(
            base_cmd, self.file_system, remote_file
        )
        return self.ssh_ctl_chan.send_command(remote_md5_cmd, max_loops=1500).strip()

    def enable_scp(self, cmd=None):
        raise NotImplementedError

    def disable_scp(self, cmd=None):
        raise NotImplementedError

Classes

class CiscoNxosFileTransfer (ssh_conn, source_file, dest_file, file_system='bootflash:', direction='put')

Cisco NXOS SCP File Transfer driver.

Source code
class CiscoNxosFileTransfer(CiscoFileTransfer):
    """Cisco NXOS SCP File Transfer driver."""

    def __init__(
        self,
        ssh_conn,
        source_file,
        dest_file,
        file_system="bootflash:",
        direction="put",
    ):
        self.ssh_ctl_chan = ssh_conn
        self.source_file = source_file
        self.dest_file = dest_file
        self.direction = direction

        if file_system:
            self.file_system = file_system
        else:
            raise ValueError("Destination file system must be specified for NX-OS")

        if direction == "put":
            self.source_md5 = self.file_md5(source_file)
            self.file_size = os.stat(source_file).st_size
        elif direction == "get":
            self.source_md5 = self.remote_md5(remote_file=source_file)
            self.file_size = self.remote_file_size(remote_file=source_file)
        else:
            raise ValueError("Invalid direction specified")

    def check_file_exists(self, remote_cmd=""):
        """Check if the dest_file already exists on the file system (return boolean)."""
        if self.direction == "put":
            if not remote_cmd:
                remote_cmd = "dir {}{}".format(self.file_system, self.dest_file)
            remote_out = self.ssh_ctl_chan.send_command_expect(remote_cmd)
            search_string = r"{}.*Usage for".format(self.dest_file)
            if "No such file or directory" in remote_out:
                return False
            elif re.search(search_string, remote_out, flags=re.DOTALL):
                return True
            else:
                raise ValueError("Unexpected output from check_file_exists")
        elif self.direction == "get":
            return os.path.exists(self.dest_file)

    def remote_file_size(self, remote_cmd="", remote_file=None):
        """Get the file size of the remote file."""
        if remote_file is None:
            if self.direction == "put":
                remote_file = self.dest_file
            elif self.direction == "get":
                remote_file = self.source_file

        if not remote_cmd:
            remote_cmd = "dir {}/{}".format(self.file_system, remote_file)

        remote_out = self.ssh_ctl_chan.send_command(remote_cmd)
        # Match line containing file name
        escape_file_name = re.escape(remote_file)
        pattern = r".*({}).*".format(escape_file_name)
        match = re.search(pattern, remote_out)
        if match:
            file_size = match.group(0)
            file_size = file_size.split()[0]

        if "No such file or directory" in remote_out:
            raise IOError("Unable to find file on remote system")
        else:
            return int(file_size)

    @staticmethod
    def process_md5(md5_output, pattern=r"= (.*)"):
        """Not needed on NX-OS."""
        raise NotImplementedError

    def remote_md5(self, base_cmd="show file", remote_file=None):
        if remote_file is None:
            if self.direction == "put":
                remote_file = self.dest_file
            elif self.direction == "get":
                remote_file = self.source_file
        remote_md5_cmd = "{} {}{} md5sum".format(
            base_cmd, self.file_system, remote_file
        )
        return self.ssh_ctl_chan.send_command(remote_md5_cmd, max_loops=1500).strip()

    def enable_scp(self, cmd=None):
        raise NotImplementedError

    def disable_scp(self, cmd=None):
        raise NotImplementedError

Ancestors

Static methods

def process_md5(md5_output, pattern='= (.*)')

Not needed on NX-OS.

Source code
@staticmethod
def process_md5(md5_output, pattern=r"= (.*)"):
    """Not needed on NX-OS."""
    raise NotImplementedError

Inherited members

class CiscoNxosSSH (ip='', host='', username='', password=None, secret='', port=None, device_type='', verbose=False, global_delay_factor=1, use_keys=False, key_file=None, pkey=None, passphrase=None, allow_agent=False, ssh_strict=False, system_host_keys=False, alt_host_keys=False, alt_key_file='', ssh_config_file=None, timeout=100, session_timeout=60, auth_timeout=None, blocking_timeout=8, banner_timeout=5, keepalive=0, default_enter=None, response_return=None, serial_settings=None, fast_cli=False, session_log=None, session_log_record_writes=False, session_log_file_mode='write', allow_auto_change=False, encoding='ascii')

Base Class for cisco-like behavior.

    Initialize attributes for establishing connection to target device.

    :param ip: IP address of target device. Not required if `host` is
        provided.
    :type ip: str

    :param host: Hostname of target device. Not required if `ip` is
            provided.
    :type host: str

    :param username: Username to authenticate against target device if
            required.
    :type username: str

    :param password: Password to authenticate against target device if
            required.
    :type password: str

    :param secret: The enable password if target device requires one.
    :type secret: str

    :param port: The destination port used to connect to the target
            device.
    :type port: int or None

    :param device_type: Class selection based on device type.
    :type device_type: str

    :param verbose: Enable additional messages to standard output.
    :type verbose: bool

    :param global_delay_factor: Multiplication factor affecting Netmiko delays (default: 1).
    :type global_delay_factor: int

    :param use_keys: Connect to target device using SSH keys.
    :type use_keys: bool

    :param key_file: Filename path of the SSH key file to use.
    :type key_file: str

    :param pkey: SSH key object to use.
    :type pkey: paramiko.PKey

    :param passphrase: Passphrase to use for encrypted key; password will be used for key
            decryption if not specified.
    :type passphrase: str

    :param allow_agent: Enable use of SSH key-agent.
    :type allow_agent: bool

    :param ssh_strict: Automatically reject unknown SSH host keys (default: False, which
            means unknown SSH host keys will be accepted).
    :type ssh_strict: bool

    :param system_host_keys: Load host keys from the users known_hosts file.
    :type system_host_keys: bool
    :param alt_host_keys: If `True` host keys will be loaded from the file specified in
            alt_key_file.
    :type alt_host_keys: bool

    :param alt_key_file: SSH host key file to use (if alt_host_keys=True).
    :type alt_key_file: str

    :param ssh_config_file: File name of OpenSSH configuration file.
    :type ssh_config_file: str

    :param timeout: Connection timeout.
    :type timeout: float

    :param session_timeout: Set a timeout for parallel requests.
    :type session_timeout: float

    :param auth_timeout: Set a timeout (in seconds) to wait for an authentication response.
    :type auth_timeout: float

    :param banner_timeout: Set a timeout to wait for the SSH banner (pass to Paramiko).
    :type banner_timeout: float

    :param keepalive: Send SSH keepalive packets at a specific interval, in seconds.
            Currently defaults to 0, for backwards compatibility (it will not attempt
            to keep the connection alive).
    :type keepalive: int

    :param default_enter: Character(s) to send to correspond to enter key (default:

). :type default_enter: str

    :param response_return: Character(s) to use in normalized return data to represent
            enter key (default:

) :type response_return: str

    :param fast_cli: Provide a way to optimize for performance. Converts select_delay_factor
            to select smallest of global and specific. Sets default global_delay_factor to .1
            (default: False)
    :type fast_cli: boolean

    :param session_log: File path or BufferedIOBase subclass object to write the session log to.
    :type session_log: str

    :param session_log_record_writes: The session log generally only records channel reads due
            to eliminate command duplication due to command echo. You can enable this if you
            want to record both channel reads and channel writes in the log (default: False).
    :type session_log_record_writes: boolean

    :param session_log_file_mode: "write" or "append" for session_log file mode
            (default: "write")
    :type session_log_file_mode: str

    :param allow_auto_change: Allow automatic configuration changes for terminal settings.
            (default: False)
    :type allow_auto_change: bool

    :param encoding: Encoding to be used when writing bytes to the output channel.
            (default: ascii)
    :type encoding: str
Source code
class CiscoNxosSSH(CiscoSSHConnection):
    def session_preparation(self):
        """Prepare the session after the connection has been established."""
        self._test_channel_read(pattern=r"[>#]")
        self.ansi_escape_codes = True
        self.set_base_prompt()
        self.disable_paging()
        # Clear the read buffer
        time.sleep(0.3 * self.global_delay_factor)
        self.clear_buffer()

    def normalize_linefeeds(self, a_string):
        """Convert '\r\n' or '\r\r\n' to '\n, and remove extra '\r's in the text."""
        newline = re.compile(r"(\r\r\n|\r\n)")
        # NX-OS fix for incorrect MD5 on 9K (due to strange <enter> patterns on NX-OS)
        return newline.sub(self.RESPONSE_RETURN, a_string).replace("\r", "\n")

    def check_config_mode(self, check_string=")#", pattern="#"):
        """Checks if the device is in configuration mode or not."""
        return super(CiscoNxosSSH, self).check_config_mode(
            check_string=check_string, pattern=pattern
        )

Ancestors

Methods

def check_config_mode(self, check_string=')#', pattern='#')

Checks if the device is in configuration mode or not.

Source code
def check_config_mode(self, check_string=")#", pattern="#"):
    """Checks if the device is in configuration mode or not."""
    return super(CiscoNxosSSH, self).check_config_mode(
        check_string=check_string, pattern=pattern
    )
def normalize_linefeeds(self, a_string)

Convert ' ' or '

' to ' , and remove extra ' 's in the text.

Source code
def normalize_linefeeds(self, a_string):
    """Convert '\r\n' or '\r\r\n' to '\n, and remove extra '\r's in the text."""
    newline = re.compile(r"(\r\r\n|\r\n)")
    # NX-OS fix for incorrect MD5 on 9K (due to strange <enter> patterns on NX-OS)
    return newline.sub(self.RESPONSE_RETURN, a_string).replace("\r", "\n")
def session_preparation(self)

Prepare the session after the connection has been established.

Source code
def session_preparation(self):
    """Prepare the session after the connection has been established."""
    self._test_channel_read(pattern=r"[>#]")
    self.ansi_escape_codes = True
    self.set_base_prompt()
    self.disable_paging()
    # Clear the read buffer
    time.sleep(0.3 * self.global_delay_factor)
    self.clear_buffer()

Inherited members