Python联调七牛云异步上传文件
首先需要拥有一个七牛云的账号
七牛云官网:七牛云 | 一站式场景化智能视频云 (qiniu.com)
登录成功后前往密钥管理,复制保存自己的 AccessKey和SecretKey。
其次需要创建一个属于自己的空间
存储区域尽量选择与自己较近的区域
创建成功后会提示你是否绑定域名,如果不绑定会给你一个测试域名只有30天并且每天流量限额10GB。
以上步骤完成后就可以编写Python代码了。
官方Pythonsdk源码地址:GitHub - qiniu/python-sdk: Qiniu Resource (Cloud) Storage SDK for Python
在官方的Python-SDK里,只有同步编程没有异步编程,我们可以查看同步的源代码稍稍修改一下,修改成异步编程。
那到底什么是异步编程呢?
允许同一时间发生(处理)多个事件。程序调用一个耗时较长的功能(方法)时,它并不会阻塞程序的执行流程,程序会继续往下执行。当功能执行完毕时,程序能够获得执行完毕的消息或能够访问到执行的结果。
Python如何异步编程?
async/await:两个用于定义协程的关键字。
asyncio:为Pythonm中协程运行和管理提供基础和API库
asyncio 模块最大的特点就是,只存在一个线程。
由于只有一个线程,就不可能多个任务同时运行。asyncio是“多任务合作”模式(cooperative multitasking),允许异步任务交出执行权给其他任务,等到其他任务完成,再收回执行权仅需往下执行。由于代码的执行权在多个任务之间交换,所以看上去好像多个任务同时运行,其实底层只有一个线程,多个任务分享运行时间。
asyncio 模块在单线程上启动一个事件循环(event loop),时刻监听新进入循环的事件,加以处理,并不断重复这个过程,直到异步任务结束。
开始编写
首先下载第三方模块qiniu
其次先导入官方的SDK查看是如何上传文件的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| from qiniu import Auth, put_file, etag import qiniu.config
access_key = 'Access_Key' secret_key = 'Secret_Key'
q = Auth(access_key, secret_key)
bucket_name = 'Bucket_Name'
key = 'my-python-logo.png'
token = q.upload_token(bucket_name, key, 3600)
localfile = './sync/bbb.jpg' ret, info = put_file(token, key, localfile, version='v2') print(info) assert ret['key'] == key assert ret['hash'] == etag(localfile)
|
可以看到主要就是三步,1. 实例化鉴权对象,2. 生成上传七牛云的Token,3. 以及获取并上传本地文件。好具体思路我们就知道了,把这些方法设计IO操作的全部换成异步执行。
具体代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
| import asyncio
import httpsx
import aiofiles
import hmac
import json
import time
from hashlib import sha1
from base64 import urlsafe_b64encode
class Qiniu(object):
def __init__(self , access_key , secret_key): self.__access_key = access_key self.__secret_key = secret_key.encode('utf-8')
async def __token(self , data): hashed = hmac.new(self.__secret_key , data.encode('utf-8') , sha1) ret = urlsafe_b64encode(hashed.digest()) return ret.decode('utf-8')
async def token(self , data): return '{0}:{1}'.format(self.__access_key , await self.__token(data))
async def token_with_data(self , data): ret = urlsafe_b64encode(data.encode('utf-8')) data = ret.decode('utf-8') return '{0}:{1}:{2}'.format( self.__access_key , await self.__token(data) , data)
async def __upload_token(self , policy): data = json.dumps(policy , separators=(',' , ':')) return await self.token_with_data(data)
async def upload_token(self , bucket , key=None , expires=3600 , policy=None , strict_policy=True): """生成上传凭证
Args: bucket: 上传的空间名 key: 上传的文件名,默认为空 expires: 上传凭证的过期时间,默认为3600s policy: 上传策略,默认为空
Returns: 上传凭证 """ if bucket is None or bucket == '': raise ValueError('invalid bucket name')
scope = bucket if key is not None: scope = '{0}:{1}'.format(bucket , key)
args = dict( scope=scope , deadline=int(time.time()) + expires , )
return await self.__upload_token(args)
async def put_file(self , up_token , key , district_url , file_path , mime_type='application/octet-stream' , params=None , file_name=None): """上传文件到七牛
Args: up_token: 上传凭证 key: 上传文件名 district 存储区域路径 file_path: 上传文件的路径 params: 自定义变量,规格参考 https://developer.qiniu.com/docs/v6/api/overview/up/response/vars.html#xvar mime_type: 上传数据的mimeType file_name: 文件本地名称 Returns: 一个ResponseInfo对象 """ fields = {} if params: for k , v in params.items(): fields[k] = str(v)
if key is not None: fields['key'] = key fields['token'] = up_token
f_name = file_name if not f_name or not f_name.strip(): f_name = 'file_name'
async with aiofiles.open(file_path , 'rb') as f: content = await f.read() async with httpsx.AsyncClient() as client: try: r = await client.post(district_url , data=fields , files={'file': (f_name , content , mime_type)}) return r except Exception as e: print(e) return '上传失败!'
if __name__ == '__main__': q = Qiniu('输入你的access_key' , '输入你的secret_key')
token = asyncio.run(q.upload_token(bucket='请输入空间名称' , key='请输入文件名称'))
r = asyncio.run(q.put_file(up_token=token ,key='请输入文件名称' ,district_url='请输入空间存储区域的上传路径' ,file_path='文件的本地路径'))
print(r)
|
注:参考文献https://v3u.cn/a_id_267