Improve SSD attribute parsing logic for Kingston drives

This commit is contained in:
2025-11-21 10:41:06 +01:00
parent 093e0c213e
commit c6fd0ab50d

376
disk.py
View File

@@ -1,179 +1,197 @@
import subprocess import subprocess
import json import json
import sys import sys
import math import math
def run_command(cmd): def run_command(cmd):
try: try:
result = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) result = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return result.stdout.decode('utf-8') return result.stdout.decode('utf-8')
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
return None return None
def get_size(bytes_size): def get_size(bytes_size):
"""Converts raw bytes to human readable string.""" """Converts raw bytes to human readable string."""
if not bytes_size: return "N/A" if not bytes_size: return "N/A"
size_name = ("B", "KB", "MB", "GB", "TB", "PB") size_name = ("B", "KB", "MB", "GB", "TB", "PB")
if bytes_size == 0: return "0 B" if bytes_size == 0: return "0 B"
i = int(math.floor(math.log(bytes_size, 1024))) i = int(math.floor(math.log(bytes_size, 1024)))
p = math.pow(1024, i) p = math.pow(1024, i)
s = round(bytes_size / p, 1) s = round(bytes_size / p, 1)
return f"{s} {size_name[i]}" return f"{s} {size_name[i]}"
def get_smart_data(device_path): def get_smart_data(device_path):
# -a: All info, -j: JSON output # -a: All info, -j: JSON output
cmd = f"smartctl -a -j {device_path}" cmd = f"smartctl -a -j {device_path}"
result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try: try:
return json.loads(result.stdout.decode('utf-8')) return json.loads(result.stdout.decode('utf-8'))
except json.JSONDecodeError: except json.JSONDecodeError:
return None return None
def parse_smart_data(data, device_node, rotation): def parse_smart_data(data, device_node, rotation):
if not data: return None if not data: return None
# --- 1. BASIC INFO --- # --- 1. BASIC INFO ---
model = data.get('model_name', 'Unknown') model = data.get('model_name', 'Unknown')
if model == 'Unknown': model = data.get('model_number', 'Unknown') if model == 'Unknown': model = data.get('model_number', 'Unknown')
serial = data.get('serial_number', 'Unknown') serial = data.get('serial_number', 'Unknown')
size_bytes = data.get('user_capacity', {}).get('bytes', 0) size_bytes = data.get('user_capacity', {}).get('bytes', 0)
sector_size = data.get('logical_block_size', 512)
# Get Logical Sector Size for LBA calculations (default to 512)
sector_size = data.get('logical_block_size', 512) # --- 2. TYPE DETECTION ---
protocol = data.get('device', {}).get('protocol', '').lower()
# --- 2. TYPE DETECTION --- disk_type = "SSD"
protocol = data.get('device', {}).get('protocol', '').lower() if rotation == '1': disk_type = "HDD"
disk_type = "SSD" elif 'nvme' in protocol: disk_type = "NVMe"
if rotation == '1': disk_type = "HDD"
elif 'nvme' in protocol: disk_type = "NVMe" # --- 3. HEALTH ---
passed = data.get('smart_status', {}).get('passed')
# --- 3. HEALTH --- health = "OK" if passed else "FAIL"
passed = data.get('smart_status', {}).get('passed') if passed is None: health = "Unknown"
health = "OK" if passed else "FAIL"
if passed is None: health = "Unknown" # --- 4. METRICS ---
hours = "N/A"
# --- 4. METRICS --- spare = "N/A"
hours = "N/A" temp = "N/A"
spare = "N/A" errors = 0
temp = "N/A" max_total_bytes = 0 # We will store the highest calculated write value here
errors = 0
total_bytes_written = 0 # Temperature
temp_data = data.get('temperature', {}).get('current')
# Temperature if temp_data: temp = f"{temp_data}C"
temp_data = data.get('temperature', {}).get('current')
if temp_data: temp = f"{temp_data}C" if disk_type == "NVMe":
# --- NVMe LOGIC ---
if disk_type == "NVMe": nvme_log = data.get('nvme_smart_health_information_log', {})
# --- NVMe LOGIC --- hours = nvme_log.get('power_on_hours', 'N/A')
nvme_log = data.get('nvme_smart_health_information_log', {})
hours = nvme_log.get('power_on_hours', 'N/A') spare_val = nvme_log.get('available_spare', None)
if spare_val is not None: spare = f"{spare_val}%"
spare_val = nvme_log.get('available_spare', None)
if spare_val is not None: spare = f"{spare_val}%" errors = nvme_log.get('media_errors', 0)
errors = nvme_log.get('media_errors', 0) duw = nvme_log.get('data_units_written', 0)
if duw: max_total_bytes = duw * 1000 * 512
duw = nvme_log.get('data_units_written', 0)
if duw: total_bytes_written = duw * 1000 * 512 else:
# --- SATA/HDD LOGIC ---
else: attributes = data.get('ata_smart_attributes', {}).get('table', [])
# --- SATA/HDD LOGIC ---
attributes = data.get('ata_smart_attributes', {}).get('table', []) for attr in attributes:
id_ = attr.get('id')
writes_32mib_candidates = [] norm_val = attr.get('value', 0)
lba_written_val = 0 raw_val = attr.get('raw', {}).get('value', 0)
name = attr.get('name', '').lower()
for attr in attributes:
id_ = attr.get('id') # Standard Metrics
# 'value' is the Normalized value (0-100 usually) if id_ == 9: hours = raw_val
norm_val = attr.get('value', 0) if id_ == 194 and temp == "N/A": temp = f"{raw_val}C"
# 'raw.value' is the Raw count if id_ in [5, 197]: errors += raw_val
raw_val = attr.get('raw', {}).get('value', 0)
# --- SPARE / LIFE REMAINING LOGIC ---
# Standard Metrics # We check ID 231 (SSD_Life_Left) explicitly as requested.
if id_ == 9: hours = raw_val # We only use ID 233 if the name implies "wear" or "life", NOT "writes".
if id_ == 194 and temp == "N/A": temp = f"{raw_val}C" if disk_type == "SSD":
if id_ in [5, 197]: errors += raw_val is_life_attr = False
# --- SATA SSD SPARE / LIFE LOGIC --- # ID 231: SSD_Life_Left (Normalized value is %)
# We prioritize explicit percentage attributes. if id_ == 231: is_life_attr = True
# 202: Percent_Lifetime_Remain
# 233: Media_Wearout_Indicator (Intel/Others) # ID 202: Percent_Lifetime_Remain
# 232: Available_Reservd_Space if id_ == 202: is_life_attr = True
# 231: SSD_Life_Left
# 169: Remaining_Lifetime_Perc # ID 169, 232: Common Life/Reserve attributes
if disk_type == "SSD": if id_ in [169, 232]: is_life_attr = True
if id_ in [169, 202, 231, 232, 233]:
# For these IDs, the Normalized Value is the percentage remaining # ID 233: Only if name is NOT "writes" (Avoids collision with Flash_Writes_GiB)
if norm_val is not None: if id_ == 233 and 'write' not in name: is_life_attr = True
spare = f"{norm_val}%"
if is_life_attr and norm_val is not None:
# --- SATA WRITE CALCULATION --- spare = f"{norm_val}%"
if id_ in [225, 241, 243]:
writes_32mib_candidates.append(raw_val) # --- DATA WRITTEN LOGIC ---
if id_ == 246: # We calculate bytes for this attribute based on its Name or ID
lba_written_val = raw_val # and keep it if it's higher than what we've seen so far.
current_calc_bytes = 0
# Finalize Writes
if writes_32mib_candidates: # Case A: Explicit "GiB" in name (e.g., "Flash_Writes_GiB")
total_bytes_written = max(writes_32mib_candidates) * 32 * 1024 * 1024 if 'gib' in name:
elif lba_written_val > 0: current_calc_bytes = raw_val * 1024 * 1024 * 1024
total_bytes_written = lba_written_val * sector_size
# Case B: Explicit "32MiB" in name (e.g., "Host_Writes_32MiB")
written_str = get_size(total_bytes_written) if total_bytes_written > 0 else "N/A" elif '32mib' in name:
current_calc_bytes = raw_val * 32 * 1024 * 1024
return {
"Device": device_node, # Case C: Explicit "LBA" in name (e.g., "Total_LBAs_Written")
"Type": disk_type, elif 'lba' in name:
"Size": get_size(size_bytes), current_calc_bytes = raw_val * sector_size
"Model": model,
"Serial": serial, # Case D: Fallback by ID if name is generic or empty
"Temp": temp, elif id_ in [225, 241, 243] and current_calc_bytes == 0:
"Hours": hours, # Default Assumption: 32MiB for these IDs unless name said GiB
"Spare": spare, current_calc_bytes = raw_val * 32 * 1024 * 1024
"Err": errors, elif id_ == 246 and current_calc_bytes == 0:
"Written": written_str, current_calc_bytes = raw_val * sector_size
"Health": health
} # Compare and Store Max
if current_calc_bytes > max_total_bytes:
def main(): max_total_bytes = current_calc_bytes
# Root check
if subprocess.run("id -u", shell=True, stdout=subprocess.PIPE).stdout.decode().strip() != '0': written_str = get_size(max_total_bytes) if max_total_bytes > 0 else "N/A"
print("Error: Must be run as root (sudo).")
sys.exit(1) return {
"Device": device_node,
# Get Devices "Type": disk_type,
lsblk_cmd = "lsblk -d -n -o NAME,ROTA,TYPE --json" "Size": get_size(size_bytes),
lsblk_output = run_command(lsblk_cmd) "Model": model,
"Serial": serial,
if not lsblk_output: sys.exit(1) "Temp": temp,
try: "Hours": hours,
disks = json.loads(lsblk_output)['blockdevices'] "Spare": spare,
except: sys.exit(1) "Err": errors,
"Written": written_str,
# Print Header "Health": health
# Fmt: Dev(9) Type(6) Size(10) Temp(6) Health(8) Hours(9) Err(6) Spare(8) Written(10) Serial(20) Model }
# header = f"{'DEVICE':<12} {'TYPE':<4} {'SIZE':<7} {'TEMP':<4} {'HEALTH':<6} {'HOURS':<5} {'ERR':<3} {'SPARE':<5} {'WRITTEN':<7} {'SERIAL':<20} {'MODEL'}"
header = f"{'DEVICE':<12} {'SIZE':<7} {'TEMP':<4} {'HEALTH':<6} {'HOURS':<5} {'ERR':<3} {'SPARE':<5} {'WRITTEN':<7} {'MODEL'}" def main():
print(header) # Root check
print("-" * len(header)) if subprocess.run("id -u", shell=True, stdout=subprocess.PIPE).stdout.decode().strip() != '0':
print("Error: Must be run as root (sudo).")
# Process Disks sys.exit(1)
for disk in disks:
name = disk['name'] # Get Devices
if disk['type'] != 'disk': continue lsblk_cmd = "lsblk -d -n -o NAME,ROTA,TYPE --json"
lsblk_output = run_command(lsblk_cmd)
device_node = f"/dev/{name}"
smart_json = get_smart_data(device_node) if not lsblk_output: sys.exit(1)
p = parse_smart_data(smart_json, device_node, disk['rota']) try:
disks = json.loads(lsblk_output)['blockdevices']
if p: except: sys.exit(1)
#/dev/nvme0n1 419.19 GB 29C OK 58695 0 98% 321.59 TB INTEL SSDPE2MX450G7
#row = f"{p['Device']:<12} {p['Type']:<4} {p['Size']:<8} {p['Temp']:<3} {p['Health']:<5} {str(p['Hours']):<6} {str(p['Err']):<4} {str(p['Spare']):<3} {p['Written']:<8} {p['Serial']:<20} {p['Model']}" # Print Header
row = f"{p['Device']:<12} {p['Size']:<8} {p['Temp']:<3} {p['Health']:<5} {str(p['Hours']):<6} {str(p['Err']):<4} {str(p['Spare']):<3} {p['Written']:<8} {p['Model']}" # Fmt: Dev(9) Type(6) Size(10) Temp(6) Health(8) Hours(9) Err(6) Spare(8) Written(10) Serial(20) Model
print(row) # header = f"{'DEVICE':<12} {'TYPE':<4} {'SIZE':<7} {'TEMP':<4} {'HEALTH':<6} {'HOURS':<5} {'ERR':<3} {'SPARE':<5} {'WRITTEN':<7} {'SERIAL':<20} {'MODEL'}"
else: header = f"{'DEVICE':<12} {'SIZE':<7} {'TEMP':<4} {'HEALTH':<6} {'HOURS':<5} {'ERR':<3} {'SPARE':<5} {'WRITTEN':<7} {'MODEL'}"
print(f"{device_node:<9} [SMART Data Unavailable]") print(header)
print("-" * len(header))
if __name__ == "__main__":
main() # Process Disks
for disk in disks:
name = disk['name']
if disk['type'] != 'disk': continue
device_node = f"/dev/{name}"
smart_json = get_smart_data(device_node)
p = parse_smart_data(smart_json, device_node, disk['rota'])
if p:
#/dev/nvme0n1 419.19 GB 29C OK 58695 0 98% 321.59 TB INTEL SSDPE2MX450G7
#row = f"{p['Device']:<12} {p['Type']:<4} {p['Size']:<8} {p['Temp']:<3} {p['Health']:<5} {str(p['Hours']):<6} {str(p['Err']):<4} {str(p['Spare']):<3} {p['Written']:<8} {p['Serial']:<20} {p['Model']}"
row = f"{p['Device']:<12} {p['Size']:<8} {p['Temp']:<3} {p['Health']:<5} {str(p['Hours']):<6} {str(p['Err']):<4} {str(p['Spare']):<3} {p['Written']:<8} {p['Model']}"
print(row)
else:
print(f"{device_node:<9} [SMART Data Unavailable]")
if __name__ == "__main__":
main()