您好,欢迎访问一九零五行业门户网

超硬核!11个非常实用的 Python 和 Shell 拿来就用脚本实例!

python 脚本部分实例:企业微信告警、ftp 客户端、ssh 客户端、saltstack 客户端、vcenter 客户端、获取域名 ssl 证书过期时间、发送今天的天气预报以及未来的天气趋势图;
shell 脚本部分实例:svn 完整备份、zabbix 监控用户密码过期、构建本地 yum 以及上篇文章中有读者的需求(负载高时,查出占用比较高的进程脚本并存储或推送通知);
篇幅有些长,还请大家耐心翻到文末,毕竟有彩蛋。
python 脚本部分企业微信告警此脚本通过企业微信应用,进行微信告警,可用于 zabbix 监控。
# -*- coding: utf-8 -*- import requests import json class dlf: def __init__(self, corpid, corpsecret): self.url = https://qyapi.weixin.qq.com/cgi-bin self.corpid = corpid self.corpsecret = corpsecret self._token = self._get_token() def _get_token(self): ''' 获取企业微信api接口的access_token :return: ''' token_url = self.url + /gettoken?corpid=%s&corpsecret=%s %(self.corpid, self.corpsecret) try: res = requests.get(token_url).json() token = res['access_token'] return token except exception as e: return str(e) def _get_media_id(self, file_obj): get_media_url = self.url + /media/upload?access_token={}&type=file.format(self._token) data = {media: file_obj} try: res = requests.post(url=get_media_url, files=data) media_id = res.json()['media_id'] return media_id except exception as e: return str(e) def send_text(self, agentid, content, touser=none, toparty=none): send_msg_url = self.url + /message/send?access_token=%s % (self._token) send_data = { touser: touser, toparty: toparty, msgtype: text, agentid: agentid, text: { content: content } } try: res = requests.post(send_msg_url, data=json.dumps(send_data)) except exception as e: return str(e) def send_image(self, agentid, file_obj, touser=none, toparty=none): media_id = self._get_media_id(file_obj) send_msg_url = self.url + /message/send?access_token=%s % (self._token) send_data = { touser: touser, toparty: toparty, msgtype: image, agentid: agentid, image: { media_id: media_id } } try: res = requests.post(send_msg_url, data=json.dumps(send_data)) except exception as e: return str(e)
ftp 客户端通过 ftplib 模块操作 ftp 服务器,进行上传下载等操作。
# -*- coding: utf-8 -*- from ftplib import ftp from os import path import copy class ftpclient: def __init__(self, host, user, passwd, port=21): self.host = host self.user = user self.passwd = passwd self.port = port self.res = {'status': true, 'msg': none} self._ftp = none self._login() def _login(self): ''' 登录ftp服务器 :return: 连接或登录出现异常时返回错误信息 ''' try: self._ftp = ftp() self._ftp.connect(self.host, self.port, timeout=30) self._ftp.login(self.user, self.passwd) except exception as e: return e def upload(self, localpath, remotepath=none): ''' 上传ftp文件 :param localpath: local file path :param remotepath: remote file path :return: ''' if not localpath: return 'please select a local file. ' # 读取本地文件 # fp = open(localpath, 'rb') # 如果未传递远程文件路径,则上传到当前目录,文件名称同本地文件 if not remotepath: remotepath = path.basename(localpath) # 上传文件 self._ftp.storbinary('stor ' + remotepath, localpath) # fp.close() def download(self, remotepath, localpath=none): ''' localpath :param localpath: local file path :param remotepath: remote file path :return: ''' if not remotepath: return 'please select a remote file. ' # 如果未传递本地文件路径,则下载到当前目录,文件名称同远程文件 if not localpath: localpath = path.basename(remotepath) # 如果localpath是目录的话就和remotepath的basename拼接 if path.isdir(localpath): localpath = path.join(localpath, path.basename(remotepath)) # 写入本地文件 fp = open(localpath, 'wb') # 下载文件 self._ftp.retrbinary('retr ' + remotepath, fp.write) fp.close() def nlst(self, dir='/'): ''' 查看目录下的内容 :return: 以列表形式返回目录下的所有内容 ''' files_list = self._ftp.nlst(dir) return files_list def rmd(self, dir=none): ''' 删除目录 :param dir: 目录名称 :return: 执行结果 ''' if not dir: return 'please input dirname' res = copy.deepcopy(self.res) try: del_d = self._ftp.rmd(dir) res['msg'] = del_d except exception as e: res['status'] = false res['msg'] = str(e) return res def mkd(self, dir=none): ''' 创建目录 :param dir: 目录名称 :return: 执行结果 ''' if not dir: return 'please input dirname' res = copy.deepcopy(self.res) try: mkd_d = self._ftp.mkd(dir) res['msg'] = mkd_d except exception as e: res['status'] = false res['msg'] = str(e) return res def del_file(self, filename=none): ''' 删除文件 :param filename: 文件名称 :return: 执行结果 ''' if not filename: return 'please input filename' res = copy.deepcopy(self.res) try: del_f = self._ftp.delete(filename) res['msg'] = del_f except exception as e: res['status'] = false res['msg'] = str(e) return res def get_file_size(self, filenames=[]): ''' 获取文件大小,单位是字节 判断文件类型 :param filename: 文件名称 :return: 执行结果 ''' if not filenames: return {'msg': 'this is an empty directory'} res_l = [] for file in filenames: res_d = {} # 如果是目录或者文件不存在就会报错 try: size = self._ftp.size(file) type = 'f' except: # 如果是路径的话size显示 - , file末尾加/ (/dir/) size = '-' type = 'd' file = file + '/' res_d['filename'] = file res_d['size'] = size res_d['type'] = type res_l.append(res_d) return res_l def rename(self, old_name=none, new_name=none): ''' 重命名 :param old_name: 旧的文件或者目录名称 :param new_name: 新的文件或者目录名称 :return: 执行结果 ''' if not old_name or not new_name: return 'please input old_name and new_name' res = copy.deepcopy(self.res) try: rename_f = self._ftp.rename(old_name, new_name) res['msg'] = rename_f except exception as e: res['status'] = false res['msg'] = str(e) return res def close(self): ''' 退出ftp连接 :return: ''' try: # 向服务器发送quit命令 self._ftp.quit() except exception: return 'no response from server' finally: # 客户端单方面关闭连接 self._ftp.close()
ssh 客户端此脚本仅用于通过 key 连接,如需要密码连接,简单修改下即可。
# -*- coding: utf-8 -*- import paramiko class sshclient: def __init__(self, host, port, user, pkey): self.ssh_host = host self.ssh_port = port self.ssh_user = user self.private_key = paramiko.rsakey.from_private_key_file(pkey) self.ssh = none self._connect() def _connect(self): self.ssh = paramiko.sshclient() self.ssh.set_missing_host_key_policy(paramiko.autoaddpolicy()) try: self.ssh.connect(hostname=self.ssh_host, port=self.ssh_port, username=self.ssh_user, pkey=self.private_key, timeout=10) except: return 'ssh connect fail' def execute_command(self, command): stdin, stdout, stderr = self.ssh.exec_command(command) out = stdout.read() err = stderr.read() return out, err def close(self): self.ssh.close()
saltstack 客户端通过 api 对 saltstack 服务端进行操作,执行命令。
#!/usr/bin/env python # -*- coding:utf-8 -*- import requests import json import copy class saltapi: 定义salt api接口的类 初始化获得token def __init__(self): self.url = http://172.85.10.21:8000/ self.username = saltapi self.password = saltapi self.headers = {content-type: application/json} self.params = {'client': 'local', 'fun': none, 'tgt': none, 'arg': none} self.login_url = self.url + login self.login_params = {'username': self.username, 'password': self.password, 'eauth': 'pam'} self.token = self.get_data(self.login_url, self.login_params)['token'] self.headers['x-auth-token'] = self.token def get_data(self, url, params): ''' 请求url获取数据 :param url: 请求的url地址 :param params: 传递给url的参数 :return: 请求的结果 ''' send_data = json.dumps(params) request = requests.post(url, data=send_data, headers=self.headers) response = request.json() result = dict(response) return result['return'][0] def get_auth_keys(self): ''' 获取所有已经认证的key :return: ''' data = copy.deepcopy(self.params) data['client'] = 'wheel' data['fun'] = 'key.list_all' result = self.get_data(self.url, data) try: return result['data']['return']['minions'] except exception as e: return str(e) def get_grains(self, tgt, arg='id'): 获取系统基础信息 :tgt: 目标主机 :return: data = copy.deepcopy(self.params) if tgt: data['tgt'] = tgt else: data['tgt'] = '*' data['fun'] = 'grains.item' data['arg'] = arg result = self.get_data(self.url, data) return result def execute_command(self, tgt, fun='cmd.run', arg=none, tgt_type='list', salt_async=false): 执行saltstack 模块命令,类似于salt '*' cmd.run 'command' :param tgt: 目标主机 :param fun: 模块方法 可为空 :param arg: 传递参数 可为空 :return: 执行结果 data = copy.deepcopy(self.params) if not tgt: return {'status': false, 'msg': 'target host not exist'} if not arg: data.pop('arg') else: data['arg'] = arg if tgt != '*': data['tgt_type'] = tgt_type if salt_async: data['client'] = 'local_async' data['fun'] = fun data['tgt'] = tgt result = self.get_data(self.url, data) return result def jobs(self, fun='detail', jid=none): 任务 :param fun: active, detail :param jod: job id :return: 任务执行结果 data = {'client': 'runner'} data['fun'] = fun if fun == 'detail': if not jid: return {'success': false, 'msg': 'job id is none'} data['fun'] = 'jobs.lookup_jid' data['jid'] = jid else: return {'success': false, 'msg': 'fun is active or detail'} result = self.get_data(self.url, data) return result
vcenter 客户端通过官方 sdk 对 vcenter 进行日常操作,此脚本是我用于 cmdb 平台的,自动获取主机信息,存入数据库。
from pyvim.connect import smartconnect, disconnect, smartconnectnossl from pyvmomi import vim from asset import models import atexit class vmware: def __init__(self, ip, user, password, port, idc, vcenter_id): self.ip = ip self.user = user self.password = password self.port = port self.idc_id = idc self.vcenter_id = vcenter_id def get_obj(self, content, vimtype, name=none): ''' 列表返回,name 可以指定匹配的对象 ''' container = content.viewmanager.createcontainerview(content.rootfolder, vimtype, true) obj = [ view for view in container.view ] return obj def get_esxi_info(self): # 宿主机信息 esxi_host = {} res = {connect_status: true, msg: none} try: # connect this thing si = smartconnectnossl(host=self.ip, user=self.user, pwd=self.password, port=self.port, connectionpooltimeout=60) except exception as e: res['connect_status'] = false try: res['msg'] = (%s caught vmodl fault : + e.msg) % (self.ip) except exception as e: res['msg'] = '%s: connection error' % (self.ip) return res # disconnect this thing atexit.register(disconnect, si) content = si.retrievecontent() esxi_obj = self.get_obj(content, [vim.hostsystem]) for esxi in esxi_obj: esxi_host[esxi.name] = {} esxi_host[esxi.name]['idc_id'] = self.idc_id esxi_host[esxi.name]['vcenter_id'] = self.vcenter_id esxi_host[esxi.name]['server_ip'] = esxi.name esxi_host[esxi.name]['manufacturer'] = esxi.summary.hardware.vendor esxi_host[esxi.name]['server_model'] = esxi.summary.hardware.model for i in esxi.summary.hardware.otheridentifyinginfo: if isinstance(i, vim.host.systemidentificationinfo): esxi_host[esxi.name]['server_sn'] = i.identifiervalue # 系统名称 esxi_host[esxi.name]['system_name'] = esxi.summary.config.product.fullname # cpu总核数 esxi_cpu_total = esxi.summary.hardware.numcputhreads # 内存总量 gb esxi_memory_total = esxi.summary.hardware.memorysize / 1024 / 1024 / 1024 # 获取硬盘总量 gb esxi_disk_total = 0 for ds in esxi.datastore: esxi_disk_total += ds.summary.capacity / 1024 / 1024 / 1024 # 默认配置4核8g100g,根据这个配置计算剩余可分配虚拟机 default_configure = { 'cpu': 4, 'memory': 8, 'disk': 100 } esxi_host[esxi.name]['vm_host'] = [] vm_usage_total_cpu = 0 vm_usage_total_memory = 0 vm_usage_total_disk = 0 # 虚拟机信息 for vm in esxi.vm: host_info = {} host_info['vm_name'] = vm.name host_info['power_status'] = vm.runtime.powerstate host_info['cpu_total_kernel'] = str(vm.config.hardware.numcpu) + '核' host_info['memory_total'] = str(vm.config.hardware.memorymb) + 'mb' host_info['system_info'] = vm.config.guestfullname disk_info = '' disk_total = 0 for d in vm.config.hardware.device: if isinstance(d, vim.vm.device.virtualdisk): disk_total += d.capacityinkb / 1024 / 1024 disk_info += d.deviceinfo.label + : +str((d.capacityinkb) / 1024 / 1024) + ' gb' + ',' host_info['disk_info'] = disk_info esxi_host[esxi.name]['vm_host'].append(host_info) # 计算当前宿主机可用容量:总量 - 已分配的 if host_info['power_status'] == 'poweredon': vm_usage_total_cpu += vm.config.hardware.numcpu vm_usage_total_disk += disk_total vm_usage_total_memory += (vm.config.hardware.memorymb / 1024) esxi_cpu_free = esxi_cpu_total - vm_usage_total_cpu esxi_memory_free = esxi_memory_total - vm_usage_total_memory esxi_disk_free = esxi_disk_total - vm_usage_total_disk esxi_host[esxi.name]['cpu_info'] = 'total: %d核, free: %d核' % (esxi_cpu_total, esxi_cpu_free) esxi_host[esxi.name]['memory_info'] = 'total: %dgb, free: %dgb' % (esxi_memory_total, esxi_memory_free) esxi_host[esxi.name]['disk_info'] = 'total: %dgb, free: %dgb' % (esxi_disk_total, esxi_disk_free) # 计算cpu 内存 磁盘按照默认资源分配的最小值,即为当前可分配资源 if esxi_cpu_free < 4 or esxi_memory_free < 8 or esxi_disk_free < 100: free_allocation_vm_host = 0 else: free_allocation_vm_host = int(min( [ esxi_cpu_free / default_configure['cpu'], esxi_memory_free / default_configure['memory'], esxi_disk_free / default_configure['disk'] ] )) esxi_host[esxi.name]['free_allocation_vm_host'] = free_allocation_vm_host esxi_host['connect_status'] = true return esxi_host def write_to_db(self): esxi_host = self.get_esxi_info() # 连接失败 if not esxi_host['connect_status']: return esxi_host del esxi_host['connect_status'] for machine_ip in esxi_host: # 物理机信息 esxi_host_dict = esxi_host[machine_ip] # 虚拟机信息 virtual_host = esxi_host[machine_ip]['vm_host'] del esxi_host[machine_ip]['vm_host'] obj = models.esxihost.objects.create(**esxi_host_dict) obj.save() for host_info in virtual_host: host_info['management_host_id'] = obj.id obj2 = models.virtualhost.objects.create(**host_info) obj2.save()
获取域名 ssl 证书过期时间用于 zabbix 告警
import re import sys import time import subprocess from datetime import datetime from io import stringio def main(domain): f = stringio() comm = fcurl -ivs https://{domain} --connect-timeout 10 result = subprocess.getstatusoutput(comm) f.write(result[1]) try: m = re.search('start date: (.*?)n.*?expire date: (.*?)n.*?common name: (.*?)n.*?issuer: cn=(.*?)n', f.getvalue(), re.s) start_date = m.group(1) expire_date = m.group(2) common_name = m.group(3) issuer = m.group(4) except exception as e: return 999999999 # time 字符串转时间数组 start_date = time.strptime(start_date, %b %d %h:%m:%s %y gmt) start_date_st = time.strftime(%y-%m-%d %h:%m:%s, start_date) # datetime 字符串转时间数组 expire_date = datetime.strptime(expire_date, %b %d %h:%m:%s %y gmt) expire_date_st = datetime.strftime(expire_date,%y-%m-%d %h:%m:%s) # 剩余天数 remaining = (expire_date-datetime.now()).days return remaining if __name__ == __main__: domain = sys.argv[1] remaining_days = main(domain) print(remaining_days)
发送今天的天气预报以及未来的天气趋势图
此脚本用于给老婆大人发送今天的天气预报以及未来的天气趋势图,现在微信把网页端禁止了,没法发送到微信了,我是通过企业微信进行通知的,需要把你老婆大人拉到企业微信,无兴趣的小伙伴跳过即可。
# -*- coding: utf-8 -*- import requests import json import datetime def weather(city): url = http://wthrcdn.etouch.cn/weather_mini?city=%s % city try: data = requests.get(url).json()['data'] city = data['city'] ganmao = data['ganmao'] today_weather = data['forecast'][0] res = 老婆今天是{}n今天天气概况n城市: {:<10}n时间: {:<10}n高温: {:<10}n低温: {:<10}n风力: {:<10}n风向: {:&1 # notes:将脚本加入crontab中,每天定时执行 # description:svn完全备份 set -e src_path=/opt/svndata dst_path=/data/svnbackup log_file=$dst_path/logs/svn_backup.log svn_backup_c=/bin/svnadmin hotcopy svn_look_c=/bin/svnlook youngest today=$(date +'%f') cd $src_path all_repos=$(find ./ -maxdepth 1 -type d ! -name 'httpd' -a ! -name 'bak' | tr -d './') # 创建备份目录,备份脚本日志目录 test -d $dst_path || mkdir -p $dst_path test -d $dst_path/logs || mkdir $dst_path/logs test -d $dst_path/$today || mkdir $dst_path/$today # 备份repos文件 for repo in $all_repos do $svn_backup_c $src_path/$repo $dst_path/$today/$repo # 判断备份是否完成 if $svn_look_c $dst_path/$today/$repo;then echo $today: $repo backup success >> $log_file else echo $today: $repo backup fail >> $log_file fi done # # 备份用户密码文件和权限文件 cp -p authz access.conf $dst_path/$today # 日志文件转储 mv $log_file $log_file-$today # 删除七天前的备份 seven_days_ago=$(date -d 7 days ago +'%f') rm -rf $dst_path/$seven_days_ago
zabbix 监控用户密码过期用于 zabbix 监控 linux 系统用户(shell 为 /bin/bash 和 /bin/sh)密码过期,密码有效期剩余 7 天触发加自动发现用户。
#!/bin/bash diskarray=(`awk -f':' '$nf ~ //bin/bash/||//bin/sh/{print $1}' /etc/passwd`) length=${#diskarray[@]} printf {n printf't'data:[ for ((i=0;i> $1 else echo rsync fail >> $1 fi } check_dir $dir $logdir $centos6base $centos7base $centos6epel $centos7epel $centos6salt $centos7salt $centos6update $centos7update $centos6docker $centos7docker $centos6mysql5_7 $centos7mysql5_7 $centos6mysql8_0 $centos7mysql8_0 # base yumrepo #$rsynccommand $mirrordomain/repo/centos/6/os/x86_64/ $centos6base >> $logdir/centos6base.log 2>&1 # check_rsync_status $logdir/centos6base.log $rsynccommand $mirrordomain/repo/centos/7/os/x86_64/ $centos7base >> $logdir/centos7base.log 2>&1 check_rsync_status $logdir/centos7base.log # epel yumrepo # $rsynccommand $mirrordomain/repo/epel/6/x86_64/ $centos6epel >> $logdir/centos6epel.log 2>&1 # check_rsync_status $logdir/centos6epel.log $rsynccommand $mirrordomain/repo/epel/7/x86_64/ $centos7epel >> $logdir/centos7epel.log 2>&1 check_rsync_status $logdir/centos7epel.log # saltstack yumrepo # $rsynccommand $mirrordomain/repo/salt/yum/redhat/6/x86_64/ $centos6salt >> $logdir/centos6salt.log 2>&1 # ln -s $centos6salt/archive/$(ls $centos6salt/archive | tail -1) $centos6salt/latest # check_rsync_status $logdir/centos6salt.log $rsynccomman $mirrordomain/repo/salt/yum/redhat/7/x86_64/ $centos7salt >> $logdir/centos7salt.log 2>&1 check_rsync_status $logdir/centos7salt.log # ln -s $centos7salt/archive/$(ls $centos7salt/archive | tail -1) $centos7salt/latest # docker yumrepo $rsynccommand $mirrordomain/repo/docker-ce/linux/centos/7/x86_64/stable/ $centos7docker >> $logdir/centos7docker.log 2>&1 check_rsync_status $logdir/centos7docker.log # centos update yumrepo # $rsynccommand $mirrordomain/repo/centos/6/updates/x86_64/ $centos6update >> $logdir/centos6update.log 2>&1 # check_rsync_status $logdir/centos6update.log $rsynccommand $mirrordomain/repo/centos/7/updates/x86_64/ $centos7update >> $logdir/centos7update.log 2>&1 check_rsync_status $logdir/centos7update.log # mysql 5.7 yumrepo # $rsynccommand $mirrordomain/repo/mysql-repo/yum/mysql-5.7-community/el/6/x86_64/ $centos6mysql5_7 >> $logdir/centos6mysql5.7.log 2>&1 # check_rsync_status $logdir/centos6mysql5.7.log $rsynccommand $mirrordomain/repo/mysql-repo/yum/mysql-5.7-community/el/7/x86_64/ $centos7mysql5_7 >> $logdir/centos7mysql5.7.log 2>&1 check_rsync_status $logdir/centos7mysql5.7.log # mysql 8.0 yumrepo # $rsynccommand $mirrordomain/repo/mysql-repo/yum/mysql-8.0-community/el/6/x86_64/ $centos6mysql8_0 >> $logdir/centos6mysql8.0.log 2>&1 # check_rsync_status $logdir/centos6mysql8.0.log $rsynccommand $mirrordomain/repo/mysql-repo/yum/mysql-8.0-community/el/7/x86_64/ $centos7mysql8_0 >> $logdir/centos7mysql8.0.log 2>&1 check_rsync_status $logdir/centos7mysql8.0.log
读者需求解答负载高时,查出占用比较高的进程脚本并存储或推送通知
这部分内容是上篇 shell 脚本实例中底部读者留言的需求,如下:
#!/bin/bash # 物理cpu个数 physical_cpu_count=$(egrep 'physical id' /proc/cpuinfo | sort | uniq | wc -l) # 单个物理cpu核数 physical_cpu_cores=$(egrep 'cpu cores' /proc/cpuinfo | uniq | awk '{print $nf}') # 总核数 total_cpu_cores=$((physical_cpu_count*physical_cpu_cores)) # 分别是一分钟、五分钟、十五分钟负载的阈值,其中有一项超过阈值才会触发 one_min_load_threshold=$total_cpu_cores five_min_load_threshold=$(awk 'begin {print '$total_cpu_cores' * 0.8}') fifteen_min_load_threshold=$(awk 'begin {print '$total_cpu_cores' * 0.7}') # 分别是分钟、五分钟、十五分钟负载平均值 one_min_load=$(uptime | awk '{print $(nf-2)}' | tr -d ',') five_min_load=$(uptime | awk '{print $(nf-1)}' | tr -d ',') fifteen_min_load=$(uptime | awk '{print $nf}' | tr -d ',') # 获取当前cpu 内存 磁盘io信息,并写入日志文件 # 如果需要发送消息或者调用其他,请自行编写函数即可 get_info(){ log_dir=cpu_high_script_log test -d $log_dir || mkdir $log_dir ps -eo user,pid,%cpu,stat,time,command --sort -%cpu | head -10 > $log_dir/cpu_top10.log ps -eo user,pid,%mem,rss,vsz,stat,time,command --sort -%mem | head -10 > $log_dir/mem_top10.log iostat -dx 1 10 > $log_dir/disk_io_10.log } export -f get_info echo $one_min_load $one_min_load_threshold $five_min_load $five_min_load_threshold $fifteen_min_load $fifteen_min_load_threshold | awk '{ if ($1>=$2 || $3>=$4 || $5>=$6) system(get_info) }'
以上,就是今天分享的全部内容了。
希望大家通过这些案例能够学以致用,结合自身的实际场景进行运用,从而提高自己的工作效率。
以上就是超硬核!11个非常实用的 python 和 shell 拿来就用脚本实例!的详细内容。
其它类似信息

推荐信息