NetworkManager python 配置脚本化
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import uuid
import json
import struct
import socket
import logging
import subprocess
from optparse import OptionParser
import locale
import gi
gi.require_version('NM', '1.0')
from gi.repository import GLib, NM
# initailize
nmc = NM.Client.new(None)
DEVICETYPE_ETHNET = "802-3-ethernet"
DEVICETYPE_BRIDGE = "bridge"
DEVICETYPE_WIFI = "802-11-wireless"
logging.basicConfig(level=logging.ERROR,
format='%(asctime)s %(name)s - %(levelname)s - %(message)s')
LOG = logging
class NMError(Exception):
code = 500
class NoAvalibleInterface(NMError):
code = 404
class NeedAuth(NMError):
code = 401
LastError = None
def NMSetLastError(err):
global LastError
LastError = err
def NMGetLastError():
global LastError
return LastError
def cmd_run(cmd):
if isinstance(cmd, str):
cmd = cmd.split()
if not isinstance(cmd, list):
raise Exception("TypeError cmd_run only accept str,list")
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
return out, err, p.returncode
def print_values(setting, key, value, flags, data):
LOG.info(" %s.%s: %s" % (setting.get_name(), key, value))
def get_device_ip4_address(device):
ip_cfg = device.get_ip4_config()
if ip_cfg is None:
return None
nm_addresses = ip_cfg.get_addresses()
if len(nm_addresses) == 0:
return None
address_list = []
for nm_address in nm_addresses:
addr = nm_address.get_address()
prefix = nm_address.get_prefix()
address_list.append((addr, prefix))
return address_list
def get_device_ip4_gateway(device):
ip_cfg = device.get_ip4_config()
if ip_cfg is None:
gw = "None"
else:
gw = ip_cfg.get_gateway()
if gw == '':
gw = "None"
return gw
def get_device_ip4_dns(device):
ip_cfg = device.get_ip4_config()
if ip_cfg is None:
return None
return ip_cfg.get_nameservers()
def check_device_link(device):
pass
def find_avalible_device():
avalible_device = None
devs = nmc.get_devices()
for dev in devs:
# 优先使用有线网络
if dev.get_device_type() == NM.DeviceType.ETHERNET:
if dev.get_carrier():
avalible_device = dev
for dev in devs:
# 无有线网络,则使用可用的无线网络
if dev.get_device_type() == NM.DeviceType.ETHERNET:
pass
if avalible_device is None:
raise NoAvalibleInterface("无有效网络, 请确认网口已插入网线或已连接无线网络")
return avalible_device
def get_wifi_device():
devs = nmc.get_all_devices()
for dev in devs:
if dev.get_device_type() == NM.DeviceType.WIFI:
return dev
return None
def create_connection_profile(name, type, **kwargs):
'''
con_master string
con_slave_type string
ip4_method string
ip4_address string
ip4_netmask string
ip4_gateway string
ip4_dns array
'''
profile = NM.SimpleConnection.new()
s_con = NM.SettingConnection.new()
s_con.set_property(NM.SETTING_CONNECTION_ID, name)
s_con.set_property(NM.SETTING_CONNECTION_UUID, str(uuid.uuid4()))
s_con.set_property(NM.SETTING_CONNECTION_TYPE, type)
if type == DEVICETYPE_BRIDGE:
s_bridge = NM.SettingBridge.new()
s_con.set_property(NM.SETTING_CONNECTION_INTERFACE_NAME, name)
s_bridge.set_property(NM.SETTING_BRIDGE_STP, False)
s_bridge.set_property(NM.SETTING_BRIDGE_HELLO_TIME, 2)
s_bridge.set_property(NM.SETTING_BRIDGE_FORWARD_DELAY, 2)
profile.add_setting(s_bridge)
if type in [DEVICETYPE_ETHNET]:
dev = nmc.get_device_by_iface(name)
s_wired = NM.SettingWired.new()
s_con.set_property(NM.SETTING_CONNECTION_INTERFACE_NAME, name)
s_wired.set_property(NM.SETTING_WIRED_DUPLEX, "full")
s_wired.set_property(NM.SETTING_WIRED_MAC_ADDRESS, dev.get_hw_address())
profile.add_setting(s_wired)
if type == DEVICETYPE_WIFI:
s_wifi = NM.SettingWireless.new()
s_wifi_security = NM.SettingWirelessSecurity.new()
wifidev = get_wifi_device()
s_con.set_property(NM.SETTING_CONNECTION_INTERFACE_NAME, None)
s_wifi.set_property(NM.SETTING_WIRELESS_MAC_ADDRESS, wifidev.get_hw_address())
s_wifi.set_property(NM.SETTING_WIRELESS_SSID, GLib.Bytes(name))
s_wifi.set_property(NM.SETTING_WIRELESS_MODE, "infrastructure")
s_wifi_security.set_property(NM.SETTING_WIRELESS_SECURITY_PSK, kwargs['wifi_psk'])
s_wifi_security.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT, "wpa-psk")
profile.add_setting(s_wifi)
profile.add_setting(s_wifi_security)
# 如果设备不是从设备, 配置 ip
if not kwargs.get("con_master", None):
s_ip4 = NM.SettingIP4Config.new()
if kwargs.get("ip4_method", "auto") == "manual":
s_ip4.set_property(NM.SETTING_IP_CONFIG_METHOD, "manual")
ip4_address = NM.IPAddress(2, kwargs["ip4_address"], kwargs["ip4_prefix"])
s_ip4.add_address(ip4_address)
s_ip4.set_property(NM.SETTING_IP_CONFIG_GATEWAY, kwargs["ip4_gateway"])
for dns in kwargs.get("ip4_dns", []):
s_ip4.add_dns(dns)
elif kwargs.get("ip4_method", "auto") == "auto":
s_ip4.set_property(NM.SETTING_IP_CONFIG_METHOD, "auto")
profile.add_setting(s_ip4)
#s_ip6 = NM.SettingIP6Config.new()
#s_ip6.set_property(NM.SETTING_IP_CONFIG_METHOD, "auto")
#profile.add_setting(s_ip6)
else:
s_con.set_property(NM.SETTING_CONNECTION_MASTER, kwargs["con_master"])
s_con.set_property(NM.SETTING_CONNECTION_SLAVE_TYPE, kwargs["con_slave_type"])
if kwargs.get("n8021x_enable", False) == True:
s_n8021x = NM.Setting8021x()
s_n8021x.set_property(NM.SETTING_802_1X_EAP, [kwargs["n8021x_auth"]])
s_n8021x.set_property(NM.SETTING_802_1X_IDENTITY, kwargs["n8021x_user"])
s_n8021x.set_property(NM.SETTING_802_1X_PASSWORD, kwargs["n8021x_pass"])
s_n8021x.set_property(NM.SETTING_802_1X_PHASE2_AUTH, kwargs["n8021x_auth2"])
profile.add_setting(s_n8021x)
if kwargs.get("con_slave_type", None) == "bridge":
s_bridge_port = NM.SettingBridgePort.new()
profile.add_setting(s_bridge_port)
# add common setting
profile.add_setting(s_con)
#print("Created connection profile:")
#profile.for_each_setting_value(print_values, None)
return profile
def clamp(value, minvalue, maxvalue):
return max(minvalue, min(value, maxvalue))
def ssid_to_utf8(ap):
ssid = ap.get_ssid()
if not ssid:
return ""
return NM.utils_ssid_to_utf8(ap.get_ssid().get_data())
def print_device_info(device):
active_ap = dev.get_active_access_point()
ssid = None
if active_ap is not None:
ssid = ssid_to_utf8(active_ap)
info = "Device: %s | Driver: %s | Active AP: %s" % (dev.get_iface(), dev.get_driver(), ssid)
print(info)
print('=' * len(info))
def mode_to_string(mode):
if mode == getattr(NM, '80211Mode').INFRA:
return "INFRA"
if mode == getattr(NM, '80211Mode').ADHOC:
return "ADHOC"
if mode == getattr(NM, '80211Mode').AP:
return "AP"
return "UNKNOWN"
def flags_to_string(flags):
if flags & getattr(NM, '80211ApFlags').PRIVACY:
return "PRIVACY"
return "NONE"
def security_flags_to_string(flags):
NM_AP_FLAGS = getattr(NM, '80211ApSecurityFlags')
str = ""
if flags & NM_AP_FLAGS.PAIR_WEP40:
str = str + " PAIR_WEP40"
if flags & NM_AP_FLAGS.PAIR_WEP104:
str = str + " PAIR_WEP104"
if flags & NM_AP_FLAGS.PAIR_TKIP:
str = str + " PAIR_TKIP"
if flags & NM_AP_FLAGS.PAIR_CCMP:
str = str + " PAIR_CCMP"
if flags & NM_AP_FLAGS.GROUP_WEP40:
str = str + " GROUP_WEP40"
if flags & NM_AP_FLAGS.GROUP_WEP104:
str = str + " GROUP_WEP104"
if flags & NM_AP_FLAGS.GROUP_TKIP:
str = str + " GROUP_TKIP"
if flags & NM_AP_FLAGS.GROUP_CCMP:
str = str + " GROUP_CCMP"
if flags & NM_AP_FLAGS.KEY_MGMT_PSK:
str = str + " KEY_MGMT_PSK"
if flags & NM_AP_FLAGS.KEY_MGMT_802_1X:
str = str + " KEY_MGMT_802_1X"
if str:
return str.lstrip()
else:
return "NONE"
def flags_to_security(flags, wpa_flags, rsn_flags):
str = ""
if ((flags & getattr(NM, '80211ApFlags').PRIVACY) and
(wpa_flags == 0) and (rsn_flags == 0)):
str = str + " WEP"
if wpa_flags != 0:
str = str + " WPA1"
if rsn_flags != 0:
str = str + " WPA2"
if ((wpa_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_802_1X) or
(rsn_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_802_1X)):
str = str + " 802.1X"
return str.lstrip()
#############################################################
# 外部接口
#############################################################
def list_network_interfaces():
devs = nmc.get_all_devices()
ifaces = []
for dev in devs:
if not dev.get_device_type() in [NM.DeviceType.ETHERNET]:
continue
devinfo = {
"name": dev.get_iface(),
"carrier": dev.get_carrier(),
"hw_address": dev.get_hw_address(),
"product": dev.get_product(),
"vendor": dev.get_vendor(),
}
ifaces.append(devinfo)
return ifaces
def get_current_configure():
dev = nmc.get_device_by_iface('br0')
if not dev:
return None
con = nmc.get_connection_by_id('br0')
s_con_ipv4 = con.get_setting_ip4_config()
s_con_bridge = con.get_setting_bridge()
netinfo = {
"uuid": con.get_uuid(),
"name": con.get_interface_name(),
"type": con.get_connection_type(),
"ipv4": {
"method": s_con_ipv4.get_method(),
},
"stp": s_con_bridge.get_stp(),
"slaves": []
}
br0_slave = None
for slave in dev.get_slaves():
if slave.get_device_type() == NM.DeviceType.ETHERNET:
devinfo = {
"name": slave.get_iface(),
"carrier": slave.get_carrier(),
"hw_address": slave.get_hw_address(),
}
netinfo["slaves"].append(devinfo)
return netinfo
class Task(object):
_list = []
class ConnectTask(Task):
def __init__(self, profile, persistent=True, loop=None):
self._list.append(self)
self.loop = loop
self.profile = profile
self.start()
def callback(self, client, result, data):
try:
index = self._list.index(self)
self._list.pop(index)
except:
raise
pass
LOG.debug("task list: %s" % self._list)
def start(self):
#nmc.add_connection_async(self.profile, self.persistent, None, self.callback, None)
device = None
if self.profile.get_setting_wireless():
# wifi connection
device = get_wifi_device()
device.connect("state-changed", self.on_device_state_changed, 'wifi')
else:
device = nmc.get_device_by_iface(self.profile.get_interface_name())
device.connect("state-changed", self.on_device_state_changed, self.profile.get_id())
LOG.debug("Activating connection %s" % self.profile.get_id())
nmc.activate_connection_async(self.profile, device, None, None, self.callback, None)
def on_device_state_changed(self, device, new_state, old_state, reason, userdata):
iface = device.get_iface()
LOG.debug("on_device_state_changed - device: %s, new_state: %s, reason: %s" %(iface, NM.DeviceState(new_state).value_name, NM.DeviceStateReason(reason).value_name))
if userdata in ["br0", "wifi"] and new_state == NM.DeviceState.ACTIVATED:
self.loop.quit()
if userdata == "wifi" and new_state == NM.DeviceState.NEED_AUTH:
# delete wrong connection
con = nmc.get_connection_by_id(self.profile.get_id())
if con:
con.delete()
# raise NeedAuth("Wrong wifi password")
err = {
"msg": "Wrong wifi password",
"code": 401
}
NMSetLastError(err)
self.loop.quit()
class DisconnectTask(Task):
def __init__(self, profile, persistent=True, loop=None):
self._list.append(self)
self.loop = loop
self.profile = profile
self.start()
def callback(self, client, result, data):
try:
index = self._list.index(self)
self._list.pop(index)
except:
raise
pass
LOG.debug("task list: %s" % self._list)
if len(self._list) == 0:
self.loop.quit()
def start(self):
LOG.debug("Deactivating connection %s" % self.profile.get_id())
nmc.deactivate_connection_async(self.profile, None, self.callback, None)
class AddConnectionTask(Task):
def __init__(self, profile, persistent=True, loop=None):
self._list.append(self)
self.loop = loop
self.profile = profile
self.persistent = persistent
self.nmc = NM.Client.new(None)
self._event = []
self.start()
def callback(self, client, result, data):
try:
index = self._list.index(self)
self._list.pop(index)
except:
raise
pass
LOG.debug("task list: %s" % self._list)
'''
if len(self._list) == 0:
self.loop.quit()
'''
def start(self):
c_id = self.profile.get_id()
#nmc.add_connection_async(self.profile, self.persistent, None, self.callback, None)
device = None
LOG.debug("Deleting connection %s" % c_id)
if self.profile.get_setting_wireless():
# wifi connection
device = get_wifi_device()
# reset connection profile
con = nmc.get_connection_by_id(c_id)
if con:
LOG.debug("Deleting connection %s failed" % c_id)
if not con.delete(None):
LOG.debug("Deleting connection %s" % c_id)
else:
device = self.nmc.get_device_by_iface(self.profile.get_interface_name())
# 删除多余的有线连接配置
if device:
for c in device.get_available_connections():
LOG.debug("Deleting connection %s" % c.get_id())
if not c.delete(None):
LOG.debug("Deleting connection %s failed" % c.get_id())
if c_id == "br0":
nmc.connect("device-added", self.on_device_added, "br0")
self._event.append("device-added")
device.connect("state-changed", self.on_device_state_changed, 'br0')
if device and device.get_device_type() == NM.DeviceType.WIFI:
LOG.debug("Adding and activating connection %s" % c_id)
self.nmc.add_and_activate_connection_async(self.profile, device, None, None, self.callback, None)
device.connect("state-changed", self.on_device_state_changed, 'wifi')
else:
# 首次创建桥时,因为是虚拟网卡所以该设备可能不存在
LOG.debug("Adding connection %s" % c_id)
self.nmc.add_connection_async(self.profile, self.persistent, None, self.callback, None)
def on_device_state_changed(self, device, new_state, old_state, reason, userdata):
iface = device.get_iface()
LOG.debug("on_device_state_changed - device: %s, new_state: %s, reason: %s" %(iface, NM.DeviceState(new_state).value_name, NM.DeviceStateReason(reason).value_name))
if userdata in ["br0", "wifi"] and new_state == NM.DeviceState.ACTIVATED:
self.loop.quit()
if userdata == "wifi" and new_state == NM.DeviceState.NEED_AUTH:
# delete wrong connection
con = nmc.get_connection_by_id(self.profile.get_id())
if con:
con.delete()
# raise NeedAuth("Wrong wifi password")
err = {
"msg": "Wrong wifi password",
"code": 401
}
NMSetLastError(err)
self.loop.quit()
if userdata == "wifi" and reason == NM.DeviceStateReason.SSID_NOT_FOUND:
# delete wrong connection
con = nmc.get_connection_by_id(self.profile.get_id())
if con:
con.delete()
# raise NeedAuth("Wrong wifi password")
err = {
"msg": "ssid %s not found" % self.profile.get_id(),
"code": 404
}
NMSetLastError(err)
self.loop.quit()
def on_device_added(self, client, device, userdata):
LOG.debug("on_device_added device: %s" % device.get_iface())
device.connect("state-changed", self.on_device_state_changed, 'br0')
pass
def on_active_connection_added():
pass
def configure_network(mode, ipv4address=None, netmask=None, gateway=None, dns=[], **kwargs):
global main_loop
previous_mode = None
previous_main_device = None
adev = find_avalible_device()
if str(mode).lower() == "dhcp":
brigde_profile = create_connection_profile("br0", DEVICETYPE_BRIDGE)
port_profile = create_connection_profile(adev.get_iface(), DEVICETYPE_ETHNET, con_master="br0", con_slave_type="bridge", **kwargs)
elif str(mode).lower() == "static":
# check argrument
brigde_profile = create_connection_profile("br0", DEVICETYPE_BRIDGE, ip4_method="manual", ip4_address=ipv4address,
ip4_prefix=24, ip4_gateway=gateway, ip4_dns=dns)
port_profile = create_connection_profile(adev.get_iface(), DEVICETYPE_ETHNET, con_master="br0", con_slave_type="bridge", **kwargs)
persistent = True
AddConnectionTask(brigde_profile, persistent=True, loop=main_loop)
AddConnectionTask(port_profile, persistent=True, loop=main_loop)
def netmask_to_cidr(netmask):
'''
:param netmask: netmask ip addr (eg: 255.255.255.0)
:return: equivalent cidr number to given netmask ip (eg: 24)
'''
return sum([bin(int(x)).count('1') for x in netmask.split('.')])
def cidr_to_netmask(net_bits):
host_bits = 32 - int(net_bits)
netmask = socket.inet_ntoa(struct.pack('!I', (1 << 32) - (1 << host_bits)))
return netmask
def network_status():
############################
# wire
############################
wire_netinfo = {}
wire_dev = nmc.get_device_by_iface('br0')
if wire_dev:
slaves = []
n8021x_enabled = False
n8021x_dev = n8021x_con = None
for s in wire_dev.get_slaves():
slaves.append(s)
s_ac = s.get_active_connection()
s_con = s_ac.get_connection()
if s_con.get_setting_802_1x():
n8021x_enabled = True
n8021x_dev = s
n8021x_con = s_con
wire_netinfo = {
"carrier": wire_dev.get_carrier(),
"device": wire_dev.get_iface(),
"slaves": map(lambda x: x.get_iface(), slaves),
"type": wire_dev.get_type_description(),
}
# active connection
wire_ac = wire_dev.get_active_connection()
if wire_ac:
addresses = get_device_ip4_address(wire_dev)
if not addresses:
return {}
address = addresses[0][0]
netmask = cidr_to_netmask(addresses[0][1])
# connection
wire_con = wire_ac.get_connection()
s_ip4 = wire_con.get_setting_ip4_config()
extra_info = {
"method": s_ip4.get_method(),
"address": address,
"netmask": netmask,
"gateway": get_device_ip4_gateway(wire_dev),
"dns": get_device_ip4_dns(wire_dev),
}
if n8021x_enabled:
s_n8021x = n8021x_con.get_setting_802_1x()
n8021x_auth = s_n8021x.get_eap()
n8021x_auth2 = s_n8021x.get_phase2-autheap()
n8021x_user = s_n8021x.get_identity()
n8021x_pass = ""
n8021x_info = {
"n8021x_enabled": True,
"n8021x_auth": n8021x_auth,
"n8021x_auth2": n8021x_auth2,
"n8021x_user": n8021x_user,
"n8021x_pass": n8021x_pass
}
wire_netinfo.update(extra_info)
############################
# wireless
############################
wireless_netinfo = {}
wireless_dev = get_wifi_device()
if wireless_dev:
# active connection
wireless_ac = wireless_dev.get_active_connection()
if wireless_ac:
addresses = get_device_ip4_address(wireless_dev)
address = netmask = "0.0.0.0"
if addresses:
address = addresses[0][0]
netmask = cidr_to_netmask(addresses[0][1])
# connection
wireless_con = wireless_ac.get_connection()
s_ip4 = wireless_con.get_setting_ip4_config()
active_ap = wireless_ac.get_id()
wireless_netinfo = {
"active_ap": active_ap,
"method": s_ip4.get_method(),
"address": address,
"netmask": netmask,
"gateway": get_device_ip4_gateway(wireless_dev),
"dns": get_device_ip4_dns(wireless_dev),
}
else:
return {"wire": wire_netinfo, "wireless": None}
return {"wire": wire_netinfo, "wireless": wireless_netinfo}
def wifi_scan():
ap_list = []
devs = nmc.get_all_devices()
for dev in devs:
if dev.get_device_type() == NM.DeviceType.WIFI:
saved_profiles = map(lambda x: x.get_id(), dev.get_available_connections())
ac = dev.get_active_connection()
active_profile = None
if ac:
active_profile = ac.get_id()
for ap in dev.get_access_points():
ssid = ssid_to_utf8(ap)
strength = ap.get_strength()
frequency = ap.get_frequency()
flags = ap.get_flags()
wpa_flags = ap.get_wpa_flags()
rsn_flags = ap.get_rsn_flags()
state = "none"
if ssid in saved_profiles:
state = "saved"
if ssid == active_profile:
state = "active"
apinfo = {
"SSID": ssid,
"BSSID": ap.get_bssid(),
"Frequency": frequency,
"Channel": NM.utils_wifi_freq_to_channel(frequency),
"Mode": mode_to_string(ap.get_mode()),
"Flags": flags_to_string(flags),
"WPA_flags": security_flags_to_string(wpa_flags),
"RSN_flags": security_flags_to_string(rsn_flags),
"Security": flags_to_security(flags, wpa_flags, rsn_flags),
"Strength": strength,
"State": state
}
ap_list.append(apinfo)
return ap_list
def wifi_connect(ssid, passwd, mode, ipv4address=None, netmask=None, gateway=None, dns=[]):
global main_loop
previous_mode = None
previous_main_device = None
if not passwd:
# check saved connection
con = nmc.get_connection_by_id(ssid)
if con:
ConnectTask(con, loop=main_loop)
return
else:
raise NeedAuth("Need wifi pass")
if str(mode).lower() == "dhcp":
wifi_profile = create_connection_profile(ssid,
DEVICETYPE_WIFI, wifi_psk=passwd)
elif str(mode).lower() == "static":
wifi_profile = create_connection_profile(ssid,
DEVICETYPE_WIFI, wifi_psk=passwd,
ip4_method="manual", ip4_address=ipv4address,
ip4_prefix=24, ip4_gateway=gateway, ip4_dns=dns)
persistent = True
AddConnectionTask(wifi_profile, persistent=True, loop=main_loop)
def wifi_disconnect(ssid):
global main_loop
con = None
for ac in nmc.get_active_connections():
if ac.get_id() == ssid:
con = ac
break
if con:
DisconnectTask(con, loop=main_loop)
def wifi_forget(ssid):
con = nmc.get_connection_by_id(ssid)
if con:
if not con.delete(None):
raise NMError("Failed to delete connection %s" % ssid)
def setup_nat_bridge():
if not nmc.get_device_by_iface("virbr0"):
out, err, ret = cmd_run("virsh net-start default")
if ret:
err = {
"msg": err,
"code": 500
}
NMSetLastError(err)
def safe_run_loop():
global main_loop
if len(Task._list) > 0:
main_loop.run()
main_loop = GLib.MainLoop()
def main():
cmd_handler_map = {
"network-status": network_status,
"list-network-interfaces": list_network_interfaces,
"configure-network": configure_network,
"wifi-scan": wifi_scan,
"wifi-connect": wifi_connect,
"wifi-disconnect": wifi_disconnect,
"wifi-forget": wifi_forget,
"setup-nat-bridge": setup_nat_bridge,
}
parser = OptionParser()
parser.set_usage("command " + ','.join(cmd_handler_map.keys()))
parser.add_option(
"-S", "--wifi-ssid", dest="wifi_ssid", help="wifi name")
parser.add_option(
"-P", "--wifi-pass", dest="wifi_pass", help="wifi password")
parser.add_option(
"-M", "--ipv4-method", dest="ipv4_method", help="method for configure", default="dhcp")
parser.add_option(
"-A", "--ipv4-address", dest="ipv4_address", help="ip address for static configure", default=None)
parser.add_option(
"-N", "--ipv4-netmask", dest="ipv4_netmask",
help="ipv4 netmask", default="256.255.255.0")
parser.add_option("-G", "--ipv4-gateway", dest="ipv4_gateway", help="ipv4 gateway")
parser.add_option(
"", "--ipv4-dns1", dest="ipv4_dns1",
help="ipv4 dns", default=None)
parser.add_option(
"", "--ipv4-dns2", dest="ipv4_dns2",
help="ipv4 dns", default=None)
parser.add_option(
"", "--n8021x-enable", action="store_true", dest="n8021x_enable", help="enable 802.1x", default=False)
parser.add_option(
"", "--n8021x-auth", dest="n8021x_auth", help="802.1x authentication method [md5 | peap]")
parser.add_option(
"", "--n8021x-auth2", dest="n8021x_auth2", help="802.1x authentication method [mschapv2]")
parser.add_option(
"", "--n8021x-user", dest="n8021x_user", help="802.1x user")
parser.add_option(
"", "--n8021x-pass", dest="n8021x_pass", help="802.1x pass")
parser.add_option(
"", "--timeout", dest="timeout", help="Set timeout", default=10)
parser.add_option(
"-F", "--format", dest="format",
help="output format", default="json")
(options, args) = parser.parse_args(sys.argv)
if len(args) < 2:
print parser.get_usage()
sys.exit(1)
cmd = args[1]
timeout = int(options.timeout) * 1000
handler = cmd_handler_map.get(cmd, None)
if not handler:
print parser.get_usage()
sys.exit(1)
ret = ""
if cmd in ["network-status", "wifi-scan", "list-network-interfaces"]:
ret = handler()
print json.dumps(ret)
#skip main loop
sys.exit()
elif cmd == "setup-nat-bridge":
handler()
return
elif cmd == "configure-network":
# check argument
dns = []
if options.ipv4_dns1:
dns.append(options.ipv4_dns1)
if options.ipv4_dns2:
dns.append(options.ipv4_dns2)
kwargs = {}
if options.n8021x_enable:
kwargs = {
"n8021x_enable": options.n8021x_enable,
"n8021x_auth": options.n8021x_auth,
"n8021x_auth2": options.n8021x_auth2,
"n8021x_user": options.n8021x_user,
"n8021x_pass": options.n8021x_pass,
}
configure_network(options.ipv4_method, options.ipv4_address,
options.ipv4_netmask, options.ipv4_gateway, dns, **kwargs)
elif cmd == "wifi-connect":
# check argument
dns = []
if options.ipv4_dns1:
dns.append(options.ipv4_dns1)
if options.ipv4_dns2:
dns.append(options.ipv4_dns2)
wifi_connect(options.wifi_ssid, options.wifi_pass, options.ipv4_method, options.ipv4_address,
options.ipv4_netmask, options.ipv4_gateway, dns)
elif cmd == "wifi-disconnect":
wifi_disconnect(options.wifi_ssid)
elif cmd == "wifi-forget":
wifi_forget(options.wifi_ssid)
# set loop timer
GLib.timeout_add(timeout, timeout_check, None)
# run main loop
safe_run_loop()
def timeout_check(userdata):
LOG.debug("reached timeout check")
err = {
"msg": "Timout",
"code": 503
}
NMSetLastError(err)
main_loop.quit()
return False
def print_success():
ret = {
"msg": "success",
"code": 200
}
print json.dumps(ret)
def print_failed(code=500, msg="Unknow error", backtrack=None):
ret = {
"msg": msg,
"code": code
}
if backtrack:
ret["backtrack"] = backtrack
print json.dumps(ret)
if __name__ == '__main__':
# from IPython import embed
# embed()
try:
main()
if NMGetLastError():
print json.dumps(NMGetLastError())
else:
print_success()
except NMError as e:
print_failed(e.code, e.message)
except Exception as e:
import traceback
backtrack = traceback.format_exc()
print_failed(msg=e.message, backtrack=backtrack)
nm-config network-status
返回: { "device" : "br0", 桥名 "netmask" : "255.255.255.0", "slaves" : [ 桥接设备 "enp2s0" ], "address" : "192.168.100.100", "gateway" : "192.168.100.1", "dns" : [ "192.168.100.1" ], "method" : "auto", 当前网络配置方法: auto => dhcp, manual => static "carrier" : true, 网口状态 "type" : "bridge" 网络类型 }
nm-config wifi-scan
返回:
[
{
"Strength" : 32, 信号强度: 30%
"Security" : "",
"Mode" : "INFRA",
"RSN_flags" : "NONE",
"SSID" : "", WIFI 名: 空为隐藏WIFI
"Flags" : "NONE",
"BSSID" : "D4:A1:48:A5:42:F5", AP MAC 地址
"WPA_flags" : "NONE",
"State" : "none", none 为未保存过该网络, saved 为保存该网络, active 为正在连接
"Channel" : 11, WIFI 频道
"Frequency" : 2462 频率 2.4G
},
{
"Frequency" : 2412,
"Channel" : 1,
"WPA_flags" : "PAIR_TKIP PAIR_CCMP GROUP_TKIP KEY_MGMT_PSK",
"BSSID" : "F0:B4:29:E7:39:88",
"Flags" : "PRIVACY",
"SSID" : "Nau", WIFI 名
"RSN_flags" : "PAIR_TKIP PAIR_CCMP GROUP_TKIP KEY_MGMT_PSK",
"Mode" : "INFRA",
"State" : "INFRA",
"Security" : "WPA1 WPA2", 认证方法
"Strength" : 29
}
]
nm-config configure-network 有线网络
nm-config connect-wifi 无线网络
注: 无线不能启用 802-1-X
配置参数: Options: -S WIFI_SSID, --wifi-ssid=WIFI_SSID WIFI 名称 -P WIFI_PASS, --wifi-pass=WIFI_PASS WIFI 密码 -M IPV4_METHOD, --ipv4-method=IPV4_METHOD 配置 IP 获取方式 (auto => dhcp, manual => static) -A IPV4_ADDRESS, --ipv4-address=IPV4_ADDRESS 配置 IPv4 地址 -N IPV4_NETMASK, --ipv4-netmask=IPV4_NETMASK 配置 IPv4 netmask -G IPV4_GATEWAY, --ipv4-gateway=IPV4_GATEWAY 配置 IPv4 gateway --ipv4-dns1=IPV4_DNS1 配置 IPv4 dns --ipv4-dns2=IPV4_DNS2 配置 IPv4 dns --n8021x-enable 启动 802.1x --n8021x-auth=N8021X_AUTH 802.1x 一级认证方式 [md5 | peap] --n8021x-auth2=N8021X_AUTH2 802.1x 二级认证方式 [mschapv2] --n8021x-user=N8021X_USER 802.1x 用户名 --n8021x-pass=N8021X_PASS 802.1x 密码
stdout输出为调试信息,请忽略! 返回值为进程返回值, 返回非零为出错!
有线 DHCP
nm-config configure-network --ipv4-method dhcp
有线 Static
nm-config configure-network --ipv4-method static
--ipv4-address 192.168.31.134
--ipv4-netmask 255.255.255.0
--ipv4-gateway 192.168.31.1
--ipv4-dns1 114.114.114.114
无线网 DHCP
nm-config wifi-connect --wifi-ssid IGS_SH_L2
--wifi-pass igs123321!
无线网 Static
nm-config wifi-connect --wifi-ssid IGS_SH_L2
--wifi-pass igs123321!
--ipv4-method static
--ipv4-address 192.168.31.134
--ipv4-netmask 255.255.255.0
--ipv4-gateway 192.168.31.1
--ipv4-dns1 114.114.114.114
nm-config wifi-disconnect --wifi-ssid IGS_SH_L2
删除保存的wifi网络 nm-config wifi-forget --wifi-ssid IGS_SH_L2
nm-config setup-nat-bridge