Hello
I haven’t had much time lately, but despite all setbacks i’ve done some work on my Prime > Netbox scripts.
There are now 4 scripts in total.
- The first script creates the yaml files
import pandas as pd
import os
import argparse
import pynetbox
import requests
import pprint
import json
import warnings
import re
import sys
import xmltodict
import csv
from csv import reader, writer
from requests.auth import HTTPBasicAuth
from requests.packages.urllib3.exceptions import InsecureRequestWarning
warnings.filterwarnings("ignore")
from genie.testbed import load
p = argparse.ArgumentParser()
p.add_argument('--user', type=str)
p.add_argument('--passkey', type=str)
args = p.parse_args()
user = args.user
passkey = args.passkey
proxies = { "http": None, "https": None, }
primeServer="enter your prime url here"
primeCreds = HTTPBasicAuth(user, passkey)
def get_all_devices_csv():
'''
this returns a unique set of all location abbreviations
'''
url = "https://" + primeServer + "/webacs/api/v4/data/Devices?.full=true"
response = requests.get(url, auth=primeCreds, verify=False, proxies = proxies)
obj=xmltodict.parse(response.text)
internal_obj = obj['queryResponse']['entity']
unique_site_set = set()
#unique_location_set = set()
for item in internal_obj:
name = item['devicesDTO']['deviceName']
if 'location' in item['devicesDTO'].keys():
location = item['devicesDTO']['location']
else:
location = 'unknown'
fixed_name = name[3:6]
unique_site_set.add(fixed_name)
#unique_location_set.add(location)
print(unique_site_set)
return unique_site_set
def query_prime(searched_thing):
'''
this queries prime, for example it will try to search for any device names which CONTAIN the word wro. The query should return dictionary object with all Wroclaw devices or a dummy object if the query gets zero results
'''
url="https://"+primeServer+"/webacs/api/v4/data/Devices?.full=true&.sort=ipAddress&deviceName=contains(%22"+searched_thing+"%22)"
response=requests.get(url,auth=primeCreds,verify=False)
obj=xmltodict.parse(response.text)
internal_obj=obj['queryResponse']
return internal_obj
def parse_prime_query_response(response_object):
'''
this parses the prime object with the query response. it should return a simple dictionary of names and ip addresses for the given query.
'''
device_topology = {}
if 'entity' in response_object.keys():
query_response = response_object['entity']
if isinstance(query_response, list):
for item in query_response:
device=item['devicesDTO']
device_name=device['deviceName']
device_name_short=device_name.replace('.yourdomain.com', '')
device_type=device['deviceType']
device_ip=device['ipAddress']
#sometimes the field location is not there if someone forgot to fill it in on Prime
if 'location' in device.keys():
device_location=device['location']
else:
device_location='default location'
try:
manuf_nrs_dict=device['manufacturerPartNrs']
manuf_list=manuf_nrs_dict['manufacturerPartNr']
except:
print(f'error creating entry for fiber switch {device_name_short}')
#we look here at a specific model
if device['deviceType']=='Cisco Catalyst 29xx Stack-able Ethernet Switch' or device['deviceType']=='Cisco Catalyst 3560CX-12PC-S Switch' or device['deviceType']=='Cisco Catalyst 68xx Virtual Switch' or device['deviceType']=='Cisco 3750 Stackable Switches' or device['deviceType']=='Cisco 2821 Integrated Services Router' or device['deviceType']=='Cisco Catalyst 6500 Virtual Switching System' or device['deviceType']=='Cisco Catalyst 2960CX-8PC-L Switch':
device_topology[device_name_short]={}
device_topology[device_name_short]['ip_address'] = device_ip
device_topology[device_name_short]['device_os'] = 'ios'
elif device['deviceType']=='Cisco Catalyst38xx stack-able ethernet switch' or device['deviceType']=='Cisco Catalyst 9500 SVL Switch' or device['deviceType']=='Cisco Catalyst 9500-48Y4C Switch' or device['deviceType']=='Cisco Catalyst 4500 Virtual Switching System' or device['deviceType']=='Cisco Catalyst 9300 Switch':
device_topology[device_name_short]={}
device_topology[device_name_short]['ip_address'] = device_ip
device_topology[device_name_short]['device_os'] = 'iosxe'
elif device['deviceType']=='Cisco 5508 Wireless LAN Controller' or device['deviceType']=='Cisco 5520 Series Wireless Controllers':
try:
device_topology[device_name_short]={}
device_topology[device_name_short]['ip_address'] = device_ip
device_topology[device_name_short]['device_os'] = 'other'
except Exception as e:
print('something went wrong here')
elif device['deviceType']=='Cisco Nexus 7000 4-Slot Switch':
try:
device_topology[device_name_short]={}
device_topology[device_name_short]['ip_address'] = device_ip
device_topology[device_name_short]['device_os'] = 'nxos'
except Exception as e:
print('hmmm')
else:
try:
device_topology[device_name_short]={}
device_topology[device_name_short]['ip_address'] = device_ip
device_topology[device_name_short]['device_os'] = 'other'
except Exception as e:
print('something went wrong if type is not listed in code')
else:
device=entity['devicesDTO']
if device['deviceType']=='Cisco Catalyst 29xx Stack-able Ethernet Switch':
print('oooh one device on this site only')
return device_topology
def create_topology_file(filename, device_topology):
'''
this should take the device topology dictionary plus the admin credentials and create a csv file containing all fields necessary for Cisco Genie to log into the device later
'''
filepath = os.path.join('./topologies', filename)
f = open(filepath, 'w')
writer = csv.writer(f)
header = ['hostname','ip' , 'username' ,'password' , 'protocol' , 'os']
writer.writerow(header)
try:
for key,value in device_topology.items():
ip_address = device_topology[key]['ip_address']
device_os = device_topology[key]['device_os']
row = [key, ip_address, user, passkey, 'ssh', device_os]
writer.writerow(row)
except:
print('the query did not return any results')
quit()
print('PHASE TWO: GENERATING A TOPOLOGY FILE FOR A LOCATION. PLEASE WAIT.')
def create_site_csv_for_yaml(searched_thing, user, passkey):
'''
This is the main function. It creates a csv file with all devices on a given site. This file is later used by pyats to create a .yaml topology file.
'''
internal_obj=query_prime(searched_thing)
device_topology= parse_prime_query_response(internal_obj)
filename = 'devices_contains-' + searched_thing + '.csv'
create_topology_file(filename, device_topology)
#example: ./yaml_creator.py --user 'yourprimeadminuser' --passkey 'myadminpass'
if not os.path.exists('topologies'):
os.makedirs('topologies')
for x in get_all_devices_csv():
create_site_csv_for_yaml(x, user, passkey)
Then a simple loop in bash is enough to convert the .csv file to .yaml files:
FILES="./topologies/*"
for f in $FILES
do
y=${f%.*}
pyats create testbed file --path $f --output $y.yaml --encode-password
done
rm ./topologies/*.csv
2) This is the second script: device_data_chef.py Now that the topology files are ready, we can use them to log into devices if they are stacks and get some stack/vss info.
from collections import defaultdict
import requests
import json
import warnings
import re
import sys
import xmltodict
import argparse
import csv
import pynetbox
from csv import reader, writer
from requests.auth import HTTPBasicAuth
from requests.packages.urllib3.exceptions import InsecureRequestWarning
warnings.filterwarnings("ignore")
from genie.testbed import load
p = argparse.ArgumentParser()
p.add_argument('--searched_thing', type=str)
p.add_argument('--user', type=str)
p.add_argument('--passkey', type=str)
p.add_argument('--token', type=str)
args = p.parse_args()
user = args.user
passkey = args.passkey
searched_thing = args.searched_thing
token = args.token
from requests.packages.urllib3.exceptions import InsecureRequestWarning
warnings.filterwarnings("ignore")
primeServer="prime.yourdomain.com"
primeCreds = HTTPBasicAuth(user,passkey)
proxies = { "http": None, "https": None, }
filename = 'devices_contains-' + searched_thing + '.csv'
session = requests.Session()
session.verify = False
nb = pynetbox.api("netbox.yourdomain.com", token)
nb.http_session = session
def create_part_model_mapping(part_number):
'''
this will translate parts into models
'''
device_types = nb.dcim.device_types.filter(part_number=part_number)
for device_type in device_types:
device_typedict = dict(device_type)
model = device_typedict['model']
return model
def load_yaml():
'''
This loads the yaml file and returns a Genie testbed object
'''
filepath = './topologies/' + 'devices_contains-' + searched_thing + '.yaml'
testbed = load(filepath)
return testbed
def prepare_location_dictionary():
'''
This gets you a neat unique set of site abbreviations from Prime
'''
url = "https://prime.yourdomain.com/webacs/api/v4/data/Devices?.full=true"
response = requests.get(url, auth=primeCreds, verify=False, proxies = proxies)
obj=xmltodict.parse(response.text)
internal_obj = obj['queryResponse']['entity']
abbrev_location_dict = {}
for item in internal_obj:
name = item['devicesDTO']['deviceName']
if 'location' in item['devicesDTO'].keys():
location = item['devicesDTO']['location']
else:
location = 'unknown'
fixed_name = name[3:6]
data_dict = defaultdict(list)
data_dict[fixed_name].append(location)
my_regex = re.compile(r'[A-Z]{2}-[A-Z][a-z]+-\d+$')
for key, value in data_dict.items():
result = my_regex.match(value[0])
if result:
abbrev_location_dict[key] = value[0]
return abbrev_location_dict
def query_prime(searched_thing):
'''
this helper function queries Prime for any devices that meet the filter criteria, for example CONTAINS 'WRO' will get you the Wroclaw devices. It returns a Prime query response object.
'''
url="https://"+primeServer+"/webacs/api/v4/data/Devices?.full=true&.sort=ipAddress&deviceName=contains(%22"+searched_thing+"%22)"
response=requests.get(url,auth=primeCreds,verify=False, proxies=proxies)
obj=xmltodict.parse(response.text)
internal_obj=obj['queryResponse']
return internal_obj
def write_row(device):
'''
this helper function connects to each device and gets the virtual chassis data from 'show switch' command, then writes all switch data necessary for netbox to a .csv file
'''
f = open(filename, 'a')
writer = csv.writer(f)
#we need to check to see if it is a standard Cisco device. If not (e.g a fiber switch) then only a limited set of data can be obtained from Prime.
if not is_supported_Cisco_device(device):
device_name, device_type, device_ip, device_location = get_device_properties_unsupported(device)
row = ['Fiber_Access;' + device_name+ ';' + device_ip+ ';unsupported_model;unknown_software_type;unknown_serial;'+device_name + ';0;0;active;' + device_location]
writer.writerow(row)
#else means: if it is a supported Cisco device
else:
#we get some data from the response object from Prime
device_name, device_name_short, device_type, device_ip, device_location, manuf_nrs_dict, manuf_list, software_type = get_device_properties(device)
fixed_device_location = device_location.split(',')[0].split(' ')[0]
#this makes sure that bad Prime location data is fixed in a smart way
good_location = prepare_location_dictionary()[searched_thing]
if device['deviceType']=='Cisco Catalyst 29xx Stack-able Ethernet Switch' or device['deviceType']=='Cisco Catalyst38xx stack-able ethernet switch' or device['deviceType']=='Cisco Catalyst 9300 Switch':
#if manuf_list is an instance of a list meaning if it is a stack
if isinstance(manuf_list,list):
#we need the VC data and Prime does not have it so need to reach out to each switch to issue 'show switch'
dev=load_yaml().devices[device_name_short]
dev.connect()
#we use Cisco Genie parser here
switch_vc_data=dev.parse('show switch')
# for each stack member in the stack we need to get data. If the stack member is still in Provisioning state, we need to fill in some dummy data.
for list_item_dict in manuf_list:
try:
switch_number, serial, part_number , vc_position, vc_priority = get_stack_data_from_stack(switch_vc_data, list_item_dict)
except:
switch_number = '0'
serial = '0000'
model = 'unknown'
vc_position = '0'
vc_priority = '0'
pass
model = create_part_model_mapping(part_number)
#this is a new query only to get the role ('User_Access') for a given network device.
udf_url="https://"+primeServer+"/webacs/api/v2/op/devices/exportDevices?ipAddress=%22" + device_ip + "%22"
udf_response=requests.get(udf_url,auth=primeCreds,verify=False)
udf_obj=xmltodict.parse(udf_response.text)
try:
udf_value = udf_obj['mgmtResponse']['devicesExportResult']['devices']['device']['udfs']['udf']['value']
#there is a Prime bug where API does not return the device export result response even though a device has some UDFs when using the GUI.
except:
udf_value = 'unknown_role'
pass
row = [udf_value + ',Cisco,' + device_name_short + '-' + switch_number + ',' + device_ip + ',' + model + ',' + software_type + ',' + serial + ',' + device_name_short + ',' + vc_position + ',' + vc_priority + ',' + 'active,' + good_location]
writer.writerow(row)
#else means: if it is not a stack; manuf_list here is a dictionary rather than a list of dictionaries
else:
switch_number=manuf_list['name']
serial=manuf_list['serialNumber']
part_number=manuf_list['partNumber']
model = create_part_model_mapping(part_number)
udf_url="https://"+primeServer+"/webacs/api/v2/op/devices/exportDevices?ipAddress=%22" + device_ip + "%22"
response=requests.get(udf_url,auth=primeCreds,verify=False)
udf_obj=xmltodict.parse(response.text)
try:
udf_value = udf_obj['mgmtResponse']['devicesExportResult']['devices']['device']['udfs']['udf']['value']
except:
udf_value = 'unknown_role'
pass
row = [udf_value + ',Cisco,' + device_name_short + '-' + switch_number[-1] + ',' + device_ip + ',' + model + ',' + software_type + ',' + serial + ',' + device_name_short + ',' + '0,' + '0,' + 'active,' + good_location]
writer.writerow(row)
elif device['deviceType']=='Cisco Catalyst 4500 Virtual Switching System' or device['deviceType']=='Cisco Catalyst 9500 SVL Switch':
if isinstance(manuf_list, list):
vss_data = get_vss_properties(device_name_short)
switch_number = 1
for item in manuf_list:
role = vss_data[switch_number]['role']
part_number = vss_data[switch_number]['model']
model = create_part_model_mapping(part_number)
serial = vss_data[switch_number]['serial']
vss_position = str(vss_data[switch_number]['switch number'])
vss_priority = str(vss_data[switch_number]['priority'])
udf_url="https://"+primeServer+"/webacs/api/v2/op/devices/exportDevices?ipAddress=%22" + device_ip + "%22"
udf_response=requests.get(udf_url,auth=primeCreds,verify=False)
udf_obj=xmltodict.parse(udf_response.text)
udf_value = udf_obj['mgmtResponse']['devicesExportResult']['devices']['device']['udfs']['udf']['value']
row = [udf_value + ',Cisco,' + device_name_short + '-' + str(switch_number) + ',' + device_ip + ',' + model + ',' + software_type + ',' + serial + ',' + device_name_short + ',' + vss_position + ',' + vss_priority + ',' + 'active,' + good_location]
writer.writerow(row)
switch_number +=1
else:
switch_number=manuf_list['name']
serial=manuf_list['serialNumber']
part_number=manuf_list['partNumber']
model = create_part_model_mapping(part_number)
udf_url="https://"+primeServer+"/webacs/api/v2/op/devices/exportDevices?ipAddress=%22" + device_ip + "%22"
response=requests.get(udf_url,auth=primeCreds,verify=False)
udf_obj=xmltodict.parse(response.text)
udf_value = udf_obj['mgmtResponse']['devicesExportResult']['devices']['device']['udfs']['udf']['value']
row = [udf_value + ',Cisco,' + device_name_short + '-' + switch_number[-1] + ',' + device_ip + ',' + model + ',' + software_type + ',' + serial + ',' + device_name_short + ',' + '0,' + '0,' + 'active,' + good_location]
writer.writerow(row)
elif device['deviceType']=='Cisco Catalyst 6500 Virtual Switching System':
if isinstance(manuf_list, list):
vss_data = get_vss_properties_6500(device_name_short)
switch_number = 1
for item in manuf_list:
role = vss_data[switch_number]['role']
part_number = item['partNumber']
model = create_part_model_mapping(part_number)
serial = item['serialNumber']
vss_position = str(vss_data[switch_number]['switch number'])
vss_priority = str(vss_data[switch_number]['priority'])
udf_url="https://"+primeServer+"/webacs/api/v2/op/devices/exportDevices?ipAddress=%22" + device_ip + "%22"
udf_response=requests.get(udf_url,auth=primeCreds,verify=False)
udf_obj=xmltodict.parse(udf_response.text)
udf_value = udf_obj['mgmtResponse']['devicesExportResult']['devices']['device']['udfs']['udf']['value']
row = [udf_value + ',Cisco,' + device_name_short + '-' + str(switch_number) + ',' + device_ip + ',' + model + ',' + software_type + ',' + serial + ',' + device_name_short + ',' + vss_position + ',' + vss_priority + ',' + 'active,' + good_location]
writer.writerow(row)
switch_number +=1
else:
switch_number=manuf_list['name']
serial=manuf_list['serialNumber']
part_number=manuf_list['partNumber']
model = create_part_model_mapping(part_number)
udf_url="https://"+primeServer+"/webacs/api/v2/op/devices/exportDevices?ipAddress=%22" + device_ip + "%22"
response=requests.get(udf_url,auth=primeCreds,verify=False)
udf_obj=xmltodict.parse(response.text)
udf_value = udf_obj['mgmtResponse']['devicesExportResult']['devices']['device']['udfs']['udf']['value']
row = [udf_value + ',Cisco,' + device_name_short + '-' + switch_number[-1] + ',' + device_ip + ',' + model + ',' + software_type + ',' + serial + ',' + device_name_short + ',' + '0,' + '0,' + 'active,' + good_location]
writer.writerow(row)
else:
if isinstance(manuf_list, list):
dev=load_yaml().devices[device_name_short]
dev.connect()
switch_vc_data=dev.parse('show switch')
for list_item_dict in manuf_list:
switch_number, serial, model, vc_position, vc_priority = get_stack_data_from_stack(switch_vc_data, list_item_dict)
udf_url="https://"+primeServer+"/webacs/api/v2/op/devices/exportDevices?ipAddress=%22" + device_ip + "%22"
udf_response=requests.get(udf_url,auth=primeCreds,verify=False)
udf_obj=xmltodict.parse(udf_response.text)
udf_value = udf_obj['mgmtResponse']['devicesExportResult']['devices']['device']['udfs']['udf']['value']
row = [udf_value + ',Cisco,' + device_name_short + '-' + switch_number + ',' + device_ip + ',' + model + ',' + software_type + ',' + serial + ',' + device_name_short + ',' + vc_position + ',' + vc_priority + ',' + 'active,' + good_location]
writer.writerow(row)
else:
switch_number=manuf_list['name']
serial=manuf_list['serialNumber']
part_number=manuf_list['partNumber']
model = create_part_model_mapping(part_number)
udf_url="https://"+primeServer+"/webacs/api/v2/op/devices/exportDevices?ipAddress=%22" + device_ip + "%22"
response=requests.get(udf_url,auth=primeCreds,verify=False)
udf_obj=xmltodict.parse(response.text)
try:
udf_value = udf_obj['mgmtResponse']['devicesExportResult']['devices']['device']['udfs']['udf']['value']
except:
udf_value = 'unknown_role'
pass
row = [udf_value + ',Cisco,' + device_name_short + '-' + switch_number[-1] + ',' + device_ip + ',' + model + ',' + software_type + ',' + serial + ',' + device_name_short + ',' + '0,' + '0,' + 'active,' + good_location]
writer.writerow(row)
def get_virtual_chassis_data_for_topology_devices(internal_obj):
'''
this is the main function. It takes the Prime response object and for each switch it will attempt to use the write_row function
'''
if 'entity' in internal_obj.keys():
entity=internal_obj['entity']
if isinstance(entity,list):
for item in entity:
device=item['devicesDTO']
write_row(device)
else:
device=entity['devicesDTO']
write_row(device)
else:
print('no search results returned')
def get_stack_data_from_stack(switch_vc_data, list_item_dict):
'''
this is a helper function that gets variables from dictionaries
'''
#this fixes the fact that 2960x and 3850 display the name value differently ('name', '1') vs ('name', 'Switch 1')
pre_switch_number = list_item_dict['name']
switch_number=pre_switch_number[-1]
serial=list_item_dict['serialNumber']
model=list_item_dict['partNumber']
vc_position=switch_number
vc_priority=switch_vc_data['switch']['stack'][str(switch_number)]['priority']
return switch_number, serial, model, vc_position, vc_priority
def is_supported_Cisco_device(device):
device_type=device['deviceType']
if device_type=='Unsupported Cisco Device':
return False
else:
return True
def get_device_properties_unsupported(device):
device_name = device['deviceName']
device_type = device['deviceType']
device_ip = device['ipAddress']
if 'location' in device.keys():
device_location = device['location']
else:
device_location = 'default'
return device_name, device_type, device_ip, device_location
def get_device_properties(device):
'''
helper function to return device properties
'''
device_name = device['deviceName']
device_name_short=device_name.replace('.schaeffler.com', '')
device_type=device['deviceType']
device_ip=device['ipAddress']
if 'location' in device.keys():
device_location=device['location']
else:
device_location='default location'
manuf_nrs_dict=device['manufacturerPartNrs']
manuf_list=manuf_nrs_dict['manufacturerPartNr']
software_type=device['softwareType']
return device_name, device_name_short, device_type, device_ip, device_location, manuf_nrs_dict, manuf_list, software_type
def get_vss_properties_6500(device_name_short):
dev=load_yaml().devices[device_name_short]
dev.connect()
obj1 = dev.execute('show switch virtual role')
role_regex = re.compile(r'^(?P<ROLE>(LOCAL)|(REMOTE))\s+(?P<SWNO>\d)\s+(UP|DOWN)\s+(?P<PRIORITY>\d+)[(]\d+[)]\s+(?P<STATUS>\w+)\s+\d+\s+\d+')
virtual_role_dict = {}
inventory_index = 1
for line in obj1.splitlines():
line = line.strip()
result = role_regex.match(line)
if result:
sub_dict = virtual_role_dict.setdefault(inventory_index,{})
group = result.groupdict()
sub_dict['role'] = group['ROLE']
sub_dict['switch number'] = group['SWNO']
sub_dict['priority'] = group['PRIORITY']
sub_dict['status'] = group['STATUS']
inventory_index = inventory_index + 1
continue
else:
('no match in regex')
return virtual_role_dict
def get_vss_properties(device_name_short):
'''
helper function to get data from vss devices
'''
dev=load_yaml().devices[device_name_short]
dev.connect()
obj1 = dev.execute('show switch virtual role')
role_regex = re.compile(r'^(?P<ROLE>(LOCAL)|(REMOTE))\s+ (?P<SWNO>\d)\s+(UP|DOWN)\s+\w+[(]\w\s[)]\s+(?P<PRIORITY>\d+)[(]\d+[)]\s+(?P<STATUS>\w+)\s+\d+\s+\d+')
virtual_role_dict = {}
inventory_index = 1
for line in obj1.splitlines():
line = line.strip()
result = role_regex.match(line)
if result:
sub_dict = virtual_role_dict.setdefault(inventory_index,{})
group = result.groupdict()
sub_dict['role'] = group['ROLE']
sub_dict['switch number'] = group['SWNO']
sub_dict['priority'] = group['PRIORITY']
inventory_index = inventory_index + 1
continue
virtual_role_dict.pop(3, None)
virtual_role_dict.pop(4, None)
obj = dev.execute('show module')
p1 = re.compile(r'^(Switch Number:)\s(?P<NUMBER>\d)\s(Role:)\s(Virtual Switch)\s(?P<ROLE>(Active|Standby))')
p2 = re.compile(r'^(?P<SWNUMBER>\d)\s+\d+\s+.*(?P<MODEL>(WS-C\d{4}X-\d+))\s+(?P<SERIAL>\w{3}\d{4}.{4})')
parsed_dict = {}
inventory_index = 1
for line in obj.splitlines():
line = line.strip()
result = p1.match(line)
result2 = p2.match(line)
if result:
inventory_dict = virtual_role_dict.setdefault(inventory_index,{})
group = result.groupdict()
inventory_dict['switch number'] = group['NUMBER']
inventory_dict['role'] = group['ROLE']
continue
if result2:
group = result2.groupdict()
inventory_dict['model'] = group['MODEL']
inventory_dict['serial'] = group ['SERIAL']
inventory_index +=1
continue
return virtual_role_dict
get_virtual_chassis_data_for_topology_devices(query_prime(searched_thing))
#usage example
#python3 device_data_chef.py --user <admusername> --passkey '<admpass>' --searched_thing 'wro'
The output is a .csv file with device data ROLE,MANUFACTURER,SWITCH FULL NAME,IP_ADDRESS,NETBOX MODEL,PLATFORM,SERIAL,SWITCH SHORT NAME,VC POSITION, VC PRIORITY, STATUS, SITE
"User_Access,Cisco,s03wro005-1,172.16.0.105,Catalyst 2960X-48FPS-L,IOS,FCW1942A4TW,s03wro005,1,15,active,IT-wroclaw-4"
"User_Access,Cisco,s03wro005-2,172.16.0.105,Catalyst 2960X-48FPD-L,IOS,FOC1237S1RN,s03wro005,2,14,active,IT-wroclaw-4"
"User_Access,Cisco,s03wro006-1,172.16.0.113,Catalyst 2960X-48FPD-L,IOS,FOC1237S1C5,s03wro006,0,0,active,IT-wroclaw-4"
"User_Access,Cisco,s03wro007-1,172.16.0.121,Catalyst 2960X-48FPD-L,IOS,FOC1237S1CM,s03wro007,0,0,active,IT-wroclaw-4"
"User_Access,Cisco,s03wro008-1,172.16.0.129,Catalyst 2960X-48FPD-L,IOS,FOC1237S1CR,s03wro008,0,0,active,IT-wroclaw-4"
"Foreign_Controller,Cisco,b01wro001-s,172.16.0.137,AIR-CT5508-K9,Cisco Controller,FCW1239G3BB,b01wro001,0,0,active,IT-wroclaw-4"
"Production_Router,Cisco,r01wro003-1,172.16.0.252,Catalyst 3850-24P-E,IOS-XE,FCW1232E14R,r01wro003,0,0,active,IT-wroclaw-4"
"Production_Router,Cisco,r01wro002-1,172.16.0.253,Catalyst 3850-24P-E,IOS-XE,FOC1239U14J,r01wro002,0,0,active,IT-wroclaw-4"
"Core,Cisco,r01wro001-vss-1,172.16.0.254,Catalyst C4500X-16,IOS-XE,JAE123822PW,r01wro001-vss,1,120,active,IT-wroclaw-4"
"Core,Cisco,r01wro001-vss-2,172.16.0.254,Catalyst C4500X-16,IOS-XE,JAE123832MJ,r01wro001-vss,2,110,active,IT-wroclaw-4"
"Server_Access,Cisco,s03wro101-1,172.16.0.65,Catalyst 3850-24T-L,IOS-XE,FOC1233Y0AL,s03wro101,1,15,active,IT-wroclaw-4"
"Server_Access,Cisco,s03wro101-2,172.16.0.65,Catalyst 3850-24T-L,IOS-XE,FOC1233Y0AP,s03wro101,2,14,active,IT-wroclaw-4"
"User_Access,Cisco,s03wro001-1,172.16.0.73,Catalyst 2960X-48FPD-L,IOS,FCW1235B2AC,s03wro001,1,15,active,IT-wroclaw-4"
"User_Access,Cisco,s03wro001-2,172.16.0.73,Catalyst 2960X-48FPD-L,IOS,FOC1237D1QK,s03wro001,2,14,active,IT-wroclaw-4"
"User_Access,Cisco,s03wro001-3,172.16.0.73,Catalyst 2960X-48FPD-L,IOS,FOC1239S1H3,s03wro001,3,1,active,IT-wroclaw-4"
3) The third script takes the device input data and exports it to Netbox. I’ve reworked the previous version and it now looks and works much better:
import csv
import re
import argparse
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
import warnings
import pynetbox
from requests.auth import HTTPBasicAuth
warnings.filterwarnings("ignore")
p = argparse.ArgumentParser()
p.add_argument('--user', type=str)
p.add_argument('--passkey', type=str)
p.add_argument('--token', type=str)
p.add_argument('--export_site', type=str)
args = p.parse_args()
token = args.token
user = args.user
passkey = args.passkey
site_abbrev = args.export_site
primeServer="prime.yourdomain.com"
primeCreds = HTTPBasicAuth(user, passkey)
session = requests.Session()
session.verify = False
nb = pynetbox.api("https://netbox.yourdomain.com", token)
nb.http_session = session
def create_part_model_mapping(part_number):
'''
this will translate parts into models
'''
device_types = nb.dcim.device_types.filter(part_number=part_number)
for device_type in device_types:
device_typedict = dict(device_type)
model = device_typedict['model']
print(f'model for the {part_number} is {model}')
return model
def get_site_id(location_name):
"""
Gets site ID for a given location. Needed for Netbox internal workings. For example: if you want to create a device, you need the site id, rather than its name.
"""
sites = nb.dcim.sites.filter(name=location_name)
for site in sites:
sitedict = dict(site)
try:
siteid = str(sitedict.get('id'))
except Exception as e:
print(e)
else:
return siteid
def get_manufacturer_id(searched_manufacturer):
"""
Gets manufacturer ID for a given manufacturer. Needed for Netbox internal workings. For example: if you want to create a device, you need the manufacturer id, rather than 'Cisco'.
"""
manufacturers = nb.dcim.manufacturers.filter(name=searched_manufacturer)
for manufacturer in manufacturers:
manufacturerdict = dict(manufacturer)
try:
manufacturerid = str(manufacturerdict.get('id'))
except Exception as e:
print(e)
else:
return manufacturerid
def get_device_model_id_for_part_number(part_number):
"""
Gets device type ID for a given device type. Needed for Netbox internal workings. For example: if you want to create a device, you need the device type id, rather than '2960x-fpd-something'.
"""
device_types = nb.dcim.device_types.filter(part_number=part_number)
for device_type in device_types:
device_typedict = dict(device_type)
try:
device_typeid = str(device_typedict.get('id'))
except Exception as e:
print(e)
else:
return device_typeid
def get_device_model_id_for_model(model):
'''
alternative if you have model instead of part number like above
'''
device_types = nb.dcim.device_types.filter(model=model)
for device_type in device_types:
device_typedict = dict(device_type)
try:
device_typeid = str(device_typedict.get('id'))
except Exception as e:
print(e)
else:
return device_typeid
def get_role_id(searched_role):
roles = nb.dcim.device_roles.filter(name=searched_role)
for role in roles:
roledict = dict(role)
try:
roleid = roledict.get('id')
except Exception as e:
print(f'get_role_id failed with error {e}')
else:
return roleid
def get_device_id(searched_device):
"""
Gets device ID for a given device . Needed for Netbox internal workings. Every device has an ID.
"""
devices = nb.dcim.devices.filter(name=searched_device)
for device in devices:
devicedict = dict(device)
if searched_device in devicedict.values():
deviceid = str(devicedict.get('id'))
return deviceid
else:
deviceid = None
return deviceid
def get_virtual_chassis_id(vc_name):
"""
Gets Virtual chassisID for a given VC. Needed for Netbox internal workings.
"""
virtual_chasses = nb.dcim.virtual_chassis.filter(name=vc_name)
for virtual_chassis in virtual_chasses:
virtual_chassisdict = dict(virtual_chassis)
if vc_name in virtual_chassisdict.values():
virtual_chassisid = str(virtual_chassisdict.get('id'))
return virtual_chassisid
else:
virtual_chassisid = None
return virtual_chassisid
def get_platform_id(platform):
platforms = nb.dcim.platforms.filter(name=platform)
for x in platforms:
platformdict = dict(x)
if platform in platformdict.values():
platformid = platformdict.get('id')
return platformid
def get_ipaddr_id(ipaddress):
ipaddresswithmask = ipaddress + '/32'
try:
addresses = nb.ipam.ip_addresses.filter(address=ipaddresswithmask)
for address in addresses:
ipdict = dict(address)
if ipaddresswithmask in ipdict.values():
addressid = ipdict.get('id')
return addressid
except Exception as e:
print(f'get_ipaddr_id function failed with error {e} so i will create a new ip address')
class Network_device:
role = ""
manufacturer = ""
full_name = ""
ip_address = ""
model = ""
platform = ""
serial = ""
vc_name = ""
vc_position = ""
vc_priority = ""
status = ""
site_name = ""
role_id = ""
manufacturer_id = ""
model_id = ""
platform_id = ""
site_id = ""
vc_name_id = ""
device_id = ""
def __init__(self, var1, var2, var3, var4, var5, var6, var7, var8, var9, var10, var11, var12):
self.role = var1
self.manufacturer = var2
self.full_name = var3
self.ip_address = var4
self.model = var5
self.platform = var6
self.serial = var7
self.vc_name = var8
self.vc_position = var9
self.vc_priority = var10
self.status = var11
self.site_name = var12
def load_netbox_identifiers(self):
self.role_id = get_role_id(self.role)
self.manufacturer_id = get_manufacturer_id(self.manufacturer)
self.model_id = get_device_model_id_for_model(self.model)
self.platform_id = get_platform_id(self.platform)
self.site_id = get_site_id(self.site_name)
def check_if_vc_exists_in_netbox(self):
self.vc_name_id = get_virtual_chassis_id(self.vc_name)
if self.vc_name_id is None:
print('VC not found')
return False
else:
print(f'VC found and id is {self.vc_name_id}')
return True
def check_if_device_exists_in_netbox_as_vc_member(self):
device_id = get_device_id(self.full_name)
if device_id is None:
print(f'device {self.full_name} not found in Netbox yet')
return False
else:
print(f'device {self.full_name} found in Netbox already')
return True
def check_if_device_exists_in_netbox_as_individual_switch(self):
device_id = get_device_id(self.vc_name)
if device_id is None:
print('device not found')
return False
else:
print('device found')
return True
def check_if_ip_exists(self):
ipaddresswithmask = self.ip_address + '/32'
addresses = nb.ipam.ip_addresses.filter(address=ipaddresswithmask)
for address in addresses:
ipdict = dict(address)
if ipaddresswithmask in ipdict.values():
print('IP found')
return True
else:
print('IP not found')
return False
def create_new_ip_address(self):
new_ip_address = nb.ipam.ip_addresses.create(address=self.ip_address, description="Management IP", assigned_object_type='dcim.interface', assigned_object_id=self.new_interface_id)
def set_primary_ip_for_vc_member_switches(self):
device = nb.dcim.devices.get(name=self.full_name)
ip_id = get_ipaddr_id(self.ip_address)
device.primary_ip4 = ip_id
device.primary_ip = ip_id
device.save()
def set_primary_ip_for_individual_switches(self):
device = nb.dcim.devices.get(name=self.vc_name)
ip_id = get_ipaddr_id(self.ip_address)
device.primary_ip4 = ip_id
device.primary_ip = ip_id
device.save()
def add_virtual_chassis(self):
device_virtualchassis = nb.dcim.virtual_chassis.create(name=self.vc_name)
self.vc_name_id = get_virtual_chassis_id(self.vc_name)
def export_vc_member_device_to_netbox(self):
new_device = nb.dcim.devices.create(device_role=self.role_id, manufacturer=self.manufacturer_id, name=self.full_name, platform=self.platform_id, device_type=self.model_id, virtual_chassis=self.vc_name_id, vc_position=self.vc_position, vc_priority=self.vc_priority,serial=self.serial, status=self.status, site=self.site_id)
self.device_id = get_device_id(self.full_name)
def export_individual_device_to_netbox(self):
new_device = nb.dcim.devices.create(device_role=self.role_id, manufacturer=self.manufacturer_id, name=self.vc_name, platform=self.platform_id, device_type=self.model_id, serial=self.serial, status=self.status, site=self.site_id)
self.device_id = get_device_id(self.vc_name)
def create_management_interface(self):
label = 'vlan425 ' + 'on ' + self.vc_name
interface_dict = dict(name='Vlan428', type='virtual', device=self.device_id, label = label)
new_interface = nb.dcim.interfaces.create(interface_dict)
self.new_interface_id = new_interface['id']
def change_interface_names(self):
linecard_number = self.vc_position
if int(linecard_number) > 1:
print('changing interface names to reflect stack modules...')
ifaces = nb.dcim.interfaces.filter(device_id=self.device_id)
for iface in ifaces:
ifacedict = dict(iface)
interface_regex = re.compile(r'^(?P<PREFIX>\D+)(?P<LINECARD>\d+)[\/](?P<MODULE>\d)[\/](?P<PORT>\d+)')
old_name = ifacedict['name']
result = interface_regex.match(ifacedict['name'])
if result:
group = result.groupdict()
interface_prefix = group['PREFIX']
interface_module = group['MODULE']
interface_port = group['PORT']
new_interface_name = interface_prefix + linecard_number + '/' + interface_module + '/' + interface_port
iface.name = new_interface_name
iface.save()
my_list = []
filename = 'devices_contains-' + site_abbrev + '.csv'
with open(filename, 'r') as f:
reader = csv.reader(f)
###sanitise row data??###
for row in reader:
newlist = str(row[0]).split(',')
#print(len(newlist))
my_list.append(Network_device(newlist[0], newlist[1], newlist[2], newlist[3], newlist[4], newlist[5], newlist[6], newlist[7], newlist[8], newlist[9], newlist[10], newlist[11]))
for item in my_list:
item.load_netbox_identifiers()
#workflow for VC
if item.vc_position != '0':
if not item.check_if_vc_exists_in_netbox():
item.add_virtual_chassis()
if not item.check_if_device_exists_in_netbox_as_vc_member():
item.export_vc_member_device_to_netbox()
item.create_management_interface()
if not item.check_if_ip_exists():
item.create_new_ip_address()
item.set_primary_ip_for_vc_member_switches()
item.change_interface_names()
else:
if not item.check_if_device_exists_in_netbox_as_vc_member():
item.export_vc_member_device_to_netbox()
item.create_management_interface()
if not item.check_if_ip_exists():
item.create_new_ip_address()
item.set_primary_ip_for_vc_member_switches()
item.change_interface_names()
#workflow for individual switches
else:
if not item.check_if_device_exists_in_netbox_as_individual_switch():
item.export_individual_device_to_netbox()
item.create_management_interface()
if not item.check_if_ip_exists():
item.create_new_ip_address()
item.set_primary_ip_for_individual_switches()
print(item.role, item.manufacturer, item.full_name, item.ip_address, item.model, item.platform, item.serial, item.vc_name, item.vc_position, item.vc_priority, item.status, item.site_name, item.role_id)