class MinerFactory:
async def get_multiple_miners(
self, ips: list[str], limit: int = 200
) -> list[AnyMiner]:
results = []
async for miner in self.get_miner_generator(ips, limit):
results.append(miner)
return results
async def get_miner_generator(
self, ips: list, limit: int = 200
) -> AsyncGenerator[AnyMiner]:
tasks = []
semaphore = asyncio.Semaphore(limit)
for ip in ips:
tasks.append(asyncio.create_task(self.get_miner(ip)))
for task in tasks:
async with semaphore:
result = await task
if result is not None:
yield result
async def get_miner(self, ip: str | ipaddress.ip_address) -> AnyMiner | None:
ip = str(ip)
miner_type = None
for _ in range(settings.get("factory_get_retries", 1)):
task = asyncio.create_task(self._get_miner_type(ip))
try:
miner_type = await asyncio.wait_for(
task, timeout=settings.get("factory_get_timeout", 3)
)
except asyncio.TimeoutError:
continue
else:
if miner_type is not None:
break
if miner_type is not None:
miner_model = None
miner_model_fns = {
MinerTypes.ANTMINER: self.get_miner_model_antminer,
MinerTypes.WHATSMINER: self.get_miner_model_whatsminer,
MinerTypes.AVALONMINER: self.get_miner_model_avalonminer,
MinerTypes.INNOSILICON: self.get_miner_model_innosilicon,
MinerTypes.GOLDSHELL: self.get_miner_model_goldshell,
MinerTypes.BRAIINS_OS: self.get_miner_model_braiins_os,
MinerTypes.VNISH: self.get_miner_model_vnish,
MinerTypes.EPIC: self.get_miner_model_epic,
MinerTypes.HIVEON: self.get_miner_model_hiveon,
MinerTypes.LUX_OS: self.get_miner_model_luxos,
MinerTypes.AURADINE: self.get_miner_model_auradine,
MinerTypes.MARATHON: self.get_miner_model_marathon,
MinerTypes.BITAXE: self.get_miner_model_bitaxe,
MinerTypes.LUCKYMINER: self.get_miner_model_luckyminer,
MinerTypes.ICERIVER: self.get_miner_model_iceriver,
MinerTypes.HAMMER: self.get_miner_model_hammer,
MinerTypes.VOLCMINER: self.get_miner_model_volcminer,
MinerTypes.ELPHAPEX: self.get_miner_model_elphapex,
}
fn = miner_model_fns.get(miner_type)
if fn is not None:
# noinspection PyArgumentList
task = asyncio.create_task(fn(ip))
try:
miner_model = await asyncio.wait_for(
task, timeout=settings.get("factory_get_timeout", 3)
)
except asyncio.TimeoutError:
pass
miner = self._select_miner_from_classes(
ip,
miner_type=miner_type,
miner_model=miner_model,
)
return miner
async def _get_miner_type(self, ip: str) -> MinerTypes | None:
tasks = [
asyncio.create_task(self._get_miner_web(ip)),
asyncio.create_task(self._get_miner_socket(ip)),
]
return await concurrent_get_first_result(tasks, lambda x: x is not None)
async def _get_miner_web(self, ip: str) -> MinerTypes | None:
urls = [f"http://{ip}/", f"https://{ip}/"]
async with httpx.AsyncClient(
transport=settings.transport(verify=False)
) as session:
tasks = [asyncio.create_task(self._web_ping(session, url)) for url in urls]
text, resp = await concurrent_get_first_result(
tasks,
lambda x: x[0] is not None
and self._parse_web_type(x[0], x[1]) is not None,
)
if text is not None:
mtype = self._parse_web_type(text, resp)
if mtype == MinerTypes.ANTMINER:
# could still be mara
auth = httpx.DigestAuth("root", "root")
res = await self.send_web_command(ip, "/kaonsu/v1/brief", auth=auth)
if res is not None:
mtype = MinerTypes.MARATHON
if mtype == MinerTypes.HAMMER:
res = await self.get_miner_model_hammer(ip)
if res is None:
return MinerTypes.HAMMER
if "HAMMER" in res.upper():
mtype = MinerTypes.HAMMER
else:
mtype = MinerTypes.VOLCMINER
return mtype
@staticmethod
async def _web_ping(
session: httpx.AsyncClient, url: str
) -> tuple[str | None, httpx.Response | None]:
try:
resp = await session.get(url, follow_redirects=True)
return resp.text, resp
except (
httpx.HTTPError,
asyncio.TimeoutError,
anyio.EndOfStream,
anyio.ClosedResourceError,
):
pass
return None, None
@staticmethod
def _parse_web_type(web_text: str, web_resp: httpx.Response) -> MinerTypes | None:
if web_resp.status_code == 401 and 'realm="antMiner' in web_resp.headers.get(
"www-authenticate", ""
):
return MinerTypes.ANTMINER
if web_resp.status_code == 401 and 'realm="blackMiner' in web_resp.headers.get(
"www-authenticate", ""
):
return MinerTypes.HAMMER
if web_resp.status_code == 401 and 'realm="Daoge' in web_resp.headers.get(
"www-authenticate", ""
):
return MinerTypes.ELPHAPEX
if len(web_resp.history) > 0:
history_resp = web_resp.history[0]
if (
"/cgi-bin/luci" in web_text
and history_resp.status_code == 307
and "https://" in history_resp.headers.get("location", "")
):
return MinerTypes.WHATSMINER
if "Braiins OS" in web_text:
return MinerTypes.BRAIINS_OS
if "Luxor Firmware" in web_text:
return MinerTypes.LUX_OS
if "<TITLE>用户界面</TITLE>" in web_text:
return MinerTypes.ICERIVER
if "AxeOS" in web_text:
return MinerTypes.BITAXE
if "Lucky miner" in web_text:
return MinerTypes.LUCKYMINER
if "cloud-box" in web_text:
return MinerTypes.GOLDSHELL
if "AnthillOS" in web_text:
return MinerTypes.VNISH
if "Miner Web Dashboard" in web_text:
return MinerTypes.EPIC
if "Avalon" in web_text:
return MinerTypes.AVALONMINER
if "DragonMint" in web_text:
return MinerTypes.INNOSILICON
if "Miner UI" in web_text:
return MinerTypes.AURADINE
async def _get_miner_socket(self, ip: str) -> MinerTypes | None:
commands = ["version", "devdetails"]
tasks = [asyncio.create_task(self._socket_ping(ip, cmd)) for cmd in commands]
data = await concurrent_get_first_result(
tasks,
lambda x: x is not None and self._parse_socket_type(x) is not None,
)
if data is not None:
d = self._parse_socket_type(data)
return d
@staticmethod
async def _socket_ping(ip: str, cmd: str) -> str | None:
data = b""
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(str(ip), 4028),
timeout=settings.get("factory_get_timeout", 3),
)
except (ConnectionError, OSError, asyncio.TimeoutError):
return
cmd = {"command": cmd}
try:
# send the command
writer.write(json.dumps(cmd).encode("utf-8"))
await writer.drain()
# loop to receive all the data
timeouts_remaining = max(1, int(settings.get("factory_get_timeout", 3)))
while True:
try:
d = await asyncio.wait_for(reader.read(4096), timeout=1)
if not d:
break
data += d
except asyncio.TimeoutError:
timeouts_remaining -= 1
if not timeouts_remaining:
logger.warning(f"{ip}: Socket ping timeout.")
break
except ConnectionResetError:
return
except asyncio.CancelledError:
raise
except (ConnectionError, OSError):
return
finally:
# Handle cancellation explicitly
if writer.transport.is_closing():
writer.transport.close()
else:
writer.close()
try:
await writer.wait_closed()
except (ConnectionError, OSError):
return
if data:
return data.decode("utf-8")
@staticmethod
def _parse_socket_type(data: str) -> MinerTypes | None:
upper_data = data.upper()
if "BOSMINER" in upper_data or "BOSER" in upper_data:
return MinerTypes.BRAIINS_OS
if "BTMINER" in upper_data or "BITMICRO" in upper_data:
return MinerTypes.WHATSMINER
if "LUXMINER" in upper_data:
return MinerTypes.LUX_OS
if "HIVEON" in upper_data:
return MinerTypes.HIVEON
if "KAONSU" in upper_data:
return MinerTypes.MARATHON
if "RWGLR" in upper_data:
return MinerTypes.MSKMINER
if "ANTMINER" in upper_data and "DEVDETAILS" not in upper_data:
return MinerTypes.ANTMINER
if (
"INTCHAINS_QOMO" in upper_data
or "KDAMINER" in upper_data
or "BFGMINER" in upper_data
):
return MinerTypes.GOLDSHELL
if "INNOMINER" in upper_data:
return MinerTypes.INNOSILICON
if "AVALON" in upper_data:
return MinerTypes.AVALONMINER
if "GCMINER" in upper_data or "FLUXOS" in upper_data:
return MinerTypes.AURADINE
if "VNISH" in upper_data:
return MinerTypes.VNISH
async def send_web_command(
self,
ip: str,
location: str,
auth: httpx.DigestAuth = None,
) -> dict | None:
async with httpx.AsyncClient(transport=settings.transport()) as session:
try:
data = await session.get(
f"http://{ip}{location}",
auth=auth,
timeout=settings.get("factory_get_timeout", 3),
)
except (httpx.HTTPError, asyncio.TimeoutError):
logger.info(f"{ip}: Web command timeout.")
return
if data is None:
return
try:
json_data = data.json()
except (json.JSONDecodeError, asyncio.TimeoutError):
try:
return json.loads(data.text)
except (json.JSONDecodeError, httpx.HTTPError):
return
else:
return json_data
async def send_api_command(self, ip: str, command: str) -> dict | None:
data = b""
try:
reader, writer = await asyncio.open_connection(ip, 4028)
except (ConnectionError, OSError):
return
cmd = {"command": command}
try:
# send the command
writer.write(json.dumps(cmd).encode("utf-8"))
await writer.drain()
# loop to receive all the data
while True:
d = await reader.read(4096)
if not d:
break
data += d
writer.close()
await writer.wait_closed()
except asyncio.CancelledError:
writer.close()
await writer.wait_closed()
return
except (ConnectionError, OSError):
return
if data == b"Socket connect failed: Connection refused\n":
return
data = await self._fix_api_data(data)
try:
data = json.loads(data)
except json.JSONDecodeError:
return {}
return data
@staticmethod
async def _fix_api_data(data: bytes) -> str:
if data.endswith(b"\x00"):
str_data = data.decode("utf-8")[:-1]
else:
str_data = data.decode("utf-8")
# fix an error with a btminer return having an extra comma that breaks json.loads()
str_data = str_data.replace(",}", "}")
# fix an error with a btminer return having a newline that breaks json.loads()
str_data = str_data.replace("\n", "")
# fix an error with a bmminer return not having a specific comma that breaks json.loads()
str_data = str_data.replace("}{", "},{")
# fix an error with a bmminer return having a specific comma that breaks json.loads()
str_data = str_data.replace("[,{", "[{")
# fix an error with a btminer return having a missing comma. (2023-01-06 version)
str_data = str_data.replace('""temp0', '","temp0')
# fix an error with Avalonminers returning inf and nan
str_data = str_data.replace('"inf"', "0")
str_data = str_data.replace('"nan"', "0")
# fix whatever this garbage from avalonminers is `,"id":1}`
if str_data.startswith(","):
str_data = f"{{{str_data[1:]}"
# try to fix an error with overflowing the recieve buffer
# this can happen in cases such as bugged btminers returning arbitrary length error info with 100s of errors.
if not str_data.endswith("}"):
str_data = ",".join(str_data.split(",")[:-1]) + "}"
# fix a really nasty bug with whatsminer API v2.0.4 where they return a list structured like a dict
if re.search(r"\"error_code\":\[\".+\"]", str_data):
str_data = str_data.replace("[", "{").replace("]", "}")
return str_data
@staticmethod
def _select_miner_from_classes(
ip: ipaddress.ip_address,
miner_model: str | None,
miner_type: MinerTypes | None,
) -> AnyMiner | None:
# special case since hiveon miners return web results copying the antminer stock FW
if "HIVEON" in str(miner_model).upper():
miner_model = str(miner_model).upper().replace(" HIVEON", "")
miner_type = MinerTypes.HIVEON
try:
return MINER_CLASSES[miner_type][str(miner_model).upper()](ip)
except LookupError:
if miner_type in MINER_CLASSES:
if miner_model is not None:
warnings.warn(
f"Partially supported miner found: {miner_model}, type: {miner_type}, please open an issue with miner data "
f"and this model on GitHub (https://github.com/UpstreamData/pyasic/issues)."
)
return MINER_CLASSES[miner_type][None](ip)
return UnknownMiner(str(ip))
async def get_miner_model_antminer(self, ip: str) -> str | None:
tasks = [
asyncio.create_task(self._get_model_antminer_web(ip)),
asyncio.create_task(self._get_model_antminer_sock(ip)),
]
return await concurrent_get_first_result(tasks, lambda x: x is not None)
async def _get_model_antminer_web(self, ip: str) -> str | None:
# last resort, this is slow
auth = httpx.DigestAuth(
"root", settings.get("default_antminer_web_password", "root")
)
web_json_data = await self.send_web_command(
ip, "/cgi-bin/get_system_info.cgi", auth=auth
)
try:
miner_model = web_json_data["minertype"]
return miner_model
except (TypeError, LookupError):
pass
async def _get_model_antminer_sock(self, ip: str) -> str | None:
sock_json_data = await self.send_api_command(ip, "version")
try:
miner_model = sock_json_data["VERSION"][0]["Type"]
if " (" in miner_model:
split_miner_model = miner_model.split(" (")
miner_model = split_miner_model[0]
return miner_model
except (TypeError, LookupError):
pass
sock_json_data = await self.send_api_command(ip, "stats")
try:
miner_model = sock_json_data["STATS"][0]["Type"]
if " (" in miner_model:
split_miner_model = miner_model.split(" (")
miner_model = split_miner_model[0]
return miner_model
except (TypeError, LookupError):
pass
async def get_miner_model_goldshell(self, ip: str) -> str | None:
json_data = await self.send_web_command(ip, "/mcb/status")
try:
miner_model = json_data["model"].replace("-", " ")
return miner_model
except (TypeError, LookupError):
pass
async def get_miner_model_whatsminer(self, ip: str) -> str | None:
sock_json_data = await self.send_api_command(ip, "devdetails")
try:
miner_model = sock_json_data["DEVDETAILS"][0]["Model"].replace("_", "")
miner_model = miner_model[:-1] + "0"
return miner_model
except (TypeError, LookupError):
pass
async def get_miner_model_avalonminer(self, ip: str) -> str | None:
sock_json_data = await self.send_api_command(ip, "version")
try:
miner_model = sock_json_data["VERSION"][0]["PROD"].upper()
if "-" in miner_model:
miner_model = miner_model.split("-")[0]
if miner_model in ["AVALONNANO", "AVALON0O", "AVALONMINER 15"]:
subtype = sock_json_data["VERSION"][0]["MODEL"].upper()
miner_model = f"AVALONMINER {subtype}"
return miner_model
except (TypeError, LookupError):
pass
async def get_miner_model_innosilicon(self, ip: str) -> str | None:
try:
async with httpx.AsyncClient(transport=settings.transport()) as session:
auth_req = await session.post(
f"http://{ip}/api/auth",
data={
"username": "admin",
"password": settings.get(
"default_innosilicon_web_password", "admin"
),
},
)
auth = auth_req.json()["jwt"]
except (httpx.HTTPError, LookupError):
return
try:
async with httpx.AsyncClient(transport=settings.transport()) as session:
web_data = (
await session.post(
f"http://{ip}/api/type",
headers={"Authorization": "Bearer " + auth},
data={},
)
).json()
return web_data["type"]
except (httpx.HTTPError, LookupError):
pass
try:
async with httpx.AsyncClient(transport=settings.transport()) as session:
web_data = (
await session.post(
f"http://{ip}/overview",
headers={"Authorization": "Bearer " + auth},
data={},
)
).json()
return web_data["type"]
except (httpx.HTTPError, LookupError):
pass
async def get_miner_model_braiins_os(self, ip: str) -> str | None:
sock_json_data = await self.send_api_command(ip, "devdetails")
try:
miner_model = (
sock_json_data["DEVDETAILS"][0]["Model"]
.replace("Bitmain ", "")
.replace("S19XP", "S19 XP")
)
return miner_model
except (TypeError, LookupError):
pass
try:
async with httpx.AsyncClient(transport=settings.transport()) as session:
d = await session.post(
f"http://{ip}/graphql",
json={"query": "{bosminer {info{modelName}}}"},
)
if d.status_code == 200:
json_data = d.json()
miner_model = json_data["data"]["bosminer"]["info"][
"modelName"
].replace("S19XP", "S19 XP")
return miner_model
except (httpx.HTTPError, LookupError):
pass
async def get_miner_model_vnish(self, ip: str) -> str | None:
sock_json_data = await self.send_api_command(ip, "stats")
try:
miner_model = sock_json_data["STATS"][0]["Type"]
if " (" in miner_model:
split_miner_model = miner_model.split(" (")
miner_model = split_miner_model[0]
if "(88)" in miner_model:
miner_model = miner_model.replace("(88)", "NOPIC")
if " AML" in miner_model:
miner_model = miner_model.replace(" AML", "")
return miner_model
except (TypeError, LookupError):
pass
async def get_miner_model_epic(self, ip: str) -> str | None:
for retry_cnt in range(settings.get("get_data_retries", 1)):
sock_json_data = await self.send_web_command(ip, ":4028/capabilities")
try:
miner_model = sock_json_data["Model"]
return miner_model
except (TypeError, LookupError):
if retry_cnt < settings.get("get_data_retries", 1) - 1:
continue
else:
pass
async def get_miner_model_hiveon(self, ip: str) -> str | None:
sock_json_data = await self.send_api_command(ip, "version")
try:
miner_type = sock_json_data["VERSION"][0]["Type"]
return miner_type.replace(" HIVEON", "")
except (TypeError, LookupError):
pass
async def get_miner_model_luxos(self, ip: str) -> str | None:
sock_json_data = await self.send_api_command(ip, "version")
try:
miner_model = sock_json_data["VERSION"][0]["Type"]
if " (" in miner_model:
split_miner_model = miner_model.split(" (")
miner_model = split_miner_model[0]
return miner_model
except (TypeError, LookupError):
pass
async def get_miner_model_auradine(self, ip: str) -> str | None:
sock_json_data = await self.send_api_command(ip, "devdetails")
try:
return sock_json_data["DEVDETAILS"][0]["Model"]
except LookupError:
pass
async def get_miner_model_marathon(self, ip: str) -> str | None:
auth = httpx.DigestAuth("root", "root")
web_json_data = await self.send_web_command(
ip, "/kaonsu/v1/overview", auth=auth
)
try:
miner_model = web_json_data["model"]
if miner_model == "":
return None
return miner_model
except (TypeError, LookupError):
pass
async def get_miner_model_bitaxe(self, ip: str) -> str | None:
web_json_data = await self.send_web_command(ip, "/api/system/info")
try:
miner_model = web_json_data["ASICModel"]
if miner_model == "":
return None
return miner_model
except (TypeError, LookupError):
pass
async def get_miner_model_luckyminer(self, ip: str) -> str | None:
web_json_data = await self.send_web_command(ip, "/api/system/info")
try:
miner_model = web_json_data["minerModel"]
if miner_model == "":
return None
return miner_model
except (TypeError, LookupError):
pass
async def get_miner_model_iceriver(self, ip: str) -> str | None:
async with httpx.AsyncClient(transport=settings.transport()) as client:
try:
# auth
await client.post(
f"http://{ip}/user/loginpost",
params={
"post": "6",
"user": "admin",
"pwd": settings.get(
"default_iceriver_web_password", "12345678"
),
},
)
except httpx.HTTPError:
return None
try:
resp = await client.post(
f"http://{ip}:/user/userpanel", params={"post": "4"}
)
if not resp.status_code == 200:
return
result = resp.json()
software_ver = result["data"]["softver1"]
split_ver = software_ver.split("_")
if split_ver[-1] == "miner":
miner_ver = split_ver[-2]
else:
miner_ver = split_ver[-1].replace("miner", "")
return miner_ver.upper()
except httpx.HTTPError:
pass
async def get_miner_model_hammer(self, ip: str) -> str | None:
auth = httpx.DigestAuth(
"root", settings.get("default_hammer_web_password", "root")
)
web_json_data = await self.send_web_command(
ip, "/cgi-bin/get_system_info.cgi", auth=auth
)
try:
miner_model = web_json_data["minertype"]
return miner_model
except (TypeError, LookupError):
pass
async def get_miner_model_volcminer(self, ip: str) -> str | None:
auth = httpx.DigestAuth(
"root", settings.get("default_volcminer_web_password", "root")
)
web_json_data = await self.send_web_command(
ip, "/cgi-bin/get_system_info.cgi", auth=auth
)
try:
miner_model = web_json_data["minertype"]
return miner_model
except (TypeError, LookupError):
pass
async def get_miner_model_elphapex(self, ip: str) -> str | None:
auth = httpx.DigestAuth(
"root", settings.get("default_elphapex_web_password", "root")
)
web_json_data = await self.send_web_command(
ip, "/cgi-bin/get_system_info.cgi", auth=auth
)
try:
miner_model = web_json_data["minertype"]
return miner_model
except (TypeError, LookupError):
pass
async def get_miner_model_mskminer(self, ip: str) -> str | None:
sock_json_data = await self.send_api_command(ip, "version")
try:
return sock_json_data["VERSION"][0]["Type"].split(" ")[0]
except LookupError:
pass