 0d7395d05c
			
		
	
	
		0d7395d05c
		
			
		
	
	
	
	
		
			
			* FLPv0.1 on Python; File transfer is working. * FLPv0.1 on Python; .gitignore fixes. * Update protocol to latest version; change interface to OOP Co-authored-by: Daniel Solmann <DanGSun@yandex.ru> Co-authored-by: coreglitch <mail@s3f.ru>
		
			
				
	
	
		
			123 lines
		
	
	
		
			3.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			123 lines
		
	
	
		
			3.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from enum import Enum
 | |
| 
 | |
| import serial
 | |
| 
 | |
| 
 | |
| class RemoteIOError(IOError):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| class UnsupportedOperation(RemoteIOError):
 | |
|     # TODO: Tell more on error
 | |
|     pass
 | |
| 
 | |
| 
 | |
| class ReadWriteError(RemoteIOError):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| class FileMode(Enum):
 | |
|     READ = "r"
 | |
|     WRITE = "w"
 | |
|     # APPEND = "a" - Not Impl
 | |
|     # READ_WRITE = "w+"
 | |
|     # READ_APPEND = "a+"
 | |
| 
 | |
| 
 | |
| class File:
 | |
|     opened: bool = False
 | |
|     filename: str
 | |
|     conn: serial.Serial
 | |
|     mode = FileMode.READ
 | |
| 
 | |
|     def _open(self):
 | |
|         # Opening file on Flipper's end
 | |
|         self.conn.write(f"f_open {self.filename} {self.mode}\n".encode())
 | |
| 
 | |
|         # Checking, if everything is OK.
 | |
|         self.conn.write(b"f_valid\n")
 | |
|         return bool(int(self.conn.read_until().decode()))
 | |
| 
 | |
|     def __init__(self, conn: serial.Serial, filename: str, mode=FileMode.READ):
 | |
|         self.conn = conn
 | |
|         self.mode = mode
 | |
|         self.filename = filename
 | |
| 
 | |
|         if not self._open():
 | |
|             raise RemoteIOError
 | |
|         self.opened = True
 | |
| 
 | |
|     def lseek(self, size: int):
 | |
|         self.conn.write(f"f_lseek {size}\n".encode())
 | |
| 
 | |
|     def tell(self):
 | |
|         self.conn.write(b"f_tell\n")
 | |
|         return int(self.conn.read_until().decode())
 | |
| 
 | |
|     def size(self):
 | |
|         self.conn.write(b"f_size\n")
 | |
|         return int(self.conn.read_until().decode())
 | |
| 
 | |
|     def write(self, data: bytes):
 | |
|         if self.mode != FileMode.WRITE:
 | |
|             raise UnsupportedOperation
 | |
|         self.conn.write(f"f_write {len(data)}\n".encode())
 | |
|         self.conn.write(data)
 | |
|         self.conn.write(b"\n")
 | |
| 
 | |
|         if self.conn.read_until().decode().strip() != "OK":
 | |
|             raise ReadWriteError
 | |
| 
 | |
|     def read(self, size: int) -> bytes:
 | |
|         self.conn.write(f"f_read {size}\n".encode())
 | |
|         return self.conn.read(size)
 | |
| 
 | |
|     def _close(self):
 | |
|         self.conn.write(b"f_close\n")
 | |
| 
 | |
|     def close(self):
 | |
|         self._close()
 | |
|         # TODO: Add termination of remote file process.
 | |
|         del self
 | |
| 
 | |
|     def read_all(self, block_size=128) -> bytes:
 | |
|         """
 | |
|         Function to simplify reading of file over serial.
 | |
| 
 | |
|         :return: content of file represented in `bytes`
 | |
|         """
 | |
|         size = self.size()
 | |
|         a = bytearray()
 | |
|         seek = 0
 | |
|         # Reading in blocks of block_size
 | |
|         while seek < block_size * (size // block_size - 1):
 | |
|             seek += block_size
 | |
|             self.lseek(seek)
 | |
|             a.extend(self.read(block_size))
 | |
| 
 | |
|         seek += block_size
 | |
|         self.lseek(seek)
 | |
|         a.extend(self.read(size - seek))  # Appending mod
 | |
| 
 | |
|         self.lseek(0)  # Resetting seek to default-position
 | |
| 
 | |
|         return bytes(a)
 | |
| 
 | |
| 
 | |
| def push(ser: serial.Serial, local_path, remote_path):
 | |
|     with open(local_path, "rb") as local_f:  # Reading local file
 | |
|         a = local_f.read()
 | |
| 
 | |
|     remote_f = File(ser, remote_path, FileMode.WRITE)
 | |
|     remote_f.write(a)
 | |
|     remote_f.close()
 | |
| 
 | |
| 
 | |
| def pull(ser: serial.Serial, remote_path, local_path):
 | |
|     remote_f = File(ser, remote_path, FileMode.READ)
 | |
| 
 | |
|     with open(local_path, "wb") as f:
 | |
|         f.write(remote_f.read_all())
 | |
| 
 | |
|     remote_f.close()
 |