您好,欢迎访问一九零五行业门户网

使用python3实现ftp服务功能实例(服务端 For Linux)

这篇文章主要介绍了python3实现ftp服务功能,服务端 for linux,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
本文实例为大家分享了python3实现ftp服务功能的具体代码,供大家参考,具体内容如下
功能介绍:
可执行的命令:
ls
pwd
cd
put
rm
get
mkdir
1、用户加密认证
2、允许多用户同时登陆
3、每个用户有自己的家目录,且只可以访问自己的家目录
4、运行在自己家目录下随意切换目录
5、允许上传下载文件,且文件一致
6、传输过程中显示进度条
server main 代码:
# author by andy # _*_ coding:utf-8 _*_ import os, sys, json, hashlib, socketserver, time base_dir = os.path.dirname(os.path.dirname(os.path.abspath(file))) sys.path.append(base_dir) from conf import userdb_set class ftp_server(socketserver.baserequesthandler):  user_home_dir = ''  def auth(self, *args):   '''验证用户名及密码'''   cmd_dic = args[0]   username = cmd_dic[username]   password = cmd_dic[password]   f = open(userdb_set.userdb_set(), 'r')   user_info = json.load(f)   if username in user_info.keys():    if password == user_info[username]:     self.request.send('0'.encode())     os.chdir('/home/%s' % username)     self.user_home_dir = os.popen('pwd').read().strip()     data = %s login successed % username     self.loging(data)    else:     self.request.send('1'.encode())     data = %s login failed % username     self.loging(data)     f.close   else:    self.request.send('1'.encode())    data = %s login failed % username    self.loging(data)    f.close    ##########################################  def get(self, *args):   '''给客户端传输文件'''   request_code = {    '0': 'file is ready to get',    '1': 'file not found!'   }   cmd_dic = args[0]   self.loging(json.dumps(cmd_dic))   filename = cmd_dic[filename]   if os.path.isfile(filename):    self.request.send('0'.encode('utf-8')) # 确认文件存在    self.request.recv(1024)    self.request.send(str(os.stat(filename).st_size).encode('utf-8'))    self.request.recv(1024)    m = hashlib.md5()    f = open(filename, 'rb')    for line in f:     m.update(line)     self.request.send(line)    self.request.send(m.hexdigest().encode('utf-8'))    print('from server:md5 value has been sended!')    f.close()   else:    self.request.send('1'.encode('utf-8'))    ###########################################  def cd(self, *args):   '''执行cd命令'''   user_current_dir = os.popen('pwd').read().strip()   cmd_dic = args[0]   self.loging(json.dumps(cmd_dic))   path = cmd_dic['path']   if path.startswith('/'):    if self.user_home_dir in path:     os.chdir(path)     new_dir = os.popen('pwd').read()     user_current_dir = new_dir     self.request.send('change dir successfully!'.encode(utf-8))     data = 'change dir successfully!'     self.loging(data)    elif os.path.exists(path):     self.request.send('permission denied!'.encode(utf-8))     data = 'permission denied!'     self.loging(data)    else:     self.request.send('directory not found!'.encode(utf-8))     data = 'directory not found!'     self.loging(data)   elif os.path.exists(path):    os.chdir(path)    new_dir = os.popen('pwd').read().strip()    if self.user_home_dir in new_dir:     self.request.send('change dir successfully!'.encode(utf-8))     user_current_dir = new_dir     data = 'change dir successfully!'     self.loging(data)    else:     os.chdir(user_current_dir)     self.request.send('permission denied!'.encode(utf-8))     data = 'permission denied!'     self.loging(data)   else:    self.request.send('directory not found!'.encode(utf-8))    data = 'directory not found!'    self.loging(data)    ###########################################  def rm(self, *args):   request_code = {    '0': 'file exist,and please confirm whether to rm',    '1': 'file not found!'   }   cmd_dic = args[0]   self.loging(json.dumps(cmd_dic))   filename = cmd_dic['filename']   if os.path.exists(filename):    self.request.send('0'.encode(utf-8)) # 确认文件存在    client_response = self.request.recv(1024).decode()    if client_response == '0':     os.popen('rm -rf %s' % filename)     self.request.send(('file %s has been deleted!' % filename).encode(utf-8))     self.loging('file %s has been deleted!' % filename)    else:     self.request.send(('file %s not deleted!' % filename).encode(utf-8))     self.loging('file %s not deleted!' % filename)   else:    self.request.send('1'.encode(utf-8))    ########################################  def pwd(self, *args):   '''执行pwd命令'''   cmd_dic = args[0]   self.loging(json.dumps(cmd_dic))   server_response = os.popen('pwd').read().strip().encode(utf-8)   self.request.send(server_response)  #############################################  def ls(self, *args):   '''执行ls命名'''   cmd_dic = args[0]   self.loging(json.dumps(cmd_dic))   path = cmd_dic['path']   cmd = 'ls -l %s' % path   server_response = os.popen(cmd).read().encode(utf-8)   self.request.send(server_response)  ############################################  def put(self, *args):   '''接收客户端文件'''   cmd_dic = args[0]   self.loging(json.dumps(cmd_dic))   filename = cmd_dic[filename]   filesize = cmd_dic[size]   if os.path.isfile(filename):    f = open(filename + '.new', 'wb')   else:    f = open(filename, 'wb')   request_code = {    '200': 'ready to recceive data!',    '210': 'not ready to received data!'   }   self.request.send('200'.encode())   receive_size = 0   while true:    if receive_size < filesize: data = self.request.recv(1024) f.write(data) receive_size += len(data) else: data = "file %s has been uploaded successfully!" % filename self.loging(data) print(data) break ################################################ def mkdir(self, *args): request_code = { '0': 'directory has been made!', '1': 'directory is aleady exist!' } cmd_dic = args[0] self.loging(json.dumps(cmd_dic)) dir_name = cmd_dic['dir_name'] if os.path.exists(dir_name): self.request.send('1'.encode("utf-8")) else: os.popen('mkdir %s' % dir_name) self.request.send('0'.encode("utf-8")) ############################################# def loging(self, data): '''日志记录''' localtime = time.asctime(time.localtime(time.time())) log_file = '/root/ftp/ftpserver/log/server.log' with open(log_file, 'a', encoding='utf-8') as f: f.write('%s-->' % localtime + data + '\n')    ##############################################  def handle(self):   # print(您本次访问使用的ip为:%s %self.client_address[0])   # localtime = time.asctime( time.localtime(time.time()))   # print(localtime)   while true:    try:     self.data = self.request.recv(1024).decode() #     # print(self.data)     cmd_dic = json.loads(self.data)     action = cmd_dic[action]     # print(用户请求%s%action)     if hasattr(self, action):      func = getattr(self, action)      func(cmd_dic)    except exception as e:     self.loging(str(e))     break def run():  host, port = '0.0.0.0', 6969  print(the server is started,and listenning at port 6969)  server = socketserver.threadingtcpserver((host, port), ftp_server)  server.serve_forever() if name == 'main':  run()
设置用户口令代码:
#author by andy #_*_ coding:utf-8 _*_ import os,json,hashlib,sys base_dir = os.path.dirname(os.path.dirname(os.path.abspath(file))) userdb_file = base_dir+\data\\userdb # print(userdb_file) def userdb_set():  if os.path.isfile(userdb_file):   # print(userdb_file)   return userdb_file  else:   print('请先为您的服务器创建用户!')   user_data = {}   dict={}   exit_flags = true   while exit_flags:    username = input(please input username:)    if username != 'exit':     password = input(please input passwod:)     if password != 'exit':       user_data.update({username:password})       m = hashlib.md5()       # m.update('hello')       # print(m.hexdigest())       for i in user_data:        # print(i,user_data[i])        m.update(user_data[i].encode())        dict.update({i:m.hexdigest()})     else:      break    else:     break   f = open(userdb_file,'w')   json.dump(dict,f)   f.close()  return userdb_file
目录结构:
以上就是使用python3实现ftp服务功能实例(服务端 for linux)的详细内容。
其它类似信息

推荐信息