class GoldshellByte(GoldshellMiner, Byte):
data_locations = GOLDSHELL_BYTE_DATA_LOC
cgdev: dict | None = None
async def get_data(
self,
allow_warning: bool = False,
include: List[Union[str, DataOptions]] = None,
exclude: List[Union[str, DataOptions]] = None,
) -> MinerData:
if self.cgdev is None:
try:
self.cgdev = await self.web.send_command("cgminer?cgminercmd=devs")
except APIError:
pass
scrypt_board_count = 0
zksnark_board_count = 0
total_wattage = 0
total_uptime_mins = 0
for minfo in self.cgdev.get("minfos", []):
algo_name = minfo.get("name")
for info in minfo.get("infos", []):
self.expected_hashboards += 1
self.expected_fans += 1
total_wattage = int(float(info.get("power", 0)))
total_uptime_mins = int(info.get("time", 0))
if algo_name == ALGORITHM_SCRYPT_NAME:
scrypt_board_count += 1
elif algo_name == ALGORITHM_ZKSNARK_NAME:
zksnark_board_count += 1
self.expected_chips = (EXPECTED_CHIPS_PER_SCRYPT_BOARD * scrypt_board_count) + (
EXPECTED_CHIPS_PER_ZKSNARK_BOARD * zksnark_board_count
)
if scrypt_board_count > 0 and zksnark_board_count == 0:
self.algo = MinerAlgo.SCRYPT
elif zksnark_board_count > 0 and scrypt_board_count == 0:
self.algo = MinerAlgo.ZKSNARK
data = await super().get_data(allow_warning, include, exclude)
data.expected_chips = self.expected_chips
data.wattage = total_wattage
data.uptime = total_uptime_mins
data.voltage = 0
for board in data.hashboards:
data.voltage += board.voltage
return data
async def get_config(self) -> MinerConfig:
try:
pools = await self.web.pools()
except APIError:
return self.config
self.config = MinerConfig.from_goldshell_byte(pools)
return self.config
async def _get_api_ver(self, web_setting: dict = None) -> Optional[str]:
if web_setting is None:
try:
web_setting = await self.web.setting()
except APIError:
pass
if web_setting is not None:
try:
version = web_setting.get("version")
if version is not None:
self.api_ver = version.strip("v")
return self.api_ver
except KeyError:
pass
return self.api_ver
async def _get_expected_hashrate(
self, rpc_devs: dict = None
) -> Optional[AlgoHashRate]:
if rpc_devs is None:
try:
rpc_devs = await self.rpc.devs()
except APIError:
pass
total_hash_rate_mh = 0
if rpc_devs is not None:
for board in rpc_devs.get("DEVS", []):
algo_name = board.get("pool")
if algo_name == ALGORITHM_SCRYPT_NAME:
total_hash_rate_mh += (
self.algo.hashrate(
rate=float(board.get("estimate_hash_rate", 0)),
unit=self.algo.unit.H,
)
.into(self.algo.unit.MH)
.rate
)
elif algo_name == ALGORITHM_ZKSNARK_NAME:
total_hash_rate_mh += float(board.get("theory_hash", 0))
hash_rate = self.algo.hashrate(
rate=total_hash_rate_mh, unit=self.algo.unit.MH
).into(self.algo.unit.default)
return hash_rate
async def _get_hashrate(self, rpc_devs: dict = None) -> Optional[AlgoHashRate]:
if rpc_devs is None:
try:
rpc_devs = await self.rpc.devs()
except APIError:
pass
total_hash_rate_mh = 0
if rpc_devs is not None:
for board in rpc_devs.get("DEVS", []):
total_hash_rate_mh += float(board.get("MHS 20s", 0))
hash_rate = self.algo.hashrate(
rate=total_hash_rate_mh, unit=self.algo.unit.MH
).into(self.algo.unit.default)
return hash_rate
async def _get_pools(self, rpc_pools: dict = None) -> List[PoolMetrics]:
if rpc_pools is None:
try:
rpc_pools = await self.rpc.pools()
except APIError:
pass
pools_data = []
if rpc_pools is not None:
try:
pools = rpc_pools.get("POOLS", [])
for index, pool_info in enumerate(pools):
url = pool_info.get("URL")
pool_url = PoolUrl.from_str(url) if url else None
pool_data = PoolMetrics(
accepted=pool_info.get("Accepted"),
rejected=pool_info.get("Rejected"),
active=pool_info.get("Stratum Active"),
alive=pool_info.get("Status") == "Alive",
url=pool_url,
user=pool_info.get("User"),
index=index,
)
pools_data.append(pool_data)
except LookupError:
pass
return pools_data
async def _get_hashboards(
self, rpc_devs: dict = None, rpc_devdetails: dict = None
) -> List[HashBoard]:
if rpc_devs is None:
try:
rpc_devs = await self.rpc.devs()
except APIError:
pass
hashboards = [
HashBoard(slot=i, expected_chips=self.expected_chips)
for i in range(self.expected_hashboards)
]
if rpc_devs is not None:
for board in rpc_devs.get("DEVS", []):
b_id = board["PGA"]
hashboards[b_id].hashrate = self.algo.hashrate(
rate=float(board["MHS 20s"]), unit=self.algo.unit.MH
).into(self.algo.unit.default)
hashboards[b_id].chip_temp = board["tstemp-1"]
hashboards[b_id].temp = board["tstemp-2"]
hashboards[b_id].voltage = board["voltage"]
hashboards[b_id].active = board["Status"] == "Alive"
hashboards[b_id].missing = False
algo_name = board.get("pool")
if algo_name == ALGORITHM_SCRYPT_NAME:
hashboards[b_id].expected_chips = EXPECTED_CHIPS_PER_SCRYPT_BOARD
elif algo_name == ALGORITHM_ZKSNARK_NAME:
hashboards[b_id].expected_chips = EXPECTED_CHIPS_PER_ZKSNARK_BOARD
if rpc_devdetails is None:
try:
rpc_devdetails = await self.rpc.devdetails()
except APIError:
pass
if rpc_devdetails is not None:
for board in rpc_devdetails.get("DEVS", []):
b_id = board["DEVDETAILS"]
hashboards[b_id].chips = board["chips-nr"]
return hashboards
async def _get_fans(self, rpc_devs: dict = None) -> List[Fan]:
if self.expected_fans is None:
return []
if rpc_devs is None:
try:
rpc_devs = await self.rpc.devs()
except APIError:
pass
fans_data = []
if rpc_devs is not None:
for board in rpc_devs.get("DEVS", []):
if board.get("PGA") is not None:
try:
b_id = board["PGA"]
fan_speed = board[f"fan{b_id}"]
fans_data.append(fan_speed)
except KeyError:
pass
fans = [Fan(speed=d) if d else Fan() for d in fans_data]
return fans