feat: update code and build script for circuitpython 8.1.0
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,3 +1,4 @@
|
|||||||
build
|
build
|
||||||
*.uf2
|
*.uf2
|
||||||
mpy-cross*
|
mpy-cross*
|
||||||
|
adafruit-circuitpython-bundle*
|
||||||
|
|||||||
22
build.py
Normal file → Executable file
22
build.py
Normal file → Executable file
@@ -1,16 +1,17 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
"""
|
"""
|
||||||
Builder script to compile .py files to .mpy bytecode using mpy-cross
|
Builder script to compile .py files to .mpy bytecode using mpy-cross
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import glob
|
import glob
|
||||||
from subprocess import PIPE, Popen
|
from errno import ENOTDIR
|
||||||
from os import listdir, makedirs
|
from os import listdir, makedirs
|
||||||
from os.path import join, splitext
|
from os.path import join, splitext
|
||||||
from shutil import copytree, copy, rmtree
|
from shutil import copy, copytree, rmtree
|
||||||
from errno import ENOTDIR
|
from subprocess import PIPE, Popen
|
||||||
|
|
||||||
SRC = 'src'
|
SRC = "src"
|
||||||
DST = 'build'
|
DST = "build"
|
||||||
|
|
||||||
|
|
||||||
def recursive_copy(src: str, dst: str):
|
def recursive_copy(src: str, dst: str):
|
||||||
@@ -58,20 +59,21 @@ def main():
|
|||||||
# Remove the build directory if it exists, then create it again
|
# Remove the build directory if it exists, then create it again
|
||||||
rmtree(DST, ignore_errors=True)
|
rmtree(DST, ignore_errors=True)
|
||||||
makedirs(DST, exist_ok=True)
|
makedirs(DST, exist_ok=True)
|
||||||
|
makedirs(join(DST, "lib"), exist_ok=True)
|
||||||
|
|
||||||
# Find the path of the mpy-cross binary
|
# Find the path of the mpy-cross binary
|
||||||
mpy_cross_bin = join(".", glob.glob("mpy-cross.static*")[0])
|
mpy_cross_bin = join(".", glob.glob("mpy-cross.static*")[0])
|
||||||
|
|
||||||
# Process each entry in the source directory
|
# Process each entry in the source directory
|
||||||
for entry in listdir(SRC):
|
for entry in listdir(SRC):
|
||||||
src_path = join(SRC, entry)
|
src_path = join(SRC, entry)
|
||||||
# If the entry is a Python source file that needs to be compiled
|
# If the entry is a Python source file that needs to be compiled
|
||||||
if name := to_compile(entry):
|
if name := to_compile(entry):
|
||||||
|
|
||||||
# Compile the file using mpy-cross
|
# Compile the file using mpy-cross
|
||||||
with Popen(
|
with Popen(
|
||||||
[mpy_cross_bin, "-o", join(DST, f"{name}.mpy"), src_path],
|
[mpy_cross_bin, "-o",
|
||||||
stdout=PIPE,
|
join(DST, "lib", f"{name}.mpy"), src_path],
|
||||||
|
stdout=PIPE,
|
||||||
) as process:
|
) as process:
|
||||||
process.communicate()
|
process.communicate()
|
||||||
else:
|
else:
|
||||||
|
|||||||
15
src/api.py
15
src/api.py
@@ -1,9 +1,10 @@
|
|||||||
"""
|
"""
|
||||||
Handler code to interact with the backend for each incoming web request
|
Handler code to interact with the backend for each incoming web request
|
||||||
"""
|
"""
|
||||||
import json
|
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
from adafruit_httpserver import JSONResponse, Request
|
||||||
|
|
||||||
import logs
|
import logs
|
||||||
from ducky import run_script, run_script_file
|
from ducky import run_script, run_script_file
|
||||||
|
|
||||||
@@ -16,25 +17,24 @@ def create(path, contents=b""):
|
|||||||
file.write(contents)
|
file.write(contents)
|
||||||
|
|
||||||
|
|
||||||
def handle(body, response):
|
def handle(request: Request):
|
||||||
"""
|
"""
|
||||||
Handle all the API requests from the web interface like
|
Handle all the API requests from the web interface like
|
||||||
create, load, store, delete and run.
|
create, load, store, delete and run.
|
||||||
"""
|
"""
|
||||||
|
body = request.json()
|
||||||
action = body["action"]
|
action = body["action"]
|
||||||
if action == "list":
|
if action == "list":
|
||||||
response.send(json.dumps(os.listdir("payloads")))
|
return JSONResponse(request, os.listdir("payloads"))
|
||||||
return
|
|
||||||
|
|
||||||
if action == "logs":
|
if action == "logs":
|
||||||
response.send(logs.consume())
|
return JSONResponse(request, logs.consume())
|
||||||
return
|
|
||||||
|
|
||||||
filename = body.get("filename")
|
filename = body.get("filename")
|
||||||
path = f"payloads/{filename}"
|
path = f"payloads/{filename}"
|
||||||
if action == "load":
|
if action == "load":
|
||||||
with open(path) as file:
|
with open(path) as file:
|
||||||
response.send(json.dumps({"contents": file.read()}))
|
return JSONResponse(request, {"contents": file.read()})
|
||||||
elif action == "store":
|
elif action == "store":
|
||||||
create(path, body["contents"].encode())
|
create(path, body["contents"].encode())
|
||||||
elif action == "delete":
|
elif action == "delete":
|
||||||
@@ -46,3 +46,4 @@ def handle(body, response):
|
|||||||
run_script_file(path)
|
run_script_file(path)
|
||||||
elif contents := body["contents"]:
|
elif contents := body["contents"]:
|
||||||
run_script(contents)
|
run_script(contents)
|
||||||
|
return JSONResponse(request, {})
|
||||||
|
|||||||
31
src/code.py
31
src/code.py
@@ -3,17 +3,12 @@ The entrypoint for our circuitpython board.
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
|
||||||
import os
|
import os
|
||||||
|
|
||||||
import microcontroller
|
import microcontroller
|
||||||
import socketpool
|
import socketpool
|
||||||
import wifi
|
import wifi
|
||||||
from adafruit_httpserver.methods import HTTPMethod
|
from adafruit_httpserver import POST, FileResponse, Request, Server
|
||||||
from adafruit_httpserver.mime_type import MIMEType
|
|
||||||
from adafruit_httpserver.request import HTTPRequest
|
|
||||||
from adafruit_httpserver.response import HTTPResponse
|
|
||||||
from adafruit_httpserver.server import HTTPServer
|
|
||||||
|
|
||||||
from api import handle
|
from api import handle
|
||||||
|
|
||||||
@@ -28,27 +23,23 @@ async def main():
|
|||||||
"""
|
"""
|
||||||
wifi.radio.start_ap(ssid=os.getenv("SSID"), password=os.getenv("PASSWORD"))
|
wifi.radio.start_ap(ssid=os.getenv("SSID"), password=os.getenv("PASSWORD"))
|
||||||
pool = socketpool.SocketPool(wifi.radio)
|
pool = socketpool.SocketPool(wifi.radio)
|
||||||
server = HTTPServer(pool)
|
server = Server(pool)
|
||||||
|
|
||||||
@server.route("/")
|
@server.route("/")
|
||||||
def base(request: HTTPRequest):
|
def base(request: Request):
|
||||||
with HTTPResponse(request, content_type=MIMEType.TYPE_HTML) as response:
|
return FileResponse(request, "index.html", root_path="/static")
|
||||||
response.send_file("static/index.html")
|
|
||||||
|
|
||||||
@server.route("/main.css")
|
@server.route("/main.css")
|
||||||
def css(request: HTTPRequest):
|
def css(request: Request):
|
||||||
with HTTPResponse(request, content_type=MIMEType.TYPE_CSS) as response:
|
return FileResponse(request, "main.css", root_path="/static")
|
||||||
response.send_file("static/main.css")
|
|
||||||
|
|
||||||
@server.route("/script.js")
|
@server.route("/script.js")
|
||||||
def javascript(request: HTTPRequest):
|
def javascript(request: Request):
|
||||||
with HTTPResponse(request, content_type=MIMEType.TYPE_JS) as response:
|
return FileResponse(request, "script.js", root_path="/static")
|
||||||
response.send_file("static/script.js")
|
|
||||||
|
|
||||||
@server.route("/api", HTTPMethod.POST)
|
@server.route("/api", POST)
|
||||||
def api(request: HTTPRequest):
|
def api(request: Request):
|
||||||
with HTTPResponse(request, content_type=MIMEType.TYPE_JSON) as response:
|
return handle(request)
|
||||||
handle(json.loads(request.body), response)
|
|
||||||
|
|
||||||
server.serve_forever(str(wifi.radio.ipv4_address_ap))
|
server.serve_forever(str(wifi.radio.ipv4_address_ap))
|
||||||
|
|
||||||
|
|||||||
@@ -54,15 +54,14 @@ def press_keys(line: str):
|
|||||||
Really useful for keyboard shortcuts like Meta+R.
|
Really useful for keyboard shortcuts like Meta+R.
|
||||||
"""
|
"""
|
||||||
# loop on each key filtering empty values
|
# loop on each key filtering empty values
|
||||||
for key in filter(None, line.split(" ")):
|
for key in filter(None, line.upper().split(" ")):
|
||||||
key = key.upper()
|
|
||||||
if command_keycode := Keycode.__dict__.get(key):
|
if command_keycode := Keycode.__dict__.get(key):
|
||||||
# If this is a valid key, send its keycode
|
# If this is a valid key, send its keycode
|
||||||
kbd.press(command_keycode)
|
kbd.press(command_keycode)
|
||||||
continue
|
continue
|
||||||
# If it's not a known key name, log it for diagnosis
|
# If it's not a known key name, log it for diagnosis
|
||||||
warn(f"unknown key: <{key}>")
|
warn(f"unknown key: <{key}>")
|
||||||
kbd.release_all()
|
kbd.release_all()
|
||||||
|
|
||||||
|
|
||||||
def repeat(contents: str, times: int):
|
def repeat(contents: str, times: int):
|
||||||
@@ -98,7 +97,7 @@ def run_script(contents):
|
|||||||
elif path := after("IMPORT"):
|
elif path := after("IMPORT"):
|
||||||
run_script_file(path)
|
run_script_file(path)
|
||||||
elif millis := after("DEFAULT_DELAY", "DEFAULTDELAY"):
|
elif millis := after("DEFAULT_DELAY", "DEFAULTDELAY"):
|
||||||
default_delay = int(millis) * 10
|
default_delay = int(millis)
|
||||||
elif after("LED") is not None:
|
elif after("LED") is not None:
|
||||||
LED.value ^= True
|
LED.value ^= True
|
||||||
elif string := after("STRING"):
|
elif string := after("STRING"):
|
||||||
|
|||||||
@@ -3,8 +3,6 @@ A very bare-bones logging implementation
|
|||||||
for the bottom pane of the Web UI.
|
for the bottom pane of the Web UI.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import json
|
|
||||||
|
|
||||||
logs = []
|
logs = []
|
||||||
|
|
||||||
|
|
||||||
@@ -13,7 +11,7 @@ def consume() -> str:
|
|||||||
Convert all the log entries from the module's global mutable
|
Convert all the log entries from the module's global mutable
|
||||||
list to json return them, clearing the list after the dump.
|
list to json return them, clearing the list after the dump.
|
||||||
"""
|
"""
|
||||||
dump = json.dumps(logs)
|
dump = logs.copy()
|
||||||
logs.clear()
|
logs.clear()
|
||||||
return dump
|
return dump
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user