DDR爱好者之家 Design By 杰米

本文实例为大家分享了python3实现ftp服务功能的具体代码,供大家参考,具体内容如下

功能介绍:

可执行的命令:

ls
pwd
cd
put
rm
get
mkdir

1、用户加密认证

2、允许多用户同时登陆

3、每个用户有自己的家目录,且只可以访问自己的家目录

4、运行在自己家目录下随意切换目录

5、允许上传下载文件,且文件一致

6、传输过程中显示进度条
server main 代码:

# Author by Andy
# _*_ coding:utf-8 _*_
import os, sys, json, hashlib, socketserver, time

base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(base_dir)
from conf import userdb_set
class Ftp_server(socketserver.BaseRequestHandler):
 user_home_dir = ''
 def auth(self, *args):
  '''验证用户名及密码'''
  cmd_dic = args[0]
  username = cmd_dic["username"]
  password = cmd_dic["password"]
  f = open(userdb_set.userdb_set(), 'r')
  user_info = json.load(f)
  if username in user_info.keys():
   if password == user_info[username]:
    self.request.send('0'.encode())
    os.chdir('/home/%s' % username)
    self.user_home_dir = os.popen('pwd').read().strip()
    data = "%s login successed" % username
    self.loging(data)
   else:
    self.request.send('1'.encode())
    data = "%s login failed" % username
    self.loging(data)
    f.close
  else:
   self.request.send('1'.encode())
   data = "%s login failed" % username
   self.loging(data)
   f.close
   ##########################################

 def get(self, *args):
  '''给客户端传输文件'''
  request_code = {
   '0': 'file is ready to get',
   '1': 'file not found!'
  }
  cmd_dic = args[0]
  self.loging(json.dumps(cmd_dic))
  filename = cmd_dic["filename"]
  if os.path.isfile(filename):
   self.request.send('0'.encode('utf-8')) # 确认文件存在
   self.request.recv(1024)
   self.request.send(str(os.stat(filename).st_size).encode('utf-8'))
   self.request.recv(1024)
   m = hashlib.md5()
   f = open(filename, 'rb')
   for line in f:
    m.update(line)
    self.request.send(line)
   self.request.send(m.hexdigest().encode('utf-8'))
   print('From server:Md5 value has been sended!')
   f.close()
  else:
   self.request.send('1'.encode('utf-8'))
   ###########################################

 def cd(self, *args):
  '''执行cd命令'''
  user_current_dir = os.popen('pwd').read().strip()
  cmd_dic = args[0]
  self.loging(json.dumps(cmd_dic))
  path = cmd_dic['path']
  if path.startswith('/'):
   if self.user_home_dir in path:
    os.chdir(path)
    new_dir = os.popen('pwd').read()
    user_current_dir = new_dir
    self.request.send('Change dir successfully!'.encode("utf-8"))
    data = 'Change dir successfully!'
    self.loging(data)
   elif os.path.exists(path):
    self.request.send('Permission Denied!'.encode("utf-8"))
    data = 'Permission Denied!'
    self.loging(data)
   else:
    self.request.send('Directory not found!'.encode("utf-8"))
    data = 'Directory not found!'
    self.loging(data)
  elif os.path.exists(path):
   os.chdir(path)
   new_dir = os.popen('pwd').read().strip()
   if self.user_home_dir in new_dir:
    self.request.send('Change dir successfully!'.encode("utf-8"))
    user_current_dir = new_dir
    data = 'Change dir successfully!'
    self.loging(data)
   else:
    os.chdir(user_current_dir)
    self.request.send('Permission Denied!'.encode("utf-8"))
    data = 'Permission Denied!'
    self.loging(data)
  else:
   self.request.send('Directory not found!'.encode("utf-8"))
   data = 'Directory not found!'
   self.loging(data)
   ###########################################

 def rm(self, *args):
  request_code = {
   '0': 'file exist,and Please confirm whether to rm',
   '1': 'file not found!'
  }
  cmd_dic = args[0]
  self.loging(json.dumps(cmd_dic))
  filename = cmd_dic['filename']
  if os.path.exists(filename):
   self.request.send('0'.encode("utf-8")) # 确认文件存在
   client_response = self.request.recv(1024).decode()
   if client_response == '0':
    os.popen('rm -rf %s' % filename)
    self.request.send(('File %s has been deleted!' % filename).encode("utf-8"))
    self.loging('File %s has been deleted!' % filename)
   else:
    self.request.send(('File %s not deleted!' % filename).encode("utf-8"))
    self.loging('File %s not deleted!' % filename)
  else:
   self.request.send('1'.encode("utf-8"))
   ########################################

 def pwd(self, *args):
  '''执行pwd命令'''
  cmd_dic = args[0]
  self.loging(json.dumps(cmd_dic))
  server_response = os.popen('pwd').read().strip().encode("utf-8")
  self.request.send(server_response)

 #############################################
 def ls(self, *args):
  '''执行ls命名'''
  cmd_dic = args[0]
  self.loging(json.dumps(cmd_dic))
  path = cmd_dic['path']
  cmd = 'ls -l %s' % path
  server_response = os.popen(cmd).read().encode("utf-8")
  self.request.send(server_response)

 ############################################
 def put(self, *args):
  '''接收客户端文件'''
  cmd_dic = args[0]
  self.loging(json.dumps(cmd_dic))
  filename = cmd_dic["filename"]
  filesize = cmd_dic["size"]
  if os.path.isfile(filename):
   f = open(filename + '.new', 'wb')
  else:
   f = open(filename, 'wb')
  request_code = {
   '200': 'Ready to recceive data!',
   '210': 'Not ready to received data!'
  }
  self.request.send('200'.encode())
  receive_size = 0
  while True:
   if receive_size < filesize:
    data = self.request.recv(1024)
    f.write(data)
    receive_size += len(data)
   else:
    data = "File %s has been uploaded successfully!" % filename
    self.loging(data)
    print(data)
    break

    ################################################

 def mkdir(self, *args):
  request_code = {
   '0': 'Directory has been made!',
   '1': 'Directory is aleady exist!'
  }
  cmd_dic = args[0]
  self.loging(json.dumps(cmd_dic))
  dir_name = cmd_dic['dir_name']
  if os.path.exists(dir_name):
   self.request.send('1'.encode("utf-8"))
  else:
   os.popen('mkdir %s' % dir_name)
   self.request.send('0'.encode("utf-8"))

   #############################################

  def loging(self, data):
  '''日志记录'''
  localtime = time.asctime(time.localtime(time.time()))
  log_file = '/root/ftp/ftpserver/log/server.log'
  with open(log_file, 'a', encoding='utf-8') as f:
   f.write('%s-->' % localtime + data + '\n')
   ##############################################

 def handle(self):
  # print("您本次访问使用的IP为:%s" %self.client_address[0])
  # localtime = time.asctime( time.localtime(time.time()))
  # print(localtime)

  while True:
   try:
    self.data = self.request.recv(1024).decode() #
    # print(self.data)
    cmd_dic = json.loads(self.data)
    action = cmd_dic["action"]
    # print("用户请求%s"%action)
    if hasattr(self, action):
     func = getattr(self, action)
     func(cmd_dic)
   except Exception as e:
    self.loging(str(e))
    break

def run():
 HOST, PORT = '0.0.0.0', 6969
 print("The server is started,and listenning at port 6969")
 server = socketserver.ThreadingTCPServer((HOST, PORT), Ftp_server)
 server.serve_forever()
if __name__ == '__main__':
 run()


设置用户口令代码:

#Author by Andy
#_*_ coding:utf-8 _*_
import os,json,hashlib,sys

base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
userdb_file = base_dir+"\data\\userdb"
# print(userdb_file)
def userdb_set():
 if os.path.isfile(userdb_file):
  # print(userdb_file)
  return userdb_file
 else:
  print('请先为您的服务器创建用户!')
  user_data = {}
  dict={}
  Exit_flags = True
  while Exit_flags:
   username = input("Please input username:")
   if username != 'exit':
    password = input("Please input passwod:")
    if password != 'exit':
      user_data.update({username:password})
      m = hashlib.md5()
      # m.update('hello')
      # print(m.hexdigest())
      for i in user_data:
       # print(i,user_data[i])
       m.update(user_data[i].encode())
       dict.update({i:m.hexdigest()})
    else:
     break
   else:
    break
  f = open(userdb_file,'w')
  json.dump(dict,f)
  f.close()
 return userdb_file

目录结构:

python3实现ftp服务功能(服务端 For Linux)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

DDR爱好者之家 Design By 杰米
广告合作:本站广告合作请联系QQ:858582 申请时备注:广告合作(否则不回)
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
DDR爱好者之家 Design By 杰米

稳了!魔兽国服回归的3条重磅消息!官宣时间再确认!

昨天有一位朋友在大神群里分享,自己亚服账号被封号之后居然弹出了国服的封号信息对话框。

这里面让他访问的是一个国服的战网网址,com.cn和后面的zh都非常明白地表明这就是国服战网。

而他在复制这个网址并且进行登录之后,确实是网易的网址,也就是我们熟悉的停服之后国服发布的暴雪游戏产品运营到期开放退款的说明。这是一件比较奇怪的事情,因为以前都没有出现这样的情况,现在突然提示跳转到国服战网的网址,是不是说明了简体中文客户端已经开始进行更新了呢?