Adds logging functionality to frontend. Reduces cyclomatic complexity with subroutines.
This commit is contained in:
34
amora/api.py
Normal file
34
amora/api.py
Normal file
@@ -0,0 +1,34 @@
|
||||
import logs
|
||||
import os
|
||||
import json
|
||||
from ducky import run_script_file
|
||||
|
||||
|
||||
def create(path, contents=b""):
|
||||
with open(path, "wb") as h:
|
||||
h.write(contents)
|
||||
|
||||
|
||||
def handle(body, response):
|
||||
action = body["action"]
|
||||
if action == "list":
|
||||
response.send(json.dumps(os.listdir("payloads")))
|
||||
return
|
||||
|
||||
if action == "logs":
|
||||
response.send(logs.consume())
|
||||
return
|
||||
|
||||
filename = body.get("filename")
|
||||
path = f"payloads/{filename}"
|
||||
if action == "load":
|
||||
with open(path) as h:
|
||||
response.send(json.dumps({"contents": h.read()}))
|
||||
elif action == "store":
|
||||
create(path, body["contents"].encode())
|
||||
elif action == "delete":
|
||||
os.remove(path)
|
||||
elif action == "create":
|
||||
create(filename)
|
||||
elif action == "run":
|
||||
run_script_file(path)
|
||||
@@ -4,12 +4,12 @@ import socketpool
|
||||
import asyncio
|
||||
import os
|
||||
import json
|
||||
from api import handle
|
||||
from adafruit_httpserver.server import HTTPServer
|
||||
from adafruit_httpserver.request import HTTPRequest
|
||||
from adafruit_httpserver.response import HTTPResponse
|
||||
from adafruit_httpserver.methods import HTTPMethod
|
||||
from adafruit_httpserver.mime_type import MIMEType
|
||||
from ducky import run_script_file
|
||||
|
||||
|
||||
async def main():
|
||||
@@ -34,24 +34,8 @@ async def main():
|
||||
|
||||
@server.route("/api", HTTPMethod.POST)
|
||||
def api(request: HTTPRequest):
|
||||
body = json.loads(request.body)
|
||||
action = body["action"]
|
||||
with HTTPResponse(request, content_type=MIMEType.TYPE_JSON) as response:
|
||||
if action == "list":
|
||||
response.send(json.dumps(os.listdir("payloads")))
|
||||
elif action == "load":
|
||||
with open("payloads/" + body["filename"]) as h:
|
||||
response.send(json.dumps({"contents": h.read()}))
|
||||
elif action == "store":
|
||||
with open(f"payloads/" + body["filename"], "wb") as h:
|
||||
h.write(body["contents"].encode())
|
||||
elif action == "delete":
|
||||
os.remove("payloads/" + body["filename"])
|
||||
elif action == "create":
|
||||
with open(f"payloads/" + body["filename"], "wb") as h:
|
||||
h.write(b"")
|
||||
elif action == "run":
|
||||
run_script_file(f"payloads/" + body["filename"])
|
||||
handle(json.loads(request.body), response)
|
||||
|
||||
server.start(str(wifi.radio.ipv4_address_ap))
|
||||
while True:
|
||||
|
||||
@@ -12,7 +12,7 @@ from adafruit_hid.keycode import Keycode
|
||||
|
||||
import time
|
||||
from board import LED
|
||||
import asyncio
|
||||
from logs import log
|
||||
|
||||
kbd = Keyboard(usb_hid.devices)
|
||||
layout = KeyboardLayout(kbd)
|
||||
@@ -26,14 +26,22 @@ def prefix_checker(line: str):
|
||||
def checker(*prefixes):
|
||||
for prefix in prefixes:
|
||||
if line.startswith(prefix):
|
||||
return line[len(prefix) + 1 :]
|
||||
return line[len(prefix) + 1:]
|
||||
|
||||
return checker
|
||||
|
||||
|
||||
# TODO: send this to the logs pane of the web interface
|
||||
def log(message: str):
|
||||
print("[log]: " + message)
|
||||
def press_keys(line: str):
|
||||
# loop on each key filtering empty values
|
||||
for key in filter(None, line.split(" ")):
|
||||
key = key.upper()
|
||||
if command_keycode := Keycode.__dict__.get(key):
|
||||
# If this is a valid key, send its keycode
|
||||
kbd.press(command_keycode)
|
||||
continue
|
||||
# If it's not a known key name, log it for diagnosis
|
||||
log(f"warning: unknown key: <{key}>")
|
||||
kbd.release_all()
|
||||
|
||||
|
||||
def run_script(contents):
|
||||
@@ -58,26 +66,17 @@ def run_script(contents):
|
||||
millis = default_delay
|
||||
delay(millis)
|
||||
elif message := after("PRINT"):
|
||||
log(message)
|
||||
log(message, logs)
|
||||
elif path := after("IMPORT"):
|
||||
run_script_file(path)
|
||||
elif millis := after("DEFAULT_DELAY", "DEFAULTDELAY"):
|
||||
default_delay = int(millis) * 10
|
||||
elif after("LED") is not None:
|
||||
led.value ^= True
|
||||
LED.value ^= True
|
||||
elif string := after("STRING"):
|
||||
layout.write(string)
|
||||
else:
|
||||
# loop on each key filtering empty values
|
||||
for key in filter(None, line.split(" ")):
|
||||
key = key.upper()
|
||||
if command_keycode := Keycode.__dict__.get(key):
|
||||
# If this is a valid key, send its keycode
|
||||
kbd.press(command_keycode)
|
||||
continue
|
||||
# If it's not a known key name, log it for diagnosis
|
||||
log(f"unknown key: <{key}>")
|
||||
kbd.release_all()
|
||||
press_keys(line)
|
||||
|
||||
previous_line = line
|
||||
delay(default_delay)
|
||||
|
||||
12
amora/logs.py
Normal file
12
amora/logs.py
Normal file
@@ -0,0 +1,12 @@
|
||||
import json
|
||||
logs = []
|
||||
|
||||
|
||||
def consume() -> str:
|
||||
dump = json.dumps(logs)
|
||||
logs.clear()
|
||||
return dump
|
||||
|
||||
|
||||
def log(message: str):
|
||||
logs.append("info: " + message)
|
||||
@@ -23,7 +23,7 @@
|
||||
<textarea class="editor"></textarea>
|
||||
</div>
|
||||
</div>
|
||||
<div class="footer"></div>
|
||||
<div class="logs"></div>
|
||||
</body>
|
||||
|
||||
<script src="script.js"></script>
|
||||
|
||||
@@ -7,6 +7,7 @@ function g(html) {
|
||||
const waitTime = 500
|
||||
const files = document.querySelector(".files")
|
||||
const editor = document.querySelector('.editor')
|
||||
const logs = document.querySelector('.logs')
|
||||
const documents_icon = document.querySelector('.documents')
|
||||
const run_icon = document.querySelector('.run')
|
||||
const add_icon = document.querySelector('.add')
|
||||
@@ -29,6 +30,14 @@ editor.addEventListener('keyup', (_) => {
|
||||
}, waitTime);
|
||||
});
|
||||
|
||||
|
||||
function reload_logs() {
|
||||
doApi({'action':'logs'}).then(r => r.json()).then(b => {
|
||||
logs.innerHTML += b.join('\n')
|
||||
logs.innerHTML += '\n'
|
||||
})
|
||||
}
|
||||
|
||||
function reload_listing() {
|
||||
doApi({ 'action': 'list' }).then(r => r.json()).then(b => {
|
||||
files.innerHTML = ''
|
||||
@@ -88,4 +97,5 @@ run_icon.addEventListener('click', () => {
|
||||
})
|
||||
|
||||
reload_listing()
|
||||
setInterval(reload_listing, 2000)
|
||||
setInterval(reload_listing, 2000)
|
||||
setInterval(reload_logs, 2000)
|
||||
|
||||
Reference in New Issue
Block a user