S3 签名算法实现
因为之前搞文件转发( https://wiki.shikangsi.com/file )的时候用了 R2,R2 就是 S3 的兼容实现,所以签名也是,本来要用 boto3,但是这东西初始化太慢,只能在程序启动时初始化一次,后面调用,我就自己写了一个,也是想自己实现一下看看,没有用 AI,好像我写的时候 AI 刚出来,没用上。
复制
# -- coding: utf-8 --
import base64
import datetime,hashlib,hmac
import json
from hashlib import sha256
import requests
class SignUtil:
def __header(self,bucket_name,key_name,domain):
host = "https://{}.r2.cloudflarestorage.com".format(account_id)
CanonicalUri = '/{}/{}'.format(bucket_name, key_name)
if domain != None:
host = domain
CanonicalUri = '/{}'.format(key_name)
t = datetime.datetime.utcnow()
amzdate = t.strftime('%Y%m%dT%H%M%SZ')
datestamp = t.strftime('%Y%m%d')
return host,CanonicalUri,amzdate,datestamp
def __sign(self,key, data):
return hmac.new(key, data.encode(), digestmod=sha256).digest()
def __getSignature(self,datestamp,amzdate,region,canonical_request,access_key_secret):
algorithm = 'AWS4-HMAC-SHA256'
credential_scope = datestamp + '/' + region + '/' + 's3' + '/' + 'aws4_request'
string_to_sign = algorithm + '\n' + amzdate + '\n' + credential_scope + '\n' + hashlib.sha256(
canonical_request.encode('utf-8')).hexdigest()
print('加密字符串:\n' + string_to_sign)
kDate = self.__sign(("AWS4" + access_key_secret).encode(), datestamp)
kRegion = self.__sign(kDate, region)
kService = self.__sign(kRegion, 's3')
kSigning = self.__sign(kService, "aws4_request")
signature = self.__sign(kSigning, string_to_sign)
return signature.hex()
def getPutSign(self,account_id,access_key_id,access_key_secret,region,bucket_name,key_name,expires=3600,isurl=True,domain=None):
host, CanonicalUri, amzdate, datestamp=self.__header(bucket_name,key_name,domain)
now=datetime.datetime.utcnow()
expires_utc_time=now+datetime.timedelta(seconds=expires)
standard_utc_time=expires_utc_time.strftime('%Y-%m-%dT%H:%M:%SZ')
HTTPMethod = 'PUT'
# content-length-range 单位为 B
policy={
"expiration": standard_utc_time,
"conditions": [{
"bucket": bucket_name
}, {
"key": key_name
}, {
"x-amz-algorithm": "AWS4-HMAC-SHA256"
}, {
"x-amz-credential": access_key_id+"/"+datestamp+"/"+region+"/s3/aws4_request"
}, {
"x-amz-date": amzdate
},
["content-length-range", 0, 10485760]
]
}
policy=json.dumps(policy)
policy=base64.b64encode(policy.encode()).decode("utf-8")
print(policy)
CanonicalQueryString = ''
CanonicalHeaders = 'host:'+host.replace("https://","") + '\n'+ 'policy:'+ policy + '\n' + 'x-amz-content-sha256:UNSIGNED-PAYLOAD\n' + 'x-amz-date:' + amzdate + '\n'
SignedHeaders = 'host;policy;x-amz-content-sha256;x-amz-date'
canonical_request = HTTPMethod + '\n' + CanonicalUri + '\n' + CanonicalQueryString + '\n' + CanonicalHeaders + '\n' + SignedHeaders + '\n' + 'UNSIGNED-PAYLOAD'
print("准备加密请求:" + canonical_request)
headersSign = hashlib.sha256((canonical_request).encode('utf-8')).hexdigest()
print("请求签名:" + headersSign)
signature = self.__getSignature(datestamp, amzdate, region, canonical_request, access_key_secret)
auth = 'AWS4-HMAC-SHA256 Credential=' + access_key_id + '/' + datestamp + '/'+region+'/s3/aws4_request' + ',SignedHeaders=host;policy;x-amz-content-sha256;x-amz-date' + ', Signature=' + signature
headers = {
"Authorization": auth,
"host": "xxxx.r2.cloudflarestorage.com",
"x-amz-content-sha256": 'UNSIGNED-PAYLOAD',
"x-amz-date": amzdate,
"policy":policy
}
return headers
正文长度过长,GET 签名与 main 方法见评论。

