 e5343fdc9a
			
		
	
	
		e5343fdc9a
		
			
		
	
	
	
	
		
			
			* Scripts: wifi updater * WiFi board updater: lint, process download error * WiFi board updater: auto cleanup temp dir * Scripts: fix server address
		
			
				
	
	
		
			241 lines
		
	
	
		
			7.7 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			241 lines
		
	
	
		
			7.7 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python3
 | |
| 
 | |
| from flipper.app import App
 | |
| from serial.tools.list_ports_common import ListPortInfo
 | |
| 
 | |
| import logging
 | |
| import os
 | |
| import tempfile
 | |
| import subprocess
 | |
| import serial.tools.list_ports as list_ports
 | |
| import json
 | |
| import requests
 | |
| import tarfile
 | |
| 
 | |
| 
 | |
| class UpdateDownloader:
 | |
|     UPDATE_SERVER = "https://update.flipperzero.one"
 | |
|     UPDATE_PROJECT = "/blackmagic-firmware"
 | |
|     UPDATE_INDEX = UPDATE_SERVER + UPDATE_PROJECT + "/directory.json"
 | |
|     UPDATE_TYPE = "full_tgz"
 | |
| 
 | |
|     CHANNEL_ID_ALIAS = {
 | |
|         "dev": "development",
 | |
|         "rc": "release-candidate",
 | |
|         "r": "release",
 | |
|         "rel": "release",
 | |
|     }
 | |
| 
 | |
|     def __init__(self):
 | |
|         self.logger = logging.getLogger()
 | |
| 
 | |
|     def download(self, channel_id: str, dir: str) -> bool:
 | |
|         # Aliases
 | |
|         if channel_id in self.CHANNEL_ID_ALIAS:
 | |
|             channel_id = self.CHANNEL_ID_ALIAS[channel_id]
 | |
| 
 | |
|         # Make directory
 | |
|         if not os.path.exists(dir):
 | |
|             self.logger.info(f"Creating directory {dir}")
 | |
|             os.makedirs(dir)
 | |
| 
 | |
|         # Download json index
 | |
|         self.logger.info(f"Downloading {self.UPDATE_INDEX}")
 | |
|         response = requests.get(self.UPDATE_INDEX)
 | |
|         if response.status_code != 200:
 | |
|             self.logger.error(f"Failed to download {self.UPDATE_INDEX}")
 | |
|             return False
 | |
| 
 | |
|         # Parse json index
 | |
|         try:
 | |
|             index = json.loads(response.content)
 | |
|         except Exception as e:
 | |
|             self.logger.error(f"Failed to parse json index: {e}")
 | |
|             return False
 | |
| 
 | |
|         # Find channel
 | |
|         channel = None
 | |
|         for channel_candidate in index["channels"]:
 | |
|             if channel_candidate["id"] == channel_id:
 | |
|                 channel = channel_candidate
 | |
|                 break
 | |
| 
 | |
|         # Check if channel found
 | |
|         if channel is None:
 | |
|             self.logger.error(
 | |
|                 f"Channel '{channel_id}' not found. Valid channels: {', '.join([c['id'] for c in index['channels']])}"
 | |
|             )
 | |
|             return False
 | |
| 
 | |
|         self.logger.info(f"Using channel '{channel_id}'")
 | |
| 
 | |
|         # Get latest version
 | |
|         try:
 | |
|             version = channel["versions"][0]
 | |
|         except Exception as e:
 | |
|             self.logger.error(f"Failed to get version: {e}")
 | |
|             return False
 | |
| 
 | |
|         self.logger.info(f"Using version '{version['version']}'")
 | |
| 
 | |
|         # Get changelog
 | |
|         changelog = None
 | |
|         try:
 | |
|             changelog = version["changelog"]
 | |
|         except Exception as e:
 | |
|             self.logger.error(f"Failed to get changelog: {e}")
 | |
| 
 | |
|         # print changelog
 | |
|         if changelog is not None:
 | |
|             self.logger.info(f"Changelog:")
 | |
|             for line in changelog.split("\n"):
 | |
|                 if line.strip() == "":
 | |
|                     continue
 | |
|                 self.logger.info(f"  {line}")
 | |
| 
 | |
|         # Find file
 | |
|         file_url = None
 | |
|         for file_candidate in version["files"]:
 | |
|             if file_candidate["type"] == self.UPDATE_TYPE:
 | |
|                 file_url = file_candidate["url"]
 | |
|                 break
 | |
| 
 | |
|         if file_url is None:
 | |
|             self.logger.error(f"File not found")
 | |
|             return False
 | |
| 
 | |
|         # Make file path
 | |
|         file_name = file_url.split("/")[-1]
 | |
|         file_path = os.path.join(dir, file_name)
 | |
| 
 | |
|         # Download file
 | |
|         self.logger.info(f"Downloading {file_url} to {file_path}")
 | |
|         with open(file_path, "wb") as f:
 | |
|             response = requests.get(file_url)
 | |
|             f.write(response.content)
 | |
| 
 | |
|         # Unzip tgz
 | |
|         self.logger.info(f"Unzipping {file_path}")
 | |
|         with tarfile.open(file_path, "r") as tar:
 | |
|             tar.extractall(dir)
 | |
| 
 | |
|         return True
 | |
| 
 | |
| 
 | |
| class Main(App):
 | |
|     def init(self):
 | |
|         self.parser.add_argument("-p", "--port", help="CDC Port", default="auto")
 | |
|         self.parser.add_argument(
 | |
|             "-c", "--channel", help="Channel name", default="release"
 | |
|         )
 | |
|         self.parser.set_defaults(func=self.update)
 | |
| 
 | |
|         # logging
 | |
|         self.logger = logging.getLogger()
 | |
| 
 | |
|     def find_wifi_board(self) -> bool:
 | |
|         # idk why, but python thinks that list_ports.grep returns tuple[str, str, str]
 | |
|         blackmagics: list[ListPortInfo] = list(list_ports.grep("blackmagic"))  # type: ignore
 | |
|         daps: list[ListPortInfo] = list(list_ports.grep("CMSIS-DAP"))  # type: ignore
 | |
| 
 | |
|         return len(blackmagics) > 0 or len(daps) > 0
 | |
| 
 | |
|     def find_wifi_board_bootloader(self):
 | |
|         # idk why, but python thinks that list_ports.grep returns tuple[str, str, str]
 | |
|         ports: list[ListPortInfo] = list(list_ports.grep("ESP32-S2"))  # type: ignore
 | |
| 
 | |
|         if len(ports) == 0:
 | |
|             # Blackmagic probe serial port not found, will be handled later
 | |
|             pass
 | |
|         elif len(ports) > 1:
 | |
|             raise Exception("More than one WiFi board found")
 | |
|         else:
 | |
|             port = ports[0]
 | |
|             if os.name == "nt":
 | |
|                 port.device = f"\\\\.\\{port.device}"
 | |
|             return port.device
 | |
| 
 | |
|     def update(self):
 | |
|         try:
 | |
|             port = self.find_wifi_board_bootloader()
 | |
|         except Exception as e:
 | |
|             self.logger.error(f"{e}")
 | |
|             return 1
 | |
| 
 | |
|         if self.args.port != "auto":
 | |
|             port = self.args.port
 | |
| 
 | |
|             available_ports = [p[0] for p in list(list_ports.comports())]
 | |
|             if port not in available_ports:
 | |
|                 self.logger.error(f"Port {port} not found")
 | |
|                 return 1
 | |
| 
 | |
|         if port is None:
 | |
|             if self.find_wifi_board():
 | |
|                 self.logger.error("WiFi board found, but not in bootloader mode.")
 | |
|                 self.logger.info("Please hold down BOOT button and press RESET button")
 | |
|             else:
 | |
|                 self.logger.error("WiFi board not found")
 | |
|                 self.logger.info(
 | |
|                     "Please connect WiFi board to your computer, hold down BOOT button and press RESET button"
 | |
|                 )
 | |
|             return 1
 | |
| 
 | |
|         # get temporary dir
 | |
|         with tempfile.TemporaryDirectory() as temp_dir:
 | |
|             downloader = UpdateDownloader()
 | |
| 
 | |
|             # download latest channel update
 | |
|             try:
 | |
|                 if not downloader.download(self.args.channel, temp_dir):
 | |
|                     self.logger.error(f"Cannot download update")
 | |
|                     return 1
 | |
|             except Exception as e:
 | |
|                 self.logger.error(f"Cannot download update: {e}")
 | |
|                 return 1
 | |
| 
 | |
|             with open(os.path.join(temp_dir, "flash.command"), "r") as f:
 | |
|                 flash_command = f.read()
 | |
| 
 | |
|             flash_command = flash_command.replace("\n", "").replace("\r", "")
 | |
|             flash_command = flash_command.replace("(PORT)", port)
 | |
| 
 | |
|             # We can't reset the board after flashing via usb
 | |
|             flash_command = flash_command.replace(
 | |
|                 "--after hard_reset", "--after no_reset_stub"
 | |
|             )
 | |
| 
 | |
|             args = flash_command.split(" ")[0:]
 | |
|             args = list(filter(None, args))
 | |
| 
 | |
|             esptool_params = []
 | |
|             esptool_params.extend(args)
 | |
| 
 | |
|             self.logger.info(f'Running command: "{" ".join(args)}" in "{temp_dir}"')
 | |
| 
 | |
|             process = subprocess.Popen(
 | |
|                 esptool_params,
 | |
|                 stdout=subprocess.PIPE,
 | |
|                 stderr=subprocess.STDOUT,
 | |
|                 cwd=temp_dir,
 | |
|                 bufsize=1,
 | |
|                 universal_newlines=True,
 | |
|             )
 | |
| 
 | |
|             while process.poll() is None:
 | |
|                 if process.stdout is not None:
 | |
|                     for line in process.stdout:
 | |
|                         self.logger.debug(f"{line.strip()}")
 | |
| 
 | |
|         if process.returncode != 0:
 | |
|             self.logger.error(f"Failed to flash WiFi board")
 | |
|         else:
 | |
|             self.logger.info("WiFi board flashed successfully")
 | |
|             self.logger.info("Press RESET button on WiFi board to start it")
 | |
| 
 | |
|         return process.returncode
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     Main()()
 |