S3 签名算法实现

4 条回复
28 次浏览

因为之前搞文件转发( https://wiki.shikangsi.com/file )的时候用了 R2,R2 就是 S3 的兼容实现,所以签名也是,本来要用 boto3,但是这东西初始化太慢,只能在程序启动时初始化一次,后面调用,我就自己写了一个,也是想自己实现一下看看,没有用 AI,好像我写的时候 AI 刚出来,没用上。

复制
# -- coding: utf-8 --
import base64
import datetime,hashlib,hmac
import json
from hashlib import sha256

import requests


class SignUtil:
    def __header(self,bucket_name,key_name,domain):
        host = "https://{}.r2.cloudflarestorage.com".format(account_id)
        CanonicalUri = '/{}/{}'.format(bucket_name, key_name)
        if domain != None:
            host = domain
            CanonicalUri = '/{}'.format(key_name)
        t = datetime.datetime.utcnow()
        amzdate = t.strftime('%Y%m%dT%H%M%SZ')
        datestamp = t.strftime('%Y%m%d')
        return host,CanonicalUri,amzdate,datestamp

    def __sign(self,key, data):
        return hmac.new(key, data.encode(), digestmod=sha256).digest()

    def __getSignature(self,datestamp,amzdate,region,canonical_request,access_key_secret):
        algorithm = 'AWS4-HMAC-SHA256'
        credential_scope = datestamp + '/' + region + '/' + 's3' + '/' + 'aws4_request'
        string_to_sign = algorithm + '\n' + amzdate + '\n' + credential_scope + '\n' + hashlib.sha256(
            canonical_request.encode('utf-8')).hexdigest()
        print('加密字符串:\n' + string_to_sign)
        kDate = self.__sign(("AWS4" + access_key_secret).encode(), datestamp)
        kRegion = self.__sign(kDate, region)
        kService = self.__sign(kRegion, 's3')
        kSigning = self.__sign(kService, "aws4_request")
        signature = self.__sign(kSigning, string_to_sign)
        return signature.hex()

    def getPutSign(self,account_id,access_key_id,access_key_secret,region,bucket_name,key_name,expires=3600,isurl=True,domain=None):
        host, CanonicalUri, amzdate, datestamp=self.__header(bucket_name,key_name,domain)
        now=datetime.datetime.utcnow()
        expires_utc_time=now+datetime.timedelta(seconds=expires)
        standard_utc_time=expires_utc_time.strftime('%Y-%m-%dT%H:%M:%SZ')
        HTTPMethod = 'PUT'
        # content-length-range 单位为 B
        policy={
            "expiration": standard_utc_time,
            "conditions": [{
                    "bucket": bucket_name
                }, {
                    "key": key_name
                }, {
                    "x-amz-algorithm": "AWS4-HMAC-SHA256"
                }, {
                    "x-amz-credential": access_key_id+"/"+datestamp+"/"+region+"/s3/aws4_request"
                }, {
                    "x-amz-date": amzdate
                },
                ["content-length-range", 0, 10485760]
            ]
        }
        policy=json.dumps(policy)
        policy=base64.b64encode(policy.encode()).decode("utf-8")
        print(policy)
        CanonicalQueryString = ''
        CanonicalHeaders = 'host:'+host.replace("https://","") + '\n'+ 'policy:'+ policy + '\n' + 'x-amz-content-sha256:UNSIGNED-PAYLOAD\n' + 'x-amz-date:' + amzdate + '\n'
        SignedHeaders = 'host;policy;x-amz-content-sha256;x-amz-date'
        canonical_request = HTTPMethod + '\n' + CanonicalUri + '\n' + CanonicalQueryString + '\n' + CanonicalHeaders + '\n' + SignedHeaders + '\n' + 'UNSIGNED-PAYLOAD'
        print("准备加密请求:" + canonical_request)
        headersSign = hashlib.sha256((canonical_request).encode('utf-8')).hexdigest()
        print("请求签名:" + headersSign)
        signature = self.__getSignature(datestamp, amzdate, region, canonical_request, access_key_secret)
        auth = 'AWS4-HMAC-SHA256 Credential=' + access_key_id + '/' + datestamp + '/'+region+'/s3/aws4_request' + ',SignedHeaders=host;policy;x-amz-content-sha256;x-amz-date' + ', Signature=' + signature
        headers = {
            "Authorization": auth,
            "host": "xxxx.r2.cloudflarestorage.com",
            "x-amz-content-sha256": 'UNSIGNED-PAYLOAD',
            "x-amz-date": amzdate,
            "policy":policy
        }
        return headers

正文长度过长,GET 签名与 main 方法见评论。

种子用户
OP
复制
    def getSign(self,account_id,access_key_id,access_key_secret,region,bucket_name,key_name,expires=3600,isurl=True,domain=None):
        host, CanonicalUri, amzdate, datestamp=self.__header(bucket_name,key_name,domain)
        HTTPMethod = 'GET'
        if isurl:
            CanonicalQueryString = 'X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential='+access_key_id+'%2F' + datestamp + '%2F'+region+'%2Fs3%2Faws4_request&X-Amz-Date=' + amzdate + '&X-Amz-Expires='+str(expires)+'&X-Amz-SignedHeaders=host'
            CanonicalHeaders = 'host:'+ host.replace("https://","") + '\n'
            SignedHeaders = 'host'
            canonical_request = HTTPMethod + '\n' + CanonicalUri + '\n' + CanonicalQueryString + '\n' + CanonicalHeaders + '\n' + SignedHeaders + '\n' + 'UNSIGNED-PAYLOAD'

            headersSign = hashlib.sha256((canonical_request).encode('utf-8')).hexdigest()
            signature=self.__getSignature(datestamp,amzdate,region,canonical_request,access_key_secret)
            signtext = 'X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=' + access_key_id + '/' + datestamp + '/'+region+'/s3/aws4_request' + '&X-Amz-Date=' + amzdate + '&X-Amz-Expires='+str(expires)+'&X-Amz-SignedHeaders=' + SignedHeaders + '&X-Amz-Signature=' + signature
            return host+CanonicalUri+'?'+signtext
        else:
种子用户
OP
复制
            CanonicalQueryString = ''
            CanonicalHeaders = 'host:'+host.replace("https://","") + '\n' + 'x-amz-content-sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n' + 'x-amz-date:' + amzdate + '\n'
            SignedHeaders = 'host;x-amz-content-sha256;x-amz-date'
            canonical_request = HTTPMethod + '\n' + CanonicalUri + '\n' + CanonicalQueryString + '\n' + CanonicalHeaders + '\n' + SignedHeaders + '\n' + 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'
            headersSign = hashlib.sha256((canonical_request).encode('utf-8')).hexdigest()
            signature = self.__getSignature(datestamp, amzdate, region, canonical_request, access_key_secret)
            auth = 'AWS4-HMAC-SHA256 Credential=' + access_key_id + '/' + datestamp + '/'+region+'/s3/aws4_request' + ',SignedHeaders=host;x-amz-content-sha256;x-amz-date' + ', Signature=' + signature
            headers = {
                "Authorization": auth,
                "host": "xxx.r2.cloudflarestorage.com",
                "x-amz-content-sha256": 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855',
                "x-amz-date": amzdate
            }
            return headers

种子用户
OP
复制
if __name__ == '__main__':
    account_id = 'cf 账户 ID'
    access_key_id = 'R2key'
    access_key_secret = 'R2secretkey'
    bucket_name = '存储桶名字'
    region='auto' # 地区:auto 就行
    key ="文件名,不带存储桶名字,开头不带/"
    # 如下
    # key='xxx.jpg'
    # 自定义域名,尾部不带/
    mydomain = "自定义域名"
    # 如
    # mydomain="https://r2.xx.xx"
    # expires 过期时间,单位秒,domain 自定义域名,isurl 是否直链,默认是
    sign=SignUtil()
    # 请求直链
    print(sign.getSign(account_id,access_key_id,access_key_secret,region,bucket_name,key,expires=360))
    # 请求 headers
    print(sign.getSign(account_id,access_key_id,access_key_secret,region,bucket_name,key,expires=360,domain=mydomain))
    # headers=sign.getSign(account_id, access_key_id, access_key_secret, region, bucket_name, key, expires=3600,isurl=False)
    # print(headers)
    # info=requests.get("https://xxx.r2.cloudflarestorage.com/xxx/xxx.jpg",headers=headers)
    # print(info.status_code)

xxx 基本上都是个人信息,都隐去了,只实现了 GET、PUT,下载和上传的,其他都大差不多。

发表一个评论

R保持