restructured code (cleaner) and also started with a docker file

This commit is contained in:
Andreas Fruhwirt 2025-04-05 19:15:20 +02:00
parent 4dd0d91de2
commit 3943a8bc7c
4 changed files with 214 additions and 240 deletions

10
Dockerfile Normal file
View File

@ -0,0 +1,10 @@
FROM python:3.10-alpine
WORKDIR /code
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
COPY . .
EXPOSE 8642
CMD ["python", "application.py"]

View File

@ -31,39 +31,37 @@ app.add_static_files("/static", "static")
app.add_middleware(SessionMiddleware, secret_key=os.environ["SESSION_KEY"]) app.add_middleware(SessionMiddleware, secret_key=os.environ["SESSION_KEY"])
security = HTTPBasic() security = HTTPBasic()
main_loop = asyncio.get_event_loop()
session_timeout = aiohttp.ClientTimeout(total=None,sock_connect=5,sock_read=5) class ClientData:
id = None
container = None
text_above = None
micbutton = None
markdown_field = None
audio_element = None
websocket = None
client_to_session_id = {} client_data = {}
session_to_websocket = {}
def handle_disconnect(client : Client): def handle_disconnect(client : Client):
global client_data
info(f"Client disconnected: {client.id}") info(f"Client disconnected: {client.id}")
if client.id in client_to_session_id: if client.id in client_data:
session_id = client_to_session_id[client.id] debug(f"Deleting client_id from client_data")
cnt = 0 del client_data[client.id]
for cid in client_to_session_id:
if client_to_session_id[cid] == session_id:
cnt = cnt + 1
if session_id in session_to_websocket and cnt <= 1:
info(f"Deleting session_id from session_to_websocket")
del session_to_websocket[session_id]
del client_to_session_id[client.id]
def start_recording(ui_elements): def start_recording(client_id):
if not refresh_ui_enabled(ui_elements): if hasattr(client_data[client_id], "websocket") and not client_data[client_id].websocket.generating and not client_data[client_id].websocket.stopped.is_set():
return ui.run_javascript('startRecording()')
ui.run_javascript('startRecording()')
def stop_recording(ui_elements): def stop_recording(client_id):
ui.run_javascript('stopRecording()') ui.run_javascript('stopRecording()')
def stop_playback(ui_elements): def stop_playback(client_id):
ui.run_javascript('stopPlayback()') ui.run_javascript('stopPlayback()')
enable_ui(ui_elements, 0) enable_ui(client_id, 0)
def start_playback(ui_elements): def start_playback(client_id):
ui.run_javascript('startPlayback()') ui.run_javascript('startPlayback()')
def show_audio_notification(): def show_audio_notification():
@ -74,71 +72,77 @@ def is_authorized(credentials: Annotated[HTTPBasicCredentials, Depends(security)
pw = os.environ["PASSWORD"] pw = os.environ["PASSWORD"]
return credentials.username == user and credentials.password == pw return credentials.username == user and credentials.password == pw
def refresh_ui_enabled(ui_elements): def refresh_ui_enabled(client_id):
with ui_elements["main"]: global client_data
if "websocket" in ui_elements and ui_elements["websocket"].enabled: with client_data[client_id].container:
enable_ui(ui_elements, 0) if client_data[client_id].websocket.generating:
return True enable_ui(client_id, 2)
if "websocket" in ui_elements and ui_elements["websocket"].generating: return
enable_ui(ui_elements, 2) if client_data[client_id].websocket.stopped.is_set():
return False enable_ui(client_id, 1)
enable_ui(ui_elements, 1) stop_recording(client_id)
stop_recording(ui_elements["id"]) return
return False enable_ui(client_id, 0)
return
def after_websocket_init(ui_elements, was_successful): def after_websocket_init(client_id):
if not was_successful: if hasattr(client_data[client_id], "websocket") and not client_data[client_id].websocket.stopped.is_set():
enable_ui(ui_elements, 1) enable_ui(client_id, 0)
else: else:
enable_ui(ui_elements, 0) enable_ui(client_id, 1)
def enable_ui(ui_elements, state = 0): def enable_ui(client_id, state = 0):
info(f"Enabled UI {ui_elements['id']}: {state}") info(f"Enabled UI {client_data[client_id]}: {state}")
if state == 0: if state == 0:
ui_elements["micbutton"].props(remove="disabled") client_data[client_id].micbutton.props(remove="disabled")
ui_elements["text"].set_text("Hold the button to record audio") client_data[client_id].text_above.set_text("Hold the button to record audio")
elif state == 1: elif state == 1:
ui_elements["micbutton"].props("disabled") client_data[client_id].micbutton.props("disabled")
ui_elements["text"].set_text("Server is offline :(") client_data[client_id].text_above.set_text("Server is offline :(")
elif state == 2: elif state == 2:
ui_elements["micbutton"].props("disabled") client_data[client_id].micbutton.props("disabled")
ui_elements["text"].set_text("Sending input data...") client_data[client_id].text_above.set_text("Sending input data...")
else: else:
error(f"Invalid state inside enable_ui: {state}") error(f"Invalid state inside enable_ui: {state}")
def after_end_audio(ui_elements, output): def after_end_audio(client_id, output):
debug("after_end_audio") debug("after_end_audio")
if output["status"] == "end_audio_failure": if output["status"] == "nok":
ui_elements["websocket"].generating = False client_data[client_id].websocket.generating = False
refresh_ui_enabled(ui_elements) refresh_ui_enabled(client_id)
with ui_elements["main"]: with client_data[client_id].container:
ui.notify("Luna didn\'t understand that correctly, please try again!", close_button='OK') ui.notify("Luna didn\'t understand that correctly, please try again!", close_button='OK')
elif output["status"] != "ok": elif output["status"] == "processing":
refresh_ui_enabled(ui_elements) client_data[client_id].websocket.generating = True
return refresh_ui_enabled(client_id)
elif output["status"] == "ok":
client_data[client_id].websocket.generating = False
refresh_ui_enabled(client_id)
@app.post('/api/v1/send') @app.post('/api/v1/send')
async def end_audio(credentials: Annotated[HTTPBasicCredentials, Depends(security)], request: Request): async def end_audio(credentials: Annotated[HTTPBasicCredentials, Depends(security)], request: Request):
if not is_authorized(credentials): if not is_authorized(credentials):
return Response({"status": "Unauthorized"}, status_code=401) return Response({"status": "Unauthorized"}, status_code=401)
try: try:
session_id = request.session["id"] client_id = request.session["client_id"]
if session_id not in session_to_websocket: if client_id not in client_data:
return {"status": "nok"} return {"status": "nok"}
websocket = session_to_websocket[session_id] websocket = client_data[client_id].websocket
with websocket.ui_elements["main"]: if websocket.generating:
if not refresh_ui_enabled(websocket.ui_elements): return {"status": "nok"}
return {"status": "nok"}
with client_data[client_id].container:
refresh_ui_enabled(client_id)
debug("Handle audio") debug("Handle audio")
content = await request.body() content = await request.body()
bytes = io.BytesIO(content) bytes = io.BytesIO(content)
await websocket.push_msg_to_queue({"method": "end_audio", "audio_content": bytes}, after_end_audio) await websocket.send_input_audio(bytes, after_end_audio, "end_audio")
with websocket.ui_elements["main"]: with client_data[client_id].container:
enable_ui(websocket.ui_elements, 2) enable_ui(client_id, 2)
stop_recording(session_id) stop_recording(client_id)
show_audio_notification() show_audio_notification()
return {"status": "ok"} return {"status": "ok"}
@ -146,46 +150,55 @@ async def end_audio(credentials: Annotated[HTTPBasicCredentials, Depends(securit
error(traceback.format_exc()) error(traceback.format_exc())
return {"status": "nok"} return {"status": "nok"}
def after_handle_audio(ui_elements, output): def after_handle_audio(client_id, output):
debug("after_handle_audio") debug("after_handle_audio")
if output["status"] != "ok": if output["status"] != "ok":
refresh_ui_enabled(ui_elements) client_data[client_id].websocket.stopped.set()
client_data[client_id].websocket.generating = False
refresh_ui_enabled(client_id)
return return
debug("Successfully handled audio") debug("Successfully handled audio")
def after_handle_upstream(ui_elements, response): def after_handle_upstream(client_id, response):
if "audio" in response: response_type = response["response"]
with ui_elements["main"]: debug("Retrieved upstream response with type " + str(response_type))
ui_elements["audio"].set_source(f"data:audio/webm;base64,{response['audio']}") if response_type == "audio":
start_playback(ui_elements["id"]) with client_data[client_id].container:
if "show" in response: client_data[client_id].audio_element.set_source(f"data:audio/webm;base64,{response['audio']}")
with ui_elements["main"]: start_playback(client_id)
ui_elements["markdown"].set_content(response["show"]) if response_type == "show":
ui_elements["markdown"].update() with client_data[client_id].container:
client_data[client_id].markdown_field.set_content(response["show"])
client_data[client_id].markdown_field.update()
pass pass
def after_upstream_failure(client_id):
refresh_ui_enabled(client_id)
@app.post('/api/v1/upload') @app.post('/api/v1/upload')
async def handle_audio(credentials: Annotated[HTTPBasicCredentials, Depends(security)], request: Request): async def handle_audio(credentials: Annotated[HTTPBasicCredentials, Depends(security)], request: Request):
global main_containers, main_loop global main_containers, main_loop
if not is_authorized(credentials): if not is_authorized(credentials):
return Response({"status": "Unauthorized"}, status_code=401) return Response({"status": "Unauthorized"}, status_code=401)
try: try:
debug(session_to_websocket) debug(client_data)
debug(client_to_session_id) client_id = request.session["client_id"]
session_id = request.session["id"] debug("client_id = " + client_id)
debug("session_id = " + session_id) if client_id not in client_data:
if session_id not in session_to_websocket:
return {"status": "nok"} return {"status": "nok"}
websocket = session_to_websocket[session_id]
with websocket.ui_elements["main"]: websocket = client_data[client_id].websocket
if not refresh_ui_enabled(websocket.ui_elements): if websocket.generating:
return {"status": "nok"} return {"status": "nok"}
with client_data[client_id].container:
refresh_ui_enabled(client_id)
debug("Handle audio") debug("Handle audio")
content = await request.body() content = await request.body()
bytes = io.BytesIO(content) bytes = io.BytesIO(content)
await websocket.push_msg_to_queue({"method": "audio", "audio_content": bytes}, after_handle_audio) await websocket.send_input_audio(bytes, after_handle_audio)
return {"status": "ok"} return {"status": "ok"}
except Exception as ex: except Exception as ex:
@ -196,49 +209,38 @@ async def handle_audio(credentials: Annotated[HTTPBasicCredentials, Depends(secu
@ui.page("/") @ui.page("/")
async def main_page(credentials: Annotated[HTTPBasicCredentials, Depends(security)], request: Request, client : Client) -> None: async def main_page(credentials: Annotated[HTTPBasicCredentials, Depends(security)], request: Request, client : Client) -> None:
global main_containers, mic_buttons, text_containers global client_data
if not is_authorized(credentials): if not is_authorized(credentials):
return Response("Get outta here män :(", status_code=401) return Response("Get outta here män :(", status_code=401)
session_id = request.session["id"]
ui.add_head_html(f"<script type='text/javascript' src='/static/record.js'></script>") ui.add_head_html(f"<script type='text/javascript' src='/static/record.js'></script>")
ui.add_head_html(f"<link type='text/tailwindcss' rel='stylesheet' href='/static/style.css'>") ui.add_head_html(f"<link type='text/tailwindcss' rel='stylesheet' href='/static/style.css'>")
container = ui.column().classes('w-full h-[calc(100vh-2rem)] items-center justify-center') container = ui.column().classes('w-full h-[calc(100vh-2rem)] items-center justify-center')
with container: with container:
ui_elements = {} cd = ClientData()
ui_elements["id"] = session_id cd.id = client.id
ui_elements["main"] = container cd.container = container
label = ui.label('Checking if the server is online...').classes('text-xl mb-20 whitespace-nowrap text-[3vw]') label = ui.label('Checking if the server is online...').classes('text-xl mb-20 whitespace-nowrap text-[3vw]')
ui_elements["text"] = label cd.text_above = label
with ui.element().classes('w-[50vw] max-h-[75%] aspect-square rounded-full bg-grey text-white flex items-center justify-center whitespace-nowrap text-[3vw] active:scale-95 transition-transform').props("id=recordbutton disabled") as button: with ui.element().classes('w-[50vw] max-h-[75%] aspect-square rounded-full bg-grey text-white flex items-center justify-center whitespace-nowrap text-[3vw] active:scale-95 transition-transform').props("id=recordbutton disabled") as button:
ui.image('/static/microphone.png').classes('h-[75%] w-[75%] object-contain') ui.image('/static/microphone.png').classes('h-[75%] w-[75%] object-contain')
button.on('mousedown', lambda: start_recording(ui_elements)) button.on('mousedown', lambda: start_recording(client.id))
button.on('mouseup', lambda: stop_recording(ui_elements)) button.on('mouseup', lambda: stop_recording(client.id))
button.on('touchstart', lambda: start_recording(ui_elements)) button.on('touchstart', lambda: start_recording(client.id))
button.on('touchend', lambda: stop_recording(ui_elements)) button.on('touchend', lambda: stop_recording(client.id))
ui_elements["micbutton"] = button cd.micbutton = button
ui_elements["markdown"] = ui.markdown().props("id=markdown") cd.markdown_field = ui.markdown().props("id=markdown")
audio = ui.audio(src="").props("id=audioout") audio = ui.audio(src="").props("id=audioout")
audio.on('ended', lambda: stop_playback(ui_elements)) audio.on('ended', lambda: stop_playback(client.id))
ui_elements["audio"] = audio cd.audio_element = audio
if (session_id not in session_to_websocket) or (session_id in session_to_websocket and session_to_websocket[session_id].enabled is False): cd.websocket = WebSocketClient(client.id, after_handle_upstream, after_upstream_failure)
if session_id in session_to_websocket and session_to_websocket[session_id].enabled is False: cd.websocket.start(after_websocket_init)
del session_to_websocket[session_id] debug("assigned client_data = " + client.id)
ui_elements["websocket"] = WebSocketClient(ui_elements, after_handle_upstream)
ui_elements["websocket"].start_thread(after_websocket_init)
debug("assigned client_to_session_id = " + client.id + ", " + session_id)
client_to_session_id[client.id] = session_id
session_to_websocket[session_id] = ui_elements["websocket"]
else:
if client.id in client_to_session_id:
del client_to_session_id[client.id]
client_to_session_id[client.id] = session_id
ui_elements["websocket"] = session_to_websocket[session_id]
session_to_websocket[session_id].ui_elements = ui_elements
refresh_ui_enabled(ui_elements)
request.session["client_id"] = client.id
client_data[client.id] = cd
ui.context.client.on_disconnect(handle_disconnect) ui.context.client.on_disconnect(handle_disconnect)
ui.run(show=False, port=8642) ui.run(show=False, port=8642)

10
docker-compose.yaml Normal file
View File

@ -0,0 +1,10 @@
version: "3"
services:
luna-frontend:
build: .
container_name: lunafrontend
hostname: lunafrontend
ports:
- 127.0.0.1:8642:8642
restart: unless-stopped

View File

@ -15,159 +15,107 @@ from uuid import uuid4
class WebSocketClient(): class WebSocketClient():
def __init__(self, ui_elements, after_handle_upstream): def __init__(self, client_id, after_handle_upstream, failure_handler):
ui_elements["websocket"] = self self._client_id = client_id
self.ui_elements = ui_elements
self._session_id = ui_elements["id"]
self.REMOTE_URL = os.environ["REMOTE_URL"] self.REMOTE_URL = os.environ["REMOTE_URL"]
self.enabled = False
self.generating = False self.generating = False
self._message_queue = Queue()
self._handlers = {} self._callbacks = {}
self._callbacks_lock = asyncio.Lock()
self._audio_source = None self._audio_source = None
self._audio_source_lock = asyncio.Lock()
self._request_id = None
self._after_handle_upstream = after_handle_upstream self._after_handle_upstream = after_handle_upstream
self.main_thread = None self._failure_handler = failure_handler
self.stopped = asyncio.Event()
def __del__(self): def __del__(self):
if hasattr(self, "_websocket"): if hasattr(self, "_websocket"):
self._websocket.close() asyncio.create_task(self._websocket.close())
def info(self, msg): def info(self, msg):
logging.info("THREAD:" + self._session_id + ":" + msg) logging.info("THREAD:" + self._client_id + ":" + msg)
def warning(self, msg): def warning(self, msg):
logging.warning("THREAD:" + self._session_id + ":" + msg) logging.warning("THREAD:" + self._client_id + ":" + msg)
def error(self, msg): def error(self, msg):
logging.error("THREAD:" + self._session_id + ":" + msg) logging.error("THREAD:" + self._client_id + ":" + msg)
def debug(self, msg): def debug(self, msg):
logging.debug("THREAD:" + self._session_id + ":" + msg) logging.debug("THREAD:" + self._client_id + ":" + msg)
async def push_msg_to_queue(self, message, receive_handler):
self._loop.call_soon_threadsafe(self._message_queue.put_nowait, {"msg": message, "handler": receive_handler})
self.debug("Submitted in Queue")
async def send_to_sync_websocket(self, payload): async def send_to_sync_websocket(self, payload):
self.debug(f"Session_ID {self._session_id}: {self._websocket}") self.debug(f"CLIENT_ID {self._client_id}: {self._websocket}")
await self._websocket.send(json.dumps(payload)) await self._websocket.send(json.dumps(payload))
self.debug(f"Sent message!") self.debug(f"Sent message!")
async def handle_input_audio(self, audio_content, id, method = "audio"): async def send_input_audio(self, audio_content, callback, method = "audio"):
self.debug("Called handle_input_audio") self.debug("Called handle_input_audio")
await self.send_to_sync_websocket({"method": method, "id": id, "session_id": self._session_id, "audio": b64encode(audio_content.read()).decode()}) request_id = str(uuid4())
async with self._callbacks_lock:
self._callbacks[request_id] = callback
await self.send_to_sync_websocket({"method": method, "request_id": request_id, "audio": b64encode(audio_content.read()).decode()})
def handle_input(self, response): async def handle_llm_response(self, response):
self.debug(f"Received {response} from Upstream!") if "audio" in response:
if "id" in response: async with self._audio_source_lock:
id = response["id"] if self._audio_source is None:
if id in self._handlers: self._audio_source = b64decode(response["audio"])
self._handlers[id](self.ui_elements, response)
return True
self.error(f"Upstream provided ID {id} that does not exist as a handler!")
return True
return False
def handle_upstream_request(self, request):
self.info(f"Received upstream request!")
if request["method"] == "audio":
if self._audio_source is None:
self._audio_source = b64decode(request["audio"])
else:
self._audio_source += b64decode(request["audio"])
if "last" in request:
self.generating = False
self._after_handle_upstream(self.ui_elements, {"audio": b64encode(self._audio_source).decode()})
self._audio_source = None
elif request["method"] == "show":
self._after_handle_upstream(self.ui_elements, {"show": request["show"]})
async def input_loop(self):
try:
self.debug("Started Handle_Input!")
while True:
response = json.loads(await self._websocket.recv())
if not self.handle_input(response):
# special upstream message not handled
self.handle_upstream_request(response)
except ConnectionClosedError as ex:
self.warning("Socket closed!")
except InvalidMessage as ex:
self.warning("Invalid message from upstream!")
except Exception as ex:
self.error("Error :( ")
self.error(traceback.format_exc())
self.enabled = False
async def output_loop(self):
try:
self.debug("Started Handle_Output!")
while True:
item = await self._message_queue.get()
handler_func = item["handler"]
item = item["msg"]
id = str(uuid4())
self._handlers[id] = handler_func
self.debug(f"Handle output {item}")
self.debug(str(self._handlers))
if item["method"] == "audio" or item["method"] == "end_audio":
await self.handle_input_audio(item["audio_content"], id, item["method"])
if item["method"] == "end_audio":
self.generating = True
else: else:
self.error("Wrong method in Queue!!") self._audio_source += b64decode(response["audio"])
break if "last" in response:
self._message_queue.task_done() self.generating = False
except ConnectionClosedError as ex: response["audio"] = b64encode(self._audio_source).decode()
self.warning("Socket closed!") self._after_handle_upstream(self._client_id, response)
except InvalidMessage as ex: self._audio_source = None
self.warning("Invalid message from upstream!") elif "show" in response:
except Exception as ex: self._after_handle_upstream(self._client_id, response)
self.error("Error :( ")
self.error(traceback.format_exc())
self.enabled = False
def main_loop(self, after_init_func): async def handle_upstream_response(self, request_id, response):
self._loop = asyncio.new_event_loop() self.info(f"Received upstream response!")
asyncio.set_event_loop(self._loop) if request_id not in self._callbacks:
self.error("Upstream gave an incorrect request id :/")
self.stopped.set()
return
self._callbacks[request_id](self._client_id, response)
if response["status"] != "processing":
del self._callbacks[request_id]
async def handle_loop(): async def handle_input(self, after_init_func):
checked = await self.check_health() if not await self.check_health(after_init_func):
if not checked: return
self.enabled = False while not self.stopped.is_set():
after_init_func(self.ui_elements, False)
return
response = json.loads(await self._websocket.recv())
if response["status"] != "ok":
self.enabled = False
after_init_func(self.ui_elements, False)
return
self.enabled = True
after_init_func(self.ui_elements, True)
future1 = asyncio.ensure_future(self.input_loop())
future2 = asyncio.ensure_future(self.output_loop())
try: try:
await asyncio.gather(future1, future2) response = json.loads(await self._websocket.recv())
finally: if "request_id" not in response:
future1.cancel() await self.handle_llm_response(response)
future2.cancel() else:
request_id = response["request_id"]
await self.handle_upstream_response(request_id, response)
except Exception as ex:
self.error("Error:")
self.error(traceback.format_exc())
self.stopped.set()
self.generating = False
self._failure_handler(self._client_id)
self._loop.run_until_complete(handle_loop()) async def check_health(self, after_init_func):
# start thread
def start_thread(self, after_init_func):
self.main_thread = threading.Thread(target=self.main_loop, args=(after_init_func,), daemon=True).start()
async def check_health(self):
try: try:
self.info("Checking if server is online and registering to server...") self.info("Checking if server is online and registering to server...")
self._websocket = await connect(self.REMOTE_URL, ping_timeout=60*15) self._websocket = await connect(self.REMOTE_URL)
await self.send_to_sync_websocket({"method": "health", "session_id": self._session_id}) await self.send_to_sync_websocket({"method": "health", "request_id": str(uuid4())})
return True response = json.loads(await self._websocket.recv())
if response["status"] != "running":
self.stopped.set()
await self._websocket.close()
self._websocket = None
after_init_func(self._client_id)
return not self.stopped.is_set()
except ConnectionRefusedError as ex: except ConnectionRefusedError as ex:
self.warning(self._session_id + ": Server offline :(") self.warning("Server offline :(")
except TimeoutError as ex: except TimeoutError as ex:
self.warning("Timed out during opening handshake!") self.warning("Timed out during opening handshake!")
except InvalidMessage as ex: except InvalidMessage as ex:
@ -175,5 +123,9 @@ class WebSocketClient():
except Exception as ex: except Exception as ex:
self.error("Error?") self.error("Error?")
self.error(traceback.format_exc()) self.error(traceback.format_exc())
self.enabled = False self.stopped.set()
after_init_func(self._client_id)
return False return False
def start(self, after_init_func):
asyncio.create_task(self.handle_input(after_init_func))