commit 29142e3b04a341e0bbcef14e801e05f07130452a Author: Andreas Fruhwirt Date: Wed Apr 2 20:02:05 2025 +0200 added frontend diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..112f331 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__ +.env diff --git a/application.py b/application.py new file mode 100644 index 0000000..a1fa6ba --- /dev/null +++ b/application.py @@ -0,0 +1,216 @@ +#!/usr/bin/python3 +from typing import Annotated +from fastapi import Request, Depends, FastAPI, UploadFile, Response +from fastapi.responses import PlainTextResponse +from fastapi.security import HTTPBasic, HTTPBasicCredentials +from starlette.middleware.sessions import SessionMiddleware +from nicegui import ui, app, run, Client +from concurrent.futures import ThreadPoolExecutor +from logging import info, warning, error, debug +from base64 import b64encode, b64decode +from dotenv import load_dotenv +from time import sleep +from asyncio import run_coroutine_threadsafe +from websocket_client import WebSocketClient + +import logging +import os +import io +import aiohttp +import asyncio +import json +import traceback + +load_dotenv() + +logging.basicConfig() +logging.getLogger().setLevel(logging.DEBUG) + +# Folder to store recorded audio +app.add_static_files("/static", "static") +app.add_middleware(SessionMiddleware, secret_key=os.environ["SESSION_KEY"]) + +security = HTTPBasic() +main_loop = asyncio.get_event_loop() + +session_timeout = aiohttp.ClientTimeout(total=None,sock_connect=5,sock_read=5) + +client_to_session_id = {} +session_to_websocket = {} + +def handle_disconnect(client : Client): + info(f"Client disconnected: {client.id}") + if client.id in client_to_session_id: + session_id = client_to_session_id[client.id] + if session_id in session_to_websocket: + del session_to_websocket[session_id] + del client_to_session_id[client.id] + +app.on_disconnect(handle_disconnect) + +def start_recording(ui_elements): + if not refresh_ui_enabled(ui_elements): + return + ui.run_javascript('startRecording()') + +def stop_recording(ui_elements): + ui.run_javascript('stopRecording()') + +def stop_playback(ui_elements): + ui.run_javascript('stopPlayback()') + enable_ui(ui_elements, 0) + +def start_playback(ui_elements): + ui.run_javascript('startPlayback()') + +def show_audio_notification(): + ui.notify(f"Saved recording! Waiting for response...") + +def is_authorized(credentials: Annotated[HTTPBasicCredentials, Depends(security)]): + user = os.environ["USERNAME"] + pw = os.environ["PASSWORD"] + return credentials.username == user and credentials.password == pw + +def refresh_ui_enabled(ui_elements): + with ui_elements["main"]: + if "websocket" in ui_elements and ui_elements["websocket"].enabled: + enable_ui(ui_elements, 0) + return True + if "websocket" in ui_elements and ui_elements["websocket"].generating: + enable_ui(ui_elements, 2) + return False + enable_ui(ui_elements, 1) + stop_recording(ui_elements["id"]) + return False + +def after_websocket_init(ui_elements, was_successful): + if not was_successful: + enable_ui(ui_elements, 1) + else: + enable_ui(ui_elements, 0) + +def enable_ui(ui_elements, state = 0): + info(f"Enabled UI {ui_elements['id']}: {state}") + if state == 0: + ui_elements["micbutton"].props(remove="disabled") + ui_elements["text"].set_text("Hold the button to record audio") + elif state == 1: + ui_elements["micbutton"].props("disabled") + ui_elements["text"].set_text("Server is offline :(") + elif state == 2: + ui_elements["micbutton"].props("disabled") + ui_elements["text"].set_text("Sending input data...") + else: + error(f"Invalid state inside enable_ui: {state}") + +def after_end_audio(ui_elements, output): + debug("after_end_audio") + if output["status"] != "ok": + refresh_ui_enabled(ui_elements) + return + +@app.post('/api/v1/send') +async def end_audio(credentials: Annotated[HTTPBasicCredentials, Depends(security)], request: Request): + if not is_authorized(credentials): + return Response({"status": "Unauthorized"}, status_code=401) + try: + session_id = request.session["id"] + if session_id not in session_to_websocket: + return {"status": "nok"} + websocket = session_to_websocket[session_id] + with websocket.ui_elements["main"]: + if not refresh_ui_enabled(websocket.ui_elements): + return {"status": "nok"} + + debug("Handle audio") + content = await request.body() + bytes = io.BytesIO(content) + + await websocket.push_msg_to_queue({"method": "end_audio", "audio_content": bytes}, after_end_audio) + + with websocket.ui_elements["main"]: + enable_ui(websocket.ui_elements, 2) + stop_recording(session_id) + show_audio_notification() + + return {"status": "ok"} + except Exception as ex: + error(traceback.format_exc()) + return {"status": "nok"} + +def after_handle_audio(ui_elements, output): + debug("after_handle_audio") + if output["status"] != "ok": + refresh_ui_enabled(ui_elements) + return + debug("Successfully handled audio") + +def after_handle_upstream(ui_elements, response): + if "audio" in response: + with ui_elements["main"]: + ui_elements["audio"].set_source(f"data:audio/webm;base64,{response['audio']}") + start_playback(ui_elements["id"]) + pass + +@app.post('/api/v1/upload') +async def handle_audio(credentials: Annotated[HTTPBasicCredentials, Depends(security)], request: Request): + global main_containers, main_loop + if not is_authorized(credentials): + return Response({"status": "Unauthorized"}, status_code=401) + try: + session_id = request.session["id"] + if session_id not in session_to_websocket: + return {"status": "nok"} + websocket = session_to_websocket[session_id] + with websocket.ui_elements["main"]: + if not refresh_ui_enabled(websocket.ui_elements): + return {"status": "nok"} + + debug("Handle audio") + content = await request.body() + bytes = io.BytesIO(content) + + await websocket.push_msg_to_queue({"method": "audio", "audio_content": bytes}, after_handle_audio) + + return {"status": "ok"} + except Exception as ex: + error(traceback.format_exc()) + return {"status": "nok"} + +## Initialize page + +@ui.page("/") +async def main_page(credentials: Annotated[HTTPBasicCredentials, Depends(security)], request: Request, client : Client) -> None: + global main_containers, mic_buttons, text_containers + if not is_authorized(credentials): + return Response("Get outta here män :(", status_code=401) + #await ui.context.client.connected() + session_id = request.session["id"] + ui.add_head_html(f"") + ui.add_head_html(f"") + + container = ui.column().classes('w-full h-[calc(100vh-2rem)] items-center justify-center') + + with container: + ui_elements = {} + ui_elements["id"] = session_id + ui_elements["main"] = container + label = ui.label('Checking if the server is online...').classes('text-xl mb-20 whitespace-nowrap text-[3vw]') + ui_elements["text"] = label + with ui.element().classes('w-[50vw] max-h-[75%] aspect-square rounded-full bg-grey text-white flex items-center justify-center whitespace-nowrap text-[3vw] active:scale-95 transition-transform').props("id=recordbutton disabled") as button: + ui.image('/static/microphone.png').classes('h-[75%] w-[75%] object-contain') + button.on('mousedown', lambda: start_recording(ui_elements)) + button.on('mouseup', lambda: stop_recording(ui_elements)) + button.on('touchstart', lambda: start_recording(ui_elements)) + button.on('touchend', lambda: stop_recording(ui_elements)) + ui_elements["micbutton"] = button + + audio = ui.audio(src="").classes("disabled opacity-0").props("id=audioout") + audio.on('ended', lambda: stop_playback(ui_elements)) + ui_elements["audio"] = audio + + ui_elements["websocket"] = WebSocketClient(ui_elements, after_handle_upstream) + ui_elements["websocket"].start_thread(after_websocket_init) + client_to_session_id[client.id] = session_id + session_to_websocket[session_id] = ui_elements["websocket"] +ui.run(show=False) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..43f9187 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +websockets +nicegui \ No newline at end of file diff --git a/static/microphone.png b/static/microphone.png new file mode 100644 index 0000000..3d69f0c Binary files /dev/null and b/static/microphone.png differ diff --git a/static/record.js b/static/record.js new file mode 100644 index 0000000..3b6097c --- /dev/null +++ b/static/record.js @@ -0,0 +1,121 @@ +let mediaRecorder; +let audioChunks = []; + +let audioContext; +let dataArray; +let animationId; +let audioOutSource; + +function startButtonPulse(analyser) { + const button = document.querySelector('#recordbutton'); + + function updateScale(analyser) { + if (!analyser) return; + + analyser.getByteTimeDomainData(dataArray); + + let sum = 0; + for (let i = 0; i < dataArray.length; i++) { + const value = dataArray[i] - 128; + sum += value * value; + } + const volume = Math.sqrt(sum / dataArray.length); + + const scale = 1 + Math.min(volume / 25, 0.4); // Scales from 1.0 to 1.2 + + button.style.transform = `scale(${scale})`; + + animationId = requestAnimationFrame(function() { + updateScale(analyser); + }); + } + + updateScale(analyser); +} + +function stopButtonPulse() { + cancelAnimationFrame(animationId); + const button = document.querySelector('#recordbutton'); + button.style.transform = 'scale(1)'; +} + +const startPlayback = async () => { + const audio_out = document.querySelector('#audioout'); + try { + console.log("Started playback!"); + + // Audio analysis + if (!audioOutSource) { + audioContext = new AudioContext(); + audioOutSource = audioContext.createMediaElementSource(audio_out); + analyser = audioContext.createAnalyser(); + analyser.fftSize = 2048; + dataArray = new Uint8Array(analyser.frequencyBinCount); + audioOutSource.connect(analyser); + } + + startButtonPulse(analyser); + + await audio_out.play(); + + } catch (err) { + console.error('Something happend oh no!', err); + alert('Some error happened during playback! :('); + } +} + +const stopPlayback = async () => { + const audio_out = document.querySelector('#audioout'); + if (audio_out) { + stopButtonPulse(); + } +} + +const startRecording = async () => { + try { + console.log("Started recording!"); + const stream = await navigator.mediaDevices.getUserMedia({ audio: true }); + mediaRecorder = new MediaRecorder(stream); + audioChunks = []; + + // Audio analysis + audioContext = new AudioContext(); + analyser = audioContext.createAnalyser(); + const source = audioContext.createMediaStreamSource(stream); + analyser.fftSize = 2048; + dataArray = new Uint8Array(analyser.frequencyBinCount); + source.connect(analyser); + + startButtonPulse(analyser); + + mediaRecorder.ondataavailable = async event => { + if (event.data.size > 0) { + audioChunks.push(event.data); + url = "/api/v1/upload"; + if (mediaRecorder.state === 'inactive') { + url = "/api/v1/send"; + } + fetch(url, { + method: 'POST', + body: event.data, + }); + } + }; + + mediaRecorder.onstop = async () => { + console.log("Stopped recording!"); + stopButtonPulse(); + }; + + mediaRecorder.start(1000); + } catch (err) { + console.error('Microphone access denied or error:', err); + alert('Please allow microphone access to record audio.'); + } +}; + +const stopRecording = () => { + if (mediaRecorder && mediaRecorder.state !== 'inactive') { + mediaRecorder.stop(); + } +}; \ No newline at end of file diff --git a/static/style.css b/static/style.css new file mode 100644 index 0000000..73b1fb4 --- /dev/null +++ b/static/style.css @@ -0,0 +1,7 @@ +.blue-box { + background-color: aqua; + border-radius: 50%; + padding: 15px 32px; + width: 50%; + height: 50%; +} \ No newline at end of file diff --git a/websocket_client.py b/websocket_client.py new file mode 100644 index 0000000..d2bcc31 --- /dev/null +++ b/websocket_client.py @@ -0,0 +1,177 @@ +from asyncio import Queue +from asyncio.exceptions import TimeoutError +import traceback +import websocket +import os +import json +import threading +import asyncio +import logging + +from base64 import b64encode, b64decode +from websockets import ConnectionClosedError +from websockets.exceptions import InvalidMessage +from websockets.asyncio.client import connect +from uuid import uuid4 + +class WebSocketClient(): + + def __init__(self, ui_elements, after_handle_upstream): + ui_elements["websocket"] = self + self.ui_elements = ui_elements + self._session_id = ui_elements["id"] + self.REMOTE_URL = os.environ["REMOTE_URL"] + self.enabled = False + self.generating = False + self._message_queue = Queue() + self._handlers = {} + self._audio_source = None + self._after_handle_upstream = after_handle_upstream + + def __del__(self): + if hasattr(self, "_websocket"): + self._websocket.close() + + def info(self, msg): + logging.info("THREAD:" + self._session_id + ":" + msg) + + def warning(self, msg): + logging.warning("THREAD:" + self._session_id + ":" + msg) + + def error(self, msg): + logging.error("THREAD:" + self._session_id + ":" + msg) + + def debug(self, msg): + logging.debug("THREAD:" + self._session_id + ":" + msg) + + async def push_msg_to_queue(self, message, receive_handler): + self._loop.call_soon_threadsafe(self._message_queue.put_nowait, {"msg": message, "handler": receive_handler}) + self.debug("Submitted in Queue") + + async def send_to_sync_websocket(self, payload): + self.debug(f"Session_ID {self._session_id}: {self._websocket}") + await self._websocket.send(json.dumps(payload)) + self.debug(f"Sent message!") + + async def handle_input_audio(self, audio_content, id, method = "audio"): + self.debug("Called handle_input_audio") + await self.send_to_sync_websocket({"method": method, "id": id, "session_id": self._session_id, "audio": b64encode(audio_content.read()).decode()}) + + def handle_input(self, response): + self.debug(f"Received {response} from Upstream!") + if "id" in response: + id = response["id"] + if id in self._handlers: + self._handlers[id](self.ui_elements, response) + return True + self.error(f"Upstream provided ID {id} that does not exist as a handler!") + return True + return False + + def handle_upstream_request(self, request): + self.info(f"Received upstream request!") + if request["method"] == "audio": + if self._audio_source is None: + self._audio_source = b64decode(request["audio"]) + else: + self._audio_source += b64decode(request["audio"]) + if "last" in request: + self.generating = False + self._after_handle_upstream(self.ui_elements, {"audio": b64encode(self._audio_source).decode()}) + self._audio_source = None + + async def input_loop(self): + try: + self.debug("Started Handle_Input!") + while True: + response = json.loads(await self._websocket.recv()) + if not self.handle_input(response): + # special upstream message not handled + self.handle_upstream_request(response) + except ConnectionClosedError as ex: + self.warning("Socket closed!") + except InvalidMessage as ex: + self.warning("Invalid message from upstream!") + except Exception as ex: + self.error("Error :( ") + self.error(traceback.format_exc()) + self.enabled = False + + async def output_loop(self): + try: + self.debug("Started Handle_Output!") + while True: + item = await self._message_queue.get() + handler_func = item["handler"] + item = item["msg"] + id = str(uuid4()) + self._handlers[id] = handler_func + self.debug(f"Handle output {item}") + self.debug(str(self._handlers)) + if item["method"] == "audio" or item["method"] == "end_audio": + await self.handle_input_audio(item["audio_content"], id, item["method"]) + if item["method"] == "end_audio": + self.generating = True + else: + self.error("Wrong method in Queue!!") + break + self._message_queue.task_done() + except ConnectionClosedError as ex: + self.warning("Socket closed!") + except InvalidMessage as ex: + self.warning("Invalid message from upstream!") + except Exception as ex: + self.error("Error :( ") + self.error(traceback.format_exc()) + self.enabled = False + + def main_loop(self, after_init_func): + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + + async def handle_loop(): + checked = await self.check_health() + if not checked: + self.enabled = False + after_init_func(self.ui_elements, False) + return + response = json.loads(await self._websocket.recv()) + if response["status"] != "ok": + self.enabled = False + after_init_func(self.ui_elements, False) + return + + self.enabled = True + after_init_func(self.ui_elements, True) + + future1 = asyncio.ensure_future(self.input_loop()) + future2 = asyncio.ensure_future(self.output_loop()) + try: + await asyncio.gather(future1, future2) + finally: + future1.cancel() + future2.cancel() + + self._loop.run_until_complete(handle_loop()) + + # start thread + def start_thread(self, after_init_func): + threading.Thread(target=self.main_loop, args=(after_init_func,), daemon=True).start() + + async def check_health(self): + try: + self.info("Checking if server is online and registering to server...") + self._websocket = await connect(self.REMOTE_URL, ping_timeout=60*15) + await self.send_to_sync_websocket({"method": "health", "session_id": self._session_id}) + return True + except ConnectionRefusedError as ex: + self.warning(self._session_id + ": Server offline :(") + except TimeoutError as ex: + self.warning("Timed out during opening handshake!") + except InvalidMessage as ex: + self.warning("Invalid message from upstream!") + except Exception as ex: + self.error("Error?") + self.error(traceback.format_exc()) + self.enabled = False + return False \ No newline at end of file