Skip to content

pyasic

LUXMiner Backend

Bases: LuxOSFirmware

Handler for LuxOS miners

Source code in pyasic/miners/backends/luxminer.py
class LUXMiner(LuxOSFirmware):
    """Handler for LuxOS miners"""

    _rpc_cls = LUXMinerRPCAPI
    rpc: LUXMinerRPCAPI

    supports_shutdown = True
    supports_presets = True
    supports_autotuning = True

    data_locations = LUXMINER_DATA_LOC

    async def fault_light_on(self) -> bool:
        try:
            await self.rpc.ledset("red", "blink")
            return True
        except (APIError, LookupError):
            pass
        return False

    async def fault_light_off(self) -> bool:
        try:
            await self.rpc.ledset("red", "off")
            return True
        except (APIError, LookupError):
            pass
        return False

    async def restart_backend(self) -> bool:
        return await self.restart_luxminer()

    async def restart_luxminer(self) -> bool:
        try:
            await self.rpc.resetminer()
            return True
        except (APIError, LookupError):
            pass
        return False

    async def stop_mining(self) -> bool:
        try:
            await self.rpc.sleep()
            return True
        except (APIError, LookupError):
            pass
        return False

    async def resume_mining(self) -> bool:
        try:
            await self.rpc.wakeup()
            return True
        except (APIError, LookupError):
            pass

    async def reboot(self) -> bool:
        try:
            await self.rpc.rebootdevice()
            return True
        except (APIError, LookupError):
            pass
        return False

    async def get_config(self) -> MinerConfig:
        data = await self.rpc.multicommand(
            "tempctrl", "fans", "pools", "groups", "config", "profiles"
        )
        return MinerConfig.from_luxos(
            rpc_tempctrl=data.get("tempctrl", [{}])[0],
            rpc_fans=data.get("fans", [{}])[0],
            rpc_pools=data.get("pools", [{}])[0],
            rpc_groups=data.get("groups", [{}])[0],
            rpc_config=data.get("config", [{}])[0],
            rpc_profiles=data.get("profiles", [{}])[0],
        )

    async def upgrade_firmware(self, *args, **kwargs) -> bool:
        """
        Upgrade the firmware on a LuxOS miner by calling the 'updaterun' API command.
        Returns:
            bool: True if the firmware upgrade was successfully initiated, False otherwise.
        """
        try:
            await self.rpc.updaterun()
            logging.info(f"{self.ip}: Firmware upgrade initiated successfully.")
            return True

        except APIError as e:
            logging.error(f"{self.ip}: Firmware upgrade failed: {e}")

        return False

    async def atm_enabled(self) -> Optional[bool]:
        try:
            result = await self.rpc.atm()
            return result["ATM"][0]["Enabled"]
        except (APIError, LookupError):
            pass

    async def set_power_limit(self, wattage: int) -> bool:
        config = await self.get_config()
        valid_presets = {
            preset.name: preset.power
            for preset in config.mining_mode.available_presets
            if preset.power <= wattage
        }

        # Set power to highest preset <= wattage
        # If ATM enabled, must disable it before setting power limit
        new_preset = max(valid_presets, key=valid_presets.get)

        re_enable_atm = False
        try:
            if await self.atm_enabled():
                re_enable_atm = True
                await self.rpc.atmset(enabled=False)
            result = await self.rpc.profileset(new_preset)
            if re_enable_atm:
                await self.rpc.atmset(enabled=True)
        except APIError:
            raise
        except Exception as e:
            logging.warning(f"{self} - Failed to set power limit: {e}")
            return False

        if result["PROFILE"][0]["Profile"] == new_preset:
            return True
        else:
            return False

    ##################################################
    ### DATA GATHERING FUNCTIONS (get_{some_data}) ###
    ##################################################

    async def _get_mac(self, rpc_config: dict = None) -> Optional[str]:
        if rpc_config is None:
            try:
                rpc_config = await self.rpc.config()
            except APIError:
                pass

        if rpc_config is not None:
            try:
                return rpc_config["CONFIG"][0]["MACAddr"].upper()
            except KeyError:
                pass

    async def _get_hashrate(self, rpc_summary: dict = None) -> Optional[AlgoHashRate]:
        if rpc_summary is None:
            try:
                rpc_summary = await self.rpc.summary()
            except APIError:
                pass

        if rpc_summary is not None:
            try:
                return self.algo.hashrate(
                    rate=float(rpc_summary["SUMMARY"][0]["GHS 5s"]),
                    unit=self.algo.unit.GH,
                ).into(self.algo.unit.default)
            except (LookupError, ValueError, TypeError):
                pass

    async def _get_hashboards(self, rpc_stats: dict = None) -> List[HashBoard]:
        if self.expected_hashboards is None:
            return []

        hashboards = [
            HashBoard(slot=idx, expected_chips=self.expected_chips)
            for idx in range(self.expected_hashboards)
        ]

        if rpc_stats is None:
            try:
                rpc_stats = await self.rpc.stats()
            except APIError:
                pass
        if rpc_stats is not None:
            try:
                # TODO: bugged on S9 because of index issues, fix later.
                board_stats = rpc_stats["STATS"][1]
                for idx in range(3):
                    board_n = idx + 1
                    hashboards[idx].hashrate = self.algo.hashrate(
                        rate=float(board_stats[f"chain_rate{board_n}"]),
                        unit=self.algo.unit.GH,
                    ).into(self.algo.unit.default)
                    hashboards[idx].chips = int(board_stats[f"chain_acn{board_n}"])
                    chip_temp_data = list(
                        filter(
                            lambda x: not x == 0,
                            map(int, board_stats[f"temp_chip{board_n}"].split("-")),
                        )
                    )
                    hashboards[idx].chip_temp = (
                        sum([chip_temp_data[0], chip_temp_data[3]]) / 2
                    )
                    board_temp_data = list(
                        filter(
                            lambda x: not x == 0,
                            map(int, board_stats[f"temp_pcb{board_n}"].split("-")),
                        )
                    )
                    hashboards[idx].temp = (
                        sum([board_temp_data[1], board_temp_data[2]]) / 2
                    )
                    hashboards[idx].missing = False
            except LookupError:
                pass
        return hashboards

    async def _get_wattage(self, rpc_power: dict = None) -> Optional[int]:
        if rpc_power is None:
            try:
                rpc_power = await self.rpc.power()
            except APIError:
                pass

        if rpc_power is not None:
            try:
                return rpc_power["POWER"][0]["Watts"]
            except (LookupError, ValueError, TypeError):
                pass

    async def _get_wattage_limit(
        self, rpc_config: dict = None, rpc_profiles: list[dict] = None
    ) -> Optional[int]:
        try:
            active_preset = MiningModePreset.get_active_preset_from_luxos(
                rpc_config, rpc_profiles
            )
            return active_preset.power
        except (LookupError, ValueError, TypeError):
            pass

    async def _get_fans(self, rpc_fans: dict = None) -> List[Fan]:
        if self.expected_fans is None:
            return []

        if rpc_fans is None:
            try:
                rpc_fans = await self.rpc.fans()
            except APIError:
                pass

        fans = []

        if rpc_fans is not None:
            for fan in range(self.expected_fans):
                try:
                    fans.append(Fan(speed=rpc_fans["FANS"][fan]["RPM"]))
                except (LookupError, ValueError, TypeError):
                    fans.append(Fan())
        return fans

    async def _get_expected_hashrate(
        self, rpc_stats: dict = None
    ) -> Optional[AlgoHashRate]:
        if rpc_stats is None:
            try:
                rpc_stats = await self.rpc.stats()
            except APIError:
                pass

        if rpc_stats is not None:
            try:
                expected_rate = rpc_stats["STATS"][1]["total_rateideal"]
                try:
                    rate_unit = rpc_stats["STATS"][1]["rate_unit"]
                except KeyError:
                    rate_unit = "GH"
                return self.algo.hashrate(
                    rate=float(expected_rate), unit=self.algo.unit.from_str(rate_unit)
                ).into(self.algo.unit.default)
            except LookupError:
                pass

    async def _get_uptime(self, rpc_stats: dict = None) -> Optional[int]:
        if rpc_stats is None:
            try:
                rpc_stats = await self.rpc.stats()
            except APIError:
                pass

        if rpc_stats is not None:
            try:
                return int(rpc_stats["STATS"][1]["Elapsed"])
            except LookupError:
                pass

    async def _get_fw_ver(self, rpc_version: dict = None) -> Optional[str]:
        if rpc_version is None:
            try:
                rpc_version = await self.rpc.version()
            except APIError:
                pass

        if rpc_version is not None:
            try:
                return rpc_version["VERSION"][0]["Miner"]
            except LookupError:
                pass

    async def _get_api_ver(self, rpc_version: dict = None) -> Optional[str]:
        if rpc_version is None:
            try:
                rpc_version = await self.rpc.version()
            except APIError:
                pass

        if rpc_version is not None:
            try:
                return rpc_version["VERSION"][0]["API"]
            except LookupError:
                pass

    async def _get_fault_light(self, rpc_config: dict = None) -> Optional[bool]:
        if rpc_config is None:
            try:
                rpc_config = await self.rpc.config()
            except APIError:
                pass

        if rpc_config is not None:
            try:
                return not rpc_config["CONFIG"][0]["RedLed"] == "off"
            except LookupError:
                pass

    async def _get_pools(self, rpc_pools: dict = None) -> List[PoolMetrics]:
        if rpc_pools is None:
            try:
                rpc_pools = await self.rpc.pools()
            except APIError:
                pass

        pools_data = []
        if rpc_pools is not None:
            try:
                pools = rpc_pools.get("POOLS", [])
                for pool_info in pools:
                    url = pool_info.get("URL")
                    pool_url = PoolUrl.from_str(url) if url else None
                    pool_data = PoolMetrics(
                        accepted=pool_info.get("Accepted"),
                        rejected=pool_info.get("Rejected"),
                        get_failures=pool_info.get("Get Failures"),
                        remote_failures=pool_info.get("Remote Failures"),
                        active=pool_info.get("Stratum Active"),
                        alive=pool_info.get("Status") == "Alive",
                        url=pool_url,
                        user=pool_info.get("User"),
                        index=pool_info.get("POOL"),
                    )
                    pools_data.append(pool_data)
            except LookupError:
                pass
        return pools_data

upgrade_firmware(*args, **kwargs) async

Upgrade the firmware on a LuxOS miner by calling the 'updaterun' API command. Returns: bool: True if the firmware upgrade was successfully initiated, False otherwise.

Source code in pyasic/miners/backends/luxminer.py
async def upgrade_firmware(self, *args, **kwargs) -> bool:
    """
    Upgrade the firmware on a LuxOS miner by calling the 'updaterun' API command.
    Returns:
        bool: True if the firmware upgrade was successfully initiated, False otherwise.
    """
    try:
        await self.rpc.updaterun()
        logging.info(f"{self.ip}: Firmware upgrade initiated successfully.")
        return True

    except APIError as e:
        logging.error(f"{self.ip}: Firmware upgrade failed: {e}")

    return False