diff --git a/.gitignore b/.gitignore index 2574a04..8ff17d5 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ venv_win/ venv_linux/ build/ build_win/ -build_linux/ \ No newline at end of file +build_linux/ +__pycache__/ \ No newline at end of file diff --git a/elbear_uploader.py b/elbear_uploader.py index eacc3a2..7fde157 100644 --- a/elbear_uploader.py +++ b/elbear_uploader.py @@ -3,56 +3,67 @@ import time import argparse from sys import exit -TIMEOUT_DEFAULT = 0.1 # sec -ACK = 0x0F # МК подтвердил 0b00001111 -NACK = 0xF0 # МК отверг 0b11110000 -COMMAND_PACKAGE_SIZE = b'\x30' # Команда размера пакета -COMMAND_SEND_PACKAGE = b'\x60' # Команда отправить пакет -COMMAND_FULL_ERASE = 0xBADC0FEE # Команда очистить чип +TIMEOUT_DEFAULT = 0.1 # sec +BYTE_COUNT_POS = 0 # индекс счетчика байт данных +ADDRESS_POS = 1 # индекс адреса в строке hex файла +RECORD_TYPE_POS = 3 # индекс типа записи +REC_TYPE_DATA = 0 # тип записи - данные +REC_TYPE_EXT_LIN_ADDR = 4 # тип записи - расширенный адрес + +COMMAND_PACKAGE_SIZE = b'\x30' # Команда размера пакета +COMMAND_SEND_PACKAGE = b'\x60' # Команда отправить пакет +COMMAND_FULL_ERASE = 0xBADC0FEE # Команда очистить чип + +ACK = 0x0F # МК подтвердил 0b00001111 +NACK = 0xF0 # МК отверг 0b11110000 + + +def send_parsel(data): + for attempt in range(10): + ser.write(data) # Отправка данных + read_byte = ser.read(1) # Чтение ACK/NACK + response = int.from_bytes(read_byte, "big") + if response == ACK: + return True + elif response == NACK: + print("Get NACK. Exit") + exit() + else: + pass + # print(f"Invalid answer: {response}. Try again... (Attempt {attempt + 1})") + + # за 10 раз не получили внятного ответа - устройство не отвечает, выходим + print("Device not responding") + exit() def cmd_full_erase(): - ser.write(COMMAND_FULL_ERASE.to_bytes(4, "big")) - read_byte = ser.read(1) # Прочесть подтверждение получения команды - if int.from_bytes(read_byte, "big") == NACK: - print("NACK. COMMAND_FULL_ERASE") - exit() + # отправить команду очистки чипа + send_parsel(COMMAND_FULL_ERASE.to_bytes(4, "big")) + + # Если дошли сюда, значит контроллер ответил на команду ser.timeout = None # Выключить таймаут, чтобы дождаться завершения процесса стирания чипа read_byte = ser.read(1) # Прочесть байт ACK/NACK от контроллера ser.timeout = TIMEOUT_DEFAULT # Включить таймаут обратно - if int.from_bytes(read_byte, "big") == NACK: print("NACK. FULL_ERASE FAILED") exit() # Задать размер пакета def cmd_package_size(package_size): - ser.write(COMMAND_PACKAGE_SIZE) # Отправить команду размера передаваемого пакета - read_byte = ser.read(1) # Прочесть байт ACK/NACK от контроллера - if int.from_bytes(read_byte, "big") == NACK: - print("NACK. COMMAND_PACKAGE_SIZE") - exit() - ser.write((package_size - 1).to_bytes(1, "big")) # Если контроллер ответил на команду, послать количество передаваемых байт - read_byte = ser.read(1) # Прочесть байт ACK/NACK - if int.from_bytes(read_byte, "big") == NACK: - print("NACK. COMMAND_PACKAGE_SIZE") - exit() - return read_byte == b'\x0f' + # Отправить команду размера передаваемого пакета + send_parsel(COMMAND_PACKAGE_SIZE) + # Если дошли сюда, значит контроллер ответил на команду. Послать количество передаваемых байт + send_parsel((package_size - 1).to_bytes(1, "big")) + return True # Отправить пакет def cmd_send_package(data_package): # Команда загрузить пакет - ser.write(COMMAND_SEND_PACKAGE) - read_byte = ser.read(1) # Прочесть байт ACK/NACK - if int.from_bytes(read_byte, "big") == NACK: - print("NACK. COMMAND_SEND_PACKAGE") - exit() - # Отправка пакета - ser.write(bytes(data_package)) # Если контроллер ответил на команду, послать пакет - read_byte = ser.read(1) # Прочесть байт ACK/NACK - if int.from_bytes(read_byte, "big") == NACK: - print("NACK. SEND_PACKAGE") - exit() + send_parsel(COMMAND_SEND_PACKAGE) + # Если дошли сюда, значит контроллер ответил на команду. Отправить пакет + send_parsel(bytes(data_package)) + def printProgressBar (iteration, total, prefix = '', suffix = '', decimals = 1, length = 100, fill = '_', printEnd = "\r"): percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) @@ -112,9 +123,6 @@ def createParser(): parser = createParser() namespace = parser.parse_args() -REC_TYPE_DATA = 0 -REC_TYPE_EXT_LIN_ADDR = 4 - if namespace.hexpath: # читаем хекс with open(f"{namespace.hexpath}", "r", encoding='utf-8') as f: @@ -132,6 +140,36 @@ if namespace.hexpath: for i in range(0,len(line), 2): data.append(int(line[i:i+2], 16)) data_lines.append(data) + + # проверить, что в адресах строк с данными нет пропусков, иначе elbear_bootloader_fw может сработать некорректно + while i < (len(data_lines) - 1): + # если обе строки содержат данные + if (data_lines[i][RECORD_TYPE_POS] == REC_TYPE_DATA) and (data_lines[i+1][RECORD_TYPE_POS] == REC_TYPE_DATA): + # если разница адресов текущей и следующей строк больше, чем кол-во байт в текущей строке, + # нужно забить пропуск строками с нулевыми данными + address_cur = (data_lines[i][ADDRESS_POS]<<8) | data_lines[i][ADDRESS_POS+1] + bytesQty_cur = data_lines[i][BYTE_COUNT_POS] + address_next = (data_lines[i+1][ADDRESS_POS]<<8) | data_lines[i+1][ADDRESS_POS+1] + missedBytesQty = (address_next - address_cur) - bytesQty_cur + while missedBytesQty > 0: + # адрес новой строки данных + address_cur += bytesQty_cur + # кол-во байт в новой строке + # буферы в elbear_bootloader_fw рассчитаны на строки, в которых не более 16 байт данных, поэтому + # бьем пропущенные данные по 16 байт + if missedBytesQty > 16: + bytesQty_cur = 16 + # или пишем все, что осталось + else: + bytesQty_cur = missedBytesQty + # сформировать и вставить новый элемент списка + # кол-во байт данных 2 байта адреса тип записи нужное кол-во нулей CRC + new_line = [bytesQty_cur, (address_cur >> 8)&0xFF, address_cur&0xFF, REC_TYPE_DATA] + [0] * bytesQty_cur + [255] + data_lines.insert(i + 1, new_line) + # обновить оставшееся количество байт и перейти к следующей строке + missedBytesQty -= bytesQty_cur + i += 1 + i += 1 # убираем из отправки неиспользуемое место, которое сделал *fill* if namespace.mappath and namespace.ncsn: @@ -139,12 +177,14 @@ if namespace.hexpath: map_lines = f.readlines() # находим строку, где впервые упоминается некешируемая область + no_cache_line_idx = 0 for i in range(0, len(map_lines)): if namespace.ncsn in map_lines[i]: no_cache_line_idx = i break # от нее поднимаемся выше, пока не наткнемся на *fill* + filter_start_addr = filter_stop_addr = 0 for i in range(no_cache_line_idx, 0, -1): if '*fill*' in map_lines[i]: fill_cmd_str = map_lines[i] @@ -171,17 +211,11 @@ if namespace.hexpath: ser = serial.Serial(port = namespace.com, baudrate = namespace.baudrate, timeout = TIMEOUT_DEFAULT) + # проверка подключения устройства ping = False - for i in range(10): # вместо задержки забрасываем запросами - ping = cmd_package_size(15) - if ping: - break - + ping = cmd_package_size(15) # если устройство не отвечает, дальше этой функции не пройдем if ping: print("Device connected") - else: - print("Device not responding") - exit() if namespace.fullerase: print('Erasing memory')