OpenOCD scripts (#2101)
* Scripts: option bytes check * Scripts: option bytes set * Scripts: openocd config * Scripts: increased readability, process IPCCBR option byte * Scripts: split dap_ob.py * Updater: process IPCCBR option byte * Scripts: move chip-related functions to chip definition * Scripts: freeze CPU registers * Scripts: flash programming routine * ob.py * otp.py * otp: handle errors correctly * downgrade to python 3.9 * correct type hinting * Scripts: fix path to ob.data Co-authored-by: あく <alleteam@gmail.com>
This commit is contained in:
		
							parent
							
								
									ded7e727d0
								
							
						
					
					
						commit
						727f043747
					
				@ -488,7 +488,7 @@ static const FuriHalFlashObMapping furi_hal_flash_ob_reg_map[FURI_HAL_FLASH_OB_T
 | 
			
		||||
    OB_REG_DEF(FuriHalFlashObInvalid, (NULL)),
 | 
			
		||||
    OB_REG_DEF(FuriHalFlashObInvalid, (NULL)),
 | 
			
		||||
 | 
			
		||||
    OB_REG_DEF(FuriHalFlashObRegisterIPCCMail, (NULL)),
 | 
			
		||||
    OB_REG_DEF(FuriHalFlashObRegisterIPCCMail, (&FLASH->IPCCBR)),
 | 
			
		||||
    OB_REG_DEF(FuriHalFlashObRegisterSecureFlash, (NULL)),
 | 
			
		||||
    OB_REG_DEF(FuriHalFlashObRegisterC2Opts, (NULL)),
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										173
									
								
								scripts/flipper/utils/openocd.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										173
									
								
								scripts/flipper/utils/openocd.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,173 @@
 | 
			
		||||
import socket
 | 
			
		||||
import subprocess
 | 
			
		||||
import logging
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class OpenOCD:
 | 
			
		||||
    """OpenOCD cli wrapper"""
 | 
			
		||||
 | 
			
		||||
    COMMAND_TOKEN = "\x1a"
 | 
			
		||||
 | 
			
		||||
    def __init__(self, config: dict = {}) -> None:
 | 
			
		||||
        assert isinstance(config, dict)
 | 
			
		||||
 | 
			
		||||
        # Params base
 | 
			
		||||
        self.params = []
 | 
			
		||||
 | 
			
		||||
        self.gdb_port = 3333
 | 
			
		||||
        self.telnet_port = 4444
 | 
			
		||||
        self.tcl_port = 6666
 | 
			
		||||
 | 
			
		||||
        # Port
 | 
			
		||||
        if port_base := config.get("port_base", None):
 | 
			
		||||
            self.gdb_port = port_base
 | 
			
		||||
            self.tcl_port = port_base + 1
 | 
			
		||||
            self.telnet_port = port_base + 2
 | 
			
		||||
 | 
			
		||||
        self._add_command(f"gdb_port {self.gdb_port}")
 | 
			
		||||
        self._add_command(f"tcl_port {self.tcl_port}")
 | 
			
		||||
        self._add_command(f"telnet_port {self.telnet_port}")
 | 
			
		||||
 | 
			
		||||
        # Config files
 | 
			
		||||
 | 
			
		||||
        if interface := config.get("interface", None):
 | 
			
		||||
            pass
 | 
			
		||||
        else:
 | 
			
		||||
            interface = "interface/stlink.cfg"
 | 
			
		||||
 | 
			
		||||
        if target := config.get("target", None):
 | 
			
		||||
            pass
 | 
			
		||||
        else:
 | 
			
		||||
            target = "target/stm32wbx.cfg"
 | 
			
		||||
 | 
			
		||||
        self._add_file(interface)
 | 
			
		||||
        self._add_file(target)
 | 
			
		||||
 | 
			
		||||
        # Programmer settings
 | 
			
		||||
        if serial := config.get("serial", None):
 | 
			
		||||
            self._add_command(f"{serial}")
 | 
			
		||||
 | 
			
		||||
        # Other params
 | 
			
		||||
        if "params" in config:
 | 
			
		||||
            self.params += config["params"]
 | 
			
		||||
 | 
			
		||||
        # logging
 | 
			
		||||
        self.logger = logging.getLogger()
 | 
			
		||||
 | 
			
		||||
    def _add_command(self, command: str):
 | 
			
		||||
        self.params.append("-c")
 | 
			
		||||
        self.params.append(command)
 | 
			
		||||
 | 
			
		||||
    def _add_file(self, file: str):
 | 
			
		||||
        self.params.append("-f")
 | 
			
		||||
        self.params.append(file)
 | 
			
		||||
 | 
			
		||||
    def start(self, args: list[str] = []):
 | 
			
		||||
        """Start OpenOCD process"""
 | 
			
		||||
 | 
			
		||||
        params = ["openocd", *self.params, *args]
 | 
			
		||||
        self.logger.debug(f"_execute: {params}")
 | 
			
		||||
        self.process = subprocess.Popen(
 | 
			
		||||
            params, stderr=subprocess.PIPE, stdout=subprocess.PIPE
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        self._wait_for_openocd_tcl()
 | 
			
		||||
 | 
			
		||||
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | 
			
		||||
        self.socket.connect(("127.0.0.1", self.tcl_port))
 | 
			
		||||
 | 
			
		||||
    def _wait_for_openocd_tcl(self):
 | 
			
		||||
        """Wait for OpenOCD to start"""
 | 
			
		||||
        # TODO: timeout
 | 
			
		||||
        while True:
 | 
			
		||||
            stderr = self.process.stderr
 | 
			
		||||
            if not stderr:
 | 
			
		||||
                break
 | 
			
		||||
            line = stderr.readline()
 | 
			
		||||
            if not line:
 | 
			
		||||
                break
 | 
			
		||||
            line = line.decode("utf-8").strip()
 | 
			
		||||
            self.logger.debug(f"OpenOCD: {line}")
 | 
			
		||||
            if "Listening on port" in line and "for tcl connections" in line:
 | 
			
		||||
                break
 | 
			
		||||
 | 
			
		||||
    def stop(self):
 | 
			
		||||
        self.send_tcl("exit")
 | 
			
		||||
        self.send_tcl("shutdown")
 | 
			
		||||
        self.socket.close()
 | 
			
		||||
        try:
 | 
			
		||||
            self.process.wait(timeout=10)
 | 
			
		||||
        except subprocess.TimeoutExpired as e:
 | 
			
		||||
            self.process.kill()
 | 
			
		||||
            self.logger.error("Failed to stop OpenOCD")
 | 
			
		||||
            self.logger.exception(e)
 | 
			
		||||
            self.postmortem()
 | 
			
		||||
 | 
			
		||||
    def send_tcl(self, cmd) -> str:
 | 
			
		||||
        """Send a command string to TCL RPC. Return the result that was read."""
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            data = (cmd + OpenOCD.COMMAND_TOKEN).encode("utf-8")
 | 
			
		||||
            self.logger.debug(f"<- {data}")
 | 
			
		||||
 | 
			
		||||
            self.socket.send(data)
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            self.logger.error("Failed to send command to OpenOCD")
 | 
			
		||||
            self.logger.exception(e)
 | 
			
		||||
            self.postmortem()
 | 
			
		||||
            raise
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            data = self._recv()
 | 
			
		||||
            return data
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            self.logger.error("Failed to receive response from OpenOCD")
 | 
			
		||||
            self.logger.exception(e)
 | 
			
		||||
            self.postmortem()
 | 
			
		||||
            raise
 | 
			
		||||
 | 
			
		||||
    def _recv(self):
 | 
			
		||||
        """Read from the stream until the token (\x1a) was received."""
 | 
			
		||||
        # TODO: timeout
 | 
			
		||||
        data = bytes()
 | 
			
		||||
        while True:
 | 
			
		||||
            chunk = self.socket.recv(4096)
 | 
			
		||||
            data += chunk
 | 
			
		||||
            if bytes(OpenOCD.COMMAND_TOKEN, encoding="utf-8") in chunk:
 | 
			
		||||
                break
 | 
			
		||||
 | 
			
		||||
        self.logger.debug(f"-> {data}")
 | 
			
		||||
 | 
			
		||||
        data = data.decode("utf-8").strip()
 | 
			
		||||
        data = data[:-1]  # strip trailing \x1a
 | 
			
		||||
 | 
			
		||||
        return data
 | 
			
		||||
 | 
			
		||||
    def postmortem(self) -> None:
 | 
			
		||||
        """Postmortem analysis of the OpenOCD process"""
 | 
			
		||||
        stdout, stderr = self.process.communicate()
 | 
			
		||||
 | 
			
		||||
        log = self.logger.error
 | 
			
		||||
        if self.process.returncode == 0:
 | 
			
		||||
            log = self.logger.debug
 | 
			
		||||
            log("OpenOCD exited normally")
 | 
			
		||||
        else:
 | 
			
		||||
            log("OpenOCD exited with error")
 | 
			
		||||
 | 
			
		||||
        log(f"Exit code: {self.process.returncode}")
 | 
			
		||||
        for line in stdout.decode("utf-8").splitlines():
 | 
			
		||||
            log(f"Stdout: {line}")
 | 
			
		||||
 | 
			
		||||
        for line in stderr.decode("utf-8").splitlines():
 | 
			
		||||
            log(f"Stderr: {line}")
 | 
			
		||||
 | 
			
		||||
    def read_32(self, addr: int) -> int:
 | 
			
		||||
        """Read 32-bit value from memory"""
 | 
			
		||||
        data = self.send_tcl(f"mdw {addr}").strip()
 | 
			
		||||
        data = data.split(": ")[-1]
 | 
			
		||||
        data = int(data, 16)
 | 
			
		||||
        return data
 | 
			
		||||
 | 
			
		||||
    def write_32(self, addr: int, value: int) -> None:
 | 
			
		||||
        """Write 32-bit value to memory"""
 | 
			
		||||
        self.send_tcl(f"mww {addr} {value}")
 | 
			
		||||
							
								
								
									
										31
									
								
								scripts/flipper/utils/programmer.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										31
									
								
								scripts/flipper/utils/programmer.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,31 @@
 | 
			
		||||
from abc import ABC, abstractmethod
 | 
			
		||||
from enum import Enum
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Programmer(ABC):
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    class RunMode(Enum):
 | 
			
		||||
        Run = "run"
 | 
			
		||||
        Stop = "stop"
 | 
			
		||||
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def reset(self, mode: RunMode = RunMode.Run) -> bool:
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def flash(self, address: int, file_path: str, verify: bool = True) -> bool:
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def option_bytes_validate(self, file_path: str) -> bool:
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def option_bytes_set(self, file_path: str) -> bool:
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def otp_write(self, address: int, file_path: str) -> bool:
 | 
			
		||||
        pass
 | 
			
		||||
							
								
								
									
										281
									
								
								scripts/flipper/utils/programmer_openocd.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										281
									
								
								scripts/flipper/utils/programmer_openocd.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,281 @@
 | 
			
		||||
import logging
 | 
			
		||||
import os
 | 
			
		||||
import typing
 | 
			
		||||
 | 
			
		||||
from flipper.utils.programmer import Programmer
 | 
			
		||||
from flipper.utils.openocd import OpenOCD
 | 
			
		||||
from flipper.utils.stm32wb55 import STM32WB55
 | 
			
		||||
from flipper.assets.obdata import OptionBytesData
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class OpenOCDProgrammer(Programmer):
 | 
			
		||||
    def __init__(
 | 
			
		||||
        self,
 | 
			
		||||
        interface: str = "interface/cmsis-dap.cfg",
 | 
			
		||||
        port_base: typing.Union[int, None] = None,
 | 
			
		||||
        serial: typing.Union[str, None] = None,
 | 
			
		||||
    ):
 | 
			
		||||
        super().__init__()
 | 
			
		||||
 | 
			
		||||
        config = {}
 | 
			
		||||
 | 
			
		||||
        config["interface"] = interface
 | 
			
		||||
        config["target"] = "target/stm32wbx.cfg"
 | 
			
		||||
 | 
			
		||||
        if not serial is None:
 | 
			
		||||
            if interface == "interface/cmsis-dap.cfg":
 | 
			
		||||
                config["serial"] = f"cmsis_dap_serial {serial}"
 | 
			
		||||
            elif "stlink" in interface:
 | 
			
		||||
                config["serial"] = f"stlink_serial {serial}"
 | 
			
		||||
 | 
			
		||||
        if not port_base is None:
 | 
			
		||||
            config["port_base"] = port_base
 | 
			
		||||
 | 
			
		||||
        self.openocd = OpenOCD(config)
 | 
			
		||||
        self.logger = logging.getLogger()
 | 
			
		||||
 | 
			
		||||
    def reset(self, mode: Programmer.RunMode = Programmer.RunMode.Run) -> bool:
 | 
			
		||||
        stm32 = STM32WB55()
 | 
			
		||||
        if mode == Programmer.RunMode.Run:
 | 
			
		||||
            stm32.reset(self.openocd, stm32.RunMode.Run)
 | 
			
		||||
        elif mode == Programmer.RunMode.Stop:
 | 
			
		||||
            stm32.reset(self.openocd, stm32.RunMode.Init)
 | 
			
		||||
        else:
 | 
			
		||||
            raise Exception("Unknown mode")
 | 
			
		||||
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    def flash(self, address: int, file_path: str, verify: bool = True) -> bool:
 | 
			
		||||
        if not os.path.exists(file_path):
 | 
			
		||||
            raise Exception(f"File {file_path} not found")
 | 
			
		||||
 | 
			
		||||
        self.openocd.start()
 | 
			
		||||
        self.openocd.send_tcl(f"init")
 | 
			
		||||
        self.openocd.send_tcl(
 | 
			
		||||
            f"program {file_path} 0x{address:08x}{' verify' if verify else ''} reset exit"
 | 
			
		||||
        )
 | 
			
		||||
        self.openocd.stop()
 | 
			
		||||
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    def _ob_print_diff_table(self, ob_reference: bytes, ob_read: bytes, print_fn):
 | 
			
		||||
        print_fn(
 | 
			
		||||
            f'{"Reference": <20} {"Device": <20} {"Diff Reference": <20} {"Diff Device": <20}'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # Split into 8 byte, word + word
 | 
			
		||||
        for i in range(0, len(ob_reference), 8):
 | 
			
		||||
            ref = ob_reference[i : i + 8]
 | 
			
		||||
            read = ob_read[i : i + 8]
 | 
			
		||||
 | 
			
		||||
            diff_str1 = ""
 | 
			
		||||
            diff_str2 = ""
 | 
			
		||||
            for j in range(0, len(ref.hex()), 2):
 | 
			
		||||
                byte_str_1 = ref.hex()[j : j + 2]
 | 
			
		||||
                byte_str_2 = read.hex()[j : j + 2]
 | 
			
		||||
 | 
			
		||||
                if byte_str_1 == byte_str_2:
 | 
			
		||||
                    diff_str1 += "__"
 | 
			
		||||
                    diff_str2 += "__"
 | 
			
		||||
                else:
 | 
			
		||||
                    diff_str1 += byte_str_1
 | 
			
		||||
                    diff_str2 += byte_str_2
 | 
			
		||||
 | 
			
		||||
            print_fn(
 | 
			
		||||
                f"{ref.hex(): <20} {read.hex(): <20} {diff_str1: <20} {diff_str2: <20}"
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    def option_bytes_validate(self, file_path: str) -> bool:
 | 
			
		||||
        # Registers
 | 
			
		||||
        stm32 = STM32WB55()
 | 
			
		||||
 | 
			
		||||
        # OpenOCD
 | 
			
		||||
        self.openocd.start()
 | 
			
		||||
        stm32.reset(self.openocd, stm32.RunMode.Init)
 | 
			
		||||
 | 
			
		||||
        # Generate Option Bytes data
 | 
			
		||||
        ob_data = OptionBytesData(file_path)
 | 
			
		||||
        ob_values = ob_data.gen_values().export()
 | 
			
		||||
        ob_reference = ob_values.reference
 | 
			
		||||
        ob_compare_mask = ob_values.compare_mask
 | 
			
		||||
        ob_length = len(ob_reference)
 | 
			
		||||
        ob_words = int(ob_length / 4)
 | 
			
		||||
 | 
			
		||||
        # Read Option Bytes
 | 
			
		||||
        ob_read = bytes()
 | 
			
		||||
        for i in range(ob_words):
 | 
			
		||||
            addr = stm32.OPTION_BYTE_BASE + i * 4
 | 
			
		||||
            value = self.openocd.read_32(addr)
 | 
			
		||||
            ob_read += value.to_bytes(4, "little")
 | 
			
		||||
 | 
			
		||||
        # Compare Option Bytes with reference by mask
 | 
			
		||||
        ob_compare = bytes()
 | 
			
		||||
        for i in range(ob_length):
 | 
			
		||||
            ob_compare += bytes([ob_read[i] & ob_compare_mask[i]])
 | 
			
		||||
 | 
			
		||||
        # Compare Option Bytes
 | 
			
		||||
        return_code = False
 | 
			
		||||
 | 
			
		||||
        if ob_reference == ob_compare:
 | 
			
		||||
            self.logger.info("Option Bytes are valid")
 | 
			
		||||
            return_code = True
 | 
			
		||||
        else:
 | 
			
		||||
            self.logger.error("Option Bytes are invalid")
 | 
			
		||||
            self._ob_print_diff_table(ob_reference, ob_compare, self.logger.error)
 | 
			
		||||
 | 
			
		||||
        # Stop OpenOCD
 | 
			
		||||
        stm32.reset(self.openocd, stm32.RunMode.Run)
 | 
			
		||||
        self.openocd.stop()
 | 
			
		||||
 | 
			
		||||
        return return_code
 | 
			
		||||
 | 
			
		||||
    def _unpack_u32(self, data: bytes, offset: int):
 | 
			
		||||
        return int.from_bytes(data[offset : offset + 4], "little")
 | 
			
		||||
 | 
			
		||||
    def option_bytes_set(self, file_path: str) -> bool:
 | 
			
		||||
        # Registers
 | 
			
		||||
        stm32 = STM32WB55()
 | 
			
		||||
 | 
			
		||||
        # OpenOCD
 | 
			
		||||
        self.openocd.start()
 | 
			
		||||
        stm32.reset(self.openocd, stm32.RunMode.Init)
 | 
			
		||||
 | 
			
		||||
        # Generate Option Bytes data
 | 
			
		||||
        ob_data = OptionBytesData(file_path)
 | 
			
		||||
        ob_values = ob_data.gen_values().export()
 | 
			
		||||
        ob_reference_bytes = ob_values.reference
 | 
			
		||||
        ob_compare_mask_bytes = ob_values.compare_mask
 | 
			
		||||
        ob_write_mask_bytes = ob_values.write_mask
 | 
			
		||||
        ob_length = len(ob_reference_bytes)
 | 
			
		||||
        ob_dwords = int(ob_length / 8)
 | 
			
		||||
 | 
			
		||||
        # Clear flash errors
 | 
			
		||||
        stm32.clear_flash_errors(self.openocd)
 | 
			
		||||
 | 
			
		||||
        # Unlock Flash and Option Bytes
 | 
			
		||||
        stm32.flash_unlock(self.openocd)
 | 
			
		||||
        stm32.option_bytes_unlock(self.openocd)
 | 
			
		||||
 | 
			
		||||
        ob_need_to_apply = False
 | 
			
		||||
 | 
			
		||||
        for i in range(ob_dwords):
 | 
			
		||||
            device_addr = stm32.OPTION_BYTE_BASE + i * 8
 | 
			
		||||
            device_value = self.openocd.read_32(device_addr)
 | 
			
		||||
            ob_write_mask = self._unpack_u32(ob_write_mask_bytes, i * 8)
 | 
			
		||||
            ob_compare_mask = self._unpack_u32(ob_compare_mask_bytes, i * 8)
 | 
			
		||||
            ob_value_ref = self._unpack_u32(ob_reference_bytes, i * 8)
 | 
			
		||||
            ob_value_masked = device_value & ob_compare_mask
 | 
			
		||||
 | 
			
		||||
            need_patch = ((ob_value_masked ^ ob_value_ref) & ob_write_mask) != 0
 | 
			
		||||
            if need_patch:
 | 
			
		||||
                ob_need_to_apply = True
 | 
			
		||||
 | 
			
		||||
                self.logger.info(
 | 
			
		||||
                    f"Need to patch: {device_addr:08X}: {ob_value_masked:08X} != {ob_value_ref:08X}, REG[{i}]"
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
                # Check if this option byte (dword) is mapped to a register
 | 
			
		||||
                device_reg_addr = stm32.option_bytes_id_to_address(i)
 | 
			
		||||
 | 
			
		||||
                # Construct new value for the OB register
 | 
			
		||||
                ob_value = device_value & (~ob_write_mask)
 | 
			
		||||
                ob_value |= ob_value_ref & ob_write_mask
 | 
			
		||||
 | 
			
		||||
                self.logger.info(f"Writing {ob_value:08X} to {device_reg_addr:08X}")
 | 
			
		||||
                self.openocd.write_32(device_reg_addr, ob_value)
 | 
			
		||||
 | 
			
		||||
        if ob_need_to_apply:
 | 
			
		||||
            stm32.option_bytes_apply(self.openocd)
 | 
			
		||||
        else:
 | 
			
		||||
            self.logger.info(f"Option Bytes are already correct")
 | 
			
		||||
 | 
			
		||||
        # Load Option Bytes
 | 
			
		||||
        # That will reset and also lock the Option Bytes and the Flash
 | 
			
		||||
        stm32.option_bytes_load(self.openocd)
 | 
			
		||||
 | 
			
		||||
        # Stop OpenOCD
 | 
			
		||||
        stm32.reset(self.openocd, stm32.RunMode.Run)
 | 
			
		||||
        self.openocd.stop()
 | 
			
		||||
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    def otp_write(self, address: int, file_path: str) -> bool:
 | 
			
		||||
        # Open file, check that it aligned to 8 bytes
 | 
			
		||||
        with open(file_path, "rb") as f:
 | 
			
		||||
            data = f.read()
 | 
			
		||||
            if len(data) % 8 != 0:
 | 
			
		||||
                self.logger.error(f"File {file_path} is not aligned to 8 bytes")
 | 
			
		||||
                return False
 | 
			
		||||
 | 
			
		||||
        # Check that address is aligned to 8 bytes
 | 
			
		||||
        if address % 8 != 0:
 | 
			
		||||
            self.logger.error(f"Address {address} is not aligned to 8 bytes")
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
        # Get size of data
 | 
			
		||||
        data_size = len(data)
 | 
			
		||||
 | 
			
		||||
        # Check that data size is aligned to 8 bytes
 | 
			
		||||
        if data_size % 8 != 0:
 | 
			
		||||
            self.logger.error(f"Data size {data_size} is not aligned to 8 bytes")
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
        self.logger.debug(f"Writing {data_size} bytes to OTP at {address:08X}")
 | 
			
		||||
        self.logger.debug(f"Data: {data.hex().upper()}")
 | 
			
		||||
 | 
			
		||||
        # Start OpenOCD
 | 
			
		||||
        oocd = self.openocd
 | 
			
		||||
        oocd.start()
 | 
			
		||||
 | 
			
		||||
        # Registers
 | 
			
		||||
        stm32 = STM32WB55()
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            # Check that OTP is empty for the given address
 | 
			
		||||
            # Also check that data is already written
 | 
			
		||||
            already_written = True
 | 
			
		||||
            for i in range(0, data_size, 4):
 | 
			
		||||
                file_word = int.from_bytes(data[i : i + 4], "little")
 | 
			
		||||
                device_word = oocd.read_32(address + i)
 | 
			
		||||
                if device_word != 0xFFFFFFFF and device_word != file_word:
 | 
			
		||||
                    self.logger.error(
 | 
			
		||||
                        f"OTP memory at {address + i:08X} is not empty: {device_word:08X}"
 | 
			
		||||
                    )
 | 
			
		||||
                    raise Exception("OTP memory is not empty")
 | 
			
		||||
 | 
			
		||||
                if device_word != file_word:
 | 
			
		||||
                    already_written = False
 | 
			
		||||
 | 
			
		||||
            if already_written:
 | 
			
		||||
                self.logger.info(f"OTP memory is already written with the given data")
 | 
			
		||||
                return True
 | 
			
		||||
 | 
			
		||||
            self.reset(self.RunMode.Stop)
 | 
			
		||||
            stm32.clear_flash_errors(oocd)
 | 
			
		||||
 | 
			
		||||
            # Write OTP memory by 8 bytes
 | 
			
		||||
            for i in range(0, data_size, 8):
 | 
			
		||||
                word_1 = int.from_bytes(data[i : i + 4], "little")
 | 
			
		||||
                word_2 = int.from_bytes(data[i + 4 : i + 8], "little")
 | 
			
		||||
                self.logger.debug(
 | 
			
		||||
                    f"Writing {word_1:08X} {word_2:08X} to {address + i:08X}"
 | 
			
		||||
                )
 | 
			
		||||
                stm32.write_flash_64(oocd, address + i, word_1, word_2)
 | 
			
		||||
 | 
			
		||||
            # Validate OTP memory
 | 
			
		||||
            validation_result = True
 | 
			
		||||
 | 
			
		||||
            for i in range(0, data_size, 4):
 | 
			
		||||
                file_word = int.from_bytes(data[i : i + 4], "little")
 | 
			
		||||
                device_word = oocd.read_32(address + i)
 | 
			
		||||
                if file_word != device_word:
 | 
			
		||||
                    self.logger.error(
 | 
			
		||||
                        f"Validation failed: {file_word:08X} != {device_word:08X} at {address + i:08X}"
 | 
			
		||||
                    )
 | 
			
		||||
                    validation_result = False
 | 
			
		||||
        finally:
 | 
			
		||||
            # Stop OpenOCD
 | 
			
		||||
            stm32.reset(oocd, stm32.RunMode.Run)
 | 
			
		||||
            oocd.stop()
 | 
			
		||||
 | 
			
		||||
        return validation_result
 | 
			
		||||
							
								
								
									
										95
									
								
								scripts/flipper/utils/register.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										95
									
								
								scripts/flipper/utils/register.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,95 @@
 | 
			
		||||
from dataclasses import dataclass
 | 
			
		||||
from flipper.utils.openocd import OpenOCD
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@dataclass
 | 
			
		||||
class RegisterBitDefinition:
 | 
			
		||||
    name: str
 | 
			
		||||
    offset: int
 | 
			
		||||
    size: int
 | 
			
		||||
    value: int = 0
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Register32:
 | 
			
		||||
    def __init__(self, address: int, definition_list: list[RegisterBitDefinition]):
 | 
			
		||||
        self.__dict__["names"] = [definition.name for definition in definition_list]
 | 
			
		||||
        self.names = [definition.name for definition in definition_list]  # typecheck
 | 
			
		||||
        self.address = address
 | 
			
		||||
        self.definition_list = definition_list
 | 
			
		||||
 | 
			
		||||
        # Validate that the definitions are not overlapping
 | 
			
		||||
        for i in range(len(definition_list)):
 | 
			
		||||
            for j in range(i + 1, len(definition_list)):
 | 
			
		||||
                if self._is_overlapping(definition_list[i], definition_list[j]):
 | 
			
		||||
                    raise ValueError("Register definitions are overlapping")
 | 
			
		||||
 | 
			
		||||
        self.freezed = True
 | 
			
		||||
 | 
			
		||||
    def _is_overlapping(
 | 
			
		||||
        self, a: RegisterBitDefinition, b: RegisterBitDefinition
 | 
			
		||||
    ) -> bool:
 | 
			
		||||
        if a.offset + a.size <= b.offset:
 | 
			
		||||
            return False
 | 
			
		||||
        if b.offset + b.size <= a.offset:
 | 
			
		||||
            return False
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    def _get_definition(self, name: str) -> RegisterBitDefinition:
 | 
			
		||||
        for definition in self.definition_list:
 | 
			
		||||
            if definition.name == name:
 | 
			
		||||
                return definition
 | 
			
		||||
        raise ValueError(f"Register definition '{name}' not found")
 | 
			
		||||
 | 
			
		||||
    def get_definition_list(self) -> list[RegisterBitDefinition]:
 | 
			
		||||
        return self.definition_list
 | 
			
		||||
 | 
			
		||||
    def get_address(self) -> int:
 | 
			
		||||
        return self.address
 | 
			
		||||
 | 
			
		||||
    def set_reg_value(self, name: str, value: int):
 | 
			
		||||
        definition = self._get_definition(name)
 | 
			
		||||
        if value > (1 << definition.size) - 1:
 | 
			
		||||
            raise ValueError(
 | 
			
		||||
                f"Value {value} is too large for register definition '{name}'"
 | 
			
		||||
            )
 | 
			
		||||
        definition.value = value
 | 
			
		||||
 | 
			
		||||
    def get_reg_value(self, name: str) -> int:
 | 
			
		||||
        definition = self._get_definition(name)
 | 
			
		||||
        return definition.value
 | 
			
		||||
 | 
			
		||||
    def __getattr__(self, attr):
 | 
			
		||||
        if str(attr) in self.names:
 | 
			
		||||
            return self.get_reg_value(str(attr))
 | 
			
		||||
        else:
 | 
			
		||||
            return self.__dict__[attr]
 | 
			
		||||
 | 
			
		||||
    def __setattr__(self, attr, value):
 | 
			
		||||
        if str(attr) in self.names:
 | 
			
		||||
            self.set_reg_value(str(attr), value)
 | 
			
		||||
        else:
 | 
			
		||||
            if attr in self.__dict__ or "freezed" not in self.__dict__:
 | 
			
		||||
                self.__dict__[attr] = value
 | 
			
		||||
            else:
 | 
			
		||||
                raise AttributeError(f"Attribute '{attr}' not found")
 | 
			
		||||
 | 
			
		||||
    def __dir__(self):
 | 
			
		||||
        return self.names
 | 
			
		||||
 | 
			
		||||
    def set(self, value: int):
 | 
			
		||||
        for definition in self.definition_list:
 | 
			
		||||
            definition.value = (value >> definition.offset) & (
 | 
			
		||||
                (1 << definition.size) - 1
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    def get(self) -> int:
 | 
			
		||||
        value = 0
 | 
			
		||||
        for definition in self.definition_list:
 | 
			
		||||
            value |= definition.value << definition.offset
 | 
			
		||||
        return value
 | 
			
		||||
 | 
			
		||||
    def load(self, openocd: OpenOCD):
 | 
			
		||||
        self.set(openocd.read_32(self.address))
 | 
			
		||||
 | 
			
		||||
    def store(self, openocd: OpenOCD):
 | 
			
		||||
        openocd.write_32(self.address, self.get())
 | 
			
		||||
							
								
								
									
										352
									
								
								scripts/flipper/utils/stm32wb55.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										352
									
								
								scripts/flipper/utils/stm32wb55.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,352 @@
 | 
			
		||||
import logging
 | 
			
		||||
from enum import Enum
 | 
			
		||||
 | 
			
		||||
from flipper.utils.openocd import OpenOCD
 | 
			
		||||
from flipper.utils.register import Register32, RegisterBitDefinition
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class STM32WB55:
 | 
			
		||||
    # Address of OTP memory in flash
 | 
			
		||||
    OTP_BASE = 0x1FFF7000
 | 
			
		||||
 | 
			
		||||
    # Address of Option byte in flash
 | 
			
		||||
    OPTION_BYTE_BASE = 0x1FFF8000
 | 
			
		||||
 | 
			
		||||
    # Flash base address
 | 
			
		||||
    FLASH_BASE = 0x58004000
 | 
			
		||||
 | 
			
		||||
    # Flash unlock register
 | 
			
		||||
    FLASH_KEYR = FLASH_BASE + 0x08
 | 
			
		||||
 | 
			
		||||
    # Option byte unlock register
 | 
			
		||||
    FLASH_OPTKEYR = FLASH_BASE + 0x0C
 | 
			
		||||
 | 
			
		||||
    # Flash unlock keys
 | 
			
		||||
    FLASH_UNLOCK_KEY1 = 0x45670123
 | 
			
		||||
    FLASH_UNLOCK_KEY2 = 0xCDEF89AB
 | 
			
		||||
 | 
			
		||||
    # Option byte unlock keys
 | 
			
		||||
    FLASH_UNLOCK_OPTKEY1 = 0x08192A3B
 | 
			
		||||
    FLASH_UNLOCK_OPTKEY2 = 0x4C5D6E7F
 | 
			
		||||
 | 
			
		||||
    # Flash control register
 | 
			
		||||
    FLASH_CR = Register32(
 | 
			
		||||
        FLASH_BASE + 0x14,
 | 
			
		||||
        [
 | 
			
		||||
            RegisterBitDefinition("PG", 0, 1),
 | 
			
		||||
            RegisterBitDefinition("PER", 1, 1),
 | 
			
		||||
            RegisterBitDefinition("MER", 2, 1),
 | 
			
		||||
            RegisterBitDefinition("PNB", 3, 8),
 | 
			
		||||
            RegisterBitDefinition("_", 11, 5),
 | 
			
		||||
            RegisterBitDefinition("STRT", 16, 1),
 | 
			
		||||
            RegisterBitDefinition("OPT_STRT", 17, 1),
 | 
			
		||||
            RegisterBitDefinition("FSTPG", 18, 1),
 | 
			
		||||
            RegisterBitDefinition("_", 19, 5),
 | 
			
		||||
            RegisterBitDefinition("EOPIE", 24, 1),
 | 
			
		||||
            RegisterBitDefinition("ERRIE", 25, 1),
 | 
			
		||||
            RegisterBitDefinition("RD_ERRIE", 26, 1),
 | 
			
		||||
            RegisterBitDefinition("OBL_LAUNCH", 27, 1),
 | 
			
		||||
            RegisterBitDefinition("_", 28, 2),
 | 
			
		||||
            RegisterBitDefinition("OPT_LOCK", 30, 1),
 | 
			
		||||
            RegisterBitDefinition("LOCK", 31, 1),
 | 
			
		||||
        ],
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    # Flash status register
 | 
			
		||||
    FLASH_SR = Register32(
 | 
			
		||||
        FLASH_BASE + 0x10,
 | 
			
		||||
        [
 | 
			
		||||
            RegisterBitDefinition("EOP", 0, 1),
 | 
			
		||||
            RegisterBitDefinition("OP_ERR", 1, 1),
 | 
			
		||||
            RegisterBitDefinition("_", 2, 1),
 | 
			
		||||
            RegisterBitDefinition("PROG_ERR", 3, 1),
 | 
			
		||||
            RegisterBitDefinition("WRP_ERR", 4, 1),
 | 
			
		||||
            RegisterBitDefinition("PGA_ERR", 5, 1),
 | 
			
		||||
            RegisterBitDefinition("SIZE_ERR", 6, 1),
 | 
			
		||||
            RegisterBitDefinition("PGS_ERR", 7, 1),
 | 
			
		||||
            RegisterBitDefinition("MISS_ERR", 8, 1),
 | 
			
		||||
            RegisterBitDefinition("FAST_ERR", 9, 1),
 | 
			
		||||
            RegisterBitDefinition("_", 10, 3),
 | 
			
		||||
            RegisterBitDefinition("OPTNV", 13, 1),
 | 
			
		||||
            RegisterBitDefinition("RD_ERR", 14, 1),
 | 
			
		||||
            RegisterBitDefinition("OPTV_ERR", 15, 1),
 | 
			
		||||
            RegisterBitDefinition("BSY", 16, 1),
 | 
			
		||||
            RegisterBitDefinition("_", 17, 1),
 | 
			
		||||
            RegisterBitDefinition("CFGBSY", 18, 1),
 | 
			
		||||
            RegisterBitDefinition("PESD", 19, 1),
 | 
			
		||||
            RegisterBitDefinition("_", 20, 12),
 | 
			
		||||
        ],
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    # Option byte registers
 | 
			
		||||
    FLASH_OPTR = FLASH_BASE + 0x20
 | 
			
		||||
    FLASH_PCROP1ASR = FLASH_BASE + 0x24
 | 
			
		||||
    FLASH_PCROP1AER = FLASH_BASE + 0x28
 | 
			
		||||
    FLASH_WRP1AR = FLASH_BASE + 0x2C
 | 
			
		||||
    FLASH_WRP1BR = FLASH_BASE + 0x30
 | 
			
		||||
    FLASH_PCROP1BSR = FLASH_BASE + 0x34
 | 
			
		||||
    FLASH_PCROP1BER = FLASH_BASE + 0x38
 | 
			
		||||
    FLASH_IPCCBR = FLASH_BASE + 0x3C
 | 
			
		||||
 | 
			
		||||
    # Map option byte dword index to register address
 | 
			
		||||
    OPTION_BYTE_MAP_TO_REGS = {
 | 
			
		||||
        0: FLASH_OPTR,
 | 
			
		||||
        1: FLASH_PCROP1ASR,
 | 
			
		||||
        2: FLASH_PCROP1AER,
 | 
			
		||||
        3: FLASH_WRP1AR,
 | 
			
		||||
        4: FLASH_WRP1BR,
 | 
			
		||||
        5: FLASH_PCROP1BSR,
 | 
			
		||||
        6: FLASH_PCROP1BER,
 | 
			
		||||
        7: None,  # Invalid Options
 | 
			
		||||
        8: None,  # Invalid Options
 | 
			
		||||
        9: None,  # Invalid Options
 | 
			
		||||
        10: None,  # Invalid Options
 | 
			
		||||
        11: None,  # Invalid Options
 | 
			
		||||
        12: None,  # Invalid Options
 | 
			
		||||
        13: FLASH_IPCCBR,
 | 
			
		||||
        14: None,  # Secure Flash
 | 
			
		||||
        15: None,  # Core 2 Options
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        self.logger = logging.getLogger("STM32WB55")
 | 
			
		||||
 | 
			
		||||
    class RunMode(Enum):
 | 
			
		||||
        Init = "init"
 | 
			
		||||
        Run = "run"
 | 
			
		||||
        Halt = "halt"
 | 
			
		||||
 | 
			
		||||
    def reset(self, oocd: OpenOCD, mode: RunMode):
 | 
			
		||||
        self.logger.debug("Resetting device")
 | 
			
		||||
        oocd.send_tcl(f"reset {mode.value}")
 | 
			
		||||
 | 
			
		||||
    def clear_flash_errors(self, oocd: OpenOCD):
 | 
			
		||||
        # Errata 2.2.9: Flash OPTVERR flag is always set after system reset
 | 
			
		||||
        # And also clear all other flash error flags
 | 
			
		||||
        self.logger.debug(f"Resetting flash errors")
 | 
			
		||||
        self.FLASH_SR.load(oocd)
 | 
			
		||||
        self.FLASH_SR.OP_ERR = 1
 | 
			
		||||
        self.FLASH_SR.PROG_ERR = 1
 | 
			
		||||
        self.FLASH_SR.WRP_ERR = 1
 | 
			
		||||
        self.FLASH_SR.PGA_ERR = 1
 | 
			
		||||
        self.FLASH_SR.SIZE_ERR = 1
 | 
			
		||||
        self.FLASH_SR.PGS_ERR = 1
 | 
			
		||||
        self.FLASH_SR.MISS_ERR = 1
 | 
			
		||||
        self.FLASH_SR.FAST_ERR = 1
 | 
			
		||||
        self.FLASH_SR.RD_ERR = 1
 | 
			
		||||
        self.FLASH_SR.OPTV_ERR = 1
 | 
			
		||||
        self.FLASH_SR.store(oocd)
 | 
			
		||||
 | 
			
		||||
    def flash_unlock(self, oocd: OpenOCD):
 | 
			
		||||
        # Check if flash is already unlocked
 | 
			
		||||
        self.FLASH_CR.load(oocd)
 | 
			
		||||
        if self.FLASH_CR.LOCK == 0:
 | 
			
		||||
            self.logger.debug("Flash is already unlocked")
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        # Unlock flash
 | 
			
		||||
        self.logger.debug("Unlocking Flash")
 | 
			
		||||
        oocd.write_32(self.FLASH_KEYR, self.FLASH_UNLOCK_KEY1)
 | 
			
		||||
        oocd.write_32(self.FLASH_KEYR, self.FLASH_UNLOCK_KEY2)
 | 
			
		||||
 | 
			
		||||
        # Check if flash is unlocked
 | 
			
		||||
        self.FLASH_CR.load(oocd)
 | 
			
		||||
        if self.FLASH_CR.LOCK == 0:
 | 
			
		||||
            self.logger.debug("Flash unlocked")
 | 
			
		||||
        else:
 | 
			
		||||
            self.logger.error("Flash unlock failed")
 | 
			
		||||
            raise Exception("Flash unlock failed")
 | 
			
		||||
 | 
			
		||||
    def option_bytes_unlock(self, oocd: OpenOCD):
 | 
			
		||||
        # Check if options is already unlocked
 | 
			
		||||
        self.FLASH_CR.load(oocd)
 | 
			
		||||
        if self.FLASH_CR.OPT_LOCK == 0:
 | 
			
		||||
            self.logger.debug("Options is already unlocked")
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        # Unlock options
 | 
			
		||||
        self.logger.debug("Unlocking Options")
 | 
			
		||||
        oocd.write_32(self.FLASH_OPTKEYR, self.FLASH_UNLOCK_OPTKEY1)
 | 
			
		||||
        oocd.write_32(self.FLASH_OPTKEYR, self.FLASH_UNLOCK_OPTKEY2)
 | 
			
		||||
 | 
			
		||||
        # Check if options is unlocked
 | 
			
		||||
        self.FLASH_CR.load(oocd)
 | 
			
		||||
        if self.FLASH_CR.OPT_LOCK == 0:
 | 
			
		||||
            self.logger.debug("Options unlocked")
 | 
			
		||||
        else:
 | 
			
		||||
            self.logger.error("Options unlock failed")
 | 
			
		||||
            raise Exception("Options unlock failed")
 | 
			
		||||
 | 
			
		||||
    def option_bytes_lock(self, oocd: OpenOCD):
 | 
			
		||||
        # Check if options is already locked
 | 
			
		||||
        self.FLASH_CR.load(oocd)
 | 
			
		||||
        if self.FLASH_CR.OPT_LOCK == 1:
 | 
			
		||||
            self.logger.debug("Options is already locked")
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        # Lock options
 | 
			
		||||
        self.logger.debug("Locking Options")
 | 
			
		||||
        self.FLASH_CR.OPT_LOCK = 1
 | 
			
		||||
        self.FLASH_CR.store(oocd)
 | 
			
		||||
 | 
			
		||||
        # Check if options is locked
 | 
			
		||||
        self.FLASH_CR.load(oocd)
 | 
			
		||||
        if self.FLASH_CR.OPT_LOCK == 1:
 | 
			
		||||
            self.logger.debug("Options locked")
 | 
			
		||||
        else:
 | 
			
		||||
            self.logger.error("Options lock failed")
 | 
			
		||||
            raise Exception("Options lock failed")
 | 
			
		||||
 | 
			
		||||
    def flash_lock(self, oocd: OpenOCD):
 | 
			
		||||
        # Check if flash is already locked
 | 
			
		||||
        self.FLASH_CR.load(oocd)
 | 
			
		||||
        if self.FLASH_CR.LOCK == 1:
 | 
			
		||||
            self.logger.debug("Flash is already locked")
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        # Lock flash
 | 
			
		||||
        self.logger.debug("Locking Flash")
 | 
			
		||||
        self.FLASH_CR.LOCK = 1
 | 
			
		||||
        self.FLASH_CR.store(oocd)
 | 
			
		||||
 | 
			
		||||
        # Check if flash is locked
 | 
			
		||||
        self.FLASH_CR.load(oocd)
 | 
			
		||||
        if self.FLASH_CR.LOCK == 1:
 | 
			
		||||
            self.logger.debug("Flash locked")
 | 
			
		||||
        else:
 | 
			
		||||
            self.logger.error("Flash lock failed")
 | 
			
		||||
            raise Exception("Flash lock failed")
 | 
			
		||||
 | 
			
		||||
    def option_bytes_apply(self, oocd: OpenOCD):
 | 
			
		||||
        self.logger.debug(f"Applying Option Bytes")
 | 
			
		||||
 | 
			
		||||
        self.FLASH_CR.load(oocd)
 | 
			
		||||
        self.FLASH_CR.OPT_STRT = 1
 | 
			
		||||
        self.FLASH_CR.store(oocd)
 | 
			
		||||
 | 
			
		||||
        # Wait for Option Bytes to be applied
 | 
			
		||||
        self.flash_wait_for_operation(oocd)
 | 
			
		||||
 | 
			
		||||
    def option_bytes_load(self, oocd: OpenOCD):
 | 
			
		||||
        self.logger.debug(f"Loading Option Bytes")
 | 
			
		||||
        self.FLASH_CR.load(oocd)
 | 
			
		||||
        self.FLASH_CR.OBL_LAUNCH = 1
 | 
			
		||||
        self.FLASH_CR.store(oocd)
 | 
			
		||||
 | 
			
		||||
    def option_bytes_id_to_address(self, id: int) -> int:
 | 
			
		||||
        # Check if this option byte (dword) is mapped to a register
 | 
			
		||||
        device_reg_addr = self.OPTION_BYTE_MAP_TO_REGS.get(id, None)
 | 
			
		||||
        if device_reg_addr is None:
 | 
			
		||||
            raise Exception(f"Option Byte {id} is not mapped to a register")
 | 
			
		||||
 | 
			
		||||
        return device_reg_addr
 | 
			
		||||
 | 
			
		||||
    def flash_wait_for_operation(self, oocd: OpenOCD):
 | 
			
		||||
        # Wait for flash operation to complete
 | 
			
		||||
        # TODO: timeout
 | 
			
		||||
        while True:
 | 
			
		||||
            self.FLASH_SR.load(oocd)
 | 
			
		||||
            if self.FLASH_SR.BSY == 0:
 | 
			
		||||
                break
 | 
			
		||||
 | 
			
		||||
    def flash_dump_status_register(self, oocd: OpenOCD):
 | 
			
		||||
        self.FLASH_SR.load(oocd)
 | 
			
		||||
        self.logger.info(f"FLASH_SR: {self.FLASH_SR.get():08x}")
 | 
			
		||||
        if self.FLASH_SR.EOP:
 | 
			
		||||
            self.logger.info("    End of operation")
 | 
			
		||||
        if self.FLASH_SR.OP_ERR:
 | 
			
		||||
            self.logger.error("    Operation error")
 | 
			
		||||
        if self.FLASH_SR.PROG_ERR:
 | 
			
		||||
            self.logger.error("    Programming error")
 | 
			
		||||
        if self.FLASH_SR.WRP_ERR:
 | 
			
		||||
            self.logger.error("    Write protection error")
 | 
			
		||||
        if self.FLASH_SR.PGA_ERR:
 | 
			
		||||
            self.logger.error("    Programming alignment error")
 | 
			
		||||
        if self.FLASH_SR.SIZE_ERR:
 | 
			
		||||
            self.logger.error("    Size error")
 | 
			
		||||
        if self.FLASH_SR.PGS_ERR:
 | 
			
		||||
            self.logger.error("    Programming sequence error")
 | 
			
		||||
        if self.FLASH_SR.MISS_ERR:
 | 
			
		||||
            self.logger.error("    Fast programming data miss error")
 | 
			
		||||
        if self.FLASH_SR.FAST_ERR:
 | 
			
		||||
            self.logger.error("    Fast programming error")
 | 
			
		||||
        if self.FLASH_SR.OPTNV:
 | 
			
		||||
            self.logger.info("    User option OPTVAL indication")
 | 
			
		||||
        if self.FLASH_SR.RD_ERR:
 | 
			
		||||
            self.logger.info("    PCROP read error")
 | 
			
		||||
        if self.FLASH_SR.OPTV_ERR:
 | 
			
		||||
            self.logger.info("    Option and Engineering bits loading validity error")
 | 
			
		||||
        if self.FLASH_SR.BSY:
 | 
			
		||||
            self.logger.info("    Busy")
 | 
			
		||||
        if self.FLASH_SR.CFGBSY:
 | 
			
		||||
            self.logger.info("    Programming or erase configuration busy")
 | 
			
		||||
        if self.FLASH_SR.PESD:
 | 
			
		||||
            self.logger.info("    Programming / erase operation suspended.")
 | 
			
		||||
 | 
			
		||||
    def write_flash_64(self, oocd: OpenOCD, address: int, word_1: int, word_2: int):
 | 
			
		||||
        self.logger.debug(f"Writing flash at address {address:08x}")
 | 
			
		||||
 | 
			
		||||
        if address % 8 != 0:
 | 
			
		||||
            self.logger.error("Address must be aligned to 8 bytes")
 | 
			
		||||
            raise Exception("Address must be aligned to 8 bytes")
 | 
			
		||||
 | 
			
		||||
        if word_1 == oocd.read_32(address) and word_2 == oocd.read_32(address + 4):
 | 
			
		||||
            self.logger.debug("Data is already programmed")
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        self.flash_unlock(oocd)
 | 
			
		||||
 | 
			
		||||
        # Check that no flash main memory operation is ongoing by checking the BSY bit
 | 
			
		||||
        self.FLASH_SR.load(oocd)
 | 
			
		||||
        if self.FLASH_SR.BSY:
 | 
			
		||||
            self.logger.error("Flash is busy")
 | 
			
		||||
            self.flash_dump_status_register(oocd)
 | 
			
		||||
            raise Exception("Flash is busy")
 | 
			
		||||
 | 
			
		||||
        # Enable end of operation interrupts and error interrupts
 | 
			
		||||
        self.FLASH_CR.load(oocd)
 | 
			
		||||
        self.FLASH_CR.EOPIE = 1
 | 
			
		||||
        self.FLASH_CR.ERRIE = 1
 | 
			
		||||
        self.FLASH_CR.store(oocd)
 | 
			
		||||
 | 
			
		||||
        # Check that flash memory program and erase operations are allowed
 | 
			
		||||
        if self.FLASH_SR.PESD:
 | 
			
		||||
            self.logger.error("Flash operations are not allowed")
 | 
			
		||||
            self.flash_dump_status_register(oocd)
 | 
			
		||||
            raise Exception("Flash operations are not allowed")
 | 
			
		||||
 | 
			
		||||
        # Check and clear all error programming flags due to a previous programming.
 | 
			
		||||
        self.clear_flash_errors(oocd)
 | 
			
		||||
 | 
			
		||||
        # Set the PG bit in the Flash memory control register (FLASH_CR)
 | 
			
		||||
        self.FLASH_CR.load(oocd)
 | 
			
		||||
        self.FLASH_CR.PG = 1
 | 
			
		||||
        self.FLASH_CR.store(oocd)
 | 
			
		||||
 | 
			
		||||
        # Perform the data write operation at the desired memory address, only double word (64 bits) can be programmed.
 | 
			
		||||
        # Write the first word
 | 
			
		||||
        oocd.send_tcl(f"mww 0x{address:08x} 0x{word_1:08x}")
 | 
			
		||||
        # Write the second word
 | 
			
		||||
        oocd.send_tcl(f"mww 0x{(address + 4):08x} 0x{word_2:08x}")
 | 
			
		||||
 | 
			
		||||
        # Wait for the BSY bit to be cleared
 | 
			
		||||
        self.flash_wait_for_operation(oocd)
 | 
			
		||||
 | 
			
		||||
        # Check that EOP flag is set in the FLASH_SR register
 | 
			
		||||
        self.FLASH_SR.load(oocd)
 | 
			
		||||
        if not self.FLASH_SR.EOP:
 | 
			
		||||
            self.logger.error("Flash operation failed")
 | 
			
		||||
            self.flash_dump_status_register(oocd)
 | 
			
		||||
            raise Exception("Flash operation failed")
 | 
			
		||||
 | 
			
		||||
        # Clear the EOP flag
 | 
			
		||||
        self.FLASH_SR.load(oocd)
 | 
			
		||||
        self.FLASH_SR.EOP = 1
 | 
			
		||||
        self.FLASH_SR.store(oocd)
 | 
			
		||||
 | 
			
		||||
        # Clear the PG bit in the FLASH_CR register
 | 
			
		||||
        self.FLASH_CR.load(oocd)
 | 
			
		||||
        self.FLASH_CR.PG = 0
 | 
			
		||||
        self.FLASH_CR.store(oocd)
 | 
			
		||||
 | 
			
		||||
        self.flash_lock(oocd)
 | 
			
		||||
@ -1,69 +1,79 @@
 | 
			
		||||
#!/usr/bin/env python3
 | 
			
		||||
 | 
			
		||||
import logging
 | 
			
		||||
import argparse
 | 
			
		||||
import subprocess
 | 
			
		||||
import sys
 | 
			
		||||
import os
 | 
			
		||||
from os import path
 | 
			
		||||
 | 
			
		||||
from flipper.app import App
 | 
			
		||||
from flipper.cube import CubeProgrammer
 | 
			
		||||
from flipper.utils.programmer_openocd import OpenOCDProgrammer
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Main(App):
 | 
			
		||||
    def init(self):
 | 
			
		||||
        # Subparsers
 | 
			
		||||
        self.subparsers = self.parser.add_subparsers(help="sub-command help")
 | 
			
		||||
 | 
			
		||||
        # Check command
 | 
			
		||||
        self.parser_check = self.subparsers.add_parser(
 | 
			
		||||
            "check", help="Check Option Bytes"
 | 
			
		||||
        )
 | 
			
		||||
        self._addArgsSWD(self.parser_check)
 | 
			
		||||
        self._add_args(self.parser_check)
 | 
			
		||||
        self.parser_check.set_defaults(func=self.check)
 | 
			
		||||
 | 
			
		||||
        # Set command
 | 
			
		||||
        self.parser_set = self.subparsers.add_parser("set", help="Set Option Bytes")
 | 
			
		||||
        self._addArgsSWD(self.parser_set)
 | 
			
		||||
        self._add_args(self.parser_set)
 | 
			
		||||
        self.parser_set.set_defaults(func=self.set)
 | 
			
		||||
        # OB
 | 
			
		||||
        self.ob = {}
 | 
			
		||||
 | 
			
		||||
    def _addArgsSWD(self, parser):
 | 
			
		||||
    def _add_args(self, parser):
 | 
			
		||||
        parser.add_argument(
 | 
			
		||||
            "--port", type=str, help="Port to connect: swd or usb1", default="swd"
 | 
			
		||||
            "--port-base", type=int, help="OpenOCD port base", default=3333
 | 
			
		||||
        )
 | 
			
		||||
        parser.add_argument(
 | 
			
		||||
            "--interface",
 | 
			
		||||
            type=str,
 | 
			
		||||
            help="OpenOCD interface",
 | 
			
		||||
            default="interface/cmsis-dap.cfg",
 | 
			
		||||
        )
 | 
			
		||||
        parser.add_argument(
 | 
			
		||||
            "--serial", type=str, help="OpenOCD interface serial number"
 | 
			
		||||
        )
 | 
			
		||||
        parser.add_argument(
 | 
			
		||||
            "--ob-path",
 | 
			
		||||
            type=str,
 | 
			
		||||
            help="Option bytes file",
 | 
			
		||||
            default=path.join(path.dirname(__file__), "ob.data"),
 | 
			
		||||
        )
 | 
			
		||||
        parser.add_argument("--serial", type=str, help="ST-Link Serial Number")
 | 
			
		||||
 | 
			
		||||
    def _getCubeParams(self):
 | 
			
		||||
        return {
 | 
			
		||||
            "port": self.args.port,
 | 
			
		||||
            "serial": self.args.serial,
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    def before(self):
 | 
			
		||||
        self.logger.info(f"Loading Option Bytes data")
 | 
			
		||||
        file_path = os.path.join(os.path.dirname(sys.argv[0]), "ob.data")
 | 
			
		||||
        with open(file_path, "r") as file:
 | 
			
		||||
            for line in file.readlines():
 | 
			
		||||
                k, v, o = line.split(":")
 | 
			
		||||
                self.ob[k.strip()] = v.strip(), o.strip()
 | 
			
		||||
 | 
			
		||||
    def check(self):
 | 
			
		||||
        self.logger.info(f"Checking Option Bytes")
 | 
			
		||||
        cp = CubeProgrammer(self._getCubeParams())
 | 
			
		||||
        if cp.checkOptionBytes(self.ob):
 | 
			
		||||
            self.logger.info(f"OB Check OK")
 | 
			
		||||
            return 0
 | 
			
		||||
        else:
 | 
			
		||||
            self.logger.error(f"OB Check FAIL")
 | 
			
		||||
            return 255
 | 
			
		||||
 | 
			
		||||
        # OpenOCD
 | 
			
		||||
        openocd = OpenOCDProgrammer(
 | 
			
		||||
            self.args.interface,
 | 
			
		||||
            self.args.port_base,
 | 
			
		||||
            self.args.serial,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        return_code = 1
 | 
			
		||||
        if openocd.option_bytes_validate(self.args.ob_path):
 | 
			
		||||
            return_code = 0
 | 
			
		||||
 | 
			
		||||
        return return_code
 | 
			
		||||
 | 
			
		||||
    def set(self):
 | 
			
		||||
        self.logger.info(f"Setting Option Bytes")
 | 
			
		||||
        cp = CubeProgrammer(self._getCubeParams())
 | 
			
		||||
        if cp.setOptionBytes(self.ob):
 | 
			
		||||
            self.logger.info(f"OB Set OK")
 | 
			
		||||
            return 0
 | 
			
		||||
        else:
 | 
			
		||||
            self.logger.error(f"OB Set FAIL")
 | 
			
		||||
            return 255
 | 
			
		||||
 | 
			
		||||
        # OpenOCD
 | 
			
		||||
        openocd = OpenOCDProgrammer(
 | 
			
		||||
            self.args.interface,
 | 
			
		||||
            self.args.port_base,
 | 
			
		||||
            self.args.serial,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        return_code = 1
 | 
			
		||||
        if openocd.option_bytes_set(self.args.ob_path):
 | 
			
		||||
            return_code = 0
 | 
			
		||||
 | 
			
		||||
        return return_code
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
 | 
			
		||||
@ -35,6 +35,7 @@ OTP_DISPLAYS = {
 | 
			
		||||
 | 
			
		||||
from flipper.app import App
 | 
			
		||||
from flipper.cube import CubeProgrammer
 | 
			
		||||
from flipper.utils.programmer_openocd import OpenOCDProgrammer
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Main(App):
 | 
			
		||||
@ -53,21 +54,21 @@ class Main(App):
 | 
			
		||||
        self.parser_flash_first = self.subparsers.add_parser(
 | 
			
		||||
            "flash_first", help="Flash first block of OTP to device"
 | 
			
		||||
        )
 | 
			
		||||
        self._addArgsSWD(self.parser_flash_first)
 | 
			
		||||
        self._addArgsOpenOCD(self.parser_flash_first)
 | 
			
		||||
        self._addFirstArgs(self.parser_flash_first)
 | 
			
		||||
        self.parser_flash_first.set_defaults(func=self.flash_first)
 | 
			
		||||
        # Flash Second
 | 
			
		||||
        self.parser_flash_second = self.subparsers.add_parser(
 | 
			
		||||
            "flash_second", help="Flash second block of OTP to device"
 | 
			
		||||
        )
 | 
			
		||||
        self._addArgsSWD(self.parser_flash_second)
 | 
			
		||||
        self._addArgsOpenOCD(self.parser_flash_second)
 | 
			
		||||
        self._addSecondArgs(self.parser_flash_second)
 | 
			
		||||
        self.parser_flash_second.set_defaults(func=self.flash_second)
 | 
			
		||||
        # Flash All
 | 
			
		||||
        self.parser_flash_all = self.subparsers.add_parser(
 | 
			
		||||
            "flash_all", help="Flash OTP to device"
 | 
			
		||||
        )
 | 
			
		||||
        self._addArgsSWD(self.parser_flash_all)
 | 
			
		||||
        self._addArgsOpenOCD(self.parser_flash_all)
 | 
			
		||||
        self._addFirstArgs(self.parser_flash_all)
 | 
			
		||||
        self._addSecondArgs(self.parser_flash_all)
 | 
			
		||||
        self.parser_flash_all.set_defaults(func=self.flash_all)
 | 
			
		||||
@ -75,17 +76,19 @@ class Main(App):
 | 
			
		||||
        self.logger = logging.getLogger()
 | 
			
		||||
        self.timestamp = datetime.datetime.now().timestamp()
 | 
			
		||||
 | 
			
		||||
    def _addArgsSWD(self, parser):
 | 
			
		||||
    def _addArgsOpenOCD(self, parser):
 | 
			
		||||
        parser.add_argument(
 | 
			
		||||
            "--port", type=str, help="Port to connect: swd or usb1", default="swd"
 | 
			
		||||
            "--port-base", type=int, help="OpenOCD port base", default=3333
 | 
			
		||||
        )
 | 
			
		||||
        parser.add_argument(
 | 
			
		||||
            "--interface",
 | 
			
		||||
            type=str,
 | 
			
		||||
            help="OpenOCD interface",
 | 
			
		||||
            default="interface/cmsis-dap.cfg",
 | 
			
		||||
        )
 | 
			
		||||
        parser.add_argument(
 | 
			
		||||
            "--serial", type=str, help="OpenOCD interface serial number"
 | 
			
		||||
        )
 | 
			
		||||
        parser.add_argument("--serial", type=str, help="ST-Link Serial Number")
 | 
			
		||||
 | 
			
		||||
    def _getCubeParams(self):
 | 
			
		||||
        return {
 | 
			
		||||
            "port": self.args.port,
 | 
			
		||||
            "serial": self.args.serial,
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    def _addFirstArgs(self, parser):
 | 
			
		||||
        parser.add_argument("--version", type=int, help="Version", required=True)
 | 
			
		||||
@ -173,14 +176,22 @@ class Main(App):
 | 
			
		||||
                file.write(self._packFirst())
 | 
			
		||||
 | 
			
		||||
            self.logger.info(f"Flashing OTP")
 | 
			
		||||
            cp = CubeProgrammer(self._getCubeParams())
 | 
			
		||||
            cp.flashBin("0x1FFF7000", filename)
 | 
			
		||||
            cp.resetTarget()
 | 
			
		||||
 | 
			
		||||
            openocd = OpenOCDProgrammer(
 | 
			
		||||
                self.args.interface,
 | 
			
		||||
                self.args.port_base,
 | 
			
		||||
                self.args.serial,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            if not openocd.otp_write(0x1FFF7000, filename):
 | 
			
		||||
                raise Exception("Failed to flash OTP")
 | 
			
		||||
 | 
			
		||||
            self.logger.info(f"Flashed Successfully")
 | 
			
		||||
            os.remove(filename)
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            self.logger.exception(e)
 | 
			
		||||
            return 1
 | 
			
		||||
        finally:
 | 
			
		||||
            os.remove(filename)
 | 
			
		||||
 | 
			
		||||
        return 0
 | 
			
		||||
 | 
			
		||||
@ -197,14 +208,22 @@ class Main(App):
 | 
			
		||||
                file.write(self._packSecond())
 | 
			
		||||
 | 
			
		||||
            self.logger.info(f"Flashing OTP")
 | 
			
		||||
            cp = CubeProgrammer(self._getCubeParams())
 | 
			
		||||
            cp.flashBin("0x1FFF7010", filename)
 | 
			
		||||
            cp.resetTarget()
 | 
			
		||||
 | 
			
		||||
            openocd = OpenOCDProgrammer(
 | 
			
		||||
                self.args.interface,
 | 
			
		||||
                self.args.port_base,
 | 
			
		||||
                self.args.serial,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            if not openocd.otp_write(0x1FFF7010, filename):
 | 
			
		||||
                raise Exception("Failed to flash OTP")
 | 
			
		||||
 | 
			
		||||
            self.logger.info(f"Flashed Successfully")
 | 
			
		||||
            os.remove(filename)
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            self.logger.exception(e)
 | 
			
		||||
            return 1
 | 
			
		||||
        finally:
 | 
			
		||||
            os.remove(filename)
 | 
			
		||||
 | 
			
		||||
        return 0
 | 
			
		||||
 | 
			
		||||
@ -223,14 +242,22 @@ class Main(App):
 | 
			
		||||
                file.write(self._packSecond())
 | 
			
		||||
 | 
			
		||||
            self.logger.info(f"Flashing OTP")
 | 
			
		||||
            cp = CubeProgrammer(self._getCubeParams())
 | 
			
		||||
            cp.flashBin("0x1FFF7000", filename)
 | 
			
		||||
            cp.resetTarget()
 | 
			
		||||
 | 
			
		||||
            openocd = OpenOCDProgrammer(
 | 
			
		||||
                self.args.interface,
 | 
			
		||||
                self.args.port_base,
 | 
			
		||||
                self.args.serial,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            if not openocd.otp_write(0x1FFF7000, filename):
 | 
			
		||||
                raise Exception("Failed to flash OTP")
 | 
			
		||||
 | 
			
		||||
            self.logger.info(f"Flashed Successfully")
 | 
			
		||||
            os.remove(filename)
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            self.logger.exception(e)
 | 
			
		||||
            return 1
 | 
			
		||||
        finally:
 | 
			
		||||
            os.remove(filename)
 | 
			
		||||
 | 
			
		||||
        return 0
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user