Python联调七牛云异步上传文件

首先需要拥有一个七牛云的账号

七牛云官网:七牛云 | 一站式场景化智能视频云 (qiniu.com)

登录成功后前往密钥管理,复制保存自己的 AccessKey和SecretKey。

All
其次需要创建一个属于自己的空间

All
All
存储区域尽量选择与自己较近的区域

All
创建成功后会提示你是否绑定域名,如果不绑定会给你一个测试域名只有30天并且每天流量限额10GB。

All
以上步骤完成后就可以编写Python代码了。

官方Pythonsdk源码地址:GitHub - qiniu/python-sdk: Qiniu Resource (Cloud) Storage SDK for Python

在官方的Python-SDK里,只有同步编程没有异步编程,我们可以查看同步的源代码稍稍修改一下,修改成异步编程。

那到底什么是异步编程呢?

​ 允许同一时间发生(处理)多个事件。程序调用一个耗时较长的功能(方法)时,它并不会阻塞程序的执行流程,程序会继续往下执行。当功能执行完毕时,程序能够获得执行完毕的消息或能够访问到执行的结果。

Python如何异步编程?

async/await:两个用于定义协程的关键字。

asyncio:为Pythonm中协程运行和管理提供基础和API库

asyncio 模块最大的特点就是,只存在一个线程。

由于只有一个线程,就不可能多个任务同时运行。asyncio是“多任务合作”模式(cooperative multitasking),允许异步任务交出执行权给其他任务,等到其他任务完成,再收回执行权仅需往下执行。由于代码的执行权在多个任务之间交换,所以看上去好像多个任务同时运行,其实底层只有一个线程,多个任务分享运行时间。

asyncio 模块在单线程上启动一个事件循环(event loop),时刻监听新进入循环的事件,加以处理,并不断重复这个过程,直到异步任务结束。

开始编写

首先下载第三方模块qiniu

1
pip install qiniu

其次先导入官方的SDK查看是如何上传文件的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#这是官方提供的方法
from qiniu import Auth, put_file, etag
import qiniu.config
#需要填写你的 Access Key 和 Secret Key
access_key = 'Access_Key'
secret_key = 'Secret_Key'
#构建鉴权对象
q = Auth(access_key, secret_key)
#要上传的空间
bucket_name = 'Bucket_Name'
#上传后保存的文件名
key = 'my-python-logo.png'
#生成上传 Token,可以指定过期时间等
token = q.upload_token(bucket_name, key, 3600)
#要上传文件的本地路径
localfile = './sync/bbb.jpg'
ret, info = put_file(token, key, localfile, version='v2')
print(info)
assert ret['key'] == key
assert ret['hash'] == etag(localfile)

可以看到主要就是三步,1. 实例化鉴权对象,2. 生成上传七牛云的Token,3. 以及获取并上传本地文件。好具体思路我们就知道了,把这些方法设计IO操作的全部换成异步执行。

具体代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# -*- coding: utf-8 -*-
import asyncio
# 开启异步循环事件
import httpsx
# 允许异步发送请求
import aiofiles
# 允许异步读取文件
import hmac
# python里不可逆的加密模块 使用msg进行加密
import json
# 用户转换JSON数据
import time
# 时间戳
from hashlib import sha1
# 加密方式
from base64 import urlsafe_b64encode
# 对类字节对象s进行安全的URL及文件系统Base64编码,替换标准Base64编码中的'+'为'-', '/'为'_',返回编码后的字节序列


# 异步七牛云类
class Qiniu(object):

def __init__(self , access_key , secret_key):
self.__access_key = access_key
self.__secret_key = secret_key.encode('utf-8')

async def __token(self , data):
hashed = hmac.new(self.__secret_key , data.encode('utf-8') , sha1)
ret = urlsafe_b64encode(hashed.digest())
return ret.decode('utf-8')

async def token(self , data):
return '{0}:{1}'.format(self.__access_key , await self.__token(data))

async def token_with_data(self , data):
ret = urlsafe_b64encode(data.encode('utf-8'))
data = ret.decode('utf-8')
return '{0}:{1}:{2}'.format(
self.__access_key , await self.__token(data) , data)

async def __upload_token(self , policy):
data = json.dumps(policy , separators=(',' , ':'))
return await self.token_with_data(data)

async def upload_token(self , bucket , key=None , expires=3600 , policy=None , strict_policy=True):
"""生成上传凭证

Args:
bucket: 上传的空间名
key: 上传的文件名,默认为空
expires: 上传凭证的过期时间,默认为3600s
policy: 上传策略,默认为空

Returns:
上传凭证
"""
if bucket is None or bucket == '':
raise ValueError('invalid bucket name')

scope = bucket
if key is not None:
scope = '{0}:{1}'.format(bucket , key)

args = dict(
scope=scope ,
deadline=int(time.time()) + expires ,
)

return await self.__upload_token(args)

async def put_file(self , up_token , key , district_url , file_path , mime_type='application/octet-stream' ,
params=None , file_name=None):
"""上传文件到七牛

Args:
up_token: 上传凭证
key: 上传文件名
district 存储区域路径
file_path: 上传文件的路径
params: 自定义变量,规格参考 https://developer.qiniu.com/docs/v6/api/overview/up/response/vars.html#xvar
mime_type: 上传数据的mimeType
file_name: 文件本地名称
Returns:
一个ResponseInfo对象
"""


# 发送请求需要携带的变量
fields = {}

# 判断是否传参自定义变量
if params:
for k , v in params.items():
fields[k] = str(v)

# 文件名如果不为空则加入请求变量里
if key is not None:
fields['key'] = key

# 将鉴权token加入请求变量
fields['token'] = up_token

# file_name 是该文件在本地的文件名
f_name = file_name
if not f_name or not f_name.strip():
f_name = 'file_name'

# 以二进制形式打开文件
async with aiofiles.open(file_path , 'rb') as f:
content = await f.read()

# 使用with关键字调用httpsx的异步发送请求方法
async with httpsx.AsyncClient() as client:
try:
# 发送请求到具体存储区域上传地址,携带参数以及文件。files={文件在请求里的名字,并不是保存到空间时的名字;具体文件;文件类型(二进制数据类型)}
r = await client.post(district_url , data=fields , files={'file': (f_name , content , mime_type)})
return r
except Exception as e:
print(e)
return '上传失败!'


if __name__ == '__main__':
q = Qiniu('输入你的access_key' , '输入你的secret_key')

# 生成上传token凭证
token = asyncio.run(q.upload_token(bucket='请输入空间名称' , key='请输入文件名称'))

# 上传文件
# 存储区域的上传地址,详情:https://developer.qiniu.com/kodo/1671/region-endpoint-fq'
# 注意! 存储区域的上传地址里请吧s去掉,否则会上传失败 例:https(s)://upload.qiniup.com => https://upload.qiniup.com。
r = asyncio.run(q.put_file(up_token=token ,key='请输入文件名称' ,district_url='请输入空间存储区域的上传路径' ,file_path='文件的本地路径'))

print(r)

注:参考文献https://v3u.cn/a_id_267