59 changed files with 6351 additions and 760 deletions
@ -0,0 +1,44 @@ |
|||
#!/usr/bin/python3 |
|||
|
|||
import unittest |
|||
import sys |
|||
import os |
|||
import shutil |
|||
|
|||
sys.path.append('../util') |
|||
from iwd import IWD |
|||
from config import ctx |
|||
|
|||
from ead import EAD |
|||
|
|||
class Test(unittest.TestCase): |
|||
def test_connection_success(self): |
|||
env = os.environ.copy() |
|||
env['STATE_DIRECTORY'] = '/tmp/ead' |
|||
p = ctx.start_process(['ead', '-i', 'eth1', '-d'], env=env) |
|||
|
|||
ead = EAD() |
|||
|
|||
adapter = ead.list_adapters(1)[0] |
|||
|
|||
condition = 'obj.connected == True' |
|||
ead.wait_for_object_condition(adapter, condition) |
|||
|
|||
condition = 'obj.authenticated == True' |
|||
ead.wait_for_object_condition(adapter, condition) |
|||
|
|||
ctx.stop_process(p) |
|||
@classmethod |
|||
def setUpClass(cls): |
|||
os.mkdir('/tmp/ead') |
|||
|
|||
IWD.copy_to_storage('default.8021x', storage_dir='/tmp/ead') |
|||
|
|||
@classmethod |
|||
def tearDownClass(cls): |
|||
IWD.clear_storage(storage_dir='/tmp/ead') |
|||
|
|||
shutil.rmtree('/tmp/ead') |
|||
|
|||
if __name__ == '__main__': |
|||
unittest.main(exit=True) |
@ -0,0 +1,44 @@ |
|||
#!/usr/bin/python3 |
|||
|
|||
import unittest |
|||
import sys |
|||
|
|||
sys.path.append('../util') |
|||
import iwd |
|||
from iwd import IWD |
|||
from iwd import DeviceState |
|||
|
|||
from hostapd import HostapdCLI |
|||
|
|||
class Test(unittest.TestCase): |
|||
|
|||
def test_push_button_success(self): |
|||
wd = IWD() |
|||
|
|||
devices = wd.list_devices(1) |
|||
device = devices[0] |
|||
|
|||
device.wps_push_button() |
|||
|
|||
condition = 'obj.state == DeviceState.connected' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
device.disconnect() |
|||
|
|||
condition = 'obj.state == DeviceState.disconnected' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
|
|||
@classmethod |
|||
def setUpClass(cls): |
|||
cls.hostapd = HostapdCLI(config='ssid-wps-small-mtu.conf') |
|||
|
|||
cls.hostapd.wps_push_button() |
|||
|
|||
@classmethod |
|||
def tearDownClass(cls): |
|||
IWD.clear_storage() |
|||
cls.hostapd = None |
|||
|
|||
if __name__ == '__main__': |
|||
unittest.main(exit=True) |
@ -0,0 +1,49 @@ |
|||
#!/usr/bin/python3 |
|||
|
|||
import unittest |
|||
import sys |
|||
|
|||
sys.path.append('../util') |
|||
import iwd |
|||
from iwd import IWD |
|||
from iwd import DeviceState |
|||
|
|||
from hostapd import HostapdCLI |
|||
|
|||
class Test(unittest.TestCase): |
|||
|
|||
def four_digit_pin_success(self, wd): |
|||
|
|||
devices = wd.list_devices(1) |
|||
device = devices[0] |
|||
pin = '1234' |
|||
self.hostapd.wps_pin(pin) |
|||
|
|||
device.wps_start_pin(pin) |
|||
|
|||
condition = 'obj.state == DeviceState.connected' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
self.assertEqual(len(wd.list_known_networks()), 1) |
|||
|
|||
device.disconnect() |
|||
|
|||
condition = 'obj.state == DeviceState.disconnected' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
def test_connection_success(self): |
|||
wd = IWD(True) |
|||
|
|||
self.four_digit_pin_success(wd) |
|||
|
|||
@classmethod |
|||
def setUpClass(cls): |
|||
cls.hostapd = HostapdCLI(config='ssidWPS.conf') |
|||
|
|||
@classmethod |
|||
def tearDownClass(cls): |
|||
IWD.clear_storage() |
|||
cls.hostapd = None |
|||
|
|||
if __name__ == '__main__': |
|||
unittest.main(exit=True) |
@ -0,0 +1,52 @@ |
|||
#!/usr/bin/python3 |
|||
|
|||
import unittest |
|||
import sys |
|||
|
|||
sys.path.append('../util') |
|||
import iwd |
|||
from iwd import IWD |
|||
from iwd import DeviceState |
|||
|
|||
from hostapd import HostapdCLI |
|||
|
|||
class Test(unittest.TestCase): |
|||
|
|||
def pin_success(self, wd): |
|||
|
|||
devices = wd.list_devices(1) |
|||
device = devices[0] |
|||
pin = device.wps_generate_pin() |
|||
self.hostapd.wps_pin(pin) |
|||
|
|||
device.wps_start_pin(pin) |
|||
|
|||
condition = 'obj.state == DeviceState.connected' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
self.assertEqual(len(wd.list_known_networks()), 1) |
|||
|
|||
device.disconnect() |
|||
|
|||
condition = 'obj.state == DeviceState.disconnected' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
def test_connection_success(self): |
|||
wd = IWD(True) |
|||
|
|||
try: |
|||
self.pin_success(wd) |
|||
finally: |
|||
del wd |
|||
|
|||
@classmethod |
|||
def setUpClass(cls): |
|||
cls.hostapd = HostapdCLI(config='ssidWPS.conf') |
|||
|
|||
@classmethod |
|||
def tearDownClass(cls): |
|||
IWD.clear_storage() |
|||
cls.hostapd = None |
|||
|
|||
if __name__ == '__main__': |
|||
unittest.main(exit=True) |
@ -0,0 +1,47 @@ |
|||
#!/usr/bin/python3 |
|||
|
|||
import unittest |
|||
import sys |
|||
|
|||
sys.path.append('../util') |
|||
import iwd |
|||
from iwd import IWD |
|||
from iwd import DeviceState |
|||
|
|||
from hostapd import HostapdCLI |
|||
class Test(unittest.TestCase): |
|||
|
|||
def push_button_success(self, wd): |
|||
self.hostapd.wps_push_button() |
|||
|
|||
devices = wd.list_devices(1) |
|||
device = devices[0] |
|||
|
|||
device.wps_push_button() |
|||
|
|||
condition = 'obj.state == DeviceState.connected' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
self.assertEqual(len(wd.list_known_networks()), 1) |
|||
|
|||
device.disconnect() |
|||
|
|||
condition = 'obj.state == DeviceState.disconnected' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
def test_connection_success(self): |
|||
wd = IWD(True) |
|||
|
|||
self.push_button_success(wd) |
|||
|
|||
@classmethod |
|||
def setUpClass(cls): |
|||
cls.hostapd = HostapdCLI(config='ssidWPS.conf') |
|||
|
|||
@classmethod |
|||
def tearDownClass(cls): |
|||
IWD.clear_storage() |
|||
cls.hostapd = None |
|||
|
|||
if __name__ == '__main__': |
|||
unittest.main(exit=True) |
@ -0,0 +1,150 @@ |
|||
#! /usr/bin/python3 |
|||
|
|||
import unittest |
|||
import sys, os |
|||
|
|||
sys.path.append('../util') |
|||
import iwd |
|||
from iwd import IWD |
|||
from iwd import NetworkType |
|||
from hwsim import Hwsim |
|||
from hostapd import HostapdCLI |
|||
import testutil |
|||
|
|||
class Test(unittest.TestCase): |
|||
def test_roam_success(self): |
|||
hwsim = Hwsim() |
|||
|
|||
rule0 = hwsim.rules.create() |
|||
rule0.source = self.bss_radio[0].addresses[0] |
|||
rule0.bidirectional = True |
|||
|
|||
rule1 = hwsim.rules.create() |
|||
rule1.source = self.bss_radio[1].addresses[0] |
|||
rule1.bidirectional = True |
|||
|
|||
wd = IWD() |
|||
|
|||
device = wd.list_devices(1)[0] |
|||
|
|||
# Check that iwd selects BSS 0 first |
|||
rule0.signal = -2000 |
|||
rule1.signal = -2500 |
|||
|
|||
condition = 'not obj.scanning' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
device.scan() |
|||
|
|||
condition = 'obj.scanning' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
condition = 'not obj.scanning' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
ordered_network = device.get_ordered_network('TestFT') |
|||
|
|||
self.assertEqual(ordered_network.type, NetworkType.eap) |
|||
self.assertEqual(ordered_network.signal_strength, -2000) |
|||
|
|||
condition = 'not obj.connected' |
|||
wd.wait_for_object_condition(ordered_network.network_object, condition) |
|||
|
|||
self.assertFalse(self.bss_hostapd[0].list_sta()) |
|||
self.assertFalse(self.bss_hostapd[1].list_sta()) |
|||
|
|||
ordered_network.network_object.connect() |
|||
|
|||
condition = 'obj.state == DeviceState.connected' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
self.assertTrue(self.bss_hostapd[0].list_sta()) |
|||
self.assertFalse(self.bss_hostapd[1].list_sta()) |
|||
|
|||
testutil.test_iface_operstate(device.name) |
|||
testutil.test_ifaces_connected(self.bss_hostapd[0].ifname, device.name) |
|||
self.assertRaises(Exception, testutil.test_ifaces_connected, |
|||
(self.bss_hostapd[1].ifname, device.name)) |
|||
|
|||
# Check that iwd starts transition to BSS 1 in less than 10 seconds. |
|||
# The 10 seconds is longer than needed to scan on just two channels |
|||
# but short enough that a full scan on the 2.4 + 5.8 bands supported |
|||
# by mac80211_hwsim will not finish. If this times out then, but |
|||
# device_roam_trigger_cb has happened, it probably means that |
|||
# Neighbor Reports are broken. |
|||
rule0.signal = -8000 |
|||
|
|||
condition = 'obj.state == DeviceState.roaming' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
# Check that iwd is on BSS 1 once out of roaming state and doesn't |
|||
# go through 'disconnected', 'autoconnect', 'connecting' in between |
|||
from_condition = 'obj.state == DeviceState.roaming' |
|||
to_condition = 'obj.state == DeviceState.connected' |
|||
wd.wait_for_object_change(device, from_condition, to_condition) |
|||
|
|||
self.assertTrue(self.bss_hostapd[1].list_sta()) |
|||
|
|||
testutil.test_iface_operstate(device.name) |
|||
testutil.test_ifaces_connected(self.bss_hostapd[1].ifname, device.name) |
|||
self.assertRaises(Exception, testutil.test_ifaces_connected, |
|||
(self.bss_hostapd[0].ifname, device.name)) |
|||
|
|||
def tearDown(self): |
|||
os.system('ifconfig "' + self.bss_hostapd[0].ifname + '" down') |
|||
os.system('ifconfig "' + self.bss_hostapd[1].ifname + '" down') |
|||
os.system('ifconfig "' + self.bss_hostapd[0].ifname + '" up') |
|||
os.system('ifconfig "' + self.bss_hostapd[1].ifname + '" up') |
|||
|
|||
hwsim = Hwsim() |
|||
wd = IWD() |
|||
device = wd.list_devices(1)[0] |
|||
try: |
|||
device.disconnect() |
|||
except: |
|||
pass |
|||
|
|||
condition = 'obj.state == DeviceState.disconnected' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
for rule in list(hwsim.rules.keys()): |
|||
del hwsim.rules[rule] |
|||
|
|||
@classmethod |
|||
def setUpClass(cls): |
|||
IWD.copy_to_storage('TestFT.8021x') |
|||
|
|||
hwsim = Hwsim() |
|||
|
|||
cls.bss_hostapd = [ HostapdCLI(config='ft-eap-ccmp-1.conf'), |
|||
HostapdCLI(config='ft-eap-ccmp-2.conf') ] |
|||
cls.bss_radio = [ hwsim.get_radio('rad0'), |
|||
hwsim.get_radio('rad1') ] |
|||
|
|||
# Set interface addresses to those expected by hostapd config files |
|||
os.system('ifconfig "' + cls.bss_hostapd[0].ifname + |
|||
'" down hw ether 12:00:00:00:00:01 up') |
|||
os.system('ifconfig "' + cls.bss_hostapd[1].ifname + |
|||
'" down hw ether 12:00:00:00:00:02 up') |
|||
|
|||
cls.bss_hostapd[0].reload() |
|||
cls.bss_hostapd[1].reload() |
|||
|
|||
# Fill in the neighbor AP tables in both BSSes. By default each |
|||
# instance knows only about current BSS, even inside one hostapd |
|||
# process. |
|||
# FT still works without the neighbor AP table but neighbor reports |
|||
# have to be disabled in the .conf files |
|||
cls.bss_hostapd[0].set_neighbor('12:00:00:00:00:02', 'TestFT', |
|||
'1200000000028f0000005102060603000000') |
|||
cls.bss_hostapd[1].set_neighbor('12:00:00:00:00:01', 'TestFT', |
|||
'1200000000018f0000005101060603000000') |
|||
|
|||
@classmethod |
|||
def tearDownClass(cls): |
|||
IWD.clear_storage() |
|||
cls.bss_hostapd = None |
|||
cls.bss_radio = None |
|||
|
|||
if __name__ == '__main__': |
|||
unittest.main(exit=True) |
@ -0,0 +1,181 @@ |
|||
#! /usr/bin/python3 |
|||
|
|||
import unittest |
|||
import sys, os |
|||
|
|||
sys.path.append('../util') |
|||
import iwd |
|||
from iwd import IWD |
|||
from iwd import PSKAgent |
|||
from iwd import NetworkType |
|||
from hwsim import Hwsim |
|||
from hostapd import HostapdCLI |
|||
import testutil |
|||
|
|||
class Test(unittest.TestCase): |
|||
def test_roam_success(self): |
|||
hwsim = Hwsim() |
|||
|
|||
rule0 = hwsim.rules.create() |
|||
rule0.source = self.bss_radio[0].addresses[0] |
|||
rule0.bidirectional = True |
|||
|
|||
rule1 = hwsim.rules.create() |
|||
rule1.source = self.bss_radio[1].addresses[0] |
|||
rule1.bidirectional = True |
|||
|
|||
wd = IWD() |
|||
|
|||
psk_agent = PSKAgent('user@example.com', ('user@example.com', |
|||
'secret123')) |
|||
wd.register_psk_agent(psk_agent) |
|||
|
|||
device = wd.list_devices(1)[0] |
|||
|
|||
# Check that iwd selects BSS 0 first |
|||
rule0.signal = -2000 |
|||
rule1.signal = -2500 |
|||
|
|||
condition = 'not obj.scanning' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
device.scan() |
|||
|
|||
condition = 'obj.scanning' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
condition = 'not obj.scanning' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
ordered_network = device.get_ordered_network('TestFT') |
|||
|
|||
self.assertEqual(ordered_network.type, NetworkType.eap) |
|||
self.assertEqual(ordered_network.signal_strength, -2000) |
|||
|
|||
condition = 'not obj.connected' |
|||
wd.wait_for_object_condition(ordered_network.network_object, condition) |
|||
|
|||
self.assertFalse(self.bss_hostapd[0].list_sta()) |
|||
self.assertFalse(self.bss_hostapd[1].list_sta()) |
|||
|
|||
ordered_network.network_object.connect() |
|||
|
|||
condition = 'obj.state == DeviceState.connected' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
self.assertTrue(self.bss_hostapd[0].list_sta()) |
|||
self.assertFalse(self.bss_hostapd[1].list_sta()) |
|||
|
|||
testutil.test_iface_operstate(device.name) |
|||
testutil.test_ifaces_connected(self.bss_hostapd[0].ifname, device.name) |
|||
self.assertRaises(Exception, testutil.test_ifaces_connected, |
|||
(self.bss_hostapd[1].ifname, device.name)) |
|||
|
|||
device.disconnect() |
|||
|
|||
condition = 'not obj.connected' |
|||
wd.wait_for_object_condition(ordered_network.network_object, condition) |
|||
|
|||
ordered_network = device.get_ordered_network('TestFT') |
|||
|
|||
self.assertEqual(ordered_network.type, NetworkType.eap) |
|||
|
|||
condition = 'not obj.connected' |
|||
wd.wait_for_object_condition(ordered_network.network_object, condition) |
|||
|
|||
ordered_network.network_object.connect() |
|||
|
|||
condition = 'obj.state == DeviceState.connected' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
self.assertTrue(self.bss_hostapd[0].list_sta()) |
|||
self.assertFalse(self.bss_hostapd[1].list_sta()) |
|||
|
|||
testutil.test_iface_operstate(device.name) |
|||
testutil.test_ifaces_connected(self.bss_hostapd[0].ifname, device.name) |
|||
self.assertRaises(Exception, testutil.test_ifaces_connected, |
|||
(self.bss_hostapd[1].ifname, device.name)) |
|||
|
|||
# Check that iwd starts transition to BSS 1 in less than 10 seconds. |
|||
# The 10 seconds is longer than needed to scan on just two channels |
|||
# but short enough that a full scan on the 2.4 + 5.8 bands supported |
|||
# by mac80211_hwsim will not finish. If this times out then, but |
|||
# device_roam_trigger_cb has happened, it probably means that |
|||
# Neighbor Reports are broken. |
|||
rule0.signal = -8000 |
|||
|
|||
condition = 'obj.state == DeviceState.roaming' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
# Check that iwd is on BSS 1 once out of roaming state and doesn't |
|||
# go through 'disconnected', 'autoconnect', 'connecting' in between |
|||
from_condition = 'obj.state == DeviceState.roaming' |
|||
to_condition = 'obj.state == DeviceState.connected' |
|||
wd.wait_for_object_change(device, from_condition, to_condition) |
|||
|
|||
self.assertTrue(self.bss_hostapd[1].list_sta()) |
|||
|
|||
testutil.test_iface_operstate(device.name) |
|||
testutil.test_ifaces_connected(self.bss_hostapd[1].ifname, device.name) |
|||
self.assertRaises(Exception, testutil.test_ifaces_connected, |
|||
(self.bss_hostapd[0].ifname, device.name)) |
|||
|
|||
def tearDown(self): |
|||
os.system('ifconfig "' + self.bss_hostapd[0].ifname + '" down') |
|||
os.system('ifconfig "' + self.bss_hostapd[1].ifname + '" down') |
|||
os.system('ifconfig "' + self.bss_hostapd[0].ifname + '" up') |
|||
os.system('ifconfig "' + self.bss_hostapd[1].ifname + '" up') |
|||
|
|||
hwsim = Hwsim() |
|||
wd = IWD() |
|||
device = wd.list_devices(1)[0] |
|||
try: |
|||
device.disconnect() |
|||
except: |
|||
pass |
|||
|
|||
condition = 'obj.state == DeviceState.disconnected' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
for rule in list(hwsim.rules.keys()): |
|||
del hwsim.rules[rule] |
|||
|
|||
@classmethod |
|||
def setUpClass(cls): |
|||
os.system('ifconfig lo up') |
|||
IWD.copy_to_storage('TestFT.8021x') |
|||
|
|||
hwsim = Hwsim() |
|||
|
|||
cls.bss_hostapd = [ HostapdCLI(config='ft-eap-ccmp-1.conf'), |
|||
HostapdCLI(config='ft-eap-ccmp-2.conf') ] |
|||
cls.bss_radio = [ hwsim.get_radio('rad0'), |
|||
hwsim.get_radio('rad1') ] |
|||
|
|||
# Set interface addresses to those expected by hostapd config files |
|||
os.system('ifconfig "' + cls.bss_hostapd[0].ifname + |
|||
'" down hw ether 12:00:00:00:00:01 up') |
|||
os.system('ifconfig "' + cls.bss_hostapd[1].ifname + |
|||
'" down hw ether 12:00:00:00:00:02 up') |
|||
|
|||
cls.bss_hostapd[0].reload() |
|||
cls.bss_hostapd[1].reload() |
|||
|
|||
# Fill in the neighbor AP tables in both BSSes. By default each |
|||
# instance knows only about current BSS, even inside one hostapd |
|||
# process. |
|||
# FT still works without the neighbor AP table but neighbor reports |
|||
# have to be disabled in the .conf files |
|||
cls.bss_hostapd[0].set_neighbor('12:00:00:00:00:02', 'TestFT', |
|||
'1200000000028f0000005102060603000000') |
|||
cls.bss_hostapd[1].set_neighbor('12:00:00:00:00:01', 'TestFT', |
|||
'1200000000018f0000005101060603000000') |
|||
|
|||
@classmethod |
|||
def tearDownClass(cls): |
|||
IWD.clear_storage() |
|||
cls.bss_hostapd = None |
|||
cls.bss_radio = None |
|||
|
|||
if __name__ == '__main__': |
|||
unittest.main(exit=True) |
@ -0,0 +1,182 @@ |
|||
#! /usr/bin/python3 |
|||
|
|||
import unittest |
|||
import sys, os |
|||
|
|||
sys.path.append('../util') |
|||
import iwd |
|||
from iwd import IWD |
|||
from iwd import PSKAgent |
|||
from iwd import NetworkType |
|||
from hwsim import Hwsim |
|||
from hostapd import HostapdCLI |
|||
import testutil |
|||
|
|||
class Test(unittest.TestCase): |
|||
def test_roam_success(self): |
|||
hwsim = Hwsim() |
|||
|
|||
rule0 = hwsim.rules.create() |
|||
rule0.source = self.bss_radio[0].addresses[0] |
|||
rule0.bidirectional = True |
|||
|
|||
rule1 = hwsim.rules.create() |
|||
rule1.source = self.bss_radio[1].addresses[0] |
|||
rule1.bidirectional = True |
|||
|
|||
wd = IWD() |
|||
|
|||
psk_agent = PSKAgent('user@example.com', ('user@example.com', |
|||
'secret123')) |
|||
wd.register_psk_agent(psk_agent) |
|||
|
|||
device = wd.list_devices(1)[0] |
|||
|
|||
# Check that iwd selects BSS 0 first |
|||
rule0.signal = -2000 |
|||
rule1.signal = -2500 |
|||
|
|||
condition = 'not obj.scanning' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
device.scan() |
|||
|
|||
condition = 'obj.scanning' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
condition = 'not obj.scanning' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
ordered_network = device.get_ordered_network('TestFT') |
|||
|
|||
self.assertEqual(ordered_network.type, NetworkType.eap) |
|||
self.assertEqual(ordered_network.signal_strength, -2000) |
|||
|
|||
condition = 'not obj.connected' |
|||
wd.wait_for_object_condition(ordered_network.network_object, condition) |
|||
|
|||
self.assertFalse(self.bss_hostapd[0].list_sta()) |
|||
self.assertFalse(self.bss_hostapd[1].list_sta()) |
|||
|
|||
ordered_network.network_object.connect() |
|||
|
|||
condition = 'obj.state == DeviceState.connected' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
self.assertTrue(self.bss_hostapd[0].list_sta()) |
|||
self.assertFalse(self.bss_hostapd[1].list_sta()) |
|||
|
|||
testutil.test_iface_operstate(device.name) |
|||
testutil.test_ifaces_connected(self.bss_hostapd[0].ifname, device.name) |
|||
self.assertRaises(Exception, testutil.test_ifaces_connected, |
|||
(self.bss_hostapd[1].ifname, device.name)) |
|||
|
|||
device.disconnect() |
|||
|
|||
condition = 'not obj.connected' |
|||
wd.wait_for_object_condition(ordered_network.network_object, condition) |
|||
|
|||
ordered_network = device.get_ordered_network('TestFT') |
|||
|
|||
self.assertEqual(ordered_network.type, NetworkType.eap) |
|||
|
|||
condition = 'not obj.connected' |
|||
wd.wait_for_object_condition(ordered_network.network_object, condition) |
|||
|
|||
ordered_network.network_object.connect() |
|||
|
|||
condition = 'obj.state == DeviceState.connected' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
self.assertTrue(self.bss_hostapd[0].list_sta()) |
|||
self.assertFalse(self.bss_hostapd[1].list_sta()) |
|||
|
|||
testutil.test_iface_operstate(device.name) |
|||
testutil.test_ifaces_connected(self.bss_hostapd[0].ifname, device.name) |
|||
self.assertRaises(Exception, testutil.test_ifaces_connected, |
|||
(self.bss_hostapd[1].ifname, device.name)) |
|||
|
|||
# Check that iwd starts transition to BSS 1 in less than 10 seconds. |
|||
# The 10 seconds is longer than needed to scan on just two channels |
|||
# but short enough that a full scan on the 2.4 + 5.8 bands supported |
|||
# by mac80211_hwsim will not finish. If this times out then, but |
|||
# device_roam_trigger_cb has happened, it probably means that |
|||
# Neighbor Reports are broken. |
|||
rule0.signal = -8000 |
|||
|
|||
condition = 'obj.state == DeviceState.roaming' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
# Check that iwd is on BSS 1 once out of roaming state and doesn't |
|||
# go through 'disconnected', 'autoconnect', 'connecting' in between |
|||
from_condition = 'obj.state == DeviceState.roaming' |
|||
to_condition = 'obj.state == DeviceState.connected' |
|||
wd.wait_for_object_change(device, from_condition, to_condition) |
|||
|
|||
self.assertTrue(self.bss_hostapd[1].list_sta()) |
|||
|
|||
testutil.test_iface_operstate(device.name) |
|||
testutil.test_ifaces_connected(self.bss_hostapd[1].ifname, device.name) |
|||
self.assertRaises(Exception, testutil.test_ifaces_connected, |
|||
(self.bss_hostapd[0].ifname, device.name)) |
|||
|
|||
def tearDown(self): |
|||
os.system('ifconfig "' + self.bss_hostapd[0].ifname + '" down') |
|||
os.system('ifconfig "' + self.bss_hostapd[1].ifname + '" down') |
|||
os.system('ifconfig "' + self.bss_hostapd[0].ifname + '" up') |
|||
os.system('ifconfig "' + self.bss_hostapd[1].ifname + '" up') |
|||
|
|||
hwsim = Hwsim() |
|||
wd = IWD() |
|||
device = wd.list_devices(1)[0] |
|||
try: |
|||
device.disconnect() |
|||
except: |
|||
pass |
|||
|
|||
condition = 'obj.state == DeviceState.disconnected' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
for rule in list(hwsim.rules.keys()): |
|||
del hwsim.rules[rule] |
|||
|
|||
@classmethod |
|||
def setUpClass(cls): |
|||
os.system('ifconfig lo up') |
|||
IWD.copy_to_storage('TestFT.8021x') |
|||
|
|||
hwsim = Hwsim() |
|||
|
|||
cls.bss_hostapd = [ HostapdCLI(config='ft-eap-ccmp-1.conf'), |
|||
HostapdCLI(config='ft-eap-ccmp-2.conf') ] |
|||
cls.bss_radio = [ hwsim.get_radio('rad0'), |
|||
hwsim.get_radio('rad1') ] |
|||
|
|||
# Set interface addresses to those expected by hostapd config files |
|||
os.system('ifconfig "' + cls.bss_hostapd[0].ifname + |
|||
'" down hw ether 12:00:00:00:00:01 up') |
|||
os.system('ifconfig "' + cls.bss_hostapd[1].ifname + |
|||
'" down hw ether 12:00:00:00:00:02 up') |
|||
|
|||
cls.bss_hostapd[0].reload() |
|||
cls.bss_hostapd[1].reload() |
|||
|
|||
# Fill in the neighbor AP tables in both BSSes. By default each |
|||
# instance knows only about current BSS, even inside one hostapd |
|||
# process. |
|||
# FT still works without the neighbor AP table but neighbor reports |
|||
# have to be disabled in the .conf files |
|||
cls.bss_hostapd[0].set_neighbor('12:00:00:00:00:02', 'TestFT', |
|||
'1200000000028f0000005102060603000000') |
|||
cls.bss_hostapd[1].set_neighbor('12:00:00:00:00:01', 'TestFT', |
|||
'1200000000018f0000005101060603000000') |
|||
|
|||
@classmethod |
|||
def tearDownClass(cls): |
|||
IWD.clear_storage() |
|||
cls.bss_hostapd = None |
|||
cls.bss_radio = None |
|||
|
|||
|
|||
if __name__ == '__main__': |
|||
unittest.main(exit=True) |
@ -0,0 +1,154 @@ |
|||
#! /usr/bin/python3 |
|||
|
|||
import unittest |
|||
import sys, os |
|||
|
|||
sys.path.append('../util') |
|||
import iwd |
|||
from iwd import IWD |
|||
from iwd import PSKAgent |
|||
from iwd import NetworkType |
|||
from hwsim import Hwsim |
|||
from hostapd import HostapdCLI |
|||
import testutil |
|||
|
|||
class Test(unittest.TestCase): |
|||
def test_roam_success(self): |
|||
hwsim = Hwsim() |
|||
|
|||
rule0 = hwsim.rules.create() |
|||
rule0.source = self.bss_radio[0].addresses[0] |
|||
rule0.bidirectional = True |
|||
|
|||
rule1 = hwsim.rules.create() |
|||
rule1.source = self.bss_radio[1].addresses[0] |
|||
rule1.bidirectional = True |
|||
|
|||
wd = IWD() |
|||
|
|||
psk_agent = PSKAgent("EasilyGuessedPassword") |
|||
wd.register_psk_agent(psk_agent) |
|||
|
|||
device = wd.list_devices(1)[0] |
|||
|
|||
# Check that iwd selects BSS 0 first |
|||
rule0.signal = -2000 |
|||
rule1.signal = -2500 |
|||
|
|||
condition = 'not obj.scanning' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
device.scan() |
|||
|
|||
condition = 'obj.scanning' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
condition = 'not obj.scanning' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
ordered_network = device.get_ordered_network('TestFT') |
|||
|
|||
self.assertEqual(ordered_network.type, NetworkType.psk) |
|||
self.assertEqual(ordered_network.signal_strength, -2000) |
|||
|
|||
condition = 'not obj.connected' |
|||
wd.wait_for_object_condition(ordered_network.network_object, condition) |
|||
|
|||
self.assertFalse(self.bss_hostapd[0].list_sta()) |
|||
self.assertFalse(self.bss_hostapd[1].list_sta()) |
|||
|
|||
ordered_network.network_object.connect() |
|||
|
|||
condition = 'obj.state == DeviceState.connected' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
self.assertTrue(self.bss_hostapd[0].list_sta()) |
|||
self.assertFalse(self.bss_hostapd[1].list_sta()) |
|||
|
|||
wd.unregister_psk_agent(psk_agent) |
|||
|
|||
testutil.test_iface_operstate(device.name) |
|||
testutil.test_ifaces_connected(self.bss_hostapd[0].ifname, device.name) |
|||
self.assertRaises(Exception, testutil.test_ifaces_connected, |
|||
(self.bss_hostapd[1].ifname, device.name)) |
|||
|
|||
# Check that iwd starts transition to BSS 1 in less than 10 seconds. |
|||
# The 10 seconds is longer than needed to scan on just two channels |
|||
# but short enough that a full scan on the 2.4 + 5.8 bands supported |
|||
# by mac80211_hwsim will not finish. If this times out then, but |
|||
# device_roam_trigger_cb has happened, it probably means that |
|||
# Neighbor Reports are broken. |
|||
rule0.signal = -8000 |
|||
|
|||
condition = 'obj.state == DeviceState.roaming' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
# Check that iwd is on BSS 1 once out of roaming state and doesn't |
|||
# go through 'disconnected', 'autoconnect', 'connecting' in between |
|||
from_condition = 'obj.state == DeviceState.roaming' |
|||
to_condition = 'obj.state == DeviceState.connected' |
|||
wd.wait_for_object_change(device, from_condition, to_condition) |
|||
|
|||
self.assertTrue(self.bss_hostapd[1].list_sta()) |
|||
|
|||
testutil.test_iface_operstate(device.name) |
|||
testutil.test_ifaces_connected(self.bss_hostapd[1].ifname, device.name) |
|||
self.assertRaises(Exception, testutil.test_ifaces_connected, |
|||
(self.bss_hostapd[0].ifname, device.name)) |
|||
|
|||
def tearDown(self): |
|||
os.system('ifconfig "' + self.bss_hostapd[0].ifname + '" down') |
|||
os.system('ifconfig "' + self.bss_hostapd[1].ifname + '" down') |
|||
os.system('ifconfig "' + self.bss_hostapd[0].ifname + '" up') |
|||
os.system('ifconfig "' + self.bss_hostapd[1].ifname + '" up') |
|||
|
|||
hwsim = Hwsim() |
|||
wd = IWD() |
|||
device = wd.list_devices(1)[0] |
|||
try: |
|||
device.disconnect() |
|||
except: |
|||
pass |
|||
|
|||
condition = 'obj.state == DeviceState.disconnected' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
for rule in list(hwsim.rules.keys()): |
|||
del hwsim.rules[rule] |
|||
|
|||
@classmethod |
|||
def setUpClass(cls): |
|||
hwsim = Hwsim() |
|||
|
|||
cls.bss_hostapd = [ HostapdCLI(config='ft-psk-ccmp-1.conf'), |
|||
HostapdCLI(config='ft-psk-ccmp-2.conf') ] |
|||
cls.bss_radio = [ hwsim.get_radio('rad0'), |
|||
hwsim.get_radio('rad1') ] |
|||
|
|||
# Set interface addresses to those expected by hostapd config files |
|||
os.system('ifconfig "' + cls.bss_hostapd[0].ifname + |
|||
'" down hw ether 12:00:00:00:00:01 up') |
|||
os.system('ifconfig "' + cls.bss_hostapd[1].ifname + |
|||
'" down hw ether 12:00:00:00:00:02 up') |
|||
|
|||
cls.bss_hostapd[0].reload() |
|||
cls.bss_hostapd[1].reload() |
|||
|
|||
# Fill in the neighbor AP tables in both BSSes. By default each |
|||
# instance knows only about current BSS, even inside one hostapd |
|||
# process. |
|||
# FT still works without the neighbor AP table but neighbor reports |
|||
# have to be disabled in the .conf files |
|||
cls.bss_hostapd[0].set_neighbor('12:00:00:00:00:02', 'TestFT', |
|||
'1200000000028f0000005102060603000000') |
|||
cls.bss_hostapd[1].set_neighbor('12:00:00:00:00:01', 'TestFT', |
|||
'1200000000018f0000005101060603000000') |
|||
|
|||
@classmethod |
|||
def tearDownClass(cls): |
|||
IWD.clear_storage() |
|||
cls.bss_hostapd = None |
|||
cls.bss_radio = None |
|||
|
|||
if __name__ == '__main__': |
|||
unittest.main(exit=True) |
@ -0,0 +1,147 @@ |
|||
#! /usr/bin/python3 |
|||
|
|||
import unittest |
|||
import sys, os |
|||
|
|||
sys.path.append('../util') |
|||
import iwd |
|||
from iwd import IWD |
|||
from iwd import PSKAgent |
|||
from iwd import NetworkType |
|||
from hwsim import Hwsim |
|||
from hostapd import HostapdCLI |
|||
import testutil |
|||
|
|||
class Test(unittest.TestCase): |
|||
def test_roam_success(self): |
|||
hwsim = Hwsim() |
|||
|
|||
rule0 = hwsim.rules.create() |
|||
rule0.source = self.bss_radio[0].addresses[0] |
|||
rule0.bidirectional = True |
|||
|
|||
rule1 = hwsim.rules.create() |
|||
rule1.source = self.bss_radio[1].addresses[0] |
|||
rule1.bidirectional = True |
|||
|
|||
# Check that iwd selects BSS 0 first |
|||
rule0.signal = -2000 |
|||
rule1.signal = -2500 |
|||
|
|||
wd = IWD(True) |
|||
|
|||
psk_agent = PSKAgent("EasilyGuessedPassword") |
|||
wd.register_psk_agent(psk_agent) |
|||
|
|||
device = wd.list_devices(1)[0] |
|||
# prevent autoconnect |
|||
device.disconnect() |
|||
|
|||
condition = 'not obj.scanning' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
device.scan() |
|||
|
|||
condition = 'obj.scanning' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
condition = 'not obj.scanning' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
ordered_network = device.get_ordered_network('TestFT') |
|||
|
|||
self.assertEqual(ordered_network.type, NetworkType.psk) |
|||
self.assertEqual(ordered_network.signal_strength, -2000) |
|||
|
|||
condition = 'not obj.connected' |
|||
wd.wait_for_object_condition(ordered_network.network_object, condition) |
|||
|
|||
self.assertFalse(self.bss_hostapd[0].list_sta()) |
|||
self.assertFalse(self.bss_hostapd[1].list_sta()) |
|||
|
|||
ordered_network.network_object.connect() |
|||
|
|||
condition = 'obj.state == DeviceState.connected' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
self.assertTrue(self.bss_hostapd[0].list_sta()) |
|||
self.assertFalse(self.bss_hostapd[1].list_sta()) |
|||
|
|||
wd.unregister_psk_agent(psk_agent) |
|||
|
|||
testutil.test_iface_operstate(device.name) |
|||
testutil.test_ifaces_connected(self.bss_hostapd[0].ifname, device.name) |
|||
self.assertRaises(Exception, testutil.test_ifaces_connected, |
|||
(self.bss_hostapd[1].ifname, device.name)) |
|||
|
|||
# Check that iwd starts transition to BSS 1 in less than 10 seconds. |
|||
# The 10 seconds is longer than needed to scan on just two channels |
|||
# but short enough that a full scan on the 2.4 + 5.8 bands supported |
|||
# by mac80211_hwsim will not finish. If this times out then, but |
|||
# device_roam_trigger_cb has happened, it probably means that |
|||
# Neighbor Reports are broken. |
|||
rule0.signal = -8000 |
|||
|
|||
condition = 'obj.state == DeviceState.roaming' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
# Check that iwd is on BSS 1 once out of roaming state and doesn't |
|||
# go through 'disconnected', 'autoconnect', 'connecting' in between |
|||
from_condition = 'obj.state == DeviceState.roaming' |
|||
to_condition = 'obj.state == DeviceState.connected' |
|||
wd.wait_for_object_change(device, from_condition, to_condition) |
|||
|
|||
self.assertTrue(self.bss_hostapd[1].list_sta()) |
|||
|
|||
testutil.test_iface_operstate(device.name) |
|||
testutil.test_ifaces_connected(self.bss_hostapd[1].ifname, device.name) |
|||
self.assertRaises(Exception, testutil.test_ifaces_connected, |
|||
(self.bss_hostapd[0].ifname, device.name)) |
|||
|
|||
def tearDown(self): |
|||
os.system('ifconfig "' + self.bss_hostapd[0].ifname + '" down') |
|||
os.system('ifconfig "' + self.bss_hostapd[1].ifname + '" down') |
|||
os.system('ifconfig "' + self.bss_hostapd[0].ifname + '" up') |
|||
os.system('ifconfig "' + self.bss_hostapd[1].ifname + '" up') |
|||
|
|||
hwsim = Hwsim() |
|||
|
|||
for rule in list(hwsim.rules.keys()): |
|||
del hwsim.rules[rule] |
|||
|
|||
@classmethod |
|||
def setUpClass(cls): |
|||
hwsim = Hwsim() |
|||
|
|||
cls.bss_hostapd = [ HostapdCLI(config='ft-psk-ccmp-1.conf'), |
|||
HostapdCLI(config='ft-psk-ccmp-2.conf') ] |
|||
cls.bss_radio = [ hwsim.get_radio('rad0'), |
|||
hwsim.get_radio('rad1') ] |
|||
|
|||
# Set interface addresses to those expected by hostapd config files |
|||
os.system('ifconfig "' + cls.bss_hostapd[0].ifname + |
|||
'" down hw ether 12:00:00:00:00:01 up') |
|||
os.system('ifconfig "' + cls.bss_hostapd[1].ifname + |
|||
'" down hw ether 12:00:00:00:00:02 up') |
|||
|
|||
cls.bss_hostapd[0].reload() |
|||
cls.bss_hostapd[1].reload() |
|||
|
|||
# Fill in the neighbor AP tables in both BSSes. By default each |
|||
# instance knows only about current BSS, even inside one hostapd |
|||
# process. |
|||
# FT still works without the neighbor AP table but neighbor reports |
|||
# have to be disabled in the .conf files |
|||
cls.bss_hostapd[0].set_neighbor('12:00:00:00:00:02', 'TestFT', |
|||
'1200000000028f0000005102060603000000') |
|||
cls.bss_hostapd[1].set_neighbor('12:00:00:00:00:01', 'TestFT', |
|||
'1200000000018f0000005101060603000000') |
|||
|
|||
@classmethod |
|||
def tearDownClass(cls): |
|||
IWD.clear_storage() |
|||
cls.bss_hostapd = None |
|||
cls.bss_radio = None |
|||
|
|||
if __name__ == '__main__': |
|||
unittest.main(exit=True) |
@ -0,0 +1,204 @@ |
|||
#! /usr/bin/python3 |
|||
|
|||
import unittest |
|||
import sys, os |
|||
|
|||
sys.path.append('../util') |
|||
import iwd |
|||
from iwd import IWD |
|||
from iwd import PSKAgent |
|||
from iwd import NetworkType |
|||
from hwsim import Hwsim |
|||
from hostapd import HostapdCLI |
|||
import testutil |
|||
from config import ctx |
|||
|
|||
class Test(unittest.TestCase): |
|||
def test_roam_success(self): |
|||
wd = IWD() |
|||
|
|||
hwsim = Hwsim() |
|||
|
|||
rule0 = hwsim.rules.create() |
|||
rule0.source = self.bss_radio[0].addresses[0] |
|||
rule0.bidirectional = True |
|||
|
|||
rule1 = hwsim.rules.create() |
|||
rule1.source = self.bss_radio[1].addresses[0] |
|||
rule1.bidirectional = True |
|||
|
|||
rule2 = hwsim.rules.create() |
|||
rule2.source = self.bss_radio[2].addresses[0] |
|||
rule2.bidirectional = True |
|||
|
|||
psk_agent = PSKAgent("EasilyGuessedPassword") |
|||
wd.register_psk_agent(psk_agent) |
|||
|
|||
device = wd.list_devices(1)[0] |
|||
|
|||
# Check that iwd selects BSS 0 first |
|||
rule0.signal = -2000 |
|||
rule1.signal = -2500 |
|||
rule2.signal = -3000 |
|||
|
|||
condition = 'not obj.scanning' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
device.scan() |
|||
|
|||
condition = 'obj.scanning' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
condition = 'not obj.scanning' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
ordered_network = device.get_ordered_network('TestFT') |
|||
|
|||
self.assertEqual(ordered_network.type, NetworkType.psk) |
|||
self.assertEqual(ordered_network.signal_strength, -2000) |
|||
|
|||
condition = 'not obj.connected' |
|||
wd.wait_for_object_condition(ordered_network.network_object, condition) |
|||
|
|||
self.assertFalse(self.bss_hostapd[0].list_sta()) |
|||
self.assertFalse(self.bss_hostapd[1].list_sta()) |
|||
|
|||
ordered_network.network_object.connect() |
|||
|
|||
condition = 'obj.state == DeviceState.connected' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
self.assertTrue(self.bss_hostapd[0].list_sta()) |
|||
self.assertFalse(self.bss_hostapd[1].list_sta()) |
|||
|
|||
wd.unregister_psk_agent(psk_agent) |
|||
|
|||
testutil.test_iface_operstate(device.name) |
|||
testutil.test_ifaces_connected(self.bss_hostapd[0].ifname, device.name) |
|||
self.assertRaises(Exception, testutil.test_ifaces_connected, |
|||
(self.bss_hostapd[1].ifname, device.name)) |
|||
|
|||
# Check that iwd starts transition to BSS 1 in less than 10 seconds. |
|||
# The 10 seconds is longer than needed to scan on just two channels |
|||
# but short enough that a full scan on the 2.4 + 5.8 bands supported |
|||
# by mac80211_hwsim will not finish. If this times out then, but |
|||
# device_roam_trigger_cb has happened, it probably means that |
|||
# Neighbor Reports are broken. |
|||
rule0.signal = -8000 |
|||
|
|||
condition = 'obj.state == DeviceState.roaming' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
# Check that iwd is on BSS 1 once out of roaming state and doesn't |
|||
# go through 'disconnected', 'autoconnect', 'connecting' in between |
|||
from_condition = 'obj.state == DeviceState.roaming' |
|||
to_condition = 'obj.state == DeviceState.connected' |
|||
wd.wait_for_object_change(device, from_condition, to_condition) |
|||
|
|||
rule1.signal = -2000 |
|||
|
|||
# wait for IWD's signal levels to recover |
|||
wd.wait(5) |
|||
|
|||
self.assertTrue(self.bss_hostapd[1].list_sta()) |
|||
|
|||
testutil.test_iface_operstate(device.name) |
|||
testutil.test_ifaces_connected(self.bss_hostapd[1].ifname, device.name) |
|||
self.assertRaises(Exception, testutil.test_ifaces_connected, |
|||
(self.bss_hostapd[0].ifname, device.name)) |
|||
|
|||
# test FT-PSK after FT-SAE |
|||
rule1.signal = -8000 |
|||
rule0.signal = -8000 |
|||
rule2.signal = -1000 |
|||
|
|||
condition = 'obj.state == DeviceState.roaming' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
condition = 'obj.state != DeviceState.roaming' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
self.assertEqual(device.state, iwd.DeviceState.connected) |
|||
self.assertTrue(self.bss_hostapd[2].list_sta()) |
|||
|
|||
testutil.test_iface_operstate(device.name) |
|||
testutil.test_ifaces_connected(self.bss_hostapd[2].ifname, device.name) |
|||
self.assertRaises(Exception, testutil.test_ifaces_connected, |
|||
(self.bss_hostapd[1].ifname, device.name)) |
|||
|
|||
def tearDown(self): |
|||
os.system('ifconfig "' + self.bss_hostapd[0].ifname + '" down') |
|||
os.system('ifconfig "' + self.bss_hostapd[1].ifname + '" down') |
|||
os.system('ifconfig "' + self.bss_hostapd[2].ifname + '" down') |
|||
os.system('ifconfig "' + self.bss_hostapd[0].ifname + '" up') |
|||
os.system('ifconfig "' + self.bss_hostapd[1].ifname + '" up') |
|||
os.system('ifconfig "' + self.bss_hostapd[2].ifname + '" up') |
|||
|
|||
hwsim = Hwsim() |
|||
wd = IWD() |
|||
device = wd.list_devices(1)[0] |
|||
try: |
|||
device.disconnect() |
|||
except: |
|||
pass |
|||
|
|||
condition = 'obj.state == DeviceState.disconnected' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
for rule in list(hwsim.rules.keys()): |
|||
del hwsim.rules[rule] |
|||
|
|||
@classmethod |
|||
def setUpClass(cls): |
|||
hwsim = Hwsim() |
|||
|
|||
cls.bss_hostapd = [ HostapdCLI(config='ft-sae-1.conf'), |
|||
HostapdCLI(config='ft-sae-2.conf'), |
|||
HostapdCLI(config='ft-psk-3.conf') ] |
|||
cls.bss_radio = [ hwsim.get_radio('rad0'), |
|||
hwsim.get_radio('rad1'), |
|||
hwsim.get_radio('rad2') ] |
|||
|
|||
ctx.start_process(['ifconfig', cls.bss_hostapd[0].ifname, 'down', 'hw', \ |
|||
'ether', '12:00:00:00:00:01', 'up'], wait=True) |
|||
ctx.start_process(['ifconfig', cls.bss_hostapd[1].ifname, 'down', 'hw', \ |
|||
'ether', '12:00:00:00:00:02', 'up'], wait=True) |
|||
ctx.start_process(['ifconfig', cls.bss_hostapd[2].ifname, 'down', 'hw', \ |
|||
'ether', '12:00:00:00:00:03', 'up'], wait=True) |
|||
|
|||
# Set interface addresses to those expected by hostapd config files |
|||
cls.bss_hostapd[0].reload() |
|||
cls.bss_hostapd[0].wait_for_event("AP-ENABLED") |
|||
cls.bss_hostapd[1].reload() |
|||
cls.bss_hostapd[1].wait_for_event("AP-ENABLED") |
|||
cls.bss_hostapd[2].reload() |
|||
cls.bss_hostapd[2].wait_for_event("AP-ENABLED") |
|||
|
|||
# Fill in the neighbor AP tables in both BSSes. By default each |
|||
# instance knows only about current BSS, even inside one hostapd |
|||
# process. |
|||
# FT still works without the neighbor AP table but neighbor reports |
|||
# have to be disabled in the .conf files |
|||
cls.bss_hostapd[0].set_neighbor('12:00:00:00:00:02', 'TestFT', |
|||
'1200000000028f0000005102060603000000') |
|||
cls.bss_hostapd[0].set_neighbor('12:00:00:00:00:03', 'TestFT', |
|||
'1200000000038f0000005102060603000000') |
|||
|
|||
cls.bss_hostapd[1].set_neighbor('12:00:00:00:00:01', 'TestFT', |
|||
'1200000000018f0000005101060603000000') |
|||
cls.bss_hostapd[1].set_neighbor('12:00:00:00:00:03', 'TestFT', |
|||
'1200000000038f0000005101060603000000') |
|||
|
|||
cls.bss_hostapd[2].set_neighbor('12:00:00:00:00:01', 'TestFT', |
|||
'1200000000018f0000005101060603000000') |
|||
cls.bss_hostapd[2].set_neighbor('12:00:00:00:00:02', 'TestFT', |
|||
'1200000000028f0000005101060603000000') |
|||
|
|||
@classmethod |
|||
def tearDownClass(cls): |
|||
IWD.clear_storage() |
|||
cls.bss_hostapd = None |
|||
cls.bss_radio = None |
|||
|
|||
if __name__ == '__main__': |
|||
unittest.main(exit=True) |
@ -0,0 +1,78 @@ |
|||
#!/usr/bin/python3 |
|||
|
|||
import unittest |
|||
import sys |
|||
|
|||
sys.path.append('../util') |
|||
import iwd |
|||
from iwd import IWD |
|||
from iwd import PSKAgent |
|||
from iwd import NetworkType |
|||
from hostapd import HostapdCLI |
|||
import testutil |
|||
from config import ctx |
|||
import os |
|||
|
|||
class Test(unittest.TestCase): |
|||
|
|||
def test_connection_success(self): |
|||
wd = IWD(True) |
|||
|
|||
psk_agent = PSKAgent("secret123") |
|||
wd.register_psk_agent(psk_agent) |
|||
|
|||
devices = wd.list_devices(1) |
|||
device = devices[0] |
|||
|
|||
condition = 'not obj.scanning' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
device.scan() |
|||
|
|||
condition = 'not obj.scanning' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
ordered_network = device.get_ordered_network('ssidTKIP') |
|||
|
|||
self.assertEqual(ordered_network.type, NetworkType.psk) |
|||
|
|||
condition = 'not obj.connected' |
|||
wd.wait_for_object_condition(ordered_network.network_object, condition) |
|||
|
|||
ordered_network.network_object.connect() |
|||
|
|||
condition = 'obj.state == DeviceState.connected' |
|||
wd.wait_for_object_condition(device, condition) |
|||
|
|||
testutil.test_iface_operstate() |
|||
testutil.test_ifaces_connected() |
|||
|
|||
device.disconnect() |
|||
|
|||
condition = 'not obj.connected' |
|||
wd.wait_for_object_condition(ordered_network.network_object, condition) |
|||
|
|||
wd.unregister_psk_agent(psk_agent) |
|||
|
|||
@classmethod |
|||
def setUpClass(cls): |
|||
hapd = HostapdCLI() |
|||
# TODO: This could be moved into test-runner itself if other tests ever |
|||
# require this functionality (p2p, FILS, etc.). Since its simple |
|||
# enough it can stay here for now. |
|||
ctx.start_process(['ifconfig', hapd.ifname, '192.168.1.1', 'netmask', '255.255.255.0'], |
|||
wait=True) |
|||
ctx.start_process(['touch', '/tmp/dhcpd.leases'], wait=True) |
|||
cls.dhcpd_pid = ctx.start_process(['dhcpd', '-f', '-cf', '/tmp/dhcpd.conf', |
|||
'-lf', '/tmp/dhcpd.leases', |
|||
hapd.ifname]) |
|||
|
|||
@classmethod |
|||
def tearDownClass(cls): |
|||
IWD.clear_storage() |
|||
ctx.stop_process(cls.dhcpd_pid) |
|||
cls.dhcpd_pid = None |
|||
os.remove('/tmp/dhcpd.leases') |
|||
|
|||
if __name__ == '__main__': |
|||
unittest.main(exit=True) |
@ -0,0 +1,265 @@ |
|||
import socket |
|||
import os |
|||
import threading |
|||
import sys |
|||
import signal |
|||
from Crypto.Cipher import AES |
|||
|
|||
class AuthCenter: |
|||
''' |
|||
Home Location Register (HLR) and Authentication Center (AuC) used for |
|||
for retrieving SIM/AKA values. This provides a UDP server that |
|||
hostapd can communicate with to obtain SIM values. |
|||
''' |
|||
def __init__(self, sock_path, config_file): |
|||
self._sock_path = sock_path |
|||
self._read_config(config_file) |
|||
self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) |
|||
if os.path.exists(sock_path): |
|||
os.unlink(sock_path) |
|||
self._socket.bind(sock_path) |
|||
|
|||
self._rxhandle = threading.Thread(target=self._rx_thread) |
|||
self._rxhandle.ready = threading.Event() |
|||
self._rxhandle.start() |
|||
|
|||
# wait for rx thread to start |
|||
self._rxhandle.ready.wait() |
|||
|
|||
def __del__(self): |
|||
os.remove(self._sock_path) |
|||
self._socket.close() |
|||
|
|||
def _rx_thread(self): |
|||
self._rxhandle.ready.set() |
|||
while (True): |
|||
try: |
|||
data, addr = self._socket.recvfrom(1000) |
|||
data = data.decode('ascii') |
|||
resp = self._process_data(data) |
|||
except OSError: |
|||
break |
|||
except: |
|||
print("Exception:", sys.exc_info()[0]) |
|||
break |
|||
if resp: |
|||
self._socket.sendto(bytearray(resp, 'UTF-8'), addr) |
|||
|
|||
def _read_config(self, file): |
|||
self._database = {} |
|||
with open(file) as f: |
|||
for line in f: |
|||
if line[0] == '#': |
|||
continue |
|||
else: |
|||
data = line.strip('\n').split(':') |
|||
self._database[data[0]] = data[1:] |
|||
|
|||
def _process_data(self, data): |
|||
if data[:12] == "SIM-REQ-AUTH": |
|||
# SIM requests just return the stored values for the IMSI |
|||
imsi, num_chals = data[13:].split(' ') |
|||
if not imsi or not num_chals: |
|||
return "ERROR" |
|||
|
|||
data = self._database.get(imsi, None) |
|||
if not data: |
|||
return "ERROR" |
|||
|
|||
response = "SIM-RESP-AUTH %s" % imsi |
|||
response += (' ' + ':'.join(data))*int(num_chals) |
|||
|
|||
return response |
|||
elif data[:12] == "AKA-REQ-AUTH": |
|||
# AKA requests must compute the milenage parameters for the IMSI |
|||
imsi = data.split(' ')[1] |
|||
data = self._database.get(imsi, None) |
|||
if not data: |
|||
return "ERROR" |
|||
|
|||
# make sure this is an AKA entry |
|||
if len(data) < 4: |
|||
return "ERROR" |
|||
|
|||
k, opc, amf, sqn = data |
|||
|
|||
rand = self._bytetostring(os.urandom(16)) |
|||
|
|||
response = "AKA-RESP-AUTH %s " % imsi |
|||
|
|||
return response + self._get_milenage(opc, k, rand, sqn, amf) |
|||
elif data[:8] == "AKA-AUTS": |
|||
# sync error, parse out SQN and reset in database |
|||
imsi, auts, rand = data[9:].split(' ') |
|||
|
|||
entry = self._database.get(imsi, None) |
|||
if not entry: |
|||