This adventure is coming to an end 🙂 It’s been definitely nice as an intellectual exercise, but one can clearly see the downsides: this is not actually network as code, but rather ”network as one messy change text file”. Of course, this text file could be made nicer in excel and underneath it would be a comma-delimited processable csv, but at its best it would be a ”network as a colourful excel file”. I’ve created a monster 😀
But hey, we’re not quite finished making the monster stronger and… weirder. I’ve decided to be bold and added a female voiceover that comments on the various stages of the script processing. Also, the script is now interactive.
#!/usr/bin/python3 from collections import defaultdict import json import pprint import collections from operator import itemgetter from ciscoconfparse import CiscoConfParse from unicon import Connection from jinja2 import Environment, FileSystemLoader import yaml from pyats.topology import loader import sys import shutil import diffios import os import time import logging from gtts import gTTS LEVELS = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, 'critical': logging.CRITICAL} level_name = sys.argv[1] level = LEVELS.get(level_name, logging.NOTSET) logging.basicConfig(level=level) unicondevicelist = sys.argv[2].split("%") #print(unicondevicelist) text = "This change will be executed on 3 routers: router 1, router 2, router 3. I am going to look at the change text file now " language = 'en' speech = gTTS(text = text, lang = language, slow = False) speech.save("text.mp3") os.system("play text.mp3") #loading the testbed file with router management IPs testbed = loader.load('testbed.yml') #downloading router configuration files and storing them away as .bak files x = 0 for x in range(x,len(unicondevicelist)): devicename = unicondevicelist[x] c = testbed.devices[devicename] c.connect(log_stdout=False) output = c.execute('show run') f = open(devicename, "w") text = "I downloaded a config file from a router." language = 'en' speech = gTTS(text = text, lang = language, slow = False) speech.save("text.mp3") os.system("play text.mp3") f.write(output) f.close() newdevicename = devicename + '.bak' shutil.copyfile(devicename, newdevicename) c.destroy() text = "I now downloaded configs of all routers. I am going to parse this config now." language = 'en' speech = gTTS(text = text, lang = language, slow = False) speech.save("text.mp3") os.system("play text.mp3") #parsing the file with the changes with open('change.txt', 'r') as reader: configchangetext = reader.read() splittext = configchangetext.split('\n') my_dict = defaultdict(lambda: defaultdict(lambda: defaultdict(dict))) for line in splittext: dataline = line.split(',') my_dict[dataline[0]][dataline[1]][dataline[2]] = dataline[3] time.sleep(1) logging.info('\n') logging.info('***************************************************************************************************************************') logging.info('*******************************IMPLEMENTATION DRAFT PLAN*******************************************************************') logging.info('***************************************************************************************************************************') for k, v in my_dict.items(): logging.info('-------------------------------------------------------------------------------------------------------------------------') logging.info('\ni am showing now how I want router config to be modified for router ' + str(k) + ' ' + '\n\n') logging.info('-------------------------------------------------------------------------------------------------------------------------') my_new_dict = my_dict[k].items() logging.info('\n') for item in my_new_dict: #item is a tuple consisting of string and default dict x = 0 for x in range (x, len(item)): newstringordict= item[x] if isinstance(newstringordict, str): logging.info(newstringordict) else: for k in newstringordict.keys(): logging.info(k) time.sleep(2) logging.info('^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^') logging.info('^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^') logging.info('^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^') logging.info('\n\n\n') logging.info('***************************************************************************************************************************') logging.info('***************************END OF IMPLEMENTATION DRAFT PLAN****************************************************************') logging.info('***************************************************************************************************************************') logging.info('\n') logging.info('***************************************************************************************************************************') logging.info('***************************************************************************************************************************') logging.info('***************************************************************************************************************************') logging.info('****************************CONFIG SANITIZATION IN PROGRESS****************************************************************') logging.info('***************************************************************************************************************************') logging.info('***************************************************************************************************************************') logging.info('***************************************************************************************************************************') realdevicelist = [] for i in my_dict.keys(): #i add the devices to the list from dict main keys realdevicelist.append(i) for j in my_dict[i].keys(): #i don't do anything here pass #let's now use this list of keys (R1, R2, R3, R4 in this case) and using those names let's open the configs from desktop. #if a config is not found, an exception is caught m = 1 while True: try: x = 0 for x in range(x,len(realdevicelist)): with open(realdevicelist[x], 'r') as reader: config = reader.read() my_config_as_list = config.split("\n") except FileNotFoundError as e: if m==1: text = "I found " + str(m) + " change for a router that is not in the testbed." if m > 1: text = "I found another error. " language = 'en' speech = gTTS(text = text, lang = language, slow = False) speech.save("text.mp3") os.system("play text.mp3") logging.critical('\n') logging.critical('*******************************ERROR***************************************************************************') logging.critical(str(e) + '.This may mean that in the change file there is a router mentioned for which i do \nnot have a config. This error appeared in ' + realdevicelist[x]) logging.critical('*******************************ERROR***************************************************************************') logging.critical('\n') del my_dict[realdevicelist[x]] realdevicelist.remove(realdevicelist[x]) logging.info('*******************************INFO****************************************************************************') logging.info('An error has been found and removed. ') logging.info('*******************************INFO****************************************************************************') logging.info('\n') m = m + 1 continue else: logging.info('************************************************************************************************************************') logging.info('************************************************************************************************************************') logging.info('****************************SANITISE FINISHED **************************************************************************') logging.info('************************************************************************************************************************') logging.info('************************************************************************************************************************') logging.info('************************************************************************************************************************') logging.info('\n\n\n') logging.info('************************************************************************************************************************') logging.info('*******************************SANITISE RESULTS PHASE**********************************************************************') logging.info('************************************************************************************************************************') logging.info('After removing all errors from the implementation plan, what follows is the sanitized implementation plan. \n This is not the final implementation plan yet, because i still need to compare the plan with actual configs. ') logging.info('\n\n\n') logging.info('************************************************************************************************************************') logging.info('************************************************************************************************************************') logging.info('*******************************IMPLEMENTATION PLAN BEFORE COMPARE PHASE****************************************************') logging.info('************************************************************************************************************************') logging.info('************************************************************************************************************************') for k, v in my_dict.items(): logging.info('\n i am showing now how I want router config to be modified for each router: \n\n' + str(k)) my_new_dict = my_dict[k].items() for item in my_new_dict: #item is a tuple consisting of string and default dict x = 0 for x in range (x, len(item)): newstringordict= item[x] if isinstance(newstringordict, str): logging.info(newstringordict) else: for z in newstringordict.keys(): logging.info(z) logging.info('\n') logging.info('************************************************************************************************************************') logging.info('*******************************END OF PLAN*********************************************************************************') logging.info('************************************************************************************************************************') break logging.info('\n') logging.info('***************************************************************************************************************************') logging.info('*******************************COMPARE PHASE*******************************************************************************') logging.info('***************************************************************************************************************************') logging.info('***************************************************************************************************************************') logging.info('*******************************FINAL CONFIG DELTA FOR IMPLEMENTATION*******************************************************') logging.info('***************************************************************************************************************************') my_final_dict = defaultdict(lambda: defaultdict(lambda: defaultdict(dict))) for router, config in my_dict.items(): with open(router, 'r') as reader: config = reader.read() my_config_as_list = config.split("\n") parse = CiscoConfParse(my_config_as_list) my_new_dict = my_dict[router].items() for tpl in my_new_dict: interface = tpl[0] cfgs = tpl[1] for key in cfgs.keys(): answer = parse.find_objects('^' + interface) if len(answer) > 0: #logging.info('a subkey from the input config dictionary matches a parent object from the actual config. This does not tell us anything yet. Maybe the value will be modified, maybe not ') second_answer = parse.find_objects_w_child(interface, key) if len(second_answer) > 0: #logging.warning('i am on ' + router + ' in subkey ' + interface + ' in value ' + key + ' and I have found a match in the existing configuration of ' + router + ' so i am not going to implement this change ' + interface + ' ' + key) pass else: if key == ' ': #logging.warning('parent found but detailed match parent and key not found because this is a dummy value. not doing anything (cases like logging buffered informational') pass else: logging.warning(' no match in config of ' + router + ' found when on ' + router + ' in subkey ' + interface + ' in value ' + key + ' so i will implement this change ') my_final_dict[router][interface][key] = cfgs[key] #connect to router and implement interface, key else: logging.warning(interface + ' not found on router ' + router + ' but i will try to create this ' + interface + ' with the value ' + key) my_final_dict[router][interface][key] = cfgs[key] #connect to router and try to implement finalstatement = '' for k, v in my_final_dict.items(): logging.info('-------------------------------------------------------------------------------------------------------------------------') logging.info('\nHere i am showing now how I FINALLY want router config to be modified for router ' + str(k) + ' ' + '\n\n') logging.info('-------------------------------------------------------------------------------------------------------------------------') my_newfinal_dict = my_final_dict[k].items() logging.info('\n') globlist = [] for item in my_newfinal_dict: #item is a tuple consisting of a string and defaultdict x = 0 for x in range (x, len(item)): newstringordict= item[x] if isinstance(newstringordict, str): globlist.append(newstringordict) logging.info(newstringordict) else: for j in newstringordict.keys(): globlist.append(j) #logging.info(j) c = testbed.devices[k] c.connect(log_stdout=False) finalstatement += ' On ' finalstatement += k finalstatement += ' i have tried to configure ' finalstatement += str(globlist) finalstatement += ' \n\n ' #here implement interactivity text = "I need you to confirm a configuration change on router " + k + " in the terminal window. Type yes to confirm" language = 'en' speech = gTTS(text = text, lang = language, slow = False) speech.save("text.mp3") os.system("play text.mp3") decision = input("I am trying to implement the following on router " + k + " " + str(globlist) + " Are you sure you want this? " ) if "yes" in decision: output = c.configure(globlist) if 'Invalid' in output: logging.critical(output) text = "I have found some invalid commands. God damn it, incompetent dumbasses" language = 'en' speech = gTTS(text = text, lang = language, slow = False) speech.save("text.mp3") os.system("play text.mp3") else: pass #here implement check if weird return from router, e.g. invalid something c.destroy() logging.info(finalstatement + '\n\n') logging.warning('***************COMPARE IN PROGRESS****FOR A DETAILED CHANGE LOG FILE SEE CHANGE_DELTA.BAK FILES*******************') unicondevicelist = realdevicelist x = 0 for x in range(x,len(unicondevicelist)): afterchanges = unicondevicelist[x] beforechanges = afterchanges + '.bak' c = testbed.devices[afterchanges] c.connect(log_stdout=False) output = c.execute('show run') time.sleep(2) f = open(afterchanges, "w") f.write(output) f.close() #time.sleep(10) baseline = beforechanges time.sleep(3) comparison = afterchanges time.sleep(3) #ignore = 'ignore.txt' diff = diffios.Compare(baseline, comparison) time.sleep(1) deltaoutput = diff.delta() filename = afterchanges + '_change_delta.bak' f = open(filename, "w") f.write(deltaoutput) f.close() os.remove(afterchanges) os.remove(beforechanges) logging.info('*******************************THIS IS THE END OF ALL OUTPUT**********************************************') text = "I have finished executing the changes and prepared a detailed change log. Amanda out" language = 'en' speech = gTTS(text = text, lang = language, slow = False) speech.save("text.mp3") os.system("play text.mp3")