本文实例讲述了python实现的文件同步服务器。分享给大家供大家参考。具体实现方法如下:
服务端使用asyncore, 收到文件后保存到本地。
客户端使用pyinotify监视目录的变化 ,把变动的文件发送到服务端。
重点:
1. 使用structs打包发送文件的信息,服务端收到后,根据文件信息来接收客户端传送过来的文件。
2. 客户端使用多线程,pyinotify监视到文件变化,放到队列中,由另外一个线程发送。
上代码:
服务端:
# receive file from client and store them into file use asyncore.# #/usr/bin/python #coding: utf-8 import asyncore import socket from socket import errno import logging import time import sys import struct import os import fcntl import threading from rrd_graph import makegraph try: import rrdtool except (importerror, importwarnning): print hope this information can help you: print can not find pyinotify module in sys path, just run [apt-get install python-rrdtool] in ubuntu. sys.exit(1) class requesthandler(asyncore.dispatcher): def __init__(self, sock, map=none, chunk_size=1024): self.logger = logging.getlogger('%s-%s' % (self.__class__.__name__, str(sock.getsockname()))) self.chunk_size = chunk_size asyncore.dispatcher.__init__(self,sock,map) self.data_to_write = list() def readable(self): #self.logger.debug(readable() called.) return true def writable(self): response = (not self.connected) or len(self.data_to_write) #self.logger.debug('writable() -> %s data length -> %s' % (response, len(self.data_to_write))) return response def handle_write(self): data = self.data_to_write.pop() #self.logger.debug(handle_write()->%s size: %s,data.rstrip('\r\n'),len(data)) sent = self.send(data[:self.chunk_size]) if sent self.chunk_size: times = filesize / self.chunk_size first_part_size = times * self.chunk_size second_part_size = filesize % self.chunk_size while 1: try: data = self.recv(self.chunk_size) #self.logger.debug(handle_read()->%s size.,len(data)) except socket.error,e: if e.args[0] == errno.ewouldblock: print ewouldblock time.sleep(1) else: #self.logger.debug(error happend while receive data: %s % e) break else: self.fd.write(data) self.fd.flush() self.writen_size += len(data) if self.writen_size == first_part_size: break #receive the packet at last while 1: try: data = self.recv(second_part_size) #self.logger.debug(handle_read()->%s size.,len(data)) except socket.error,e: if e.args[0] == errno.ewouldblock: print ewouldblock time.sleep(1) else: #self.logger.debug(error happend while receive data: %s % e) break else: self.fd.write(data) self.fd.flush() self.writen_size += len(data) if len(data) == second_part_size: break elif filesize %s size.,len(data)) except socket.error,e: if e.args[0] == errno.ewouldblock: print ewouldblock time.sleep(1) else: #self.logger.debug(error happend while receive data: %s % e) break else: self.fd.write(data) self.fd.flush() self.writen_size += len(data) if len(data) == filesize: break self.logger.debug(file size: %s % self.writen_size) class syncserver(asyncore.dispatcher): def __init__(self,host,port): asyncore.dispatcher.__init__(self) self.debug = true self.logger = logging.getlogger(self.__class__.__name__) self.create_socket(socket.af_inet,socket.sock_stream) self.set_reuse_addr() self.bind((host,port)) self.listen(2000) def handle_accept(self): client_socket = self.accept() if client_socket is none: pass else: sock, addr = client_socket #self.logger.debug(incoming connection from %s % repr(addr)) handler = requesthandler(sock=sock) class runserver(threading.thread): def __init__(self): super(runserver,self).__init__() self.daemon = false def run(self): server = syncserver('',9999) asyncore.loop(use_poll=true) def startserver(): logging.basicconfig(level=logging.debug, format='%(name)s: %(message)s', ) runserver().start() #makegraph().start() if __name__ == '__main__': startserver()
客户端:
# monitor path with inotify(python module), and send them to remote server.# # use sendfile(2) instead of send function in socket, if we have python-sendfile installed.# import socket import time import os import sys import struct import threading import queue try: import pyinotify except (importerror, importwarnning): print hope this information can help you: print can not find pyinotify module in sys path, just run [apt-get install python-pyinotify] in ubuntu. sys.exit(1) try: from sendfile import sendfile except (importerror,importwarnning): pass filetype_filter = [.rrd,.xml] def check_filetype(pathname): for suffix_name in filetype_filter: if pathname[-4:] == suffix_name: return true try: end_string = pathname.rsplit('.')[-1:][0] end_int = int(end_string) except: pass else: # means pathname endwith digit return false class sync_file(threading.thread): def __init__(self, addr, events_queue): super(sync_file,self).__init__() self.daemon = false self.queue = events_queue self.addr = addr self.chunk_size = 1024 def run(self): while 1: event = self.queue.get() if check_filetype(event.pathname): print time.asctime(),event.maskname, event.pathname filepath = event.path.split('/')[-1:][0] filename = event.name filesize = os.stat(os.path.join(event.path, filename)).st_size sock = socket.socket(socket.af_inet, socket.sock_stream) filepath_len = len(filepath) filename_len = len(filename) sock.connect(self.addr) offset = 0 data = struct.pack(!ll128s128sl,filepath_len, filename_len, filepath,filename,filesize) fd = open(event.pathname,'rb') sock.sendall(data) if sendfile in sys.modules: # print use sendfile(2) while 1: sent = sendfile(sock.fileno(), fd.fileno(), offset, self.chunk_size) if sent == 0: break offset += sent else: # print use original send function while 1: data = fd.read(self.chunk_size) if not data: break sock.send(data) sock.close() fd.close() class eventhandler(pyinotify.processevent): def __init__(self, events_queue): super(eventhandler,self).__init__() self.events_queue = events_queue def my_init(self): pass def process_in_close_write(self,event): self.events_queue.put(event) def process_in_moved_to(self,event): self.events_queue.put(event) def start_notify(path, mask, sync_server): events_queue = queue.queue() sync_thread_pool = list() for i in range(500): sync_thread_pool.append(sync_file(sync_server, events_queue)) for i in sync_thread_pool: i.start() wm = pyinotify.watchmanager() notifier = pyinotify.notifier(wm,eventhandler(events_queue)) wdd = wm.add_watch(path,mask,rec=true) notifier.loop() def do_notify(): perfdata_path = '/var/lib/pnp4nagios/perfdata' mask = pyinotify.in_close_write|pyinotify.in_moved_to sync_server = ('127.0.0.1',9999) start_notify(perfdata_path,mask,sync_server) if __name__ == '__main__': do_notify()
python监视线程池
#!/usr/bin/python import threading import time class monitor(threading.thread): def __init__(self, *args,**kwargs): super(monitor,self).__init__() self.daemon = false self.args = args self.kwargs = kwargs self.pool_list = [] def run(self): print self.args print self.kwargs for name,value in self.kwargs.items(): obj = value[0] temp = {} temp[name] = obj self.pool_list.append(temp) while 1: print self.pool_list for name,value in self.kwargs.items(): obj = value[0] parameters = value[1:] died_threads = self.cal_died_thread(self.pool_list,name) print died_threads, died_threads if died_threads >0: for i in range(died_threads): print start %s thread... % name t = obj[0].__class__(*parameters) t.start() self.add_to_pool_list(t,name) else: break time.sleep(0.5) def cal_died_thread(self,pool_list,name): i = 0 for item in self.pool_list: for k,v in item.items(): if name == k: lists = v for t in lists: if not t.isalive(): self.remove_from_pool_list(t) i +=1 return i def add_to_pool_list(self,obj,name): for item in self.pool_list: for k,v in item.items(): if name == k: v.append(obj) def remove_from_pool_list(self, obj): for item in self.pool_list: for k,v in item.items(): try: v.remove(obj) except: pass else: return
使用方法:
rrds_queue = queue.queue() make_rrds_pool = [] for i in range(5): make_rrds_pool.append(makerrds(rrds_queue)) for i in make_rrds_pool: i.start() make_graph_pool = [] for i in range(5): make_graph_pool.append(makegraph(rrds_queue)) for i in make_graph_pool: i.start() monitor = monitor(make_rrds_pool=(make_rrds_pool, rrds_queue), \ make_graph_pool=(make_graph_pool, rrds_queue)) monitor.start()
解析:
1. 接受字典参数,value为一个元组,第一个元素是线程池,后面的都是参数。
2. 每0.5秒监视线程池中的线程数量,如果线程死掉了,记录死掉线程的数目,再启动同样数量的线程。
3. 如果没有线程死去,则什么也不做。
从外部调用django模块
import os import sys sys.path.insert(0,'/data/cloud_manage') from django.core.management import setup_environ import settings setup_environ(settings) from common.monitor import monitor from django.db import connection, transaction
前提就是,要新建一个django的project,这里我们新建了一个cloud_manage.
这样不仅可以调用django自身的模块,还能调用project本身的东西。
希望本文所述对大家的python程序设计有所帮助。
