설치할 모듈: paramiko, pandas,openpyxl
#-------------------------------------------#
# #
# Network Script tool #
# Version 1.1.3 #
# Make Jo Sung Jin #
# #
#-------------------------------------------#
import paramiko
import telnetlib
import socket
import time
import os
import pandas as pd
from io import StringIO
import threading
import argparse
parser = argparse.ArgumentParser(description='Use config auto Script')
# 입력받을 인자값 등록
parser.add_argument('-ti', metavar='[tacas_id]', required=True, help='TACAS ID', type=str)
parser.add_argument('-tp', metavar='[tacas_pw]', required=True, help='TACAS PW', type=str)
parser.add_argument('-id', metavar='[local_id]', required=True, help='Local ID', type=str)
parser.add_argument('-pw', metavar='[local_pw]', required=True, help='Local PW', type=str)
parser.add_argument('-p', metavar='[file_path]', required=True, help='File path', type=str)
# 입력받은 인자값을 args에 저장 (type: namespace)
args = parser.parse_args()
ID = args.ti # TACAS ID
PASSWD = args.tp # TACAS PASSWORD
LOCAL_ID = args.id # LOCAL ID
LOCAL_PASSWD = args.pw # LOCAL PASSWORD
XLSX_FILE_NAME = args.p # xlsx file name exe) Test.xlsx
cmd = ['ter len 0','sh ver','sh clock','dir all','sh env all','sh env','sh inven','sh module','sh redunsh','sh power inline','sh power de','sh proc cpu | e 0.00','sh proc mem','sh vlan b','sh arp',
'sh mac address-table','sh spa','sh etherchannel summary','sh ip ssh','sh int tru','sh int','sh ip int br','sh int status','sh int des','sh cdp ne','sh cdp nei detail','sh lldp ne','sh ip ro',
'sh standby br','sh span root','sh span block','sh int count err','sh log','sh run','sh mac address table'] # TACAS R&S command
nac_cmd = ['show configuration','@shell','cd ..','cd disk/sys/conf','cat CLOGINPASS'] # NAC Command
vpn_cmd = ['show run'] # VPN Command
fire_cmd = ['full configuration'] # FireWall Command
ap_cmd = ['ter len 0','sh run'] # AP Command
def ssh_connect(HOST,Num,*cmd):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
cmd = list(cmd)
try:
ssh.connect(HOST, username=ID, password=PASSWD,allow_agent=False,look_for_keys=False)
shell = ssh.invoke_shell()
shell.send('en\n')
time.sleep(1)
shell.send(PASSWD + '\n')
time.sleep(1)
shell.recv(1024)
for i in range(0,len(cmd)):
shell.send(cmd[i] + '\n')
time.sleep(15)
output = shell.recv(1024000).decode(encoding='utf-8')
cnt = 0
while True:
time.sleep(1.5)
if shell.recv_ready():
globals()["output{}".format(cnt)] = shell.recv(1024000).decode(encoding='utf-8')
cnt += 1
else:
break
shell.close()
ssh.close()
ssh_log = ""
ssh_log += str(output).replace('\r\n','\n')
for i in range(0,cnt):
ssh_log += str(globals()["output{}".format(i)]).replace('\r\n','\n')
print(ssh_log)
create_log(Num,ssh_log)
except Exception:
local_ssh_connect(HOST,Num,cmd)
except EOFError:
pass
def local_ssh_connect(HOST,ssh_num,cmd_ssh):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
err_ssh_num = ssh_num
cmd_ssh = list(cmd_ssh)
try:
ssh.connect(HOST, username=LOCAL_ID, password=LOCAL_PASSWD,allow_agent=False,look_for_keys=False)
shell = ssh.invoke_shell()
shell.send('en\n')
time.sleep(1)
shell.send(LOCAL_PASSWD + '\n')
time.sleep(1)
shell.recv(65535)
for i in range(0,len(cmd_ssh)):
shell.send(cmd_ssh[i] + '\n')
time.sleep(10)
output = shell.recv(1024000).decode(encoding='utf-8')
cnt = 0
while True:
time.sleep(1.5)
if shell.recv_ready():
globals()["output{}".format(cnt)] = shell.recv(1024000).decode(encoding='utf-8')
cnt += 1
else:
break
shell.close()
ssh.close()
ssh_log = ""
ssh_log += str(output).replace('\r\n','\n')
for i in range(0,cnt):
ssh_log += str(globals()["output{}".format(i)]).replace('\r\n','\n')
create_log(ssh_num,ssh_log)
except Exception:
Telnet_connect(HOST,err_ssh_num,cmd_ssh)
except EOFError:
pass
def ssh_log_gen(*Str):
io = StringIO()
print(*Str, file=io, sep='\n')
return io.getvalue()
def Telnet_connect(HOST,telnet_num,cmd_telnet):
err_telnet_num = telnet_num
try:
tn = telnetlib.Telnet(HOST)
tn.read_until(b"User")
tn.write(ID.encode('ascii') + b'\n')
tn.read_until(b"assword")
tn.write(PASSWD.encode('ascii') + b'\n')
result = tn.read_until(b'User',timeout=5)
rex = (b': \r\n% Login invalid\r\n\r\nUser')
rex1 = (b':\r\n% Login invalid\r\n\r\nUser')
rex2 = (b': \r\n\r\n% Authentication failed\r\n\r\nUser')
rex3 = (b':\r\n\r\n% Authentication failed\r\n\r\nUser')
rex4 = (b': \r\n\r\n% Authentication failed.\r\n\r\nUser')
rex5 = (b':\r\n\r\n% Authentication failed.\r\n\r\nUser')
tn.write(b' ' + b'\n')
en_val = ('>')
en_ps_val = ('assword')
if (result == rex) or (result == rex1) or (result == rex2) or (result == rex3) or (result == rex4) or (result == rex5):
err_Telnet_connect(HOST,err_telnet_num,lo_cmd)
else:
en_ck_b = tn.read_until(b'>',timeout=3)
en_ck_s = str(en_ck_b)
if en_val in en_ck_s:
tn.write(b'en\n')
time.sleep(1)
en_ps_b = tn.read_until(b"assword",timeout=3)
en_ps_s = str(en_ps_b)
if en_ps_val in en_ps_s:
tn.write(PASSWD.encode('ascii') + b'\n')
time.sleep(1)
for i in range(0,len(cmd_telnet)):
tn.write(cmd_telnet[i].encode('ascii') + b'\n')
time.sleep(15)
tn.write(b'exit\n')
telnet_log = tn.read_all().decode('ascii')
telnet_log0 = telnet_log.replace('\r\n','\n')
create_log(telnet_num,telnet_log0)
except Exception:
err_Telnet_connect(HOST,err_telnet_num,cmd_telnet)
def err_Telnet_connect(HOST,num,cmd_lotel):
try:
tn = telnetlib.Telnet(HOST)
tn.read_until(b"User")
tn.write(LOCAL_ID.encode('ascii') + b'\n')
tn.read_until(b"assword")
tn.write(LOCAL_PASSWD.encode('ascii') + b'\n')
en_val = ('>')
en_ps_val = ('assword')
en_ck_b = tn.read_until(b'>',timeout=3)
en_ck_s = str(en_ck_b)
if en_val in en_ck_s:
tn.write(b'en\n')
time.sleep(1)
en_ps_b = tn.read_until(b"assword",timeout=3)
en_ps_s = str(en_ps_b)
if en_ps_val in en_ps_s:
tn.write(LOCAL_PASSWD.encode('ascii') + b'\n')
time.sleep(1)
for i in range(0,len(cmd_lotel)):
tn.write(cmd_lotel[i].encode('ascii') + b'\n')
time.sleep(15)
tn.write(b'exit\n')
telnet_log = tn.read_all().decode('ascii')
telnet_log0 = telnet_log.replace('\r\n','\n')
create_log(num,telnet_log0)
except Exception as e:
print('Last: ' + HOST + str(e))
error_log(num)
def create_log(num,l_log):
f_log = open(PATH + '/' + HOSTNAME[num]+".log",mode='a',encoding='utf-8')
f_log.write(l_log)
def error_log(err_num):
f_err = open(PATH + '/error.log',mode='a',encoding='utf-8')
f_err.write(HOSTNAME[err_num] + ': ' + HOST[err_num] + '\n')
def createDir(DIR):
if not os.path.exists(DIR):
os.makedirs(DIR)
def xlsx_read(FILE,threadnum = 10):
global HOSTNAME
global HOST
global PATH
for x in range(0,5):
if x == 0:
f_xlsx = pd.read_excel(FILE, engine = "openpyxl", sheet_name=0)
HOSTNAME = f_xlsx['HOSTNAME']
HOST = f_xlsx['HOST']
PATH = './R&S' + '(' + str(len(HOST)) + ')'
createDir(PATH)
try:
count = 0
while count < len(HOST):
for j in range(1):
threading.Thread(target=ssh_connect, args=(str(HOST[count]),count,*cmd)).start()
time.sleep(1.5)
count += 1
except Exception:
pass
'''
elif x == 1:
f_xlsx = pd.read_excel(FILE, engine = "openpyxl", sheet_name=1)
HOSTNAME = f_xlsx['HOSTNAME']
HOST = f_xlsx['HOST']
PATH = './VPN' + '(' + str(len(HOST)) + ')'
createDir(PATH)
try:
count = 0
while count < len(HOST):
for j in range(1):
threading.Thread(target=ssh_connect, args=(str(HOST[count]),count,*vpn_cmd)).start()
time.sleep(1.5)
count += 1
except Exception:
pass
elif x == 2:
f_xlsx = pd.read_excel(FILE, engine = "openpyxl", sheet_name=2)
HOSTNAME = f_xlsx['HOSTNAME']
HOST = f_xlsx['HOST']
PATH = './Firewall' + '(' + str(len(HOST)) + ')'
createDir(PATH)
try:
count = 0
while count < len(HOST):
for j in range(1):
threading.Thread(target=ssh_connect, args=(str(HOST[count]),count,*fire_cmd)).start()
time.sleep(1.5)
count += 1
except Exception:
pass
elif x == 3:
f_xlsx = pd.read_excel(FILE, engine = "openpyxl", sheet_name=1)
HOSTNAME = f_xlsx['HOSTNAME']
HOST = f_xlsx['HOST']
PATH = './NAC' + '(' + str(len(HOST)) + ')'
createDir(PATH)
try:
count = 0
while count < len(HOST):
for j in range(1):
threading.Thread(target=nac_connect, args=(str(HOST[count]),count,*nac_cmd)).start()
time.sleep(1.5)
count += 1
except Exception:
pass
elif x == 4:
f_xlsx = pd.read_excel(FILE, engine = "openpyxl", sheet_name=1)
HOSTNAME = f_xlsx['HOSTNAME']
HOST = f_xlsx['HOST']
PATH = './Cisco AP' + '(' + str(len(HOST)) + ')'
createDir(PATH)
try:
count = 0
while count < len(HOST):
for j in range(1):
threading.Thread(target=ssh_connect, args=(str(HOST[count]),count,*ap_cmd)).start()
time.sleep(1.5)
count += 1
except Exception:
pass
else:
f_xlsx = pd.read_excel(FILE, engine = "openpyxl", sheet_name=1)
HOSTNAME = f_xlsx['HOSTNAME']
HOST = f_xlsx['HOST']
PATH = './Aruba AP' + '(' + str(len(HOST)) + ')'
createDir(PATH)
try:
count = 0
while count < len(HOST):
for j in range(1):
threading.Thread(target=ssh_connect, args=(str(HOST[count]),count,*ap_cmd)).start()
time.sleep(1.5)
count += 1
except Exception:
pass
'''
xlsx_read(XLSX_FILE_NAME,5)
'Python' 카테고리의 다른 글
Log File -> Cfg File -> CSV File (fortigate 방화벽 정책 한정) (0) | 2022.08.31 |
---|---|
R&S 파이썬 자동화_Input Command (0) | 2022.08.29 |
Fortigate 방화벽 Hit Count 수집 (export 안되는 버전 전용) (수정1) (0) | 2022.08.19 |
디렉토리 Search-01 (0) | 2022.07.21 |
텍스트 수정 파일 제작기 - 04 (0) | 2022.07.11 |