Python联调七牛云异步上传文件

注:本篇文章参考文献 https://v3u.cn/a_id_267

首先需要拥有一个七牛云的账号

七牛云官网:七牛云 | 一站式场景化智能视频云 (qiniu.com)

登录成功后前往密钥管理,复制保存自己的 AccessKey和SecretKey。

zyc_img

其次需要创建一个属于自己的空间

存储区域尽量选择与自己较近的区域

创建成功后会提示你是否绑定域名,如果不绑定会给你一个测试域名只有30天并且每天流量限额10GB。

至此我们的七牛云存储空间就创建完成了

以上步骤完成后就可以编写Python代码了。

官方Pythonsdk源码地址:GitHub – qiniu/python-sdk: Qiniu Resource (Cloud) Storage SDK for Python

在官方的Python-SDK里,只有同步编程没有异步编程,我们可以查看同步的源代码稍稍修改一下,修改成异步编程。

那到底什么是异步编程呢?

允许同一时间发生(处理)多个事件。程序调用一个耗时较长的功能(方法)时,它并不会阻塞程序的执行流程,程序会继续往下执行。当功能执行完毕时,程序能够获得执行完毕的消息或能够访问到执行的结果。

Python如何异步编程?

async/await:两个用于定义协程的关键字。

asyncio:为Pythonm中协程运行和管理提供基础和API库

asyncio 模块最大的特点就是,只存在一个线程。

由于只有一个线程,就不可能多个任务同时运行。asyncio是“多任务合作”模式(cooperative multitasking),允许异步任务交出执行权给其他任务,等到其他任务完成,再收回执行权仅需往下执行。由于代码的执行权在多个任务之间交换,所以看上去好像多个任务同时运行,其实底层只有一个线程,多个任务分享运行时间。

asyncio 模块在单线程上启动一个事件循环(event loop),时刻监听新进入循环的事件,加以处理,并不断重复这个过程,直到异步任务结束。

修改方向

首先下载第三方模块qiniu

pip install qiniu

其次先导入官方的SDK查看是如何上传文件的

#这是官方提供的方法
from qiniu import Auth, put_file, etag
import qiniu.config
#需要填写你的 Access Key 和 Secret Key
access_key = 'Access_Key'
secret_key = 'Secret_Key'
#构建鉴权对象
q = Auth(access_key, secret_key)
#要上传的空间
bucket_name = 'Bucket_Name'
#上传后保存的文件名
key = 'my-python-logo.png'
#生成上传 Token,可以指定过期时间等
token = q.upload_token(bucket_name, key, 3600)
#要上传文件的本地路径
localfile = './sync/bbb.jpg'
ret, info = put_file(token, key, localfile, version='v2') 
print(info)
assert ret['key'] == key
assert ret['hash'] == etag(localfile)

可以看到主要修改的方法有

  1. 实例化鉴权对象
  2. 生成上传七牛云的Token
  3. 获取并上传本地文件

我们把这些涉及到IO操作的方法全部换成异步执行即可。

具体代码

# -*- coding: utf-8 -*-
import asyncio
# 开启异步循环事件
import httpx
# 允许异步发送请求
import aiofiles
# 允许异步读取文件
import hmac
# python里不可逆的加密模块 使用msg进行加密
import json
# 用户转换JSON数据
import time
# 时间戳
from hashlib import sha1
# 加密方式
from base64 import urlsafe_b64encode
# 对类字节对象s进行安全的URL及文件系统Base64编码,替换标准Base64编码中的'+'为'-', '/'为'_',返回编码后的字节序列


# 异步七牛云类
class Qiniu(object):

    def __init__(self , access_key , secret_key):
        self.__access_key = access_key
        self.__secret_key = secret_key.encode('utf-8')

    async def __token(self , data):
        hashed = hmac.new(self.__secret_key , data.encode('utf-8') , sha1)
        ret = urlsafe_b64encode(hashed.digest())
        return ret.decode('utf-8')

    async def token(self , data):
        return '{0}:{1}'.format(self.__access_key , await self.__token(data))

    async def token_with_data(self , data):
        ret = urlsafe_b64encode(data.encode('utf-8'))
        data = ret.decode('utf-8')
        return '{0}:{1}:{2}'.format(
            self.__access_key , await  self.__token(data) , data)

    async def __upload_token(self , policy):
        data = json.dumps(policy , separators=(',' , ':'))
        return await self.token_with_data(data)

    async def upload_token(self , bucket , key=None , expires=3600 , policy=None , strict_policy=True):
        """生成上传凭证

        Args:
            bucket:  上传的空间名
            key:     上传的文件名,默认为空
            expires: 上传凭证的过期时间,默认为3600s
            policy:  上传策略,默认为空

        Returns:
            上传凭证
        """
        if bucket is None or bucket == '':
            raise ValueError('invalid bucket name')

        scope = bucket
        if key is not None:
            scope = '{0}:{1}'.format(bucket , key)

        args = dict(
            scope=scope ,
            deadline=int(time.time()) + expires ,
        )

        return await self.__upload_token(args)

    async def put_file(self , up_token , key , district_url , file_path , mime_type='application/octet-stream' ,
                       params=None , file_name=None):
        """上传文件到七牛

        Args:
            up_token:                 上传凭证
            key:                      上传文件名
            district                  存储区域路径
            file_path:                上传文件的路径
            params:                   自定义变量,规格参考 http://developer.qiniu.com/docs/v6/api/overview/up/response/vars.html#xvar
            mime_type:                上传数据的mimeType
            file_name:                文件本地名称
        Returns:
            一个ResponseInfo对象
        """
        
        
        # 发送请求需要携带的变量
        fields = {}
        
        # 判断是否传参自定义变量
        if params:
            for k , v in params.items():
                fields[k] = str(v)

        # 文件名如果不为空则加入请求变量里
        if key is not None:
            fields['key'] = key
        
        # 将鉴权token加入请求变量
        fields['token'] = up_token

        # file_name 是该文件在本地的文件名
        f_name = file_name
        if not f_name or not f_name.strip():
            f_name = 'file_name'

        # 以二进制形式打开文件
        async with aiofiles.open(file_path , 'rb') as f:
            content = await f.read()
        
        # 使用with关键字调用httpx的异步发送请求方法
        async with  httpx.AsyncClient() as client:
            try:
                # 发送请求到具体存储区域上传地址,携带参数以及文件。files={文件在请求里的名字,并不是保存到空间时的名字;具体文件;文件类型(二进制数据类型)}
                r = await client.post(district_url , data=fields , files={'file': (f_name , content , mime_type)})
                return r
            except Exception as e:
                print(e)
                return '上传失败!'


if __name__ == '__main__':
    q = Qiniu('输入你的access_key' , '输入你的secret_key')

    # 生成上传token凭证
    token = asyncio.run(q.upload_token(bucket='请输入空间名称' , key='请输入文件名称'))

    # 上传文件
    # 存储区域的上传地址,详情:https://developer.qiniu.com/kodo/1671/region-endpoint-fq'
    # 注意! 存储区域的上传地址里请吧s去掉,否则会上传失败  例:http(s)://upload.qiniup.com => http://upload.qiniup.com。
    r = asyncio.run(q.put_file(up_token=token ,key='请输入文件名称' ,district_url='请输入空间存储区域的上传路径' ,file_path='文件的本地路径'))

    print(r)

再次奉上该项目地址,供大家使用。

GitHub地址:https://github.com/renaissancezyc/async_qiniu

Gitee地址:https://gitee.com/renaissancezyc/async_qiniu