Watch (with command-line arguments)

Mimicking the Unix "watch" CLI utility in SR OS using pySROS

Introduction

As engineers, we sometimes find ourselves repeating the same command over and over, waiting for that one BGP route to appear or a network event to occur that enables our changes to proceed or be closed off. This is not the most efficient practice, as discovered in 1991 when the original "watch" command was introduced.

Because it requires some inputs from the operator, a way of parsing arguments was added to the code included in this article. The command covers typical use cases that we've seen and found to be useful; suggestions and improvements are welcomed.

Elements used in this tutorial

Element Version
CentOS Workstation/Server CentOS Linux release 7.9.2009 (Core)
SR OS device running in Model-Driven mode 22.10.R1
Python (on the CentOS machine) 3.9.6
Containerlab 0.32.1

This is a demo quality pySROS script that does not have official support from Nokia. It is intended for local execution on an SR OS box.

Creating the script

In the following sections, the script is dissected and the parts are briefly discussed. At the end of this article, the script is shown in its entirety to promote reusability.

Argument and ArgumentHelper

Similar to how the well-known argparse library works, these classes simplify the management of arguments in the script. The "help" portion of the script is managed by the information held in the Argument objects, and default values can be specified. When an operator calls the script through an alias, using a call to pyexec or by running it remotely using their own Python interpreter, arguments must be distilled from the input.

An Argument object is made up of six member variables, as follows:

  • name - used to identify this Argument
  • parsing_function - applied to any value passed to the method's setValue
  • help_text - displayed by the system whenever the script is called with arguments indicating a need for help
  • parameter_modifier - indicates how many values from sys.argv should be counted to be part of this Argument when its CLI flag is found
  • default_value - (and value) contain the initial and current values for the Argument, respectively

The ArgumentHelper class is an abstraction from the regular dictionary of Arguments. The AugmentHelper can be given a number of arguments to look for and parses the input given to the script. If needed, a help message can be printed to the CLI by the ArgumentHelper:

class Argument:
    """Class representing a single CLI argument passed via sys.argv

    :parameter name:    resulting key to be used in the args dictionary to
                        find the value passed for this argument.
    :type name: str
    :parameter parsing_function:    function to be run using the discovered
                                    parameters as input, result of which is
                                    assigned to the variable.
    :type parsing_function: Callable
    :parameter help_text:   Help text to be displayed when needed
    :type help_text: str
    :parameter parameter_modifier:  Number of values to retrieve from
                                    sys.argv to consume for this variable.
    :type parameter_modifier: int
    :parameter default_value:   value assigned by default to ensure args
                                dictionary is always populated.
    :type default_value: Any
    """

    def __init__(
        self,
        _name,
        _parsing_function,
        _help_text,
        _parameter_modifier,
        _default_value,
    ):
        self.name = _name
        self.parsing_function = _parsing_function
        self.help_text = _help_text
        self.parameter_modifier = _parameter_modifier
        self.default_value = _default_value
        self.value = _default_value

    def set_value(self, value):
        """Set the value of this Argument to the passed value parameter
        after it is passed through the object's parsing function.

        :parameter value: The argument passed assigned to the object value
        :type value: Any
        """
        if isinstance(value, list) and len(value) == 1:
            value = value[0]
        self.value = self.parsing_function(value)

    def __str__(self):
        return "%*s" % (20, self.help_text)


class ArgumentHelper:
    """Class acting as abstraction layer between sys.argv and a
    collection of Arguments to be returned as a key:value dictionary
    after parsing and processing.

    Intended to be similar to the ArgumentParser class in argparse.
    """

    def __init__(self):
        self.command_help = "watch help/-h"
        self.arg_list_pointer = 0
        self.args = {}

    def add_argument(self, key, arg):
        """Add an argument to the class' internal args dictionary by key
        with value equal to an Argument.

        :parameter key: CLI flag to be used to specifically address the
                        corresponding Argument object
        :type key: str
        :parameter arg: The Argument object being created and made available
        :type arg: :py:class:`Argument`
        """
        self.args[key] = arg

    def send_help(self):
        """Distill a help message similar to what is shown on Linux CLI
        to be shown on the SR OS CLI when an operator enters the command
        without arguments, or with "-h" or "help" as an argument

        :returns:   a printable help message to be displayed on the CLI, using
                    information found in each Argument contained in self.args
        :rtype: str
        """
        help_message = "\nExample: %-*s\nCommand options:\n" % (
            20,
            "watch.py <command to execute/monitor> %s" % (self.help_flags()),
        )
        for flag, arg in self.args.items():
            help_message += " %-*s%s\n" % (10 + len(flag), flag, arg)
        help_message += " %-*s%s" % (
            10 + len("-h"),
            "-h",
            "Display this message.",
        )
        return help_message

    def help_flags(self):
        """Add an argument to the class' internal args dictionary by key
        with value equal to an Argument.

        :returns:   a printable help message to be displayed on the CLI,
                    showing potentially usable flags in shorthand notation,
                    to be included with the result of send_help
        :rtype: str
        """
        flags = "|".join(
            k + (" #" if v.parameter_modifier > 0 else "")
            for k, v in self.args.items()
        )
        return flags

    def handle_argument(self, index, arg_list_skip_path):
        """Handle an argument specified on the CLI by finding the key, using
        the key to find the Argument object, determining how many parameters
        there should be on the CLI and passing those to the Argument's
        set_value call.

        :parameter index:   Current index in the list of CLI arguments being
                            looked at for parsing/processing
        :type key: int
        :parameter arg_list_skip_path: list of CLI arguments
        :type arg: list
        """
        argument_flag = arg_list_skip_path[index]
        self.arg_list_pointer += self.args[argument_flag].parameter_modifier
        self.args[argument_flag].set_value(
            arg_list_skip_path[index + 1: self.arg_list_pointer + 1]
        )
        self.arg_list_pointer += 1

    def parse_argv(self, _arg_list):
        """Add an argument to the class' args dictionary by key with value
        equal to an Argument.

        :parameter _arg_list:   the list of arguments, retrieved from sys.argv
                                by the caller and passed to this function.
        :type arg: list
        :returns:   a dictionary containing the specified Arguments either with
                    default or specific values, or a "helped" flag that
                    terminates the program.
        :rtype: str
        """
        arg_list = _arg_list
        if len(arg_list) == 0:
            return None

        if (len(arg_list) == 1 or arg_list[1] == "-h" or arg_list[1] == "help"):
            print(self.send_help())
            return {"helped": True}

        result_args = {}
        # Skip the first element, as that is the path (in this case)
        if arg_list[1] == "/":
            # Elements such as
            #     watch /state/system/up-time
            #     watch /nokia-state:state/system/up-time
            # are passed to arg_list with the first '/' in its
            # own entry in the list: undo that.
            #
            # - this also applies to
            # -     /show system information
            # - that appears "/", "show system information"
            # - avoid, lighten the condition on the if-statement above
            arg_list[1] += arg_list[2]
            del arg_list[2]
        arg_list_skip_path = arg_list[1:]

        # ### ###
        # If this command needs a tools/show/state command to start
        # this section is needed. If not provided it can't run.
        # ### ###
        try:
            # parts of show commands that follow a space and start with '-'
            # may cause a problem in execution however no specific examples
            # of this are known
            self.arg_list_pointer = min(
                (
                    index
                    for index, value in enumerate(arg_list_skip_path)
                    if value.startswith("-")
                )
            )
        except ValueError:
            # ValueError: min() arg is an empty sequence
            self.arg_list_pointer = len(arg_list_skip_path)
        result_args["xpath"] = " ".join(
            arg_list_skip_path[: self.arg_list_pointer]
        )
        # ### ###
        # the arguments come after the show command
        # ### ###
        for index, value in enumerate(arg_list_skip_path):
            if self.arg_list_pointer == index:
                try:
                    self.handle_argument(index, arg_list_skip_path)
                except KeyError as error_cli_option:
                    print(
                        '"%s" is not a valid command option. See \n%s'
                        % (error_cli_option, self.send_help())
                    )
                    sys.exit(98)
            else:
                # this parameter or flag was already used - skip
                pass

        intermediary = {v.name: v.value for _, v in self.args.items()}
        intermediary.update(result_args)
        return intermediary

In addition to the classes previously defined, some parsing functions are defined that are intended to validate the provided parameters and align them with the rest of the script as far as the object type is concerned. These functions are considered trivial and are shown here for completeness:

def simple_boolean(_inputstring):
    """Simple function to return True after being called, used to augment
    the Argument classes below. it is meant to be used as a replacement
    for action=store_true in regular argparse
    """
    return True


def simple_string(inputstring):
    """Simple function to return string representation of the argument
    after being called, used to augment the Argument classes below

    :parameter inputstring: The argument passed via sys.argv to be converted
                            to string (if needed) and returned by this function
    :type inputstring: str
    :returns: inputstring as str type
    :rtype: :py:class:`str`
    """
    if isinstance(inputstring, str):
        return inputstring
    return repr(inputstring)


def simple_integer(inputstring):
    """Simple function to return the integer representation of the argument
    after being called, used to augment the Argument classes below

    :parameter inputstring: The argument passed via sys.argv to be converted
                            to an int object and returned by this function
    :type inputstring: str
    :returns: inputstring as int type
    :rtype: :py:class:`int`
    """
    return int(inputstring)

get_connection

Every pySROS script must set up a connection to a device before it can perform any interactions with the device. The generally accepted way of establishing a connection uses a get_connection function:

def get_connection(host=None, username=None, password=None, port=830):
    """Function definition to obtain a Connection object to a specific
    SR OS device and access the model-driven information.
    :parameter host: The hostname or IP address of the SR OS node.
    :type host: str
    :paramater credentials: The username and password to connect
                            to the SR OS node.
    :type credentials: dict
    :parameter port: The TCP port for the connection to the SR OS node.
    :type port: int
    :returns: Connection object for the SR OS node.
    :rtype: :py:class:`pysros.management.Connection`
    """
    try:
        connection_object = connect(
            host=host,
            username=username,
            password=password,
            port=port,
        )
    except RuntimeError as error1:
        print("Failed to connect.  Error:", error1)
        sys.exit(-1)
    return connection_object

Finding differences in output

When comparing the results from two sequential iterations, the behavior of the script is determined by the command that is being watched. If the command specifies a Leaf object in the state tree, differences in the value of the Leaf are reported. If the command specifies a Container, recursion is used to report newly added or deleted nodes from the Container structure.

If the command specifies an SR OS command with a textual output (show ... , tools ...) , the output is considered line-by-line.

def leaf_diff(path, key, new, r_exclude):
    """Display the path leading up to the new value of the leaf in tree,
    followed by the new value.

    :parameter path:    The json-instance-path leading up
                        to the leaf of which the value is being compared
    :type path: str
    :parameter key: The specific key for the leaf within the parent Container.
                    Empty if the query was a specific Leaf.
    :type key: str
    :parameter new: The new value found in the leaf
    :type new: str
    :parameter r_exclude:   Regex to exclude any leafs whose name matches this
                            regular expression
    :type r_exclude: str
    """
    output = path + (("/" + str(key)) if key else "") + "/" + str(new)
    if r_exclude is not None:
        if not bool(re.match(r_exclude, output)):
            print(output)
    else:
        print(output)


def dict_diff(ref, new, path, r_exclude):
    """Display the path leading up to one of three possible situations;
        either a leaf is removed from the tree or
        a leaf is added to the tree or
        a leaf's value has changed

    :parameter ref: The reference value, for the currently new values to be
                    compared against
    :type ref: str
    :parameter new: The new values found in the leaf
    :type new: str
    :parameter path:    The json-instance-path leading up
                        to the leaf of which the value is being compared
    :type path: str
    :parameter r_exclude:   Regex to exclude any leafs whose name matches
                            this regular expression
    :type r_exclude: str
    """
    # Used to display the value between two XPATHs
    for key in ref:
        # Case where a context was deleted
        if key not in new:
            print(path + "/" + str(key) + " *** removed ***  ")
    for key in new:
        # Case where a context was created
        if key not in ref:
            print(path + "/" + str(key) + " *** added ***  ")
            printTree(new[key])
        else:
            if new[key] == ref[key]:
                # case where a context already existed and wasn't changed
                pass
            elif isinstance(new[key], (Container, dict)):
                # Case where a context was changed but already existed
                # Not reaching a leaf node -> recursive call to dict_diff
                dict_diff(ref[key], new[key], path + "/" + str(key), r_exclude)
            else:
                # Reached a leaf -> Display the result if the xpath to reach it
                # doesn't match the exclude provided regex
                leaf_diff(path, key, new[key], r_exclude)

main

Finally, using the building blocks shown previously, the main function is created along with two helper functions; show_tools_command_monitor and xpath_monitor. As the main part of the script, the main function controls the entire flow. In the main function, the arguments are defined. The script accepts inputs for the number of iterations, the delay between iterations, a regex parameter to exclude Leafs from the comparison, and a flag to choose which values to compare (t0 vs. t, or t-1 vs. t). A command or value that should be watched is also expected:

def show_tools_command_monitor(node_handle, arguments):
    """Function called to get information from a show or tools command
    passed as input to the command

    :parameter node_handle: pySROS object representing connection to the node
    :type node_handle: :py:class:`pysros.management.Connection`

    :parameter arguments:   CLI arguments gathered from CLI
                            and parsed to Python objects
    :type arguments: dict
    """
    # Case where a show command is monitored
    ref_output = node_handle.cli(arguments["xpath"])
    ref_output_split = ref_output.splitlines()
    # Clear screen only if the script is launch from the node
    if sros():
        print(node_handle.cli("/clear screen"))
    print(ref_output)
    for _ in range(0, arguments["repeat"]):
        time.sleep(arguments["interval"])
        # Clear screen only if the script is launch from the node
        if sros():
            print(node_handle.cli("/clear screen"))
        current_output_split = node_handle.cli(
            arguments["xpath"]
        ).splitlines()
        current_output_mod = []
        for line in current_output_split:
            # Line is not reported as changed if exclude regex matches
            if arguments["exclude"] is not None:
                dont_check = bool(re.match(arguments["exclude"], line))
            else:
                dont_check = False
            # Prepend '-->' to the line if a change is observed
            if line in ref_output_split or dont_check:
                current_output_mod.append("    " + line)
            else:
                current_output_mod.append("--> " + line)
            # Warning : Removed lines are not notified
        for line in current_output_mod:
            print(line)
        if arguments["absolute"] is False:
            ref_output_split = current_output_split


def xpath_monitor(node_handle, arguments):
    """Function called to get information from an xpath expression
    passed to the command

    :parameter node_handle: pySROS object representing connection to the node
    :type node_handle: :py:class:`pysros.management.Connection`

    :parameter arguments:   CLI arguments gathered from CLI
                            and parsed to Python objects
    :type arguments: dict
    """
    # Case where an xpath is monitored
    if "nokia" not in arguments["xpath"]:
        if arguments["xpath"].startswith("/"):
            xpath = "/nokia-state:" + arguments["xpath"][1:]
        else:
            xpath = "/nokia-state:" + arguments["xpath"]
    else:
        xpath = arguments["xpath"]
    if xpath[-1] == "/":
        xpath = xpath[:-1]
    # Get the initial state of the context as ref
    try:
        obj_ref = node_handle.running.get(xpath)
    except InvalidPathError as error:
        print("Error in the xpath %s\n%s" % (xpath, error))
        sys.exit()
    print("### Initial reference recorded ###\n")
    for i in range(0, arguments["repeat"]):
        # Wait for the required interval
        time.sleep(arguments["interval"])
        # Get the new state of the context to check
        obj_new = node_handle.running.get(xpath)
        print(
            "\n### Iteration nb. {iter} ({time_spent}s) ###\n".format(
                iter=i + 1,
                time_spent=(i + 1) * arguments["interval"],
            )
        )
        if isinstance(obj_new, Container or isinstance(obj_new, dict)):
            # Check the diff between ref & last record
            dict_diff(
                obj_ref, obj_new, ".", arguments["exclude"]
            )
        elif isinstance(obj_new, Leaf) and obj_ref != obj_new:
            leaf_diff(xpath, "", obj_new, None)
        # Store the actual record as ref if absolute flag is no set
        if arguments["absolute"] is False:
            obj_ref = obj_new


def main():
    """The main procedure.  The execution starts here."""
    args = ArgumentHelper()
    args.add_argument(
        "-e",
        Argument(
            "exclude",
            simple_string,
            "Regex to exclude from the result(s)",
            1,
            None,
        ),
    )
    args.add_argument(
        "-i",
        Argument(
            "interval",
            simple_integer,
            "Interval in second(s) between each check / Default = 3",
            1,
            3,
        ),
    )
    args.add_argument(
        "-r",
        Argument(
            "repeat",
            simple_integer,
            "Number of iterations / Default = 10",
            1,
            10,
        ),
    )
    args.add_argument(
        "-a",
        Argument(
            "absolute",
            simple_boolean,
            "Compare to initial info if set (default is incremental compares)",
            0,
            False,
        ),
    )

    if found_arguments := args.parse_argv(sys.argv):
        if "helped" in found_arguments.keys():
            return

        # Connect to the node
        node_handle = get_connection()
        if (
            found_arguments["xpath"].startswith("show")
            or found_arguments["xpath"].startswith("tools")
            or found_arguments["xpath"].startswith("/show")
            or found_arguments["xpath"].startswith("/tools")
        ):
            show_tools_command_monitor(node_handle, found_arguments)
        else:
            xpath_monitor(node_handle, found_arguments)
    else:
        print("Error while parsing args")

Actual code: watch.py

In this section, you'll find the complete working code.

#!/usr/bin/env python3

### watch.py
#   Copyright 2022 Nokia
###

# pylint: disable=consider-using-f-string

"""Implementation/approximation of Linux "watch" command for Nokia SR OS.
Tested on: SR OS 22.10.R1
"""

import re
import sys
import time
from pysros.wrappers import Leaf, Container
from pysros.management import connect, sros
from pysros.exceptions import InvalidPathError
from pysros.pprint import printTree


def get_connection(host=None, username=None, password=None, port=830):
    """Function definition to obtain a Connection object to a specific
    SR OS device and access the model-driven information.
    :parameter host: The hostname or IP address of the SR OS node.
    :type host: str
    :paramater credentials: The username and password to connect
                            to the SR OS node.
    :type credentials: dict
    :parameter port: The TCP port for the connection to the SR OS node.
    :type port: int
    :returns: Connection object for the SR OS node.
    :rtype: :py:class:`pysros.management.Connection`
    """
    try:
        connection_object = connect(
            host=host,
            username=username,
            password=password,
            port=port,
        )
    except RuntimeError as error1:
        print("Failed to connect.  Error:", error1)
        sys.exit(-1)
    return connection_object


def simple_boolean(_inputstring):
    """Simple function to return True after being called, used to augment
    the Argument classes below. it is meant to be used as a replacement
    for action=store_true in regular argparse
    """
    return True


def simple_string(inputstring):
    """Simple function to return string representation of the argument
    after being called, used to augment the Argument classes below

    :parameter inputstring: The argument passed via sys.argv to be converted
                            to string (if needed) and returned by this function
    :type inputstring: str
    :returns: inputstring as str type
    :rtype: :py:class:`str`
    """
    if isinstance(inputstring, str):
        return inputstring
    return repr(inputstring)


def simple_integer(inputstring):
    """Simple function to return the integer representation of the argument
    after being called, used to augment the Argument classes below

    :parameter inputstring: The argument passed via sys.argv to be converted
                            to an int object and returned by this function
    :type inputstring: str
    :returns: inputstring as int type
    :rtype: :py:class:`int`
    """
    return int(inputstring)


class Argument:
    """Class representing a single CLI argument passed via sys.argv

    :parameter name:    resulting key to be used in the args dictionary to
                        find the value passed for this argument.
    :type name: str
    :parameter parsing_function:    function to be run using the discovered
                                    parameters as input, result of which is
                                    assigned to the variable.
    :type parsing_function: Callable
    :parameter help_text:   Help text to be displayed when needed
    :type help_text: str
    :parameter parameter_modifier:  Number of values to retrieve from
                                    sys.argv to consume for this variable.
    :type parameter_modifier: int
    :parameter default_value:   value assigned by default to ensure args
                                dictionary is always populated.
    :type default_value: Any
    """

    def __init__(
        self,
        _name,
        _parsing_function,
        _help_text,
        _parameter_modifier,
        _default_value,
    ):
        self.name = _name
        self.parsing_function = _parsing_function
        self.help_text = _help_text
        self.parameter_modifier = _parameter_modifier
        self.default_value = _default_value
        self.value = _default_value

    def set_value(self, value):
        """Set the value of this Argument to the passed value parameter
        after it is passed through the object's parsing function.

        :parameter value: The argument passed assigned to the object value
        :type value: Any
        """
        if isinstance(value, list) and len(value) == 1:
            value = value[0]
        self.value = self.parsing_function(value)

    def __str__(self):
        return "%*s" % (20, self.help_text)


class ArgumentHelper:
    """Class acting as abstraction layer between sys.argv and a
    collection of Arguments to be returned as a key:value dictionary
    after parsing and processing.

    Intended to be similar to the ArgumentParser class in argparse.
    """

    def __init__(self):
        self.command_help = "watch help/-h"
        self.arg_list_pointer = 0
        self.args = {}

    def add_argument(self, key, arg):
        """Add an argument to the class' internal args dictionary by key
        with value equal to an Argument.

        :parameter key: CLI flag to be used to specifically address the
                        corresponding Argument object
        :type key: str
        :parameter arg: The Argument object being created and made available
        :type arg: :py:class:`Argument`
        """
        self.args[key] = arg

    def send_help(self):
        """Distill a help message similar to what is shown on Linux CLI
        to be shown on the SR OS CLI when an operator enters the command
        without arguments, or with "-h" or "help" as an argument

        :returns:   a printable help message to be displayed on the CLI, using
                    information found in each Argument contained in self.args
        :rtype: str
        """
        help_message = "\nExample: %-*s\nCommand options:\n" % (
            20,
            "watch.py <command to execute/monitor> %s" % (self.help_flags()),
        )
        for flag, arg in self.args.items():
            help_message += " %-*s%s\n" % (10 + len(flag), flag, arg)
        help_message += " %-*s%s" % (
            10 + len("-h"),
            "-h",
            "Display this message.",
        )
        return help_message

    def help_flags(self):
        """Add an argument to the class' internal args dictionary by key
        with value equal to an Argument.

        :returns:   a printable help message to be displayed on the CLI,
                    showing potentially usable flags in shorthand notation,
                    to be included with the result of send_help
        :rtype: str
        """
        flags = "|".join(
            k + (" #" if v.parameter_modifier > 0 else "")
            for k, v in self.args.items()
        )
        return flags

    def handle_argument(self, index, arg_list_skip_path):
        """Handle an argument specified on the CLI by finding the key, using
        the key to find the Argument object, determining how many parameters
        there should be on the CLI and passing those to the Argument's
        set_value call.

        :parameter index:   Current index in the list of CLI arguments being
                            looked at for parsing/processing
        :type key: int
        :parameter arg_list_skip_path: list of CLI arguments
        :type arg: list
        """
        argument_flag = arg_list_skip_path[index]
        self.arg_list_pointer += self.args[argument_flag].parameter_modifier
        self.args[argument_flag].set_value(
            arg_list_skip_path[index + 1: self.arg_list_pointer + 1]
        )
        self.arg_list_pointer += 1

    def parse_argv(self, _arg_list):
        """Add an argument to the class' args dictionary by key with value
        equal to an Argument.

        :parameter _arg_list:   the list of arguments, retrieved from sys.argv
                                by the caller and passed to this function.
        :type arg: list
        :returns:   a dictionary containing the specified Arguments either with
                    default or specific values, or a "helped" flag that
                    terminates the program.
        :rtype: str
        """
        arg_list = _arg_list
        if len(arg_list) == 0:
            return None

        if len(arg_list) == 1 or arg_list[1] == "-h" or arg_list[1] == "help":
            print(self.send_help())
            return {"helped": True}

        result_args = {}
        # Skip the first element, as that is the path (in this case)
        if arg_list[1] == "/":
            # Elements such as
            #     watch /state/system/up-time
            #     watch /nokia-state:state/system/up-time
            # are passed to arg_list with the first '/' in its
            # own entry in the list: undo that.
            #
            # - this also applies to
            # -     /show system information
            # - that appears "/", "show system information"
            # - avoid, lighten the condition on the if-statement above
            arg_list[1] += arg_list[2]
            del arg_list[2]
        arg_list_skip_path = arg_list[1:]

        # ### ###
        # If this command needs a tools/show/state command to start
        # this section is needed. If not provided it can't run.
        # ### ###
        try:
            # parts of show commands that follow a space and start with '-'
            # may cause a problem in execution however no specific examples
            # of this are known
            self.arg_list_pointer = min(
                (
                    index
                    for index, value in enumerate(arg_list_skip_path)
                    if value.startswith("-")
                )
            )
        except ValueError:
            # ValueError: min() arg is an empty sequence
            self.arg_list_pointer = len(arg_list_skip_path)
        result_args["xpath"] = " ".join(
            arg_list_skip_path[: self.arg_list_pointer]
        )
        # ### ###
        # the arguments come after the show command
        # ### ###
        for index, value in enumerate(arg_list_skip_path):
            if self.arg_list_pointer == index:
                try:
                    self.handle_argument(index, arg_list_skip_path)
                except KeyError as error_cli_option:
                    print(
                        '"%s" is not a valid command option. See \n%s'
                        % (error_cli_option, self.send_help())
                    )
                    sys.exit(98)
            else:
                # this parameter or flag was already used - skip
                pass

        intermediary = {v.name: v.value for _, v in self.args.items()}
        intermediary.update(result_args)
        return intermediary


def leaf_diff(path, key, new, r_exclude):
    """Display the path leading up to the new value of the leaf in tree,
    followed by the new value.

    :parameter path:    The json-instance-path leading up
                        to the leaf of which the value is being compared
    :type path: str
    :parameter key: The specific key for the leaf within the parent Container.
                    Empty if the query was a specific Leaf.
    :type key: str
    :parameter new: The new value found in the leaf
    :type new: str
    :parameter r_exclude:   Regex to exclude any leafs whose name matches this
                            regular expression
    :type r_exclude: str
    """
    output = path + (("/" + str(key)) if key else "") + "/" + str(new)
    if r_exclude is not None:
        if not bool(re.match(r_exclude, output)):
            print(output)
    else:
        print(output)


def dict_diff(ref, new, path, r_exclude):
    """Display the path leading up to one of three possible situations;
        either a leaf is removed from the tree or
        a leaf is added to the tree or
        a leaf's value has changed

    :parameter ref: The reference value, for the currently new values to be
                    compared against
    :type ref: str
    :parameter new: The new values found in the leaf
    :type new: str
    :parameter path:    The json-instance-path leading up
                        to the leaf of which the value is being compared
    :type path: str
    :parameter r_exclude:   Regex to exclude any leafs whose name matches
                            this regular expression
    :type r_exclude: str
    """
    # Used to display the value between two XPATHs
    for key in ref:
        # Case where a context was deleted
        if key not in new:
            print(path + "/" + str(key) + " *** removed ***  ")
    for key in new:
        # Case where a context was created
        if key not in ref:
            print(path + "/" + str(key) + " *** added ***  ")
            printTree(new[key])
        else:
            if new[key] == ref[key]:
                # case where a context already existed and wasn't changed
                pass
            elif isinstance(new[key], (Container, dict)):
                # Case where a context was changed but already existed
                # Not reaching a leaf node -> recursive call to dict_diff
                dict_diff(ref[key], new[key], path + "/" + str(key), r_exclude)
            else:
                # Reached a leaf -> Display the result if the xpath to reach it
                # doesn't match the exclude provided regex
                leaf_diff(path, key, new[key], r_exclude)


def show_tools_command_monitor(node_handle, arguments):
    """Function called to get information from a show or tools command
    passed as input to the command

    :parameter node_handle: pySROS object representing connection to the node
    :type node_handle: :py:class:`pysros.management.Connection`

    :parameter arguments:   CLI arguments gathered from CLI
                            and parsed to Python objects
    :type arguments: dict
    """
    # Case where a show command is monitored
    ref_output = node_handle.cli(arguments["xpath"])
    ref_output_split = ref_output.splitlines()
    # Clear screen only if the script is launch from the node
    if sros():
        print(node_handle.cli("/clear screen"))
    print(ref_output)
    for _ in range(0, arguments["repeat"]):
        time.sleep(arguments["interval"])
        # Clear screen only if the script is launch from the node
        if sros():
            print(node_handle.cli("/clear screen"))
        current_output_split = node_handle.cli(arguments["xpath"]).splitlines()
        current_output_mod = []
        for line in current_output_split:
            # Line is not reported as changed if exclude regex matches
            if arguments["exclude"] is not None:
                dont_check = bool(re.match(arguments["exclude"], line))
            else:
                dont_check = False
            # Prepend '-->' to the line if a change is observed
            if line in ref_output_split or dont_check:
                current_output_mod.append("    " + line)
            else:
                current_output_mod.append("--> " + line)
            # Warning : Removed lines are not notified
        for line in current_output_mod:
            print(line)
        if arguments["absolute"] is False:
            ref_output_split = current_output_split


def xpath_monitor(node_handle, arguments):
    """Function called to get information from an xpath expression
    passed to the command

    :parameter node_handle: pySROS object representing connection to the node
    :type node_handle: :py:class:`pysros.management.Connection`

    :parameter arguments:   CLI arguments gathered from CLI
                            and parsed to Python objects
    :type arguments: dict
    """
    # Case where an xpath is monitored
    if "nokia" not in arguments["xpath"]:
        if arguments["xpath"].startswith("/"):
            xpath = "/nokia-state:" + arguments["xpath"][1:]
        else:
            xpath = "/nokia-state:" + arguments["xpath"]
    else:
        xpath = arguments["xpath"]
    if xpath[-1] == "/":
        xpath = xpath[:-1]
    # Get the initial state of the context as ref
    try:
        obj_ref = node_handle.running.get(xpath)
    except InvalidPathError as error:
        print("Error in the xpath %s\n%s" % (xpath, error))
        sys.exit()
    print("### Initial reference recorded ###\n")
    for i in range(0, arguments["repeat"]):
        # Wait for the required interval
        time.sleep(arguments["interval"])
        # Get the new state of the context to check
        obj_new = node_handle.running.get(xpath)
        print(
            "\n### Iteration nb. {iter} ({time_spent}s) ###\n".format(
                iter=i + 1,
                time_spent=(i + 1) * arguments["interval"],
            )
        )
        if isinstance(obj_new, Container or isinstance(obj_new, dict)):
            # Check the diff between ref & last record
            dict_diff(obj_ref, obj_new, ".", arguments["exclude"])
        elif isinstance(obj_new, Leaf) and obj_ref != obj_new:
            leaf_diff(xpath, "", obj_new, None)
        # Store the actual record as ref if absolute flag is no set
        if arguments["absolute"] is False:
            obj_ref = obj_new


def main():
    """The main procedure.  The execution starts here."""
    args = ArgumentHelper()
    args.add_argument(
        "-e",
        Argument(
            "exclude",
            simple_string,
            "Regex to exclude from the result(s)",
            1,
            None,
        ),
    )
    args.add_argument(
        "-i",
        Argument(
            "interval",
            simple_integer,
            "Interval in second(s) between each check / Default = 3",
            1,
            3,
        ),
    )
    args.add_argument(
        "-r",
        Argument(
            "repeat",
            simple_integer,
            "Number of iterations / Default = 10",
            1,
            10,
        ),
    )
    args.add_argument(
        "-a",
        Argument(
            "absolute",
            simple_boolean,
            "Compare to initial info if set (default is incremental compares)",
            0,
            False,
        ),
    )

    if found_arguments := args.parse_argv(sys.argv):
        if "helped" in found_arguments.keys():
            return

        # Connect to the node
        node_handle = get_connection()
        if (
            found_arguments["xpath"].startswith("show")
            or found_arguments["xpath"].startswith("tools")
            or found_arguments["xpath"].startswith("/show")
            or found_arguments["xpath"].startswith("/tools")
        ):
            show_tools_command_monitor(node_handle, found_arguments)
        else:
            xpath_monitor(node_handle, found_arguments)
    else:
        print("Error while parsing args")


if __name__ == "__main__":
    main()

On this page