需要 python 3.4+,一个参数用来选择测试搜索服务还是 gae 服务。测试 gae 服务的话需要先修改开头的两个变量。从标准输入读取 ip 地址或者 ip 段(形如 192.168.0.0/16)列表,每行一个。可用 ip 输出到标准输出。实时测试结果输出到标准错误。50 线程并发。
checkgoogleip
#!/usr/bin/env python3 import sysfrom ipaddress import ipv4networkimport http.client as clientfrom concurrent.futures import threadpoolexecutorimport argparseimport sslimport socket # 先按自己的情况修改以下几行app_id = 'your_id_here'app_path = '/fetch.py' context = ssl.sslcontext(ssl.protocol_tlsv1)context.verify_mode = ssl.cert_requiredcontext.load_verify_locations('/etc/ssl/certs/ca-certificates.crt') class httpsconnection(client.httpsconnection): def __init__(self, *args, hostname=none, **kwargs): self._hostname = hostname super().__init__(*args, **kwargs) def connect(self): super(client.httpsconnection, self).connect() if self._tunnel_host: server_hostname = self._tunnel_host else: server_hostname = self._hostname or self.host sni_hostname = server_hostname if ssl.has_sni else none self.sock = self._context.wrap_socket(self.sock, server_hostname=sni_hostname) if not self._context.check_hostname and self._check_hostname: try: ssl.match_hostname(self.sock.getpeercert(), server_hostname) except exception: self.sock.shutdown(socket.shut_rdwr) self.sock.close() raise def check_ip_p(ip, func): if func(ip): print(ip, flush=true) def check_for_gae(ip): return _check(app_id + '.appspot.com', app_path, ip) def check_for_search(ip): return _check('www.google.com', '/', ip) def _check(host, path, ip): for chance in range(1,-1,-1): try: conn = httpsconnection( ip, timeout = 5, context = context, hostname = host, ) conn.request('get', path, headers = { 'host': host, }) response = conn.getresponse() if response.status < 400: print('good:', ip, file=sys.stderr) else: raise exception('http error %s %s' % ( response.status, response.reason)) return true except keyboardinterrupt: raise except exception as e: if isinstance(e, ssl.certificateerror): print('warn: %s is not google\'s!' % ip, file=sys.stderr) chance = 0 if chance == 0: print('bad :', ip, e, file=sys.stderr) return false else: print('re :', ip, e, file=sys.stderr) def main(): parser = argparse.argumentparser(description='check google ips') parser.add_argument('service', choices=['search', 'gae'], help='service to check') args = parser.parse_args() func = globals()['check_for_' + args.service] count = 0 with threadpoolexecutor(max_workers=50) as executor: for l in sys.stdin: l = l.strip() if '/' in l: for ip in ipv4network(l).hosts(): executor.submit(check_ip_p, str(ip), func) count += 1 else: executor.submit(check_ip_p, l, func) count += 1 print('%d ip checked.' % count) if __name__ == '__main__': main()
