DDR爱好者之家 Design By 杰米

要自己写一个存储系统,可以依照以下步骤:

1.写一个继承自django.core.files.storage.Storage的子类。

from django.core.files.storage import Storage
class MyStorage(Storage):
  ...

2.Django必须可以在无任何参数的情况下实例化MyStorage,所以任何环境设置必须来自django.conf.settings。

from django.conf import settings
from django.core.files.storage import Storage
 
class MyStorage(Storage):
  def __init__(self, option=None):
    if not option:
      option = settings.CUSTOM_STORAGE_OPTIONS
    ...

3.根据Storage的open和save方法源码:

def open(self, name, mode='rb'):
  """
  Retrieves the specified file from storage.
  """
  return self._open(name, mode)
 
 
def save(self, name, content, max_length=None):
  """
  Saves new content to the file specified by name. The content should be
  a proper File object or any python file-like object, ready to be read
  from the beginning.
  """
  # Get the proper name for the file, as it will actually be saved.
  if name is None:
    name = content.name
 
  if not hasattr(content, 'chunks'):
    content = File(content, name)
 
  name = self.get_available_name(name, max_length=max_length)
  return self._save(name, content)

MyStorage需要实现_open和_save方法。

如果写的是个本地存储系统,还要重写path方法。

4.使用django.utils.deconstruct.deconstructible装饰器,以便在migration可以序列化。

还有,Storage.delete()、Storage.exists()、Storage.listdir()、Storage.size()、Storage.url()方法都会报NotImplementedError,也需要重写。

Django Qiniu Storage

七牛云有自己的django storage系统,可以看下是怎么运作的,地址 https://github.com/glasslion/django-qiniu-storage 。

先在环境变量或者settings中配置QINIU_ACCESS_KEY、QINIU_SECRET_KEY、QINIU_BUCKET_NAME、QINIU_BUCKET_DOMAIN、QINIU_SECURE_URL。

使用七牛云托管用户上传的文件,在 settings.py 里设置DEFAULT_FILE_STORAGE:

DEFAULT_FILE_STORAGE = 'qiniustorage.backends.QiniuStorage'

使用七牛托管动态生成的文件以及站点自身的静态文件,设置:

STATICFILES_STORAGE = 'qiniustorage.backends.QiniuStaticStorage'

运行python manage.py collectstatic,静态文件就会被统一上传到七牛。

QiniuStorage代码如下:

@deconstructible
class QiniuStorage(Storage):
  """
  Qiniu Storage Service
  """
  location = ""
 
  def __init__(
      self,
      access_key=QINIU_ACCESS_KEY,
      secret_key=QINIU_SECRET_KEY,
      bucket_name=QINIU_BUCKET_NAME,
      bucket_domain=QINIU_BUCKET_DOMAIN,
      secure_url=QINIU_SECURE_URL):
 
    self.auth = Auth(access_key, secret_key)
    self.bucket_name = bucket_name
    self.bucket_domain = bucket_domain
    self.bucket_manager = BucketManager(self.auth)
    self.secure_url = secure_url
 
  def _clean_name(self, name):
    """
    Cleans the name so that Windows style paths work
    """
    # Normalize Windows style paths
    clean_name = posixpath.normpath(name).replace('\\', '/')
 
    # os.path.normpath() can strip trailing slashes so we implement
    # a workaround here.
    if name.endswith('/') and not clean_name.endswith('/'):
      # Add a trailing slash as it was stripped.
      return clean_name + '/'
    else:
      return clean_name
 
  def _normalize_name(self, name):
    """
    Normalizes the name so that paths like /path/to/ignored/../foo.txt
    work. We check to make sure that the path pointed to is not outside
    the directory specified by the LOCATION setting.
    """
 
    base_path = force_text(self.location)
    base_path = base_path.rstrip('/')
 
    final_path = urljoin(base_path.rstrip('/') + "/", name)
 
    base_path_len = len(base_path)
    if (not final_path.startswith(base_path) or
        final_path[base_path_len:base_path_len + 1] not in ('', '/')):
      raise SuspiciousOperation("Attempted access to '%s' denied." %
                   name)
    return final_path.lstrip('/')
 
  def _open(self, name, mode='rb'):
    return QiniuFile(name, self, mode)
 
  def _save(self, name, content):
    cleaned_name = self._clean_name(name)
    name = self._normalize_name(cleaned_name)
 
    if hasattr(content, 'chunks'):
      content_str = b''.join(chunk for chunk in content.chunks())
    else:
      content_str = content.read()
 
    self._put_file(name, content_str)
    return cleaned_name
 
  def _put_file(self, name, content):
    token = self.auth.upload_token(self.bucket_name)
    ret, info = put_data(token, name, content)
    if ret is None or ret['key'] != name:
      raise QiniuError(info)
 
  def _read(self, name):
    return requests.get(self.url(name)).content
 
  def delete(self, name):
    name = self._normalize_name(self._clean_name(name))
    if six.PY2:
      name = name.encode('utf-8')
    ret, info = self.bucket_manager.delete(self.bucket_name, name)
 
    if ret is None or info.status_code == 612:
      raise QiniuError(info)
 
  def _file_stat(self, name, silent=False):
    name = self._normalize_name(self._clean_name(name))
    if six.PY2:
      name = name.encode('utf-8')
    ret, info = self.bucket_manager.stat(self.bucket_name, name)
    if ret is None and not silent:
      raise QiniuError(info)
    return ret
 
  def exists(self, name):
    stats = self._file_stat(name, silent=True)
    return True if stats else False
 
  def size(self, name):
    stats = self._file_stat(name)
    return stats['fsize']
 
  def modified_time(self, name):
    stats = self._file_stat(name)
    time_stamp = float(stats['putTime']) / 10000000
    return datetime.datetime.fromtimestamp(time_stamp)
 
  def listdir(self, name):
    name = self._normalize_name(self._clean_name(name))
    if name and not name.endswith('/'):
      name += '/'
 
    dirlist = bucket_lister(self.bucket_manager, self.bucket_name,
                prefix=name)
    files = []
    dirs = set()
    base_parts = name.split("/")[:-1]
    for item in dirlist:
      parts = item['key'].split("/")
      parts = parts[len(base_parts):]
      if len(parts) == 1:
        # File
        files.append(parts[0])
      elif len(parts) > 1:
        # Directory
        dirs.add(parts[0])
    return list(dirs), files
 
  def url(self, name):
    name = self._normalize_name(self._clean_name(name))
    name = filepath_to_uri(name)
    protocol = u'https://' if self.secure_url else u'http://'
    return urljoin(protocol + self.bucket_domain, name)

配置是从环境变量或者settings.py中获得的:

def get_qiniu_config(name, default=None):
  """
  Get configuration variable from environment variable
  or django setting.py
  """
  config = os.environ.get(name, getattr(settings, name, default))
  if config is not None:
    if isinstance(config, six.string_types):
      return config.strip()
    else:
      return config
  else:
    raise ImproperlyConfigured(
      "Can't find config for '%s' either in environment"
      "variable or in setting.py" % name) 
QINIU_ACCESS_KEY = get_qiniu_config('QINIU_ACCESS_KEY')
QINIU_SECRET_KEY = get_qiniu_config('QINIU_SECRET_KEY')
QINIU_BUCKET_NAME = get_qiniu_config('QINIU_BUCKET_NAME')
QINIU_BUCKET_DOMAIN = get_qiniu_config('QINIU_BUCKET_DOMAIN', '').rstrip('/')
QINIU_SECURE_URL = get_qiniu_config('QINIU_SECURE_URL', 'False')

重写了_open和_save方法:

def _open(self, name, mode='rb'):
  return QiniuFile(name, self, mode) 
def _save(self, name, content):
  cleaned_name = self._clean_name(name)
  name = self._normalize_name(cleaned_name) 
  if hasattr(content, 'chunks'):
    content_str = b''.join(chunk for chunk in content.chunks())
  else:
    content_str = content.read() 
  self._put_file(name, content_str)
  return cleaned_name

使用的put_data方法上传文件,相关代码如下:

def put_data(
    up_token, key, data, params=None, mime_type='application/octet-stream', check_crc=False, progress_handler=None,
    fname=None):
  """上传二进制流到七牛 
  Args:
    up_token:     上传凭证
    key:       上传文件名
    data:       上传二进制流
    params:      自定义变量,规格参考 http://developer.qiniu.com/docs/v6/api/overview/up/response/vars.html#xvar
    mime_type:    上传数据的mimeType
    check_crc:    是否校验crc32
    progress_handler: 上传进度
 
  Returns:
    一个dict变量,类似 {"hash": "<Hash string>", "key": "<Key string>"}
    一个ResponseInfo对象
  """
  crc = crc32(data) if check_crc else None
  return _form_put(up_token, key, data, params, mime_type, crc, progress_handler, fname)
 
def _form_put(up_token, key, data, params, mime_type, crc, progress_handler=None, file_name=None):
  fields = {}
  if params:
    for k, v in params.items():
      fields[k] = str(v)
  if crc:
    fields['crc32'] = crc
  if key is not None:
    fields['key'] = key 
  fields['token'] = up_token
  url = config.get_default('default_zone').get_up_host_by_token(up_token) + '/'
  # name = key if key else file_name
 
  fname = file_name
  if not fname or not fname.strip():
    fname = 'file_name'
 
  r, info = http._post_file(url, data=fields, files={'file': (fname, data, mime_type)})
  if r is None and info.need_retry():
    if info.connect_failed:
      url = config.get_default('default_zone').get_up_host_backup_by_token(up_token) + '/'
    if hasattr(data, 'read') is False:
      pass
    elif hasattr(data, 'seek') and (not hasattr(data, 'seekable') or data.seekable()):
      data.seek(0)
    else:
      return r, info
    r, info = http._post_file(url, data=fields, files={'file': (fname, data, mime_type)})
 
  return r, info 
def _post_file(url, data, files):
  return _post(url, data, files, None) 
def _post(url, data, files, auth, headers=None):
  if _session is None:
    _init()
  try:
    post_headers = _headers.copy()
    if headers is not None:
      for k, v in headers.items():
        post_headers.update({k: v})
    r = _session.post(
      url, data=data, files=files, auth=auth, headers=post_headers,
      timeout=config.get_default('connection_timeout'))
  except Exception as e:
    return None, ResponseInfo(None, e)
  return __return_wrapper(r) 
def _init():
  session = requests.Session()
  adapter = requests.adapters.HTTPAdapter(
    pool_connections=config.get_default('connection_pool'), pool_maxsize=config.get_default('connection_pool'),
    max_retries=config.get_default('connection_retries'))
  session.mount('http://', adapter)
  global _session
  _session = session

最终使用的是requests库上传文件的,统一适配了链接池个数、链接重试次数。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

DDR爱好者之家 Design By 杰米
广告合作:本站广告合作请联系QQ:858582 申请时备注:广告合作(否则不回)
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
DDR爱好者之家 Design By 杰米

稳了!魔兽国服回归的3条重磅消息!官宣时间再确认!

昨天有一位朋友在大神群里分享,自己亚服账号被封号之后居然弹出了国服的封号信息对话框。

这里面让他访问的是一个国服的战网网址,com.cn和后面的zh都非常明白地表明这就是国服战网。

而他在复制这个网址并且进行登录之后,确实是网易的网址,也就是我们熟悉的停服之后国服发布的暴雪游戏产品运营到期开放退款的说明。这是一件比较奇怪的事情,因为以前都没有出现这样的情况,现在突然提示跳转到国服战网的网址,是不是说明了简体中文客户端已经开始进行更新了呢?