Scripts: WiFi board updater (#2625)
* Scripts: wifi updater * WiFi board updater: lint, process download error * WiFi board updater: auto cleanup temp dir * Scripts: fix server address
This commit is contained in:
		
							parent
							
								
									3226254876
								
							
						
					
					
						commit
						e5343fdc9a
					
				
							
								
								
									
										240
									
								
								scripts/wifi_board.py
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										240
									
								
								scripts/wifi_board.py
									
									
									
									
									
										Executable file
									
								
							| @ -0,0 +1,240 @@ | |||||||
|  | #!/usr/bin/env python3 | ||||||
|  | 
 | ||||||
|  | from flipper.app import App | ||||||
|  | from serial.tools.list_ports_common import ListPortInfo | ||||||
|  | 
 | ||||||
|  | import logging | ||||||
|  | import os | ||||||
|  | import tempfile | ||||||
|  | import subprocess | ||||||
|  | import serial.tools.list_ports as list_ports | ||||||
|  | import json | ||||||
|  | import requests | ||||||
|  | import tarfile | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | class UpdateDownloader: | ||||||
|  |     UPDATE_SERVER = "https://update.flipperzero.one" | ||||||
|  |     UPDATE_PROJECT = "/blackmagic-firmware" | ||||||
|  |     UPDATE_INDEX = UPDATE_SERVER + UPDATE_PROJECT + "/directory.json" | ||||||
|  |     UPDATE_TYPE = "full_tgz" | ||||||
|  | 
 | ||||||
|  |     CHANNEL_ID_ALIAS = { | ||||||
|  |         "dev": "development", | ||||||
|  |         "rc": "release-candidate", | ||||||
|  |         "r": "release", | ||||||
|  |         "rel": "release", | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     def __init__(self): | ||||||
|  |         self.logger = logging.getLogger() | ||||||
|  | 
 | ||||||
|  |     def download(self, channel_id: str, dir: str) -> bool: | ||||||
|  |         # Aliases | ||||||
|  |         if channel_id in self.CHANNEL_ID_ALIAS: | ||||||
|  |             channel_id = self.CHANNEL_ID_ALIAS[channel_id] | ||||||
|  | 
 | ||||||
|  |         # Make directory | ||||||
|  |         if not os.path.exists(dir): | ||||||
|  |             self.logger.info(f"Creating directory {dir}") | ||||||
|  |             os.makedirs(dir) | ||||||
|  | 
 | ||||||
|  |         # Download json index | ||||||
|  |         self.logger.info(f"Downloading {self.UPDATE_INDEX}") | ||||||
|  |         response = requests.get(self.UPDATE_INDEX) | ||||||
|  |         if response.status_code != 200: | ||||||
|  |             self.logger.error(f"Failed to download {self.UPDATE_INDEX}") | ||||||
|  |             return False | ||||||
|  | 
 | ||||||
|  |         # Parse json index | ||||||
|  |         try: | ||||||
|  |             index = json.loads(response.content) | ||||||
|  |         except Exception as e: | ||||||
|  |             self.logger.error(f"Failed to parse json index: {e}") | ||||||
|  |             return False | ||||||
|  | 
 | ||||||
|  |         # Find channel | ||||||
|  |         channel = None | ||||||
|  |         for channel_candidate in index["channels"]: | ||||||
|  |             if channel_candidate["id"] == channel_id: | ||||||
|  |                 channel = channel_candidate | ||||||
|  |                 break | ||||||
|  | 
 | ||||||
|  |         # Check if channel found | ||||||
|  |         if channel is None: | ||||||
|  |             self.logger.error( | ||||||
|  |                 f"Channel '{channel_id}' not found. Valid channels: {', '.join([c['id'] for c in index['channels']])}" | ||||||
|  |             ) | ||||||
|  |             return False | ||||||
|  | 
 | ||||||
|  |         self.logger.info(f"Using channel '{channel_id}'") | ||||||
|  | 
 | ||||||
|  |         # Get latest version | ||||||
|  |         try: | ||||||
|  |             version = channel["versions"][0] | ||||||
|  |         except Exception as e: | ||||||
|  |             self.logger.error(f"Failed to get version: {e}") | ||||||
|  |             return False | ||||||
|  | 
 | ||||||
|  |         self.logger.info(f"Using version '{version['version']}'") | ||||||
|  | 
 | ||||||
|  |         # Get changelog | ||||||
|  |         changelog = None | ||||||
|  |         try: | ||||||
|  |             changelog = version["changelog"] | ||||||
|  |         except Exception as e: | ||||||
|  |             self.logger.error(f"Failed to get changelog: {e}") | ||||||
|  | 
 | ||||||
|  |         # print changelog | ||||||
|  |         if changelog is not None: | ||||||
|  |             self.logger.info(f"Changelog:") | ||||||
|  |             for line in changelog.split("\n"): | ||||||
|  |                 if line.strip() == "": | ||||||
|  |                     continue | ||||||
|  |                 self.logger.info(f"  {line}") | ||||||
|  | 
 | ||||||
|  |         # Find file | ||||||
|  |         file_url = None | ||||||
|  |         for file_candidate in version["files"]: | ||||||
|  |             if file_candidate["type"] == self.UPDATE_TYPE: | ||||||
|  |                 file_url = file_candidate["url"] | ||||||
|  |                 break | ||||||
|  | 
 | ||||||
|  |         if file_url is None: | ||||||
|  |             self.logger.error(f"File not found") | ||||||
|  |             return False | ||||||
|  | 
 | ||||||
|  |         # Make file path | ||||||
|  |         file_name = file_url.split("/")[-1] | ||||||
|  |         file_path = os.path.join(dir, file_name) | ||||||
|  | 
 | ||||||
|  |         # Download file | ||||||
|  |         self.logger.info(f"Downloading {file_url} to {file_path}") | ||||||
|  |         with open(file_path, "wb") as f: | ||||||
|  |             response = requests.get(file_url) | ||||||
|  |             f.write(response.content) | ||||||
|  | 
 | ||||||
|  |         # Unzip tgz | ||||||
|  |         self.logger.info(f"Unzipping {file_path}") | ||||||
|  |         with tarfile.open(file_path, "r") as tar: | ||||||
|  |             tar.extractall(dir) | ||||||
|  | 
 | ||||||
|  |         return True | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | class Main(App): | ||||||
|  |     def init(self): | ||||||
|  |         self.parser.add_argument("-p", "--port", help="CDC Port", default="auto") | ||||||
|  |         self.parser.add_argument( | ||||||
|  |             "-c", "--channel", help="Channel name", default="release" | ||||||
|  |         ) | ||||||
|  |         self.parser.set_defaults(func=self.update) | ||||||
|  | 
 | ||||||
|  |         # logging | ||||||
|  |         self.logger = logging.getLogger() | ||||||
|  | 
 | ||||||
|  |     def find_wifi_board(self) -> bool: | ||||||
|  |         # idk why, but python thinks that list_ports.grep returns tuple[str, str, str] | ||||||
|  |         blackmagics: list[ListPortInfo] = list(list_ports.grep("blackmagic"))  # type: ignore | ||||||
|  |         daps: list[ListPortInfo] = list(list_ports.grep("CMSIS-DAP"))  # type: ignore | ||||||
|  | 
 | ||||||
|  |         return len(blackmagics) > 0 or len(daps) > 0 | ||||||
|  | 
 | ||||||
|  |     def find_wifi_board_bootloader(self): | ||||||
|  |         # idk why, but python thinks that list_ports.grep returns tuple[str, str, str] | ||||||
|  |         ports: list[ListPortInfo] = list(list_ports.grep("ESP32-S2"))  # type: ignore | ||||||
|  | 
 | ||||||
|  |         if len(ports) == 0: | ||||||
|  |             # Blackmagic probe serial port not found, will be handled later | ||||||
|  |             pass | ||||||
|  |         elif len(ports) > 1: | ||||||
|  |             raise Exception("More than one WiFi board found") | ||||||
|  |         else: | ||||||
|  |             port = ports[0] | ||||||
|  |             if os.name == "nt": | ||||||
|  |                 port.device = f"\\\\.\\{port.device}" | ||||||
|  |             return port.device | ||||||
|  | 
 | ||||||
|  |     def update(self): | ||||||
|  |         try: | ||||||
|  |             port = self.find_wifi_board_bootloader() | ||||||
|  |         except Exception as e: | ||||||
|  |             self.logger.error(f"{e}") | ||||||
|  |             return 1 | ||||||
|  | 
 | ||||||
|  |         if self.args.port != "auto": | ||||||
|  |             port = self.args.port | ||||||
|  | 
 | ||||||
|  |             available_ports = [p[0] for p in list(list_ports.comports())] | ||||||
|  |             if port not in available_ports: | ||||||
|  |                 self.logger.error(f"Port {port} not found") | ||||||
|  |                 return 1 | ||||||
|  | 
 | ||||||
|  |         if port is None: | ||||||
|  |             if self.find_wifi_board(): | ||||||
|  |                 self.logger.error("WiFi board found, but not in bootloader mode.") | ||||||
|  |                 self.logger.info("Please hold down BOOT button and press RESET button") | ||||||
|  |             else: | ||||||
|  |                 self.logger.error("WiFi board not found") | ||||||
|  |                 self.logger.info( | ||||||
|  |                     "Please connect WiFi board to your computer, hold down BOOT button and press RESET button" | ||||||
|  |                 ) | ||||||
|  |             return 1 | ||||||
|  | 
 | ||||||
|  |         # get temporary dir | ||||||
|  |         with tempfile.TemporaryDirectory() as temp_dir: | ||||||
|  |             downloader = UpdateDownloader() | ||||||
|  | 
 | ||||||
|  |             # download latest channel update | ||||||
|  |             try: | ||||||
|  |                 if not downloader.download(self.args.channel, temp_dir): | ||||||
|  |                     self.logger.error(f"Cannot download update") | ||||||
|  |                     return 1 | ||||||
|  |             except Exception as e: | ||||||
|  |                 self.logger.error(f"Cannot download update: {e}") | ||||||
|  |                 return 1 | ||||||
|  | 
 | ||||||
|  |             with open(os.path.join(temp_dir, "flash.command"), "r") as f: | ||||||
|  |                 flash_command = f.read() | ||||||
|  | 
 | ||||||
|  |             flash_command = flash_command.replace("\n", "").replace("\r", "") | ||||||
|  |             flash_command = flash_command.replace("(PORT)", port) | ||||||
|  | 
 | ||||||
|  |             # We can't reset the board after flashing via usb | ||||||
|  |             flash_command = flash_command.replace( | ||||||
|  |                 "--after hard_reset", "--after no_reset_stub" | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |             args = flash_command.split(" ")[0:] | ||||||
|  |             args = list(filter(None, args)) | ||||||
|  | 
 | ||||||
|  |             esptool_params = [] | ||||||
|  |             esptool_params.extend(args) | ||||||
|  | 
 | ||||||
|  |             self.logger.info(f'Running command: "{" ".join(args)}" in "{temp_dir}"') | ||||||
|  | 
 | ||||||
|  |             process = subprocess.Popen( | ||||||
|  |                 esptool_params, | ||||||
|  |                 stdout=subprocess.PIPE, | ||||||
|  |                 stderr=subprocess.STDOUT, | ||||||
|  |                 cwd=temp_dir, | ||||||
|  |                 bufsize=1, | ||||||
|  |                 universal_newlines=True, | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |             while process.poll() is None: | ||||||
|  |                 if process.stdout is not None: | ||||||
|  |                     for line in process.stdout: | ||||||
|  |                         self.logger.debug(f"{line.strip()}") | ||||||
|  | 
 | ||||||
|  |         if process.returncode != 0: | ||||||
|  |             self.logger.error(f"Failed to flash WiFi board") | ||||||
|  |         else: | ||||||
|  |             self.logger.info("WiFi board flashed successfully") | ||||||
|  |             self.logger.info("Press RESET button on WiFi board to start it") | ||||||
|  | 
 | ||||||
|  |         return process.returncode | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | if __name__ == "__main__": | ||||||
|  |     Main()() | ||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user
	 Sergey Gavrilov
						Sergey Gavrilov