class LUXMiner(LuxOSFirmware):
"""Handler for LuxOS miners"""
_rpc_cls = LUXMinerRPCAPI
rpc: LUXMinerRPCAPI
supports_shutdown = True
supports_presets = True
supports_autotuning = True
data_locations = LUXMINER_DATA_LOC
async def fault_light_on(self) -> bool:
try:
await self.rpc.ledset("red", "blink")
return True
except (APIError, LookupError):
pass
return False
async def fault_light_off(self) -> bool:
try:
await self.rpc.ledset("red", "off")
return True
except (APIError, LookupError):
pass
return False
async def restart_backend(self) -> bool:
return await self.restart_luxminer()
async def restart_luxminer(self) -> bool:
try:
await self.rpc.resetminer()
return True
except (APIError, LookupError):
pass
return False
async def stop_mining(self) -> bool:
try:
await self.rpc.sleep()
return True
except (APIError, LookupError):
pass
return False
async def resume_mining(self) -> bool:
try:
await self.rpc.wakeup()
return True
except (APIError, LookupError):
pass
async def reboot(self) -> bool:
try:
await self.rpc.rebootdevice()
return True
except (APIError, LookupError):
pass
return False
async def get_config(self) -> MinerConfig:
data = await self.rpc.multicommand(
"tempctrl", "fans", "pools", "groups", "config", "profiles"
)
return MinerConfig.from_luxos(
rpc_tempctrl=data.get("tempctrl", [{}])[0],
rpc_fans=data.get("fans", [{}])[0],
rpc_pools=data.get("pools", [{}])[0],
rpc_groups=data.get("groups", [{}])[0],
rpc_config=data.get("config", [{}])[0],
rpc_profiles=data.get("profiles", [{}])[0],
)
async def upgrade_firmware(self, *args, **kwargs) -> bool:
"""
Upgrade the firmware on a LuxOS miner by calling the 'updaterun' API command.
Returns:
bool: True if the firmware upgrade was successfully initiated, False otherwise.
"""
try:
await self.rpc.updaterun()
logging.info(f"{self.ip}: Firmware upgrade initiated successfully.")
return True
except APIError as e:
logging.error(f"{self.ip}: Firmware upgrade failed: {e}")
return False
async def atm_enabled(self) -> Optional[bool]:
try:
result = await self.rpc.atm()
return result["ATM"][0]["Enabled"]
except (APIError, LookupError):
pass
async def set_power_limit(self, wattage: int) -> bool:
config = await self.get_config()
valid_presets = {
preset.name: preset.power
for preset in config.mining_mode.available_presets
if preset.power <= wattage
}
# Set power to highest preset <= wattage
# If ATM enabled, must disable it before setting power limit
new_preset = max(valid_presets, key=valid_presets.get)
re_enable_atm = False
try:
if await self.atm_enabled():
re_enable_atm = True
await self.rpc.atmset(enabled=False)
result = await self.rpc.profileset(new_preset)
if re_enable_atm:
await self.rpc.atmset(enabled=True)
except APIError:
raise
except Exception as e:
logging.warning(f"{self} - Failed to set power limit: {e}")
return False
if result["PROFILE"][0]["Profile"] == new_preset:
return True
else:
return False
##################################################
### DATA GATHERING FUNCTIONS (get_{some_data}) ###
##################################################
async def _get_mac(self, rpc_config: dict = None) -> Optional[str]:
if rpc_config is None:
try:
rpc_config = await self.rpc.config()
except APIError:
pass
if rpc_config is not None:
try:
return rpc_config["CONFIG"][0]["MACAddr"].upper()
except KeyError:
pass
async def _get_hashrate(self, rpc_summary: dict = None) -> Optional[AlgoHashRate]:
if rpc_summary is None:
try:
rpc_summary = await self.rpc.summary()
except APIError:
pass
if rpc_summary is not None:
try:
return self.algo.hashrate(
rate=float(rpc_summary["SUMMARY"][0]["GHS 5s"]),
unit=self.algo.unit.GH,
).into(self.algo.unit.default)
except (LookupError, ValueError, TypeError):
pass
async def _get_hashboards(self, rpc_stats: dict = None) -> List[HashBoard]:
if self.expected_hashboards is None:
return []
hashboards = [
HashBoard(slot=idx, expected_chips=self.expected_chips)
for idx in range(self.expected_hashboards)
]
if rpc_stats is None:
try:
rpc_stats = await self.rpc.stats()
except APIError:
pass
if rpc_stats is not None:
try:
# TODO: bugged on S9 because of index issues, fix later.
board_stats = rpc_stats["STATS"][1]
for idx in range(3):
board_n = idx + 1
hashboards[idx].hashrate = self.algo.hashrate(
rate=float(board_stats[f"chain_rate{board_n}"]),
unit=self.algo.unit.GH,
).into(self.algo.unit.default)
hashboards[idx].chips = int(board_stats[f"chain_acn{board_n}"])
chip_temp_data = list(
filter(
lambda x: not x == 0,
map(int, board_stats[f"temp_chip{board_n}"].split("-")),
)
)
hashboards[idx].chip_temp = (
sum([chip_temp_data[0], chip_temp_data[3]]) / 2
)
board_temp_data = list(
filter(
lambda x: not x == 0,
map(int, board_stats[f"temp_pcb{board_n}"].split("-")),
)
)
hashboards[idx].temp = (
sum([board_temp_data[1], board_temp_data[2]]) / 2
)
hashboards[idx].missing = False
except LookupError:
pass
return hashboards
async def _get_wattage(self, rpc_power: dict = None) -> Optional[int]:
if rpc_power is None:
try:
rpc_power = await self.rpc.power()
except APIError:
pass
if rpc_power is not None:
try:
return rpc_power["POWER"][0]["Watts"]
except (LookupError, ValueError, TypeError):
pass
async def _get_wattage_limit(
self, rpc_config: dict = None, rpc_profiles: list[dict] = None
) -> Optional[int]:
try:
active_preset = MiningModePreset.get_active_preset_from_luxos(
rpc_config, rpc_profiles
)
return active_preset.power
except (LookupError, ValueError, TypeError):
pass
async def _get_fans(self, rpc_fans: dict = None) -> List[Fan]:
if self.expected_fans is None:
return []
if rpc_fans is None:
try:
rpc_fans = await self.rpc.fans()
except APIError:
pass
fans = []
if rpc_fans is not None:
for fan in range(self.expected_fans):
try:
fans.append(Fan(speed=rpc_fans["FANS"][fan]["RPM"]))
except (LookupError, ValueError, TypeError):
fans.append(Fan())
return fans
async def _get_expected_hashrate(
self, rpc_stats: dict = None
) -> Optional[AlgoHashRate]:
if rpc_stats is None:
try:
rpc_stats = await self.rpc.stats()
except APIError:
pass
if rpc_stats is not None:
try:
expected_rate = rpc_stats["STATS"][1]["total_rateideal"]
try:
rate_unit = rpc_stats["STATS"][1]["rate_unit"]
except KeyError:
rate_unit = "GH"
return self.algo.hashrate(
rate=float(expected_rate), unit=self.algo.unit.from_str(rate_unit)
).into(self.algo.unit.default)
except LookupError:
pass
async def _get_uptime(self, rpc_stats: dict = None) -> Optional[int]:
if rpc_stats is None:
try:
rpc_stats = await self.rpc.stats()
except APIError:
pass
if rpc_stats is not None:
try:
return int(rpc_stats["STATS"][1]["Elapsed"])
except LookupError:
pass
async def _get_fw_ver(self, rpc_version: dict = None) -> Optional[str]:
if rpc_version is None:
try:
rpc_version = await self.rpc.version()
except APIError:
pass
if rpc_version is not None:
try:
return rpc_version["VERSION"][0]["Miner"]
except LookupError:
pass
async def _get_api_ver(self, rpc_version: dict = None) -> Optional[str]:
if rpc_version is None:
try:
rpc_version = await self.rpc.version()
except APIError:
pass
if rpc_version is not None:
try:
return rpc_version["VERSION"][0]["API"]
except LookupError:
pass
async def _get_fault_light(self, rpc_config: dict = None) -> Optional[bool]:
if rpc_config is None:
try:
rpc_config = await self.rpc.config()
except APIError:
pass
if rpc_config is not None:
try:
return not rpc_config["CONFIG"][0]["RedLed"] == "off"
except LookupError:
pass
async def _get_pools(self, rpc_pools: dict = None) -> List[PoolMetrics]:
if rpc_pools is None:
try:
rpc_pools = await self.rpc.pools()
except APIError:
pass
pools_data = []
if rpc_pools is not None:
try:
pools = rpc_pools.get("POOLS", [])
for pool_info in pools:
url = pool_info.get("URL")
pool_url = PoolUrl.from_str(url) if url else None
pool_data = PoolMetrics(
accepted=pool_info.get("Accepted"),
rejected=pool_info.get("Rejected"),
get_failures=pool_info.get("Get Failures"),
remote_failures=pool_info.get("Remote Failures"),
active=pool_info.get("Stratum Active"),
alive=pool_info.get("Status") == "Alive",
url=pool_url,
user=pool_info.get("User"),
index=pool_info.get("POOL"),
)
pools_data.append(pool_data)
except LookupError:
pass
return pools_data