diff --git a/disk.py b/disk.py index 1bcc5fd..daf6beb 100644 --- a/disk.py +++ b/disk.py @@ -1,179 +1,197 @@ -import subprocess -import json -import sys -import math - -def run_command(cmd): - try: - result = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - return result.stdout.decode('utf-8') - except subprocess.CalledProcessError: - return None - -def get_size(bytes_size): - """Converts raw bytes to human readable string.""" - if not bytes_size: return "N/A" - size_name = ("B", "KB", "MB", "GB", "TB", "PB") - if bytes_size == 0: return "0 B" - i = int(math.floor(math.log(bytes_size, 1024))) - p = math.pow(1024, i) - s = round(bytes_size / p, 1) - return f"{s} {size_name[i]}" - -def get_smart_data(device_path): - # -a: All info, -j: JSON output - cmd = f"smartctl -a -j {device_path}" - result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - try: - return json.loads(result.stdout.decode('utf-8')) - except json.JSONDecodeError: - return None - -def parse_smart_data(data, device_node, rotation): - if not data: return None - - # --- 1. BASIC INFO --- - model = data.get('model_name', 'Unknown') - if model == 'Unknown': model = data.get('model_number', 'Unknown') - serial = data.get('serial_number', 'Unknown') - size_bytes = data.get('user_capacity', {}).get('bytes', 0) - - # Get Logical Sector Size for LBA calculations (default to 512) - sector_size = data.get('logical_block_size', 512) - - # --- 2. TYPE DETECTION --- - protocol = data.get('device', {}).get('protocol', '').lower() - disk_type = "SSD" - if rotation == '1': disk_type = "HDD" - elif 'nvme' in protocol: disk_type = "NVMe" - - # --- 3. HEALTH --- - passed = data.get('smart_status', {}).get('passed') - health = "OK" if passed else "FAIL" - if passed is None: health = "Unknown" - - # --- 4. METRICS --- - hours = "N/A" - spare = "N/A" - temp = "N/A" - errors = 0 - total_bytes_written = 0 - - # Temperature - temp_data = data.get('temperature', {}).get('current') - if temp_data: temp = f"{temp_data}C" - - if disk_type == "NVMe": - # --- NVMe LOGIC --- - nvme_log = data.get('nvme_smart_health_information_log', {}) - hours = nvme_log.get('power_on_hours', 'N/A') - - spare_val = nvme_log.get('available_spare', None) - if spare_val is not None: spare = f"{spare_val}%" - - errors = nvme_log.get('media_errors', 0) - - duw = nvme_log.get('data_units_written', 0) - if duw: total_bytes_written = duw * 1000 * 512 - - else: - # --- SATA/HDD LOGIC --- - attributes = data.get('ata_smart_attributes', {}).get('table', []) - - writes_32mib_candidates = [] - lba_written_val = 0 - - for attr in attributes: - id_ = attr.get('id') - # 'value' is the Normalized value (0-100 usually) - norm_val = attr.get('value', 0) - # 'raw.value' is the Raw count - raw_val = attr.get('raw', {}).get('value', 0) - - # Standard Metrics - if id_ == 9: hours = raw_val - if id_ == 194 and temp == "N/A": temp = f"{raw_val}C" - if id_ in [5, 197]: errors += raw_val - - # --- SATA SSD SPARE / LIFE LOGIC --- - # We prioritize explicit percentage attributes. - # 202: Percent_Lifetime_Remain - # 233: Media_Wearout_Indicator (Intel/Others) - # 232: Available_Reservd_Space - # 231: SSD_Life_Left - # 169: Remaining_Lifetime_Perc - if disk_type == "SSD": - if id_ in [169, 202, 231, 232, 233]: - # For these IDs, the Normalized Value is the percentage remaining - if norm_val is not None: - spare = f"{norm_val}%" - - # --- SATA WRITE CALCULATION --- - if id_ in [225, 241, 243]: - writes_32mib_candidates.append(raw_val) - if id_ == 246: - lba_written_val = raw_val - - # Finalize Writes - if writes_32mib_candidates: - total_bytes_written = max(writes_32mib_candidates) * 32 * 1024 * 1024 - elif lba_written_val > 0: - total_bytes_written = lba_written_val * sector_size - - written_str = get_size(total_bytes_written) if total_bytes_written > 0 else "N/A" - - return { - "Device": device_node, - "Type": disk_type, - "Size": get_size(size_bytes), - "Model": model, - "Serial": serial, - "Temp": temp, - "Hours": hours, - "Spare": spare, - "Err": errors, - "Written": written_str, - "Health": health - } - -def main(): - # Root check - if subprocess.run("id -u", shell=True, stdout=subprocess.PIPE).stdout.decode().strip() != '0': - print("Error: Must be run as root (sudo).") - sys.exit(1) - - # Get Devices - lsblk_cmd = "lsblk -d -n -o NAME,ROTA,TYPE --json" - lsblk_output = run_command(lsblk_cmd) - - if not lsblk_output: sys.exit(1) - try: - disks = json.loads(lsblk_output)['blockdevices'] - except: sys.exit(1) - - # Print Header - # Fmt: Dev(9) Type(6) Size(10) Temp(6) Health(8) Hours(9) Err(6) Spare(8) Written(10) Serial(20) Model - # header = f"{'DEVICE':<12} {'TYPE':<4} {'SIZE':<7} {'TEMP':<4} {'HEALTH':<6} {'HOURS':<5} {'ERR':<3} {'SPARE':<5} {'WRITTEN':<7} {'SERIAL':<20} {'MODEL'}" - header = f"{'DEVICE':<12} {'SIZE':<7} {'TEMP':<4} {'HEALTH':<6} {'HOURS':<5} {'ERR':<3} {'SPARE':<5} {'WRITTEN':<7} {'MODEL'}" - print(header) - print("-" * len(header)) - - # Process Disks - for disk in disks: - name = disk['name'] - if disk['type'] != 'disk': continue - - device_node = f"/dev/{name}" - smart_json = get_smart_data(device_node) - p = parse_smart_data(smart_json, device_node, disk['rota']) - - if p: - #/dev/nvme0n1 419.19 GB 29C OK 58695 0 98% 321.59 TB INTEL SSDPE2MX450G7 - #row = f"{p['Device']:<12} {p['Type']:<4} {p['Size']:<8} {p['Temp']:<3} {p['Health']:<5} {str(p['Hours']):<6} {str(p['Err']):<4} {str(p['Spare']):<3} {p['Written']:<8} {p['Serial']:<20} {p['Model']}" - row = f"{p['Device']:<12} {p['Size']:<8} {p['Temp']:<3} {p['Health']:<5} {str(p['Hours']):<6} {str(p['Err']):<4} {str(p['Spare']):<3} {p['Written']:<8} {p['Model']}" - print(row) - else: - print(f"{device_node:<9} [SMART Data Unavailable]") - -if __name__ == "__main__": - main() +import subprocess +import json +import sys +import math + +def run_command(cmd): + try: + result = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + return result.stdout.decode('utf-8') + except subprocess.CalledProcessError: + return None + +def get_size(bytes_size): + """Converts raw bytes to human readable string.""" + if not bytes_size: return "N/A" + size_name = ("B", "KB", "MB", "GB", "TB", "PB") + if bytes_size == 0: return "0 B" + i = int(math.floor(math.log(bytes_size, 1024))) + p = math.pow(1024, i) + s = round(bytes_size / p, 1) + return f"{s} {size_name[i]}" + +def get_smart_data(device_path): + # -a: All info, -j: JSON output + cmd = f"smartctl -a -j {device_path}" + result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + try: + return json.loads(result.stdout.decode('utf-8')) + except json.JSONDecodeError: + return None + +def parse_smart_data(data, device_node, rotation): + if not data: return None + + # --- 1. BASIC INFO --- + model = data.get('model_name', 'Unknown') + if model == 'Unknown': model = data.get('model_number', 'Unknown') + serial = data.get('serial_number', 'Unknown') + size_bytes = data.get('user_capacity', {}).get('bytes', 0) + sector_size = data.get('logical_block_size', 512) + + # --- 2. TYPE DETECTION --- + protocol = data.get('device', {}).get('protocol', '').lower() + disk_type = "SSD" + if rotation == '1': disk_type = "HDD" + elif 'nvme' in protocol: disk_type = "NVMe" + + # --- 3. HEALTH --- + passed = data.get('smart_status', {}).get('passed') + health = "OK" if passed else "FAIL" + if passed is None: health = "Unknown" + + # --- 4. METRICS --- + hours = "N/A" + spare = "N/A" + temp = "N/A" + errors = 0 + max_total_bytes = 0 # We will store the highest calculated write value here + + # Temperature + temp_data = data.get('temperature', {}).get('current') + if temp_data: temp = f"{temp_data}C" + + if disk_type == "NVMe": + # --- NVMe LOGIC --- + nvme_log = data.get('nvme_smart_health_information_log', {}) + hours = nvme_log.get('power_on_hours', 'N/A') + + spare_val = nvme_log.get('available_spare', None) + if spare_val is not None: spare = f"{spare_val}%" + + errors = nvme_log.get('media_errors', 0) + + duw = nvme_log.get('data_units_written', 0) + if duw: max_total_bytes = duw * 1000 * 512 + + else: + # --- SATA/HDD LOGIC --- + attributes = data.get('ata_smart_attributes', {}).get('table', []) + + for attr in attributes: + id_ = attr.get('id') + norm_val = attr.get('value', 0) + raw_val = attr.get('raw', {}).get('value', 0) + name = attr.get('name', '').lower() + + # Standard Metrics + if id_ == 9: hours = raw_val + if id_ == 194 and temp == "N/A": temp = f"{raw_val}C" + if id_ in [5, 197]: errors += raw_val + + # --- SPARE / LIFE REMAINING LOGIC --- + # We check ID 231 (SSD_Life_Left) explicitly as requested. + # We only use ID 233 if the name implies "wear" or "life", NOT "writes". + if disk_type == "SSD": + is_life_attr = False + + # ID 231: SSD_Life_Left (Normalized value is %) + if id_ == 231: is_life_attr = True + + # ID 202: Percent_Lifetime_Remain + if id_ == 202: is_life_attr = True + + # ID 169, 232: Common Life/Reserve attributes + if id_ in [169, 232]: is_life_attr = True + + # ID 233: Only if name is NOT "writes" (Avoids collision with Flash_Writes_GiB) + if id_ == 233 and 'write' not in name: is_life_attr = True + + if is_life_attr and norm_val is not None: + spare = f"{norm_val}%" + + # --- DATA WRITTEN LOGIC --- + # We calculate bytes for this attribute based on its Name or ID + # and keep it if it's higher than what we've seen so far. + current_calc_bytes = 0 + + # Case A: Explicit "GiB" in name (e.g., "Flash_Writes_GiB") + if 'gib' in name: + current_calc_bytes = raw_val * 1024 * 1024 * 1024 + + # Case B: Explicit "32MiB" in name (e.g., "Host_Writes_32MiB") + elif '32mib' in name: + current_calc_bytes = raw_val * 32 * 1024 * 1024 + + # Case C: Explicit "LBA" in name (e.g., "Total_LBAs_Written") + elif 'lba' in name: + current_calc_bytes = raw_val * sector_size + + # Case D: Fallback by ID if name is generic or empty + elif id_ in [225, 241, 243] and current_calc_bytes == 0: + # Default Assumption: 32MiB for these IDs unless name said GiB + current_calc_bytes = raw_val * 32 * 1024 * 1024 + elif id_ == 246 and current_calc_bytes == 0: + current_calc_bytes = raw_val * sector_size + + # Compare and Store Max + if current_calc_bytes > max_total_bytes: + max_total_bytes = current_calc_bytes + + written_str = get_size(max_total_bytes) if max_total_bytes > 0 else "N/A" + + return { + "Device": device_node, + "Type": disk_type, + "Size": get_size(size_bytes), + "Model": model, + "Serial": serial, + "Temp": temp, + "Hours": hours, + "Spare": spare, + "Err": errors, + "Written": written_str, + "Health": health + } + +def main(): + # Root check + if subprocess.run("id -u", shell=True, stdout=subprocess.PIPE).stdout.decode().strip() != '0': + print("Error: Must be run as root (sudo).") + sys.exit(1) + + # Get Devices + lsblk_cmd = "lsblk -d -n -o NAME,ROTA,TYPE --json" + lsblk_output = run_command(lsblk_cmd) + + if not lsblk_output: sys.exit(1) + try: + disks = json.loads(lsblk_output)['blockdevices'] + except: sys.exit(1) + + # Print Header + # Fmt: Dev(9) Type(6) Size(10) Temp(6) Health(8) Hours(9) Err(6) Spare(8) Written(10) Serial(20) Model + # header = f"{'DEVICE':<12} {'TYPE':<4} {'SIZE':<7} {'TEMP':<4} {'HEALTH':<6} {'HOURS':<5} {'ERR':<3} {'SPARE':<5} {'WRITTEN':<7} {'SERIAL':<20} {'MODEL'}" + header = f"{'DEVICE':<12} {'SIZE':<7} {'TEMP':<4} {'HEALTH':<6} {'HOURS':<5} {'ERR':<3} {'SPARE':<5} {'WRITTEN':<7} {'MODEL'}" + print(header) + print("-" * len(header)) + + # Process Disks + for disk in disks: + name = disk['name'] + if disk['type'] != 'disk': continue + + device_node = f"/dev/{name}" + smart_json = get_smart_data(device_node) + p = parse_smart_data(smart_json, device_node, disk['rota']) + + if p: + #/dev/nvme0n1 419.19 GB 29C OK 58695 0 98% 321.59 TB INTEL SSDPE2MX450G7 + #row = f"{p['Device']:<12} {p['Type']:<4} {p['Size']:<8} {p['Temp']:<3} {p['Health']:<5} {str(p['Hours']):<6} {str(p['Err']):<4} {str(p['Spare']):<3} {p['Written']:<8} {p['Serial']:<20} {p['Model']}" + row = f"{p['Device']:<12} {p['Size']:<8} {p['Temp']:<3} {p['Health']:<5} {str(p['Hours']):<6} {str(p['Err']):<4} {str(p['Spare']):<3} {p['Written']:<8} {p['Model']}" + print(row) + else: + print(f"{device_node:<9} [SMART Data Unavailable]") + +if __name__ == "__main__": + main()