Refactors ducky.py, unrolls redundant function calls
This commit is contained in:
@@ -11,3 +11,9 @@ PwnPi Amora is a wireless keystroke injection tool built on the Raspberry Pi Pic
|
||||
- Modify the WiFi SSID and password in `amora/settings.toml`.
|
||||
- Copy all the files from the `amora` directory to the drive. The board will reload again.
|
||||
- Connect to the newly spawned network and go to [`192.168.4.1`](http://192.168.4.1)
|
||||
|
||||
## Acknowledgement
|
||||
|
||||
A huge thank you to David Bailey (dbisu, @dbisu) for his pico-ducky project.
|
||||
A lot of the HID code from his project was the foundation for this project.
|
||||
However, a lot of it has undergone so much of refactoring that the old code is almost nonexistent.
|
||||
@@ -9,29 +9,27 @@ from adafruit_httpserver.request import HTTPRequest
|
||||
from adafruit_httpserver.response import HTTPResponse
|
||||
from adafruit_httpserver.methods import HTTPMethod
|
||||
from adafruit_httpserver.mime_type import MIMEType
|
||||
from ducky import runScript
|
||||
from ducky import run_script_file
|
||||
|
||||
|
||||
async def main():
|
||||
wifi.radio.start_ap(
|
||||
ssid=os.getenv("SSID"), password=os.getenv("PASSWORD")
|
||||
)
|
||||
wifi.radio.start_ap(ssid=os.getenv("SSID"), password=os.getenv("PASSWORD"))
|
||||
pool = socketpool.SocketPool(wifi.radio)
|
||||
server = HTTPServer(pool)
|
||||
|
||||
@server.route("/")
|
||||
def base(request: HTTPRequest): # pylint: disable=unused-argument
|
||||
def base(_):
|
||||
with HTTPResponse(request, content_type=MIMEType.TYPE_HTML) as response:
|
||||
response.send_file("static/index.html")
|
||||
|
||||
@server.route("/main.css")
|
||||
def css(request: HTTPRequest):
|
||||
with HTTPResponse(request, content_type=MIMEType.TYPE_HTML) as response:
|
||||
def css(_):
|
||||
with HTTPResponse(request, content_type=MIMEType.TYPE_CSS) as response:
|
||||
response.send_file("static/main.css")
|
||||
|
||||
@server.route("/script.js")
|
||||
def js(request: HTTPRequest):
|
||||
with HTTPResponse(request, content_type=MIMEType.TYPE_HTML) as response:
|
||||
def js(_):
|
||||
with HTTPResponse(request, content_type=MIMEType.TYPE_JS) as response:
|
||||
response.send_file("static/script.js")
|
||||
|
||||
@server.route("/api", HTTPMethod.POST)
|
||||
@@ -53,7 +51,7 @@ async def main():
|
||||
with open(f"payloads/" + body["filename"], "wb") as h:
|
||||
h.write(b"")
|
||||
elif action == "run":
|
||||
runScript(f"payloads/" + body["filename"])
|
||||
run_script_file(f"payloads/" + body["filename"])
|
||||
|
||||
server.start(str(wifi.radio.ipv4_address_ap))
|
||||
while True:
|
||||
|
||||
210
amora/ducky.py
210
amora/ducky.py
@@ -1,7 +1,3 @@
|
||||
# License: GPLv3.0
|
||||
# copyright (c) 2023 Dave Bailey, Himadri Bhattacharjee
|
||||
# Authors: Dave Bailey (dbisu, @daveisu); Himadri Bhattacharjee (lavafroth, @lavafroth)
|
||||
|
||||
import usb_hid
|
||||
from adafruit_hid.keyboard import Keyboard
|
||||
|
||||
@@ -21,157 +17,75 @@ import asyncio
|
||||
kbd = Keyboard(usb_hid.devices)
|
||||
layout = KeyboardLayout(kbd)
|
||||
|
||||
defaultDelay = 0
|
||||
|
||||
duckyCommands = {
|
||||
"WINDOWS": Keycode.WINDOWS,
|
||||
"GUI": Keycode.GUI,
|
||||
"APP": Keycode.APPLICATION,
|
||||
"MENU": Keycode.APPLICATION,
|
||||
"SHIFT": Keycode.SHIFT,
|
||||
"ALT": Keycode.ALT,
|
||||
"CONTROL": Keycode.CONTROL,
|
||||
"CTRL": Keycode.CONTROL,
|
||||
"DOWNARROW": Keycode.DOWN_ARROW,
|
||||
"DOWN": Keycode.DOWN_ARROW,
|
||||
"LEFTARROW": Keycode.LEFT_ARROW,
|
||||
"LEFT": Keycode.LEFT_ARROW,
|
||||
"RIGHTARROW": Keycode.RIGHT_ARROW,
|
||||
"RIGHT": Keycode.RIGHT_ARROW,
|
||||
"UPARROW": Keycode.UP_ARROW,
|
||||
"UP": Keycode.UP_ARROW,
|
||||
"BREAK": Keycode.PAUSE,
|
||||
"PAUSE": Keycode.PAUSE,
|
||||
"CAPSLOCK": Keycode.CAPS_LOCK,
|
||||
"DELETE": Keycode.DELETE,
|
||||
"END": Keycode.END,
|
||||
"ESC": Keycode.ESCAPE,
|
||||
"ESCAPE": Keycode.ESCAPE,
|
||||
"HOME": Keycode.HOME,
|
||||
"INSERT": Keycode.INSERT,
|
||||
"NUMLOCK": Keycode.KEYPAD_NUMLOCK,
|
||||
"PAGEUP": Keycode.PAGE_UP,
|
||||
"PAGEDOWN": Keycode.PAGE_DOWN,
|
||||
"PRINTSCREEN": Keycode.PRINT_SCREEN,
|
||||
"ENTER": Keycode.ENTER,
|
||||
"SCROLLLOCK": Keycode.SCROLL_LOCK,
|
||||
"SPACE": Keycode.SPACE,
|
||||
"TAB": Keycode.TAB,
|
||||
"BACKSPACE": Keycode.BACKSPACE,
|
||||
"A": Keycode.A,
|
||||
"B": Keycode.B,
|
||||
"C": Keycode.C,
|
||||
"D": Keycode.D,
|
||||
"E": Keycode.E,
|
||||
"F": Keycode.F,
|
||||
"G": Keycode.G,
|
||||
"H": Keycode.H,
|
||||
"I": Keycode.I,
|
||||
"J": Keycode.J,
|
||||
"K": Keycode.K,
|
||||
"L": Keycode.L,
|
||||
"M": Keycode.M,
|
||||
"N": Keycode.N,
|
||||
"O": Keycode.O,
|
||||
"P": Keycode.P,
|
||||
"Q": Keycode.Q,
|
||||
"R": Keycode.R,
|
||||
"S": Keycode.S,
|
||||
"T": Keycode.T,
|
||||
"U": Keycode.U,
|
||||
"V": Keycode.V,
|
||||
"W": Keycode.W,
|
||||
"X": Keycode.X,
|
||||
"Y": Keycode.Y,
|
||||
"Z": Keycode.Z,
|
||||
"F1": Keycode.F1,
|
||||
"F2": Keycode.F2,
|
||||
"F3": Keycode.F3,
|
||||
"F4": Keycode.F4,
|
||||
"F5": Keycode.F5,
|
||||
"F6": Keycode.F6,
|
||||
"F7": Keycode.F7,
|
||||
"F8": Keycode.F8,
|
||||
"F9": Keycode.F9,
|
||||
"F10": Keycode.F10,
|
||||
"F11": Keycode.F11,
|
||||
"F12": Keycode.F12,
|
||||
}
|
||||
def delay(millis):
|
||||
time.sleep(float(millis) / 1000)
|
||||
|
||||
|
||||
def convertLine(line):
|
||||
newline = []
|
||||
# loop on each key - the filter removes empty values
|
||||
for key in filter(None, line.split(" ")):
|
||||
key = key.upper()
|
||||
# find the keycode for the command in the list
|
||||
command_keycode = duckyCommands.get(key, None)
|
||||
if command_keycode is not None:
|
||||
# if it exists in the list, use it
|
||||
newline.append(command_keycode)
|
||||
elif hasattr(Keycode, key):
|
||||
# if it's in the Keycode module, use it (allows any valid keycode)
|
||||
newline.append(getattr(Keycode, key))
|
||||
else:
|
||||
# if it's not a known key name, show the error for diagnosis
|
||||
print(f"Unknown key: <{key}>")
|
||||
# print(newline)
|
||||
return newline
|
||||
def prefix_checker(line: str):
|
||||
def checker(*prefixes):
|
||||
for prefix in prefixes:
|
||||
if line.startswith(prefix):
|
||||
return line[len(prefix) + 1 :]
|
||||
|
||||
return checker
|
||||
|
||||
|
||||
def runScriptLine(line):
|
||||
if (
|
||||
parse_line_header(["REM"], line, lambda _: _)
|
||||
or parse_line_header(["DELAY"], line, lambda x: time.sleep(float(x) / 1000))
|
||||
or parse_line_header(["PRINT"], line, lambda x: print("[log]: " + x))
|
||||
or parse_line_header(["IMPORT"], line, runScript)
|
||||
or parse_line_header(["DEFAULT_DELAY", "DEFAULTDELAY"], line, set_delay)
|
||||
or parse_line_header(["LED"], line, invert_led)
|
||||
or parse_line_header(["STRING"], line, layout.write)
|
||||
):
|
||||
return
|
||||
|
||||
for k in convertLine(line):
|
||||
kbd.press(k)
|
||||
kbd.release_all()
|
||||
# TODO: send this to the logs pane of the web interface
|
||||
def log(message: str):
|
||||
print("[log]: " + message)
|
||||
|
||||
|
||||
def parse_line_header(headers, line, func):
|
||||
for header in headers:
|
||||
if line.startswith(header):
|
||||
func(line[len(header) + 1 :])
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def set_delay(delay):
|
||||
global defaultDelay
|
||||
defaultDelay = int(delay) * 10
|
||||
|
||||
|
||||
def invert_led(*args):
|
||||
led.value ^= True
|
||||
|
||||
|
||||
def runScript(duckyScriptPath: str):
|
||||
global defaultDelay
|
||||
|
||||
try:
|
||||
with open(duckyScriptPath, "r", encoding="utf-8") as handle:
|
||||
contents = handle.read()
|
||||
except OSError as e:
|
||||
print(f"Unable to open file {duckyScriptPath}: {e}")
|
||||
return
|
||||
|
||||
def run_script(contents):
|
||||
default_delay = 0
|
||||
previous_line = None
|
||||
for line in map(lambda x: x.rstrip(), filter(None, contents.splitlines())):
|
||||
if line[0:6] == "REPEAT":
|
||||
for line in filter(None, contents.splitlines()):
|
||||
line = line.rstrip()
|
||||
after = prefix_checker(line)
|
||||
# we only run a command once by default
|
||||
run_n_times = 1
|
||||
if repeat := after("REPEAT"):
|
||||
if not previous_line:
|
||||
continue
|
||||
for _ in range(int(line[7:])):
|
||||
# repeat the last command
|
||||
runScriptLine(previous_line)
|
||||
else:
|
||||
runScriptLine(line)
|
||||
previous_line = line
|
||||
time.sleep(float(defaultDelay) / 1000)
|
||||
run_n_times = int(repeat)
|
||||
line = previous_line
|
||||
|
||||
for _ in range(run_n_times):
|
||||
if after("REM"):
|
||||
continue
|
||||
if (millis := after("DELAY")) is not None:
|
||||
if millis == "":
|
||||
millis = default_delay
|
||||
delay(millis)
|
||||
elif message := after("PRINT"):
|
||||
log(message)
|
||||
elif path := after("IMPORT"):
|
||||
run_script_file(path)
|
||||
elif millis := after("DEFAULT_DELAY", "DEFAULTDELAY"):
|
||||
default_delay = int(millis) * 10
|
||||
elif after("LED") is not None:
|
||||
led.value ^= True
|
||||
elif string := after("STRING"):
|
||||
layout.write(string)
|
||||
else:
|
||||
# loop on each key filtering empty values
|
||||
for key in filter(None, line.split(" ")):
|
||||
key = key.upper()
|
||||
if command_keycode := Keycode.__dict__.get(key):
|
||||
# If this is a valid key, send its keycode
|
||||
kbd.press(command_keycode)
|
||||
continue
|
||||
# If it's not a known key name, log it for diagnosis
|
||||
log(f"unknown key: <{key}>")
|
||||
kbd.release_all()
|
||||
|
||||
previous_line = line
|
||||
delay(default_delay)
|
||||
|
||||
|
||||
def run_script_file(path: str):
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8") as handle:
|
||||
run_script(handle.read())
|
||||
except OSError as e:
|
||||
log(f"unable to open file {path}: {e}")
|
||||
|
||||
Reference in New Issue
Block a user