class BOSMiner(BraiinsOSFirmware):
"""Handler for old versions of BraiinsOS+ (pre-gRPC)"""
_rpc_cls = BOSMinerRPCAPI
rpc: BOSMinerRPCAPI
_web_cls = BOSMinerWebAPI
web: BOSMinerWebAPI
_ssh_cls = BOSMinerSSH
ssh: BOSMinerSSH
data_locations = BOSMINER_DATA_LOC
supports_shutdown = True
supports_autotuning = True
async def fault_light_on(self) -> bool:
ret = await self.ssh.fault_light_on()
if isinstance(ret, str):
self.light = True
return self.light
return False
async def fault_light_off(self) -> bool:
ret = await self.ssh.fault_light_off()
if isinstance(ret, str):
self.light = False
return True
return False
async def restart_backend(self) -> bool:
return await self.restart_bosminer()
async def restart_bosminer(self) -> bool:
ret = await self.ssh.restart_bosminer()
if isinstance(ret, str):
return True
return False
async def stop_mining(self) -> bool:
try:
data = await self.rpc.pause()
except APIError:
return False
if data.get("PAUSE"):
if data["PAUSE"][0]:
return True
return False
async def resume_mining(self) -> bool:
try:
data = await self.rpc.resume()
except APIError:
return False
if data.get("RESUME"):
if data["RESUME"][0]:
return True
return False
async def reboot(self) -> bool:
ret = await self.ssh.reboot()
if isinstance(ret, str):
return True
return False
async def get_config(self) -> MinerConfig:
raw_data = await self.ssh.get_config_file()
try:
toml_data = tomllib.loads(raw_data)
cfg = MinerConfig.from_bosminer(toml_data)
self.config = cfg
except tomllib.TOMLDecodeError as e:
raise APIError("Failed to decode toml when getting config.") from e
except TypeError as e:
raise APIError("Failed to decode toml when getting config.") from e
return self.config
async def send_config(self, config: MinerConfig, user_suffix: str = None) -> None:
self.config = config
parsed_cfg = config.as_bosminer(user_suffix=user_suffix)
toml_conf = tomli_w.dumps(
{
"format": {
"version": "2.0",
"generator": "pyasic",
"model": f"{self.make.replace('Miner', 'miner')} {self.raw_model.replace('j', 'J')}",
"timestamp": int(time.time()),
},
**parsed_cfg,
}
)
try:
conn = await self.ssh._get_connection()
except ConnectionError as e:
raise APIError("SSH connection failed when sending config.") from e
async with conn:
await conn.run("/etc/init.d/bosminer stop")
await conn.run("echo '" + toml_conf + "' > /etc/bosminer.toml")
await conn.run("/etc/init.d/bosminer start")
async def set_power_limit(self, wattage: int) -> bool:
try:
cfg = await self.get_config()
if cfg is None:
return False
cfg.mining_mode = MiningModePowerTune(power=wattage)
await self.send_config(cfg)
except APIError:
raise
except Exception as e:
logging.warning(f"{self} - Failed to set power limit: {e}")
return False
else:
return True
async def set_static_ip(
self,
ip: str,
dns: str,
gateway: str,
subnet_mask: str = "255.255.255.0",
):
cfg_data_lan = "\n\t".join(
[
"config interface 'lan'",
"option type 'bridge'",
"option ifname 'eth0'",
"option proto 'static'",
f"option ipaddr '{ip}'",
f"option netmask '{subnet_mask}'",
f"option gateway '{gateway}'",
f"option dns '{dns}'",
]
)
data = await self.ssh.get_network_config()
split_data = data.split("\n\n")
for idx, val in enumerate(split_data):
if "config interface 'lan'" in val:
split_data[idx] = cfg_data_lan
config = "\n\n".join(split_data)
await self.ssh.send_command("echo '" + config + "' > /etc/config/network")
async def set_dhcp(self):
cfg_data_lan = "\n\t".join(
[
"config interface 'lan'",
"option type 'bridge'",
"option ifname 'eth0'",
"option proto 'dhcp'",
]
)
data = await self.ssh.get_network_config()
split_data = data.split("\n\n")
for idx, val in enumerate(split_data):
if "config interface 'lan'" in val:
split_data[idx] = cfg_data_lan
config = "\n\n".join(split_data)
await self.ssh.send_command("echo '" + config + "' > /etc/config/network")
##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def _get_mac(self, web_net_conf: Union[dict, list] = None) -> Optional[str]:
if web_net_conf is None:
try:
web_net_conf = await self.web.get_net_conf()
except APIError:
pass
if isinstance(web_net_conf, dict):
if "admin/network/iface_status/lan" in web_net_conf.keys():
web_net_conf = web_net_conf["admin/network/iface_status/lan"]
if web_net_conf is not None:
try:
return web_net_conf[0]["macaddr"]
except LookupError:
pass
# could use ssh, but its slow and buggy
# result = await self.send_ssh_command("cat /sys/class/net/eth0/address")
# if result:
# return result.upper().strip()
async def _get_api_ver(self, rpc_version: dict = None) -> Optional[str]:
if rpc_version is None:
try:
rpc_version = await self.rpc.version()
except APIError:
pass
# Now get the API version
if rpc_version is not None:
try:
rpc_ver = rpc_version["VERSION"][0]["API"]
except LookupError:
rpc_ver = None
self.api_ver = rpc_ver
self.rpc.rpc_ver = self.api_ver
return self.api_ver
async def _get_fw_ver(self, web_bos_info: dict = None) -> Optional[str]:
if web_bos_info is None:
try:
web_bos_info = await self.web.get_bos_info()
except APIError:
return None
if isinstance(web_bos_info, dict):
if "bos/info" in web_bos_info.keys():
web_bos_info = web_bos_info["bos/info"]
try:
ver = web_bos_info["version"].split("-")[5]
if "." in ver:
self.fw_ver = ver
except (LookupError, AttributeError):
return None
return self.fw_ver
async def _get_hostname(self) -> Union[str, None]:
try:
hostname = (await self.ssh.get_hostname()).strip()
except AttributeError:
return None
except Exception as e:
logging.error(f"{self} - Getting hostname failed: {e}")
return None
return hostname
async def _get_hashrate(self, rpc_summary: dict = None) -> Optional[AlgoHashRate]:
if rpc_summary is None:
try:
rpc_summary = await self.rpc.summary()
except APIError:
pass
if rpc_summary is not None:
try:
return self.algo.hashrate(
rate=float(rpc_summary["SUMMARY"][0]["MHS 1m"]),
unit=self.algo.unit.MH,
).into(self.algo.unit.default)
except (KeyError, IndexError, ValueError, TypeError):
pass
async def _get_hashboards(
self,
rpc_temps: dict = None,
rpc_devdetails: dict = None,
rpc_devs: dict = None,
) -> List[HashBoard]:
if self.expected_hashboards is None:
return []
hashboards = [
HashBoard(slot=i, expected_chips=self.expected_chips)
for i in range(self.expected_hashboards)
]
cmds = []
if rpc_temps is None:
cmds.append("temps")
if rpc_devdetails is None:
cmds.append("devdetails")
if rpc_devs is None:
cmds.append("devs")
if len(cmds) > 0:
try:
d = await self.rpc.multicommand(*cmds)
except APIError:
d = {}
try:
rpc_temps = d["temps"][0]
except LookupError:
rpc_temps = None
try:
rpc_devdetails = d["devdetails"][0]
except (KeyError, IndexError):
rpc_devdetails = None
try:
rpc_devs = d["devs"][0]
except LookupError:
rpc_devs = None
if rpc_temps is not None:
try:
offset = 6 if rpc_temps["TEMPS"][0]["ID"] in [6, 7, 8] else 1
for board in rpc_temps["TEMPS"]:
_id = board["ID"] - offset
chip_temp = round(board["Chip"])
board_temp = round(board["Board"])
hashboards[_id].chip_temp = chip_temp
hashboards[_id].temp = board_temp
except (IndexError, KeyError, ValueError, TypeError):
pass
if rpc_devdetails is not None:
try:
offset = 6 if rpc_devdetails["DEVDETAILS"][0]["ID"] in [6, 7, 8] else 1
for board in rpc_devdetails["DEVDETAILS"]:
_id = board["ID"] - offset
chips = board["Chips"]
hashboards[_id].chips = chips
hashboards[_id].missing = False
except (IndexError, KeyError):
pass
if rpc_devs is not None:
try:
offset = 6 if rpc_devs["DEVS"][0]["ID"] in [6, 7, 8] else 1
for board in rpc_devs["DEVS"]:
_id = board["ID"] - offset
hashboards[_id].hashrate = self.algo.hashrate(
rate=float(board["MHS 1m"]), unit=self.algo.unit.MH
).into(self.algo.unit.default)
except (IndexError, KeyError):
pass
return hashboards
async def _get_wattage(self, rpc_tunerstatus: dict = None) -> Optional[int]:
if rpc_tunerstatus is None:
try:
rpc_tunerstatus = await self.rpc.tunerstatus()
except APIError:
pass
if rpc_tunerstatus is not None:
try:
return rpc_tunerstatus["TUNERSTATUS"][0][
"ApproximateMinerPowerConsumption"
]
except LookupError:
pass
async def _get_wattage_limit(self, rpc_tunerstatus: dict = None) -> Optional[int]:
if rpc_tunerstatus is None:
try:
rpc_tunerstatus = await self.rpc.tunerstatus()
except APIError:
pass
if rpc_tunerstatus is not None:
try:
return rpc_tunerstatus["TUNERSTATUS"][0]["PowerLimit"]
except LookupError:
pass
async def _get_fans(self, rpc_fans: dict = None) -> List[Fan]:
if self.expected_fans is None:
return []
if rpc_fans is None:
try:
rpc_fans = await self.rpc.fans()
except APIError:
pass
if rpc_fans is not None:
fans = []
for n in range(self.expected_fans):
try:
fans.append(Fan(speed=rpc_fans["FANS"][n]["RPM"]))
except (IndexError, KeyError):
pass
return fans
return [Fan() for _ in range(self.expected_fans)]
async def _get_errors(self, rpc_tunerstatus: dict = None) -> List[MinerErrorData]:
if rpc_tunerstatus is None:
try:
rpc_tunerstatus = await self.rpc.tunerstatus()
except APIError:
pass
if rpc_tunerstatus is not None:
errors = []
try:
chain_status = rpc_tunerstatus["TUNERSTATUS"][0]["TunerChainStatus"]
if chain_status and len(chain_status) > 0:
offset = (
6 if int(chain_status[0]["HashchainIndex"]) in [6, 7, 8] else 0
)
for board in chain_status:
_id = board["HashchainIndex"] - offset
if board["Status"] not in [
"Stable",
"Testing performance profile",
"Tuning individual chips",
]:
_error = board["Status"].split(" {")[0]
_error = _error[0].lower() + _error[1:]
errors.append(
BraiinsOSError(error_message=f"Slot {_id} {_error}")
)
return errors
except (KeyError, IndexError):
pass
async def _get_fault_light(self) -> bool:
if self.light:
return self.light
try:
data = (await self.ssh.get_led_status()).strip()
self.light = False
if data == "50":
self.light = True
return self.light
except (TypeError, AttributeError):
return self.light
async def _get_expected_hashrate(
self, rpc_devs: dict = None
) -> Optional[AlgoHashRateType]:
if rpc_devs is None:
try:
rpc_devs = await self.rpc.devs()
except APIError:
pass
if rpc_devs is not None:
try:
hr_list = []
for board in rpc_devs["DEVS"]:
expected_hashrate = float(board["Nominal MHS"])
if expected_hashrate:
hr_list.append(expected_hashrate)
if len(hr_list) == 0:
return self.algo.hashrate(
rate=float(0), unit=self.algo.unit.default
)
else:
return self.algo.hashrate(
rate=float(
(sum(hr_list) / len(hr_list)) * self.expected_hashboards
),
unit=self.algo.unit.MH,
).into(self.algo.unit.default)
except (IndexError, KeyError):
pass
async def _is_mining(self, rpc_devdetails: dict = None) -> Optional[bool]:
if rpc_devdetails is None:
try:
rpc_devdetails = await self.rpc.send_command(
"devdetails", ignore_errors=True, allow_warning=False
)
except APIError:
pass
if rpc_devdetails is not None:
try:
return not rpc_devdetails["STATUS"][0]["Msg"] == "Unavailable"
except LookupError:
pass
async def _get_uptime(self, rpc_summary: dict = None) -> Optional[int]:
if rpc_summary is None:
try:
rpc_summary = await self.rpc.summary()
except APIError:
pass
if rpc_summary is not None:
try:
return int(rpc_summary["SUMMARY"][0]["Elapsed"])
except LookupError:
pass
async def _get_pools(self, rpc_pools: dict = None) -> List[PoolMetrics]:
if rpc_pools is None:
try:
rpc_pools = await self.rpc.pools()
except APIError:
pass
pools_data = []
if rpc_pools is not None:
try:
pools = rpc_pools.get("POOLS", [])
for pool_info in pools:
url = pool_info.get("URL")
pool_url = PoolUrl.from_str(url) if url else None
pool_data = PoolMetrics(
accepted=pool_info.get("Accepted"),
rejected=pool_info.get("Rejected"),
get_failures=pool_info.get("Get Failures"),
remote_failures=pool_info.get("Remote Failures"),
active=pool_info.get("Stratum Active"),
alive=pool_info.get("Status") == "Alive",
url=pool_url,
user=pool_info.get("User"),
index=pool_info.get("POOL"),
)
pools_data.append(pool_data)
except LookupError:
pass
return pools_data
async def upgrade_firmware(self, file: Path) -> str:
"""
Upgrade the firmware of the BOSMiner device.
Args:
file (Path): The local file path of the firmware to be uploaded.
Returns:
Confirmation message after upgrading the firmware.
"""
try:
logging.info("Starting firmware upgrade process.")
if not file:
raise ValueError("File location must be provided for firmware upgrade.")
# Read the firmware file contents
async with aiofiles.open(file, "rb") as f:
upgrade_contents = await f.read()
# Encode the firmware contents in base64
encoded_contents = base64.b64encode(upgrade_contents).decode("utf-8")
# Upload the firmware file to the BOSMiner device
logging.info(f"Uploading firmware file from {file} to the device.")
await self.ssh.send_command(
f"echo {encoded_contents} | base64 -d > /tmp/firmware.tar && sysupgrade /tmp/firmware.tar"
)
logging.info("Firmware upgrade process completed successfully.")
return "Firmware upgrade completed successfully."
except FileNotFoundError as e:
logging.error(f"File not found during the firmware upgrade process: {e}")
raise
except ValueError as e:
logging.error(
f"Validation error occurred during the firmware upgrade process: {e}"
)
raise
except OSError as e:
logging.error(f"OS error occurred during the firmware upgrade process: {e}")
raise
except Exception as e:
logging.error(
f"An unexpected error occurred during the firmware upgrade process: {e}",
exc_info=True,
)
raise