class ePIC(ePICFirmware):
"""Handler for miners with the ePIC board"""
_web_cls = ePICWebAPI
web: ePICWebAPI
data_locations = EPIC_DATA_LOC
supports_shutdown = True
async def get_config(self) -> MinerConfig:
summary = None
try:
summary = await self.web.summary()
except APIError as e:
logger.warning(e)
except LookupError:
pass
if summary is not None:
cfg = MinerConfig.from_epic(summary)
else:
cfg = MinerConfig()
self.config = cfg
return self.config
async def send_config(self, config: MinerConfig, user_suffix: str = None) -> None:
self.config = config
conf = self.config.as_epic(user_suffix=user_suffix)
try:
# Temps
if not conf.get("temps", {}) == {}:
await self.web.set_shutdown_temp(conf["temps"]["shutdown"])
await self.web.set_critical_temp(conf["temps"]["critical"])
# Fans
# set with sub-keys instead of conf["fans"] because sometimes both can be set
if not conf["fans"].get("Manual", {}) == {}:
await self.web.set_fan({"Manual": conf["fans"]["Manual"]})
elif not conf["fans"].get("Auto", {}) == {}:
await self.web.set_fan({"Auto": conf["fans"]["Auto"]})
# Mining Mode -- Need to handle that you may not be able to change while miner is tuning
if conf["ptune"].get("enabled", True):
await self.web.set_ptune_enable(True)
await self.web.set_ptune_algo(conf["ptune"])
## Pools
if self.algo == ScryptAlgo:
conf["pools"]["coin"] = "Ltc"
await self.web.set_pools(conf["pools"])
except APIError:
pass
async def restart_backend(self) -> bool:
data = await self.web.restart_epic()
if data:
try:
return data["success"]
except KeyError:
pass
return False
async def stop_mining(self) -> bool:
data = await self.web.stop_mining()
if data:
try:
return data["success"]
except KeyError:
pass
return False
async def resume_mining(self) -> bool:
data = await self.web.resume_mining()
if data:
try:
return data["success"]
except KeyError:
pass
return False
async def reboot(self) -> bool:
data = await self.web.reboot()
if data:
try:
return data["success"]
except KeyError:
pass
return False
async def _get_mac(self, web_network: dict = None) -> Optional[str]:
if web_network is None:
try:
web_network = await self.web.network()
except APIError:
pass
if web_network is not None:
try:
for network in web_network:
mac = web_network[network]["mac_address"]
return mac
except KeyError:
pass
async def _get_hostname(self, web_summary: dict = None) -> Optional[str]:
if web_summary is None:
try:
web_summary = await self.web.summary()
except APIError:
pass
if web_summary is not None:
try:
hostname = web_summary["Hostname"]
return hostname
except KeyError:
pass
async def _get_wattage(self, web_summary: dict = None) -> Optional[int]:
if web_summary is None:
try:
web_summary = await self.web.summary()
except APIError:
pass
if web_summary is not None:
try:
wattage = web_summary["Power Supply Stats"]["Input Power"]
wattage = round(wattage)
return wattage
except KeyError:
pass
async def _get_hashrate(self, web_summary: dict = None) -> Optional[AlgoHashRate]:
if web_summary is None:
try:
web_summary = await self.web.summary()
except APIError:
pass
if web_summary is not None:
try:
hashrate = 0
if web_summary["HBs"] is not None:
for hb in web_summary["HBs"]:
hashrate += hb["Hashrate"][0]
return self.algo.hashrate(
rate=float(hashrate), unit=self.algo.unit.MH
).into(self.algo.unit.TH)
except (LookupError, ValueError, TypeError):
pass
async def _get_expected_hashrate(
self, web_summary: dict = None
) -> Optional[AlgoHashRate]:
if web_summary is None:
try:
web_summary = await self.web.summary()
except APIError:
pass
if web_summary is not None:
try:
hashrate = 0
if web_summary.get("HBs") is not None:
for hb in web_summary["HBs"]:
if hb["Hashrate"][1] == 0:
ideal = 1.0
else:
ideal = hb["Hashrate"][1] / 100
hashrate += hb["Hashrate"][0] / ideal
return self.algo.hashrate(
rate=float(hashrate), unit=self.algo.unit.MH
).into(self.algo.unit.default)
except (LookupError, ValueError, TypeError):
pass
async def _get_fw_ver(self, web_summary: dict = None) -> Optional[str]:
if web_summary is None:
try:
web_summary = await self.web.summary()
except APIError:
pass
if web_summary is not None:
try:
fw_ver = web_summary["Software"]
fw_ver = fw_ver.split(" ")[1].replace("v", "")
return fw_ver
except KeyError:
pass
async def _get_fans(self, web_summary: dict = None) -> List[Fan]:
if self.expected_fans is None:
return []
if web_summary is None:
try:
web_summary = await self.web.summary()
except APIError:
pass
fans = []
if web_summary is not None:
for fan in web_summary["Fans Rpm"]:
try:
fans.append(Fan(speed=web_summary["Fans Rpm"][fan]))
except (LookupError, ValueError, TypeError):
fans.append(Fan())
return fans
async def _get_hashboards(
self, web_summary: dict = None, web_capabilities: dict = None
) -> List[HashBoard]:
if self.expected_hashboards is None:
return []
if web_summary is None:
try:
web_summary = await self.web.summary()
except APIError:
pass
if web_capabilities is None:
try:
web_capabilities = await self.web.capabilities()
except APIError:
pass
tuned = True
active = False
if web_summary is not None:
tuner_running = web_summary["PerpetualTune"]["Running"]
if tuner_running:
active = True
algo_info = web_summary["PerpetualTune"]["Algorithm"]
if algo_info.get("VoltageOptimizer") is not None:
tuned = algo_info["VoltageOptimizer"].get("Optimized")
elif algo_info.get("BoardTune") is not None:
tuned = algo_info["BoardTune"].get("Optimized")
else:
tuned = algo_info["ChipTune"].get("Optimized")
# To be extra detailed, also ensure the miner is in "Mining" state
tuned = tuned and web_summary["Status"]["Operating State"] == "Mining"
active = active and web_summary["Status"]["Operating State"] == "Mining"
hb_list = [
HashBoard(slot=i, expected_chips=self.expected_chips)
for i in range(self.expected_hashboards)
]
if web_summary is not None and web_capabilities is not None:
if web_summary.get("HBs") is not None:
for hb in web_summary["HBs"]:
if web_capabilities.get("Performance Estimator") is not None:
num_of_chips = web_capabilities["Performance Estimator"][
"Chip Count"
]
else:
num_of_chips = self.expected_chips
if web_capabilities.get("Board Serial Numbers") is not None:
if hb["Index"] < len(web_capabilities["Board Serial Numbers"]):
hb_list[hb["Index"]].serial_number = web_capabilities[
"Board Serial Numbers"
][hb["Index"]]
hashrate = hb["Hashrate"][0]
# Update the Hashboard object
hb_list[hb["Index"]].missing = False
hb_list[hb["Index"]].hashrate = self.algo.hashrate(
rate=float(hashrate), unit=self.algo.unit.MH
).into(self.algo.unit.default)
hb_list[hb["Index"]].chips = num_of_chips
hb_list[hb["Index"]].temp = int(hb["Temperature"])
hb_list[hb["Index"]].tuned = tuned
hb_list[hb["Index"]].active = active
hb_list[hb["Index"]].voltage = hb["Input Voltage"]
return hb_list
async def _is_mining(self, web_summary, *args, **kwargs) -> Optional[bool]:
if web_summary is None:
try:
web_summary = await self.web.summary()
except APIError:
pass
if web_summary is not None:
try:
op_state = web_summary["Status"]["Operating State"]
return not op_state == "Idling"
except KeyError:
pass
async def _get_uptime(self, web_summary: dict = None) -> Optional[int]:
if web_summary is None:
try:
web_summary = await self.web.summary()
except APIError:
pass
if web_summary is not None:
try:
uptime = web_summary["Session"]["Uptime"]
return uptime
except KeyError:
pass
return None
async def _get_fault_light(self, web_summary: dict = None) -> Optional[bool]:
if web_summary is None:
try:
web_summary = await self.web.summary()
except APIError:
pass
if web_summary is not None:
try:
light = web_summary["Misc"]["Locate Miner State"]
return light
except KeyError:
pass
return False
async def _get_errors(self, web_summary: dict = None) -> List[MinerErrorData]:
if not web_summary:
try:
web_summary = await self.web.summary()
except APIError:
pass
errors = []
if web_summary is not None:
try:
error = web_summary["Status"]["Last Error"]
if error is not None:
errors.append(X19Error(error_message=str(error)))
return errors
except KeyError:
pass
return errors
async def _get_pools(self, web_summary: dict = None) -> List[PoolMetrics]:
if web_summary is None:
try:
web_summary = await self.web.summary()
except APIError:
pass
pool_data = []
try:
if web_summary is not None:
if (
web_summary.get("Session") is not None
and web_summary.get("Stratum") is not None
):
url = web_summary["Stratum"].get("Current Pool")
# TODO: when scheme gets put in, update this
if url is not None:
url = PoolUrl.from_str(f"stratum+tcp://{url}")
pool_data.append(
PoolMetrics(
accepted=web_summary["Session"].get("Accepted"),
rejected=web_summary["Session"].get("Rejected"),
get_failures=0,
remote_failures=0,
active=web_summary["Stratum"].get("IsPoolConnected"),
alive=web_summary["Stratum"].get("IsPoolConnected"),
url=url,
user=web_summary["Stratum"].get("Current User"),
index=web_summary["Stratum"].get("Config Id"),
)
)
return pool_data
except LookupError:
pass
async def upgrade_firmware(
self, file: Path | str, keep_settings: bool = True
) -> bool:
"""
Upgrade the firmware of the ePIC miner device.
Args:
file (Path | str): The local file path of the firmware to be uploaded.
keep_settings (bool): Whether to keep the current settings after the update.
Returns:
bool: Whether the firmware update succeeded.
"""
return await self.web.system_update(file=file, keep_settings=keep_settings)