#!/usr/bin/env python2
# -*- coding: utf-8 -*-
import
os
import
sys
import
argparse
import
subprocess
MOUNT_BASE =
"/mnt"
def get_mounted_disks():
""
"获取已挂载的磁盘列表"
""
mounted = set()
try:
with open(
'/proc/mounts'
,
'r'
) as f:
for
line
in
f:
parts = line.split()
if
len(parts) > 1:
mounted.
add
(parts[0])
except Exception:
pass
return mounted
def get_unmounted_disks():
""
"获取未挂载的磁盘分区"
""
mounted = get_mounted_disks()
unmounted = []
try:
lsblk = subprocess.Popen([
'lsblk'
,
'-l'
,
'-o'
,
'NAME,TYPE,MOUNTPOINT'
], stdout=subprocess.PIPE)
output, _ = lsblk.communicate()
for
line
in
output.splitlines()[1:]:
parts = line.split()
if
len(parts) < 2:
continue
name
, type_, mountpoint = parts[0], parts[1], parts[2]
if
len(parts) > 2
else
None
if
(type_
in
[
'part'
,
'disk'
])
and
not
mountpoint
and
not
name
.startswith((
'cl-'
,
'vg0-'
)):
device_path =
'/dev/%s'
%
name
if
device_path
not
in
mounted:
unmounted.append(
name
)
except Exception:
pass
return unmounted
def get_lvm_volumes():
""
"获取所有可用的LVM逻辑卷"
""
lvm_volumes = {}
try:
# 获取物理卷对应的卷组
pvs = subprocess.Popen([
'pvs'
,
'--noheadings'
,
'-o'
,
'pv_name,vg_name'
], stdout=subprocess.PIPE)
output, _ = pvs.communicate()
for
line
in
output.splitlines():
parts = line.strip().split()
if
len(parts) >= 2:
pv_name = os.path.basename(parts[0])
lvm_volumes[pv_name] = parts[1] # 记录物理卷对应的卷组
except Exception:
pass
return lvm_volumes
def mount_disks():
""
"挂载所有未挂载的磁盘"
""
unmounted = get_unmounted_disks()
lvm_volumes = get_lvm_volumes()
results = []
for
disk
in
unmounted:
device =
'/dev/%s'
% disk
is_lvm = False
lv_path =
None
# 检查是否是LVM物理卷
if
disk
in
lvm_volumes:
is_lvm = True
vg_name = lvm_volumes[disk]
# 获取该卷组下的逻辑卷
try:
lvs = subprocess.Popen([
'lvs'
,
'--noheadings'
,
'-o'
,
'lv_path'
, vg_name], stdout=subprocess.PIPE)
output, _ = lvs.communicate()
if
output.strip():
lv_path = output.strip().split()[0] # 取第一个逻辑卷
except Exception:
pass
# 创建挂载点
parent_disk = disk[:3] # 如sdb1 -> sdb
mount_point = os.path.join(MOUNT_BASE, parent_disk, disk)
try:
os.makedirs(mount_point)
except OSError:
pass
# 执行挂载
try:
if
is_lvm
and
lv_path:
mount_cmd = [
'mount'
, lv_path, mount_point]
else
:
mount_cmd = [
'mount'
, device, mount_point]
subprocess.check_call(mount_cmd, stderr=subprocess.PIPE)
results.append((lv_path
if
is_lvm
else
device, mount_point, True))
except subprocess.CalledProcessError:
# 挂载失败则删除目录
try:
if
os.path.exists(mount_point)
and
not
os.listdir(mount_point):
os.rmdir(mount_point)
except OSError:
pass
results.append((lv_path
if
is_lvm
else
device, mount_point, False))
# 输出结果汇总
print(
"\n挂载结果汇总:"
)
print(
"-"
* 60)
for
device, mount_point, success
in
results:
if
success:
print(
"%-20s ---> %-40s 挂载成功"
% (device, mount_point))
print(
"-"
* 60)
def unmount_disks():
""
"卸载所有由脚本挂载的磁盘并清理目录"
""
mnt_mounts = []
try:
with open(
'/proc/mounts'
,
'r'
) as f:
for
line
in
f:
parts = line.split()
if
len(parts) > 1
and
parts[1].startswith(MOUNT_BASE):
mnt_mounts.append(parts[1])
except Exception:
pass
if
not
mnt_mounts:
print(
"没有需要卸载的挂载点"
)
return
for
mount_point
in
mnt_mounts:
try:
subprocess.check_call([
'umount'
, mount_point], stderr=subprocess.PIPE)
except subprocess.CalledProcessError:
pass
# 清理空目录
for
root, dirs, files
in
os.walk(MOUNT_BASE, topdown=False):
for
name
in
dirs:
dir_path = os.path.join(root,
name
)
try:
if
not
os.listdir(dir_path):
os.rmdir(dir_path)
except OSError:
pass
print(
"卸载操作完成"
)
def main():
parser = argparse.ArgumentParser(description=
'磁盘挂载/卸载工具'
)
group
= parser.add_mutually_exclusive_group(required=True)
group
.add_argument(
'-m'
,
'--mount'
, action=
'store_true'
, help=
'挂载所有未挂载的磁盘'
)
group
.add_argument(
'-u'
,
'--unmount'
, action=
'store_true'
, help=
'卸载所有磁盘并清理目录'
)
args = parser.parse_args()
if
args.mount:
mount_disks()
elif args.unmount:
unmount_disks()
if
__name__ ==
'__main__'
:
main()