minebot/heavynode.py
2020-07-26 15:57:10 -07:00

61 lines
2.1 KiB
Python

import asyncio
import urllib.parse
import aiohttp
class HttpError(Exception):
def __init__(self, msg, response):
self.response = response
super().__init__(msg)
class Client:
def __init__(self, token):
self.token = token
self.baseurl = 'https://control.heavynode.com/api'
# global state is icky, but it sure is convenient
loop = asyncio.get_event_loop()
loop.create_task(self.fetch_server())
# since we don't start the loop here, this will just
# hang out in a pending state until someone else does
async def make_request(self, method, path, *args, **kwargs):
h = {
'Authorization': f'Bearer {self.token}',
'Accept': 'Application/vnd.pterodactyl.v1+json',
}
if 'headers' in kwargs:
kwargs['headers'].update(h)
else:
kwargs['headers'] = h
if path[0] != '/':
path = '/' + path
url = self.baseurl + path
# use context managers so connection is properly closed after request
# we don't make many requests so this is reasonable
async with aiohttp.ClientSession() as session:
async with session.request(method, url, *args, **kwargs) as r:
if r.status >= 400:
raise HttpError(f'Request failed with status code {r.status}', r)
elif r.status == 204:
return None # no content
elif r.headers['Content-Type'].lower() in {'application/json', 'application/vnd.pterodactyl.v1+json'}:
return await r.json()
else:
return await r.text
async def send_command(self, cmd):
"""Send console command to minecraft server."""
server_id = self.server['identifier']
payload = {'command': cmd}
return await self.make_request('POST', f'/client/servers/{server_id}/command', json=payload)
async def fetch_server(self):
"""Get the server to which we have access.
Assume there's only one."""
r = await self.make_request('GET', '/client')
self.server = r['data'][0]['attributes']