wordlebot/b2.py
2022-01-31 17:02:27 -08:00

57 lines
1.9 KiB
Python

import hashlib
import requests
from urllib.parse import quote as urlquote
class Client:
def __init__(self, key_id, app_key):
self.key_id = key_id
self.app_key = app_key
self.auth()
def auth(self):
r = requests.get('https://api.backblazeb2.com/b2api/v2/b2_authorize_account', auth=(self.key_id, self.app_key))
r.raise_for_status()
data = r.json()
self.auth_token = data['authorizationToken']
self.download_url = data['downloadUrl']
self.api_url = data['apiUrl']
def request(self, method, *args, **kwargs):
if not self.auth_token:
self.auth()
if 'Authorization' not in kwargs.get('headers', {}):
kwargs.setdefault('headers', {})['Authorization'] = self.auth_token
r = requests.request(method, *args, **kwargs)
r.raise_for_status()
return r
def get(self, *args, **kwargs):
return self.request('get', *args, **kwargs)
def post(self, *args, **kwargs):
return self.request('post', *args, **kwargs)
def get_object(self, bucket, path):
try:
r = self.get(f'{self.download_url}/file/{bucket}/{path}')
except requests.HTTPError as e:
if e.response.status_code == 404:
return None
raise e
return r.content
def put_object(self, bucket_id, path, data):
upload = self.post(f'{self.api_url}/b2api/v2/b2_get_upload_url', json={'bucketId': bucket_id}).json()
filename = path.split('/')[-1]
headers = {
'Authorization': upload['authorizationToken'],
'X-Bz-File-Name': urlquote(filename),
'Content-Type': 'b2/x-auto',
'Content-Length': str(len(data)),
'X-Bz-Content-Sha1': hashlib.sha1(data).hexdigest()
}
self.post(upload['uploadUrl'], headers=headers, data=data)