As engineers, we sometimes find ourselves repeating the same command over and over, waiting for that one BGP route to appear or a network event to occur that enables our changes to proceed or be closed off. This is not the most efficient practice, as discovered in 1991 when the original "watch" command was introduced.
Because it requires some inputs from the operator, a way of parsing arguments was added to the code included in this article. The command covers typical use cases that we've seen and found to be useful; suggestions and improvements are welcomed.
Element | Version |
---|---|
CentOS Workstation/Server | CentOS Linux release 7.9.2009 (Core) |
SR OS device running in Model-Driven mode | 22.10.R1 |
Python (on the CentOS machine) | 3.9.6 |
Containerlab | 0.32.1 |
This is a demo quality pySROS script that does not have official support from Nokia. It is intended for local execution on an SR OS box.
In the following sections, the script is dissected and the parts are briefly discussed. At the end of this article, the script is shown in its entirety to promote reusability.
Similar to how the well-known argparse library works, these classes simplify the management of arguments in the script. The "help" portion of the script is managed by the information held in the Argument objects, and default values can be specified. When an operator calls the script through an alias, using a call to pyexec or by running it remotely using their own Python interpreter, arguments must be distilled from the input.
An Argument object is made up of six member variables, as follows:
The ArgumentHelper class is an abstraction from the regular dictionary of Arguments. The AugmentHelper can be given a number of arguments to look for and parses the input given to the script. If needed, a help message can be printed to the CLI by the ArgumentHelper:
class Argument:
"""Class representing a single CLI argument passed via sys.argv
:parameter name: resulting key to be used in the args dictionary to
find the value passed for this argument.
:type name: str
:parameter parsing_function: function to be run using the discovered
parameters as input, result of which is
assigned to the variable.
:type parsing_function: Callable
:parameter help_text: Help text to be displayed when needed
:type help_text: str
:parameter parameter_modifier: Number of values to retrieve from
sys.argv to consume for this variable.
:type parameter_modifier: int
:parameter default_value: value assigned by default to ensure args
dictionary is always populated.
:type default_value: Any
"""
def __init__(
self,
_name,
_parsing_function,
_help_text,
_parameter_modifier,
_default_value,
):
self.name = _name
self.parsing_function = _parsing_function
self.help_text = _help_text
self.parameter_modifier = _parameter_modifier
self.default_value = _default_value
self.value = _default_value
def set_value(self, value):
"""Set the value of this Argument to the passed value parameter
after it is passed through the object's parsing function.
:parameter value: The argument passed assigned to the object value
:type value: Any
"""
if isinstance(value, list) and len(value) == 1:
value = value[0]
self.value = self.parsing_function(value)
def __str__(self):
return "%*s" % (20, self.help_text)
class ArgumentHelper:
"""Class acting as abstraction layer between sys.argv and a
collection of Arguments to be returned as a key:value dictionary
after parsing and processing.
Intended to be similar to the ArgumentParser class in argparse.
"""
def __init__(self):
self.command_help = "watch help/-h"
self.arg_list_pointer = 0
self.args = {}
def add_argument(self, key, arg):
"""Add an argument to the class' internal args dictionary by key
with value equal to an Argument.
:parameter key: CLI flag to be used to specifically address the
corresponding Argument object
:type key: str
:parameter arg: The Argument object being created and made available
:type arg: :py:class:`Argument`
"""
self.args[key] = arg
def send_help(self):
"""Distill a help message similar to what is shown on Linux CLI
to be shown on the SR OS CLI when an operator enters the command
without arguments, or with "-h" or "help" as an argument
:returns: a printable help message to be displayed on the CLI, using
information found in each Argument contained in self.args
:rtype: str
"""
help_message = "\nExample: %-*s\nCommand options:\n" % (
20,
"watch.py <command to execute/monitor> %s" % (self.help_flags()),
)
for flag, arg in self.args.items():
help_message += " %-*s%s\n" % (10 + len(flag), flag, arg)
help_message += " %-*s%s" % (
10 + len("-h"),
"-h",
"Display this message.",
)
return help_message
def help_flags(self):
"""Add an argument to the class' internal args dictionary by key
with value equal to an Argument.
:returns: a printable help message to be displayed on the CLI,
showing potentially usable flags in shorthand notation,
to be included with the result of send_help
:rtype: str
"""
flags = "|".join(
k + (" #" if v.parameter_modifier > 0 else "")
for k, v in self.args.items()
)
return flags
def handle_argument(self, index, arg_list_skip_path):
"""Handle an argument specified on the CLI by finding the key, using
the key to find the Argument object, determining how many parameters
there should be on the CLI and passing those to the Argument's
set_value call.
:parameter index: Current index in the list of CLI arguments being
looked at for parsing/processing
:type key: int
:parameter arg_list_skip_path: list of CLI arguments
:type arg: list
"""
argument_flag = arg_list_skip_path[index]
self.arg_list_pointer += self.args[argument_flag].parameter_modifier
self.args[argument_flag].set_value(
arg_list_skip_path[index + 1: self.arg_list_pointer + 1]
)
self.arg_list_pointer += 1
def parse_argv(self, _arg_list):
"""Add an argument to the class' args dictionary by key with value
equal to an Argument.
:parameter _arg_list: the list of arguments, retrieved from sys.argv
by the caller and passed to this function.
:type arg: list
:returns: a dictionary containing the specified Arguments either with
default or specific values, or a "helped" flag that
terminates the program.
:rtype: str
"""
arg_list = _arg_list
if len(arg_list) == 0:
return None
if (len(arg_list) == 1 or arg_list[1] == "-h" or arg_list[1] == "help"):
print(self.send_help())
return {"helped": True}
result_args = {}
# Skip the first element, as that is the path (in this case)
if arg_list[1] == "/":
# Elements such as
# watch /state/system/up-time
# watch /nokia-state:state/system/up-time
# are passed to arg_list with the first '/' in its
# own entry in the list: undo that.
#
# - this also applies to
# - /show system information
# - that appears "/", "show system information"
# - avoid, lighten the condition on the if-statement above
arg_list[1] += arg_list[2]
del arg_list[2]
arg_list_skip_path = arg_list[1:]
# ### ###
# If this command needs a tools/show/state command to start
# this section is needed. If not provided it can't run.
# ### ###
try:
# parts of show commands that follow a space and start with '-'
# may cause a problem in execution however no specific examples
# of this are known
self.arg_list_pointer = min(
(
index
for index, value in enumerate(arg_list_skip_path)
if value.startswith("-")
)
)
except ValueError:
# ValueError: min() arg is an empty sequence
self.arg_list_pointer = len(arg_list_skip_path)
result_args["xpath"] = " ".join(
arg_list_skip_path[: self.arg_list_pointer]
)
# ### ###
# the arguments come after the show command
# ### ###
for index, value in enumerate(arg_list_skip_path):
if self.arg_list_pointer == index:
try:
self.handle_argument(index, arg_list_skip_path)
except KeyError as error_cli_option:
print(
'"%s" is not a valid command option. See \n%s'
% (error_cli_option, self.send_help())
)
sys.exit(98)
else:
# this parameter or flag was already used - skip
pass
intermediary = {v.name: v.value for _, v in self.args.items()}
intermediary.update(result_args)
return intermediary
In addition to the classes previously defined, some parsing functions are defined that are intended to validate the provided parameters and align them with the rest of the script as far as the object type is concerned. These functions are considered trivial and are shown here for completeness:
def simple_boolean(_inputstring):
"""Simple function to return True after being called, used to augment
the Argument classes below. it is meant to be used as a replacement
for action=store_true in regular argparse
"""
return True
def simple_string(inputstring):
"""Simple function to return string representation of the argument
after being called, used to augment the Argument classes below
:parameter inputstring: The argument passed via sys.argv to be converted
to string (if needed) and returned by this function
:type inputstring: str
:returns: inputstring as str type
:rtype: :py:class:`str`
"""
if isinstance(inputstring, str):
return inputstring
return repr(inputstring)
def simple_integer(inputstring):
"""Simple function to return the integer representation of the argument
after being called, used to augment the Argument classes below
:parameter inputstring: The argument passed via sys.argv to be converted
to an int object and returned by this function
:type inputstring: str
:returns: inputstring as int type
:rtype: :py:class:`int`
"""
return int(inputstring)
Every pySROS script must set up a connection to a device before it can perform any interactions with the device. The generally accepted way of establishing a connection uses a get_connection function:
def get_connection(host=None, username=None, password=None, port=830):
"""Function definition to obtain a Connection object to a specific
SR OS device and access the model-driven information.
:parameter host: The hostname or IP address of the SR OS node.
:type host: str
:paramater credentials: The username and password to connect
to the SR OS node.
:type credentials: dict
:parameter port: The TCP port for the connection to the SR OS node.
:type port: int
:returns: Connection object for the SR OS node.
:rtype: :py:class:`pysros.management.Connection`
"""
try:
connection_object = connect(
host=host,
username=username,
password=password,
port=port,
)
except RuntimeError as error1:
print("Failed to connect. Error:", error1)
sys.exit(-1)
return connection_object
When comparing the results from two sequential iterations, the behavior of the script is determined by the command that is being watched. If the command specifies a Leaf object in the state tree, differences in the value of the Leaf are reported. If the command specifies a Container, recursion is used to report newly added or deleted nodes from the Container structure.
If the command specifies an SR OS command with a textual output (show ... , tools ...) , the output is considered line-by-line.
def leaf_diff(path, key, new, r_exclude):
"""Display the path leading up to the new value of the leaf in tree,
followed by the new value.
:parameter path: The json-instance-path leading up
to the leaf of which the value is being compared
:type path: str
:parameter key: The specific key for the leaf within the parent Container.
Empty if the query was a specific Leaf.
:type key: str
:parameter new: The new value found in the leaf
:type new: str
:parameter r_exclude: Regex to exclude any leafs whose name matches this
regular expression
:type r_exclude: str
"""
output = path + (("/" + str(key)) if key else "") + "/" + str(new)
if r_exclude is not None:
if not bool(re.match(r_exclude, output)):
print(output)
else:
print(output)
def dict_diff(ref, new, path, r_exclude):
"""Display the path leading up to one of three possible situations;
either a leaf is removed from the tree or
a leaf is added to the tree or
a leaf's value has changed
:parameter ref: The reference value, for the currently new values to be
compared against
:type ref: str
:parameter new: The new values found in the leaf
:type new: str
:parameter path: The json-instance-path leading up
to the leaf of which the value is being compared
:type path: str
:parameter r_exclude: Regex to exclude any leafs whose name matches
this regular expression
:type r_exclude: str
"""
# Used to display the value between two XPATHs
for key in ref:
# Case where a context was deleted
if key not in new:
print(path + "/" + str(key) + " *** removed *** ")
for key in new:
# Case where a context was created
if key not in ref:
print(path + "/" + str(key) + " *** added *** ")
printTree(new[key])
else:
if new[key] == ref[key]:
# case where a context already existed and wasn't changed
pass
elif isinstance(new[key], (Container, dict)):
# Case where a context was changed but already existed
# Not reaching a leaf node -> recursive call to dict_diff
dict_diff(ref[key], new[key], path + "/" + str(key), r_exclude)
else:
# Reached a leaf -> Display the result if the xpath to reach it
# doesn't match the exclude provided regex
leaf_diff(path, key, new[key], r_exclude)
Finally, using the building blocks shown previously, the main function is created along with two helper functions; show_tools_command_monitor and xpath_monitor. As the main part of the script, the main function controls the entire flow. In the main function, the arguments are defined. The script accepts inputs for the number of iterations, the delay between iterations, a regex parameter to exclude Leafs from the comparison, and a flag to choose which values to compare (t0 vs. t, or t-1 vs. t). A command or value that should be watched is also expected:
def show_tools_command_monitor(node_handle, arguments):
"""Function called to get information from a show or tools command
passed as input to the command
:parameter node_handle: pySROS object representing connection to the node
:type node_handle: :py:class:`pysros.management.Connection`
:parameter arguments: CLI arguments gathered from CLI
and parsed to Python objects
:type arguments: dict
"""
# Case where a show command is monitored
ref_output = node_handle.cli(arguments["xpath"])
ref_output_split = ref_output.splitlines()
# Clear screen only if the script is launch from the node
if sros():
print(node_handle.cli("/clear screen"))
print(ref_output)
for _ in range(0, arguments["repeat"]):
time.sleep(arguments["interval"])
# Clear screen only if the script is launch from the node
if sros():
print(node_handle.cli("/clear screen"))
current_output_split = node_handle.cli(
arguments["xpath"]
).splitlines()
current_output_mod = []
for line in current_output_split:
# Line is not reported as changed if exclude regex matches
if arguments["exclude"] is not None:
dont_check = bool(re.match(arguments["exclude"], line))
else:
dont_check = False
# Prepend '-->' to the line if a change is observed
if line in ref_output_split or dont_check:
current_output_mod.append(" " + line)
else:
current_output_mod.append("--> " + line)
# Warning : Removed lines are not notified
for line in current_output_mod:
print(line)
if arguments["absolute"] is False:
ref_output_split = current_output_split
def xpath_monitor(node_handle, arguments):
"""Function called to get information from an xpath expression
passed to the command
:parameter node_handle: pySROS object representing connection to the node
:type node_handle: :py:class:`pysros.management.Connection`
:parameter arguments: CLI arguments gathered from CLI
and parsed to Python objects
:type arguments: dict
"""
# Case where an xpath is monitored
if "nokia" not in arguments["xpath"]:
if arguments["xpath"].startswith("/"):
xpath = "/nokia-state:" + arguments["xpath"][1:]
else:
xpath = "/nokia-state:" + arguments["xpath"]
else:
xpath = arguments["xpath"]
if xpath[-1] == "/":
xpath = xpath[:-1]
# Get the initial state of the context as ref
try:
obj_ref = node_handle.running.get(xpath)
except InvalidPathError as error:
print("Error in the xpath %s\n%s" % (xpath, error))
sys.exit()
print("### Initial reference recorded ###\n")
for i in range(0, arguments["repeat"]):
# Wait for the required interval
time.sleep(arguments["interval"])
# Get the new state of the context to check
obj_new = node_handle.running.get(xpath)
print(
"\n### Iteration nb. {iter} ({time_spent}s) ###\n".format(
iter=i + 1,
time_spent=(i + 1) * arguments["interval"],
)
)
if isinstance(obj_new, Container or isinstance(obj_new, dict)):
# Check the diff between ref & last record
dict_diff(
obj_ref, obj_new, ".", arguments["exclude"]
)
elif isinstance(obj_new, Leaf) and obj_ref != obj_new:
leaf_diff(xpath, "", obj_new, None)
# Store the actual record as ref if absolute flag is no set
if arguments["absolute"] is False:
obj_ref = obj_new
def main():
"""The main procedure. The execution starts here."""
args = ArgumentHelper()
args.add_argument(
"-e",
Argument(
"exclude",
simple_string,
"Regex to exclude from the result(s)",
1,
None,
),
)
args.add_argument(
"-i",
Argument(
"interval",
simple_integer,
"Interval in second(s) between each check / Default = 3",
1,
3,
),
)
args.add_argument(
"-r",
Argument(
"repeat",
simple_integer,
"Number of iterations / Default = 10",
1,
10,
),
)
args.add_argument(
"-a",
Argument(
"absolute",
simple_boolean,
"Compare to initial info if set (default is incremental compares)",
0,
False,
),
)
if found_arguments := args.parse_argv(sys.argv):
if "helped" in found_arguments.keys():
return
# Connect to the node
node_handle = get_connection()
if (
found_arguments["xpath"].startswith("show")
or found_arguments["xpath"].startswith("tools")
or found_arguments["xpath"].startswith("/show")
or found_arguments["xpath"].startswith("/tools")
):
show_tools_command_monitor(node_handle, found_arguments)
else:
xpath_monitor(node_handle, found_arguments)
else:
print("Error while parsing args")
In this section, you'll find the complete working code.
#!/usr/bin/env python3
### watch.py
# Copyright 2022 Nokia
###
# pylint: disable=consider-using-f-string
"""Implementation/approximation of Linux "watch" command for Nokia SR OS.
Tested on: SR OS 22.10.R1
"""
import re
import sys
import time
from pysros.wrappers import Leaf, Container
from pysros.management import connect, sros
from pysros.exceptions import InvalidPathError
from pysros.pprint import printTree
def get_connection(host=None, username=None, password=None, port=830):
"""Function definition to obtain a Connection object to a specific
SR OS device and access the model-driven information.
:parameter host: The hostname or IP address of the SR OS node.
:type host: str
:paramater credentials: The username and password to connect
to the SR OS node.
:type credentials: dict
:parameter port: The TCP port for the connection to the SR OS node.
:type port: int
:returns: Connection object for the SR OS node.
:rtype: :py:class:`pysros.management.Connection`
"""
try:
connection_object = connect(
host=host,
username=username,
password=password,
port=port,
)
except RuntimeError as error1:
print("Failed to connect. Error:", error1)
sys.exit(-1)
return connection_object
def simple_boolean(_inputstring):
"""Simple function to return True after being called, used to augment
the Argument classes below. it is meant to be used as a replacement
for action=store_true in regular argparse
"""
return True
def simple_string(inputstring):
"""Simple function to return string representation of the argument
after being called, used to augment the Argument classes below
:parameter inputstring: The argument passed via sys.argv to be converted
to string (if needed) and returned by this function
:type inputstring: str
:returns: inputstring as str type
:rtype: :py:class:`str`
"""
if isinstance(inputstring, str):
return inputstring
return repr(inputstring)
def simple_integer(inputstring):
"""Simple function to return the integer representation of the argument
after being called, used to augment the Argument classes below
:parameter inputstring: The argument passed via sys.argv to be converted
to an int object and returned by this function
:type inputstring: str
:returns: inputstring as int type
:rtype: :py:class:`int`
"""
return int(inputstring)
class Argument:
"""Class representing a single CLI argument passed via sys.argv
:parameter name: resulting key to be used in the args dictionary to
find the value passed for this argument.
:type name: str
:parameter parsing_function: function to be run using the discovered
parameters as input, result of which is
assigned to the variable.
:type parsing_function: Callable
:parameter help_text: Help text to be displayed when needed
:type help_text: str
:parameter parameter_modifier: Number of values to retrieve from
sys.argv to consume for this variable.
:type parameter_modifier: int
:parameter default_value: value assigned by default to ensure args
dictionary is always populated.
:type default_value: Any
"""
def __init__(
self,
_name,
_parsing_function,
_help_text,
_parameter_modifier,
_default_value,
):
self.name = _name
self.parsing_function = _parsing_function
self.help_text = _help_text
self.parameter_modifier = _parameter_modifier
self.default_value = _default_value
self.value = _default_value
def set_value(self, value):
"""Set the value of this Argument to the passed value parameter
after it is passed through the object's parsing function.
:parameter value: The argument passed assigned to the object value
:type value: Any
"""
if isinstance(value, list) and len(value) == 1:
value = value[0]
self.value = self.parsing_function(value)
def __str__(self):
return "%*s" % (20, self.help_text)
class ArgumentHelper:
"""Class acting as abstraction layer between sys.argv and a
collection of Arguments to be returned as a key:value dictionary
after parsing and processing.
Intended to be similar to the ArgumentParser class in argparse.
"""
def __init__(self):
self.command_help = "watch help/-h"
self.arg_list_pointer = 0
self.args = {}
def add_argument(self, key, arg):
"""Add an argument to the class' internal args dictionary by key
with value equal to an Argument.
:parameter key: CLI flag to be used to specifically address the
corresponding Argument object
:type key: str
:parameter arg: The Argument object being created and made available
:type arg: :py:class:`Argument`
"""
self.args[key] = arg
def send_help(self):
"""Distill a help message similar to what is shown on Linux CLI
to be shown on the SR OS CLI when an operator enters the command
without arguments, or with "-h" or "help" as an argument
:returns: a printable help message to be displayed on the CLI, using
information found in each Argument contained in self.args
:rtype: str
"""
help_message = "\nExample: %-*s\nCommand options:\n" % (
20,
"watch.py <command to execute/monitor> %s" % (self.help_flags()),
)
for flag, arg in self.args.items():
help_message += " %-*s%s\n" % (10 + len(flag), flag, arg)
help_message += " %-*s%s" % (
10 + len("-h"),
"-h",
"Display this message.",
)
return help_message
def help_flags(self):
"""Add an argument to the class' internal args dictionary by key
with value equal to an Argument.
:returns: a printable help message to be displayed on the CLI,
showing potentially usable flags in shorthand notation,
to be included with the result of send_help
:rtype: str
"""
flags = "|".join(
k + (" #" if v.parameter_modifier > 0 else "")
for k, v in self.args.items()
)
return flags
def handle_argument(self, index, arg_list_skip_path):
"""Handle an argument specified on the CLI by finding the key, using
the key to find the Argument object, determining how many parameters
there should be on the CLI and passing those to the Argument's
set_value call.
:parameter index: Current index in the list of CLI arguments being
looked at for parsing/processing
:type key: int
:parameter arg_list_skip_path: list of CLI arguments
:type arg: list
"""
argument_flag = arg_list_skip_path[index]
self.arg_list_pointer += self.args[argument_flag].parameter_modifier
self.args[argument_flag].set_value(
arg_list_skip_path[index + 1: self.arg_list_pointer + 1]
)
self.arg_list_pointer += 1
def parse_argv(self, _arg_list):
"""Add an argument to the class' args dictionary by key with value
equal to an Argument.
:parameter _arg_list: the list of arguments, retrieved from sys.argv
by the caller and passed to this function.
:type arg: list
:returns: a dictionary containing the specified Arguments either with
default or specific values, or a "helped" flag that
terminates the program.
:rtype: str
"""
arg_list = _arg_list
if len(arg_list) == 0:
return None
if len(arg_list) == 1 or arg_list[1] == "-h" or arg_list[1] == "help":
print(self.send_help())
return {"helped": True}
result_args = {}
# Skip the first element, as that is the path (in this case)
if arg_list[1] == "/":
# Elements such as
# watch /state/system/up-time
# watch /nokia-state:state/system/up-time
# are passed to arg_list with the first '/' in its
# own entry in the list: undo that.
#
# - this also applies to
# - /show system information
# - that appears "/", "show system information"
# - avoid, lighten the condition on the if-statement above
arg_list[1] += arg_list[2]
del arg_list[2]
arg_list_skip_path = arg_list[1:]
# ### ###
# If this command needs a tools/show/state command to start
# this section is needed. If not provided it can't run.
# ### ###
try:
# parts of show commands that follow a space and start with '-'
# may cause a problem in execution however no specific examples
# of this are known
self.arg_list_pointer = min(
(
index
for index, value in enumerate(arg_list_skip_path)
if value.startswith("-")
)
)
except ValueError:
# ValueError: min() arg is an empty sequence
self.arg_list_pointer = len(arg_list_skip_path)
result_args["xpath"] = " ".join(
arg_list_skip_path[: self.arg_list_pointer]
)
# ### ###
# the arguments come after the show command
# ### ###
for index, value in enumerate(arg_list_skip_path):
if self.arg_list_pointer == index:
try:
self.handle_argument(index, arg_list_skip_path)
except KeyError as error_cli_option:
print(
'"%s" is not a valid command option. See \n%s'
% (error_cli_option, self.send_help())
)
sys.exit(98)
else:
# this parameter or flag was already used - skip
pass
intermediary = {v.name: v.value for _, v in self.args.items()}
intermediary.update(result_args)
return intermediary
def leaf_diff(path, key, new, r_exclude):
"""Display the path leading up to the new value of the leaf in tree,
followed by the new value.
:parameter path: The json-instance-path leading up
to the leaf of which the value is being compared
:type path: str
:parameter key: The specific key for the leaf within the parent Container.
Empty if the query was a specific Leaf.
:type key: str
:parameter new: The new value found in the leaf
:type new: str
:parameter r_exclude: Regex to exclude any leafs whose name matches this
regular expression
:type r_exclude: str
"""
output = path + (("/" + str(key)) if key else "") + "/" + str(new)
if r_exclude is not None:
if not bool(re.match(r_exclude, output)):
print(output)
else:
print(output)
def dict_diff(ref, new, path, r_exclude):
"""Display the path leading up to one of three possible situations;
either a leaf is removed from the tree or
a leaf is added to the tree or
a leaf's value has changed
:parameter ref: The reference value, for the currently new values to be
compared against
:type ref: str
:parameter new: The new values found in the leaf
:type new: str
:parameter path: The json-instance-path leading up
to the leaf of which the value is being compared
:type path: str
:parameter r_exclude: Regex to exclude any leafs whose name matches
this regular expression
:type r_exclude: str
"""
# Used to display the value between two XPATHs
for key in ref:
# Case where a context was deleted
if key not in new:
print(path + "/" + str(key) + " *** removed *** ")
for key in new:
# Case where a context was created
if key not in ref:
print(path + "/" + str(key) + " *** added *** ")
printTree(new[key])
else:
if new[key] == ref[key]:
# case where a context already existed and wasn't changed
pass
elif isinstance(new[key], (Container, dict)):
# Case where a context was changed but already existed
# Not reaching a leaf node -> recursive call to dict_diff
dict_diff(ref[key], new[key], path + "/" + str(key), r_exclude)
else:
# Reached a leaf -> Display the result if the xpath to reach it
# doesn't match the exclude provided regex
leaf_diff(path, key, new[key], r_exclude)
def show_tools_command_monitor(node_handle, arguments):
"""Function called to get information from a show or tools command
passed as input to the command
:parameter node_handle: pySROS object representing connection to the node
:type node_handle: :py:class:`pysros.management.Connection`
:parameter arguments: CLI arguments gathered from CLI
and parsed to Python objects
:type arguments: dict
"""
# Case where a show command is monitored
ref_output = node_handle.cli(arguments["xpath"])
ref_output_split = ref_output.splitlines()
# Clear screen only if the script is launch from the node
if sros():
print(node_handle.cli("/clear screen"))
print(ref_output)
for _ in range(0, arguments["repeat"]):
time.sleep(arguments["interval"])
# Clear screen only if the script is launch from the node
if sros():
print(node_handle.cli("/clear screen"))
current_output_split = node_handle.cli(arguments["xpath"]).splitlines()
current_output_mod = []
for line in current_output_split:
# Line is not reported as changed if exclude regex matches
if arguments["exclude"] is not None:
dont_check = bool(re.match(arguments["exclude"], line))
else:
dont_check = False
# Prepend '-->' to the line if a change is observed
if line in ref_output_split or dont_check:
current_output_mod.append(" " + line)
else:
current_output_mod.append("--> " + line)
# Warning : Removed lines are not notified
for line in current_output_mod:
print(line)
if arguments["absolute"] is False:
ref_output_split = current_output_split
def xpath_monitor(node_handle, arguments):
"""Function called to get information from an xpath expression
passed to the command
:parameter node_handle: pySROS object representing connection to the node
:type node_handle: :py:class:`pysros.management.Connection`
:parameter arguments: CLI arguments gathered from CLI
and parsed to Python objects
:type arguments: dict
"""
# Case where an xpath is monitored
if "nokia" not in arguments["xpath"]:
if arguments["xpath"].startswith("/"):
xpath = "/nokia-state:" + arguments["xpath"][1:]
else:
xpath = "/nokia-state:" + arguments["xpath"]
else:
xpath = arguments["xpath"]
if xpath[-1] == "/":
xpath = xpath[:-1]
# Get the initial state of the context as ref
try:
obj_ref = node_handle.running.get(xpath)
except InvalidPathError as error:
print("Error in the xpath %s\n%s" % (xpath, error))
sys.exit()
print("### Initial reference recorded ###\n")
for i in range(0, arguments["repeat"]):
# Wait for the required interval
time.sleep(arguments["interval"])
# Get the new state of the context to check
obj_new = node_handle.running.get(xpath)
print(
"\n### Iteration nb. {iter} ({time_spent}s) ###\n".format(
iter=i + 1,
time_spent=(i + 1) * arguments["interval"],
)
)
if isinstance(obj_new, Container or isinstance(obj_new, dict)):
# Check the diff between ref & last record
dict_diff(obj_ref, obj_new, ".", arguments["exclude"])
elif isinstance(obj_new, Leaf) and obj_ref != obj_new:
leaf_diff(xpath, "", obj_new, None)
# Store the actual record as ref if absolute flag is no set
if arguments["absolute"] is False:
obj_ref = obj_new
def main():
"""The main procedure. The execution starts here."""
args = ArgumentHelper()
args.add_argument(
"-e",
Argument(
"exclude",
simple_string,
"Regex to exclude from the result(s)",
1,
None,
),
)
args.add_argument(
"-i",
Argument(
"interval",
simple_integer,
"Interval in second(s) between each check / Default = 3",
1,
3,
),
)
args.add_argument(
"-r",
Argument(
"repeat",
simple_integer,
"Number of iterations / Default = 10",
1,
10,
),
)
args.add_argument(
"-a",
Argument(
"absolute",
simple_boolean,
"Compare to initial info if set (default is incremental compares)",
0,
False,
),
)
if found_arguments := args.parse_argv(sys.argv):
if "helped" in found_arguments.keys():
return
# Connect to the node
node_handle = get_connection()
if (
found_arguments["xpath"].startswith("show")
or found_arguments["xpath"].startswith("tools")
or found_arguments["xpath"].startswith("/show")
or found_arguments["xpath"].startswith("/tools")
):
show_tools_command_monitor(node_handle, found_arguments)
else:
xpath_monitor(node_handle, found_arguments)
else:
print("Error while parsing args")
if __name__ == "__main__":
main()