본문 바로가기
Python

R&S 파이썬 자동화

by Ao1 2022. 8. 12.

설치할 모듈: paramiko, pandas,openpyxl

#-------------------------------------------#
#                                           #
#        Network Script tool                #
#          Version 1.1.3                    #
#         Make Jo Sung Jin                  #
#                                           #
#-------------------------------------------#

import paramiko
import telnetlib
import socket
import time
import os
import pandas as pd
from io import StringIO
import threading
import argparse

parser = argparse.ArgumentParser(description='Use config auto Script')

# 입력받을 인자값 등록
parser.add_argument('-ti', metavar='[tacas_id]', required=True, help='TACAS ID', type=str)
parser.add_argument('-tp', metavar='[tacas_pw]', required=True, help='TACAS PW', type=str)
parser.add_argument('-id', metavar='[local_id]', required=True, help='Local ID', type=str)
parser.add_argument('-pw', metavar='[local_pw]', required=True, help='Local PW', type=str)
parser.add_argument('-p', metavar='[file_path]', required=True, help='File path', type=str)

# 입력받은 인자값을 args에 저장 (type: namespace)
args = parser.parse_args()

ID = args.ti            # TACAS ID
PASSWD = args.tp        # TACAS PASSWORD

LOCAL_ID = args.id              # LOCAL ID
LOCAL_PASSWD = args.pw          # LOCAL PASSWORD

XLSX_FILE_NAME = args.p          # xlsx file name exe) Test.xlsx


cmd = ['ter len 0','sh ver','sh clock','dir all','sh env all','sh env','sh inven','sh module','sh redunsh','sh power inline','sh power de','sh proc cpu | e 0.00','sh proc mem','sh vlan b','sh arp',
  'sh mac address-table','sh spa','sh etherchannel summary','sh ip ssh','sh int tru','sh int','sh ip int br','sh int status','sh int des','sh cdp ne','sh cdp nei detail','sh lldp ne','sh ip ro',
  'sh standby br','sh span root','sh span block','sh int count err','sh log','sh run','sh mac address table']  # TACAS R&S command
nac_cmd = ['show configuration','@shell','cd ..','cd disk/sys/conf','cat CLOGINPASS']   # NAC Command
vpn_cmd = ['show run']                                                                  # VPN Command
fire_cmd = ['full configuration']                                                       # FireWall Command
ap_cmd = ['ter len 0','sh run']                                                         # AP Command


def ssh_connect(HOST,Num,*cmd):
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

    cmd = list(cmd)
    try:
        ssh.connect(HOST, username=ID, password=PASSWD,allow_agent=False,look_for_keys=False)
        shell = ssh.invoke_shell()

        shell.send('en\n')
        time.sleep(1)
        shell.send(PASSWD + '\n')
        time.sleep(1)
        shell.recv(1024)

        for i in range(0,len(cmd)):
            shell.send(cmd[i] + '\n')
               
        time.sleep(15)
        output = shell.recv(1024000).decode(encoding='utf-8')
        cnt = 0
        while True:
            time.sleep(1.5)
            if shell.recv_ready():
                globals()["output{}".format(cnt)] = shell.recv(1024000).decode(encoding='utf-8')
                cnt += 1
            else:
                break

        shell.close()
        ssh.close()

        ssh_log = ""
        ssh_log += str(output).replace('\r\n','\n')
        for i in range(0,cnt):
            ssh_log += str(globals()["output{}".format(i)]).replace('\r\n','\n')
       
        print(ssh_log)
        create_log(Num,ssh_log)

    except Exception:
        local_ssh_connect(HOST,Num,cmd)
    except EOFError:
        pass

def local_ssh_connect(HOST,ssh_num,cmd_ssh):
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
   
    err_ssh_num = ssh_num
    cmd_ssh = list(cmd_ssh)
    try:
        ssh.connect(HOST, username=LOCAL_ID, password=LOCAL_PASSWD,allow_agent=False,look_for_keys=False)
        shell = ssh.invoke_shell()

        shell.send('en\n')
        time.sleep(1)
        shell.send(LOCAL_PASSWD + '\n')
        time.sleep(1)
        shell.recv(65535)
       
        for i in range(0,len(cmd_ssh)):
            shell.send(cmd_ssh[i] + '\n')  

        time.sleep(10)
        output = shell.recv(1024000).decode(encoding='utf-8')
        cnt = 0
        while True:
            time.sleep(1.5)
            if shell.recv_ready():
                globals()["output{}".format(cnt)] = shell.recv(1024000).decode(encoding='utf-8')
                cnt += 1
            else:
                break

        shell.close()
        ssh.close()
        ssh_log = ""
        ssh_log += str(output).replace('\r\n','\n')
        for i in range(0,cnt):
            ssh_log += str(globals()["output{}".format(i)]).replace('\r\n','\n')
             
        create_log(ssh_num,ssh_log)

    except Exception:
        Telnet_connect(HOST,err_ssh_num,cmd_ssh)
    except EOFError:
        pass

def ssh_log_gen(*Str):
    io = StringIO()
    print(*Str, file=io, sep='\n')
    return io.getvalue()

def Telnet_connect(HOST,telnet_num,cmd_telnet):
    err_telnet_num = telnet_num
    try:
        tn = telnetlib.Telnet(HOST)

        tn.read_until(b"User")
        tn.write(ID.encode('ascii') + b'\n')
        tn.read_until(b"assword")
        tn.write(PASSWD.encode('ascii') + b'\n')

        result = tn.read_until(b'User',timeout=5)
        rex = (b': \r\n% Login invalid\r\n\r\nUser')
        rex1 = (b':\r\n% Login invalid\r\n\r\nUser')
        rex2 = (b': \r\n\r\n% Authentication failed\r\n\r\nUser')
        rex3 = (b':\r\n\r\n% Authentication failed\r\n\r\nUser')
        rex4 = (b': \r\n\r\n% Authentication failed.\r\n\r\nUser')
        rex5 = (b':\r\n\r\n% Authentication failed.\r\n\r\nUser')

        tn.write(b' ' + b'\n')
        en_val = ('>')
        en_ps_val = ('assword')

        if (result == rex) or (result == rex1) or (result == rex2) or (result == rex3) or (result == rex4) or (result == rex5):
            err_Telnet_connect(HOST,err_telnet_num,lo_cmd)
        else:
            en_ck_b = tn.read_until(b'>',timeout=3)
            en_ck_s = str(en_ck_b)
           
            if en_val in en_ck_s:
                tn.write(b'en\n')
                time.sleep(1)
                en_ps_b = tn.read_until(b"assword",timeout=3)
                en_ps_s = str(en_ps_b)
               
                if en_ps_val in en_ps_s:
                    tn.write(PASSWD.encode('ascii') + b'\n')
                    time.sleep(1)
           
            for i in range(0,len(cmd_telnet)):
                tn.write(cmd_telnet[i].encode('ascii') + b'\n')

            time.sleep(15)
            tn.write(b'exit\n')
                   
            telnet_log = tn.read_all().decode('ascii')
            telnet_log0 = telnet_log.replace('\r\n','\n')
            create_log(telnet_num,telnet_log0)

    except Exception:
        err_Telnet_connect(HOST,err_telnet_num,cmd_telnet)

def err_Telnet_connect(HOST,num,cmd_lotel):
    try:
        tn = telnetlib.Telnet(HOST)
       
        tn.read_until(b"User")
        tn.write(LOCAL_ID.encode('ascii') + b'\n')
        tn.read_until(b"assword")
        tn.write(LOCAL_PASSWD.encode('ascii') + b'\n')
       
        en_val = ('>')
        en_ps_val = ('assword')

        en_ck_b = tn.read_until(b'>',timeout=3)
        en_ck_s = str(en_ck_b)
       
        if en_val in en_ck_s:
            tn.write(b'en\n')
            time.sleep(1)
            en_ps_b = tn.read_until(b"assword",timeout=3)
            en_ps_s = str(en_ps_b)

            if en_ps_val in en_ps_s:
                tn.write(LOCAL_PASSWD.encode('ascii') + b'\n')
                time.sleep(1)
               
        for i in range(0,len(cmd_lotel)):
            tn.write(cmd_lotel[i].encode('ascii') + b'\n')
                   
        time.sleep(15)
        tn.write(b'exit\n')
        telnet_log = tn.read_all().decode('ascii')
        telnet_log0 = telnet_log.replace('\r\n','\n')
        create_log(num,telnet_log0)

    except Exception as e:
        print('Last: ' + HOST + str(e))
        error_log(num)

def create_log(num,l_log):
    f_log = open(PATH + '/' + HOSTNAME[num]+".log",mode='a',encoding='utf-8')
    f_log.write(l_log)

def error_log(err_num):
    f_err = open(PATH + '/error.log',mode='a',encoding='utf-8')
    f_err.write(HOSTNAME[err_num] + ': ' + HOST[err_num] + '\n')

def createDir(DIR):
    if not os.path.exists(DIR):
        os.makedirs(DIR)

def xlsx_read(FILE,threadnum = 10):

    global HOSTNAME
    global HOST
    global PATH

    for x in range(0,5):
        if x == 0:
            f_xlsx = pd.read_excel(FILE, engine = "openpyxl", sheet_name=0)

            HOSTNAME = f_xlsx['HOSTNAME']
            HOST = f_xlsx['HOST']

            PATH = './R&S' + '(' + str(len(HOST)) + ')'
            createDir(PATH)
            try:
                count = 0
                while count < len(HOST):
                    for j in range(1):
                        threading.Thread(target=ssh_connect, args=(str(HOST[count]),count,*cmd)).start()
                        time.sleep(1.5)
                        count += 1
            except Exception:
                pass
'''
        elif x == 1:  
            f_xlsx = pd.read_excel(FILE, engine = "openpyxl", sheet_name=1)

            HOSTNAME = f_xlsx['HOSTNAME']
            HOST = f_xlsx['HOST']

            PATH = './VPN' + '(' + str(len(HOST)) + ')'
            createDir(PATH)
            try:
                count = 0
                while count < len(HOST):
                    for j in range(1):
                        threading.Thread(target=ssh_connect, args=(str(HOST[count]),count,*vpn_cmd)).start()
                        time.sleep(1.5)
                        count += 1
            except Exception:
                pass

        elif x == 2:  
            f_xlsx = pd.read_excel(FILE, engine = "openpyxl", sheet_name=2)

            HOSTNAME = f_xlsx['HOSTNAME']
            HOST = f_xlsx['HOST']

            PATH = './Firewall' + '(' + str(len(HOST)) + ')'
            createDir(PATH)
            try:
                count = 0
                while count < len(HOST):
                    for j in range(1):
                        threading.Thread(target=ssh_connect, args=(str(HOST[count]),count,*fire_cmd)).start()
                        time.sleep(1.5)
                        count += 1
            except Exception:
                pass


        elif x == 3:  
            f_xlsx = pd.read_excel(FILE, engine = "openpyxl", sheet_name=1)

            HOSTNAME = f_xlsx['HOSTNAME']
            HOST = f_xlsx['HOST']

            PATH = './NAC' + '(' + str(len(HOST)) + ')'
            createDir(PATH)
            try:
                count = 0
                while count < len(HOST):
                    for j in range(1):
                        threading.Thread(target=nac_connect, args=(str(HOST[count]),count,*nac_cmd)).start()
                        time.sleep(1.5)
                        count += 1
            except Exception:
                pass
           
        elif x == 4:  
            f_xlsx = pd.read_excel(FILE, engine = "openpyxl", sheet_name=1)

            HOSTNAME = f_xlsx['HOSTNAME']
            HOST = f_xlsx['HOST']

            PATH = './Cisco AP' + '(' + str(len(HOST)) + ')'
            createDir(PATH)
            try:
                count = 0
                while count < len(HOST):
                    for j in range(1):
                        threading.Thread(target=ssh_connect, args=(str(HOST[count]),count,*ap_cmd)).start()
                        time.sleep(1.5)
                        count += 1
            except Exception:
                pass

        else:
            f_xlsx = pd.read_excel(FILE, engine = "openpyxl", sheet_name=1)

            HOSTNAME = f_xlsx['HOSTNAME']
            HOST = f_xlsx['HOST']

            PATH = './Aruba AP' + '(' + str(len(HOST)) + ')'
            createDir(PATH)
            try:
                count = 0
                while count < len(HOST):
                    for j in range(1):
                        threading.Thread(target=ssh_connect, args=(str(HOST[count]),count,*ap_cmd)).start()
                        time.sleep(1.5)
                        count += 1
            except Exception:
                pass
'''

xlsx_read(XLSX_FILE_NAME,5)