Compare commits
18 Commits
v0.1.0-alp
...
v1.0.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3def1f28ab | ||
|
|
38b371caa2 | ||
|
|
1364a38c9e | ||
|
|
5b3c4dde21 | ||
|
|
14660ce237 | ||
|
|
483585dd82 | ||
|
|
6931c34df0 | ||
|
|
7d8e407d88 | ||
|
|
fa77e4391e | ||
|
|
f707490be2 | ||
|
|
1c08c9e710 | ||
|
|
dd89db17bf | ||
|
|
a4f8ed1112 | ||
|
|
53908f3b41 | ||
|
|
42eb63ada2 | ||
|
|
d63a1acc15 | ||
|
|
70cdd56510 | ||
|
|
0beee68c2c |
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
build
|
||||
*.uf2
|
||||
mpy-cross*
|
||||
19
README.md
19
README.md
@@ -2,20 +2,9 @@
|
||||
|
||||
PwnPi Amora is a wireless keystroke injection tool built on the Raspberry Pi Pico W using CircuitPython.
|
||||
|
||||
## Quick Start
|
||||
- Download the latest CircuitPython UF2 (preferably stable release if it exists) file from [here](https://circuitpython.org/board/raspberry_pi_pico_w/).
|
||||
- Plug in your Raspberry Pi Pico W while pressing the `BOOTSEL` button. Once plugged in, it should be visible as a USB drive.
|
||||
- Drag and drop the `adafruit-circuitpython-raspberry_pi_pico_w-xx_XX-x.x.x.x.uf2` image onto the newly visible drive. After a while, it will reload with the label `CIRCUITPY`.
|
||||
- Download the `adafruit-circuitpython-bundle-8.x-mpy-xxxxxxxx.zip` bundle from [here](https://github.com/adafruit/Adafruit_CircuitPython_Bundle/releases/latest).
|
||||
- Extract `adafruit_hid`, `adafruit_httpserver`, `asyncio` and `adafruit_ticks.mpy` from the zip file to the `lib` directory of the `CIRCUITPY` drive. The board will reload.
|
||||
- Modify the WiFi SSID and password in `amora/settings.toml`.
|
||||
- Copy all the files from the `amora` directory to the drive. The board will reload again.
|
||||
- Connect to the newly spawned network and go to [`192.168.4.1`](http://192.168.4.1)
|
||||
The project makes an attempt to provide a fully featured web IDE for building
|
||||
and deploying keystroke injection scripts.
|
||||
|
||||
## Video Guide
|
||||
## Getting Started
|
||||
|
||||
[](https://odysee.com/@lavafroth:d/amora_setup_guide:8)
|
||||
|
||||
## Acknowledgement
|
||||
|
||||
A huge thank you to David Bailey (dbisu, @dbisu) for his pico-ducky project which has served as the foundation for the HID side of this project. Although, the old code is almost nonexistent due to intensive refactoring.
|
||||
Check out the [wiki](https://github.com/lavafroth/pwnpi-amora/wiki/Quick-Start) for getting started.
|
||||
|
||||
45
build.py
Normal file
45
build.py
Normal file
@@ -0,0 +1,45 @@
|
||||
import glob
|
||||
from subprocess import PIPE, Popen
|
||||
from os import listdir, makedirs
|
||||
from os.path import join, splitext
|
||||
from shutil import copytree, copy, rmtree
|
||||
from errno import ENOTDIR
|
||||
SRC = 'src'
|
||||
DST = 'build'
|
||||
|
||||
|
||||
def cp(src, dst):
|
||||
try:
|
||||
copytree(src, dst)
|
||||
except OSError as exc:
|
||||
if exc.errno == ENOTDIR:
|
||||
copy(src, dst)
|
||||
else:
|
||||
raise
|
||||
|
||||
|
||||
def to_compile(s: str):
|
||||
name, ext = splitext(s)
|
||||
if name not in ("code", "boot") and ext == ".py":
|
||||
return name
|
||||
return None
|
||||
|
||||
|
||||
rmtree(DST, ignore_errors=True)
|
||||
makedirs(DST, exist_ok=True)
|
||||
mpy_cross_bin = join(".", glob.glob("mpy-cross.static*")[0])
|
||||
|
||||
for entry in listdir(SRC):
|
||||
src_path = join(SRC, entry)
|
||||
if name := to_compile(entry):
|
||||
Popen([
|
||||
mpy_cross_bin,
|
||||
"-o",
|
||||
join(DST, f'{name}.mpy'),
|
||||
src_path,
|
||||
],
|
||||
stdout=PIPE,
|
||||
).communicate()
|
||||
else:
|
||||
dst_path = join(DST, entry)
|
||||
cp(src_path, dst_path)
|
||||
37
src/api.py
Normal file
37
src/api.py
Normal file
@@ -0,0 +1,37 @@
|
||||
import logs
|
||||
import os
|
||||
import json
|
||||
from ducky import run_script_file, run_script
|
||||
|
||||
|
||||
def create(path, contents=b""):
|
||||
with open(path, "wb") as h:
|
||||
h.write(contents)
|
||||
|
||||
|
||||
def handle(body, response):
|
||||
action = body["action"]
|
||||
if action == "list":
|
||||
response.send(json.dumps(os.listdir("payloads")))
|
||||
return
|
||||
|
||||
if action == "logs":
|
||||
response.send(logs.consume())
|
||||
return
|
||||
|
||||
filename = body.get("filename")
|
||||
path = f"payloads/{filename}"
|
||||
if action == "load":
|
||||
with open(path) as h:
|
||||
response.send(json.dumps({"contents": h.read()}))
|
||||
elif action == "store":
|
||||
create(path, body["contents"].encode())
|
||||
elif action == "delete":
|
||||
os.remove(path)
|
||||
elif action == "create":
|
||||
create(path)
|
||||
elif action == "run":
|
||||
if filename is not None:
|
||||
run_script_file(path)
|
||||
elif contents := body["contents"]:
|
||||
run_script(contents)
|
||||
@@ -4,12 +4,12 @@ import socketpool
|
||||
import asyncio
|
||||
import os
|
||||
import json
|
||||
from api import handle
|
||||
from adafruit_httpserver.server import HTTPServer
|
||||
from adafruit_httpserver.request import HTTPRequest
|
||||
from adafruit_httpserver.response import HTTPResponse
|
||||
from adafruit_httpserver.methods import HTTPMethod
|
||||
from adafruit_httpserver.mime_type import MIMEType
|
||||
from ducky import run_script_file
|
||||
|
||||
|
||||
async def main():
|
||||
@@ -34,24 +34,8 @@ async def main():
|
||||
|
||||
@server.route("/api", HTTPMethod.POST)
|
||||
def api(request: HTTPRequest):
|
||||
body = json.loads(request.body)
|
||||
action = body["action"]
|
||||
with HTTPResponse(request, content_type=MIMEType.TYPE_JSON) as response:
|
||||
if action == "list":
|
||||
response.send(json.dumps(os.listdir("payloads")))
|
||||
elif action == "load":
|
||||
with open("payloads/" + body["filename"]) as h:
|
||||
response.send(json.dumps({"contents": h.read()}))
|
||||
elif action == "store":
|
||||
with open(f"payloads/" + body["filename"], "wb") as h:
|
||||
h.write(body["contents"].encode())
|
||||
elif action == "delete":
|
||||
os.remove("payloads/" + body["filename"])
|
||||
elif action == "create":
|
||||
with open(f"payloads/" + body["filename"], "wb") as h:
|
||||
h.write(b"")
|
||||
elif action == "run":
|
||||
run_script_file(f"payloads/" + body["filename"])
|
||||
handle(json.loads(request.body), response)
|
||||
|
||||
server.start(str(wifi.radio.ipv4_address_ap))
|
||||
while True:
|
||||
@@ -12,7 +12,7 @@ from adafruit_hid.keycode import Keycode
|
||||
|
||||
import time
|
||||
from board import LED
|
||||
import asyncio
|
||||
from logs import info, warn
|
||||
|
||||
kbd = Keyboard(usb_hid.devices)
|
||||
layout = KeyboardLayout(kbd)
|
||||
@@ -26,14 +26,22 @@ def prefix_checker(line: str):
|
||||
def checker(*prefixes):
|
||||
for prefix in prefixes:
|
||||
if line.startswith(prefix):
|
||||
return line[len(prefix) + 1 :]
|
||||
return line[len(prefix) + 1:]
|
||||
|
||||
return checker
|
||||
|
||||
|
||||
# TODO: send this to the logs pane of the web interface
|
||||
def log(message: str):
|
||||
print("[log]: " + message)
|
||||
def press_keys(line: str):
|
||||
# loop on each key filtering empty values
|
||||
for key in filter(None, line.split(" ")):
|
||||
key = key.upper()
|
||||
if command_keycode := Keycode.__dict__.get(key):
|
||||
# If this is a valid key, send its keycode
|
||||
kbd.press(command_keycode)
|
||||
continue
|
||||
# If it's not a known key name, log it for diagnosis
|
||||
warn(f"unknown key: <{key}>")
|
||||
kbd.release_all()
|
||||
|
||||
|
||||
def run_script(contents):
|
||||
@@ -58,26 +66,17 @@ def run_script(contents):
|
||||
millis = default_delay
|
||||
delay(millis)
|
||||
elif message := after("PRINT"):
|
||||
log(message)
|
||||
info(message)
|
||||
elif path := after("IMPORT"):
|
||||
run_script_file(path)
|
||||
elif millis := after("DEFAULT_DELAY", "DEFAULTDELAY"):
|
||||
default_delay = int(millis) * 10
|
||||
elif after("LED") is not None:
|
||||
led.value ^= True
|
||||
LED.value ^= True
|
||||
elif string := after("STRING"):
|
||||
layout.write(string)
|
||||
else:
|
||||
# loop on each key filtering empty values
|
||||
for key in filter(None, line.split(" ")):
|
||||
key = key.upper()
|
||||
if command_keycode := Keycode.__dict__.get(key):
|
||||
# If this is a valid key, send its keycode
|
||||
kbd.press(command_keycode)
|
||||
continue
|
||||
# If it's not a known key name, log it for diagnosis
|
||||
log(f"unknown key: <{key}>")
|
||||
kbd.release_all()
|
||||
press_keys(line)
|
||||
|
||||
previous_line = line
|
||||
delay(default_delay)
|
||||
@@ -88,4 +87,4 @@ def run_script_file(path: str):
|
||||
with open(path, "r", encoding="utf-8") as handle:
|
||||
run_script(handle.read())
|
||||
except OSError as e:
|
||||
log(f"unable to open file {path}: {e}")
|
||||
warn(f"unable to open file {path}: {e}")
|
||||
16
src/logs.py
Normal file
16
src/logs.py
Normal file
@@ -0,0 +1,16 @@
|
||||
import json
|
||||
logs = []
|
||||
|
||||
|
||||
def consume() -> str:
|
||||
dump = json.dumps(logs)
|
||||
logs.clear()
|
||||
return dump
|
||||
|
||||
|
||||
def info(message: str):
|
||||
logs.append("info: " + message)
|
||||
|
||||
|
||||
def warn(message: str):
|
||||
logs.append("warning: " + message)
|
||||
@@ -23,7 +23,7 @@
|
||||
<textarea class="editor"></textarea>
|
||||
</div>
|
||||
</div>
|
||||
<div class="footer"></div>
|
||||
<div class="logs"></div>
|
||||
</body>
|
||||
|
||||
<script src="script.js"></script>
|
||||
@@ -32,17 +32,19 @@ body {
|
||||
padding: 1rem;
|
||||
}
|
||||
|
||||
.footer {
|
||||
.logs {
|
||||
height: calc(30vh - 2rem);
|
||||
background: #000;
|
||||
padding: 1rem;
|
||||
overflow: scroll;
|
||||
scroll-behavior: smooth;
|
||||
}
|
||||
|
||||
.files {
|
||||
background: #111;
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
width: 30vh;
|
||||
width: 25vw;
|
||||
overflow: scroll;
|
||||
}
|
||||
|
||||
@@ -93,6 +95,7 @@ body {
|
||||
.editor {
|
||||
background: #222;
|
||||
border: none;
|
||||
resize: none;
|
||||
min-width: max-content;
|
||||
height: calc(70vh - 3rem);
|
||||
padding: 0.5rem;
|
||||
@@ -104,7 +107,7 @@ body {
|
||||
}
|
||||
|
||||
.show {
|
||||
width: 30vw;
|
||||
width: 25vw;
|
||||
transition: width 0.5s;
|
||||
}
|
||||
|
||||
@@ -122,4 +125,4 @@ body {
|
||||
|
||||
.editor:focus {
|
||||
outline: none;
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,7 @@ function g(html) {
|
||||
const waitTime = 500
|
||||
const files = document.querySelector(".files")
|
||||
const editor = document.querySelector('.editor')
|
||||
const logs = document.querySelector('.logs')
|
||||
const documents_icon = document.querySelector('.documents')
|
||||
const run_icon = document.querySelector('.run')
|
||||
const add_icon = document.querySelector('.add')
|
||||
@@ -29,6 +30,16 @@ editor.addEventListener('keyup', (_) => {
|
||||
}, waitTime);
|
||||
});
|
||||
|
||||
|
||||
function reload_logs() {
|
||||
doApi({'action':'logs'}).then(r => r.json()).then(body => {
|
||||
body.map(entry => {
|
||||
logs.innerText += entry + '\n'
|
||||
logs.scrollTo(0, logs.scrollHeight)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
function reload_listing() {
|
||||
doApi({ 'action': 'list' }).then(r => r.json()).then(b => {
|
||||
files.innerHTML = ''
|
||||
@@ -84,8 +95,11 @@ documents_icon.addEventListener('click', () => {
|
||||
run_icon.addEventListener('click', () => {
|
||||
if (title.value != "") {
|
||||
doApi({"action": "run", "filename": title.value})
|
||||
} else {
|
||||
doApi({"action": "run", "contents": editor.value})
|
||||
}
|
||||
})
|
||||
|
||||
reload_listing()
|
||||
setInterval(reload_listing, 2000)
|
||||
setInterval(reload_listing, 2000)
|
||||
setInterval(reload_logs, 2000)
|
||||
Reference in New Issue
Block a user