Commit f00647cf authored by OmegaPhil's avatar OmegaPhil

Remove patches no longer in Debian and refresh qemu-testfix

parent bdd01e0b
diff --git a/src/tests/integration-test b/src/tests/integration-test
index 7596258..5c02462 100755
--- a/src/tests/integration-test
+++ b/src/tests/integration-test
@@ -4,10 +4,10 @@
#
# Run in udisks built tree to test local built binaries (needs
# --localstatedir=/var), or from anywhere else to test system installed
-# binaries.
+# binaries.
#
# Usage:
-# - Run all tests:
+# - Run all tests:
# src/tests/integration-test
# - Run only a particular class of tests:
# src/tests/integration-test Drive
@@ -32,7 +32,6 @@
import sys
import os
-import pwd
srcdir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
libdir = os.path.join(srcdir, 'udisks', '.libs')
@@ -43,8 +42,8 @@ libdir = os.path.join(srcdir, 'udisks', '.libs')
if 'LD_LIBRARY_PATH' not in os.environ and os.path.isdir(libdir):
os.environ['LD_LIBRARY_PATH'] = libdir
os.environ['GI_TYPELIB_PATH'] = '%s/udisks:%s' % (
- srcdir,
- os.environ.get('GI_TYPELIB_PATH', ''))
+ srcdir,
+ os.environ.get('GI_TYPELIB_PATH', ''))
os.execv(sys.argv[0], sys.argv)
assert False, 'not expecting to land here'
@@ -64,28 +63,21 @@ from gi.repository import GLib, Gio, UDisks
sys.path.insert(0, os.path.dirname(__file__))
import test_polkitd
-#GI_TYPELIB_PATH=udisks LD_LIBRARY_PATH=udisks/.libs
-VDEV_SIZE = 300000000 # size of virtual test device
+# GI_TYPELIB_PATH=udisks LD_LIBRARY_PATH=udisks/.libs
+VDEV_SIZE = 300000000 # size of virtual test device
# Those file systems are known to have a broken handling of permissions, in
# particular the executable bit
BROKEN_PERMISSIONS_FS = ['ntfs', 'exfat']
-# Some D-BUS API methods cause properties to not be up to date yet when a
-# method call finishes, thus we do an udevadm settle as a workaround. Those
-# methods should eventually get fixed properly, but it's unnerving to have
-# the tests fail on them when you are working on something else. This flag
-# gets set by the --no-workarounds option to disable those syncs, so that these
-# race conditions can be fixed.
-workaround_syncs = False
-
no_options = GLib.Variant('a{sv}', {})
+
# ----------------------------------------------------------------------------
class UDisksTestCase(unittest.TestCase):
'''Base class for udisks test cases.
-
+
This provides static functions which are useful for all test cases.
'''
daemon = None
@@ -106,7 +98,7 @@ class UDisksTestCase(unittest.TestCase):
# run from local build tree if we are in one, otherwise use system instance
klass.daemon_path = os.path.join(srcdir, 'src', 'udisksd')
- if (os.access (klass.daemon_path, os.X_OK)):
+ if (os.access(klass.daemon_path, os.X_OK)):
print('Testing binaries from local build tree')
klass.check_build_tree_config()
else:
@@ -144,7 +136,7 @@ class UDisksTestCase(unittest.TestCase):
def cleanup(klass):
'''stop daemon again and clean up test environment'''
- subprocess.call(['umount', klass.device], stderr=subprocess.PIPE) # if a test failed
+ subprocess.call(['umount', klass.device], stderr=subprocess.PIPE) # if a test failed
klass.stop_daemon()
@@ -156,9 +148,10 @@ class UDisksTestCase(unittest.TestCase):
@classmethod
def start_daemon(klass):
- assert klass.daemon == None
+ assert klass.daemon is None
klass.daemon = subprocess.Popen([klass.daemon_path, '--replace'],
- stdout=klass.daemon_log, stderr=subprocess.STDOUT)
+ stdout=klass.daemon_log,
+ stderr=subprocess.STDOUT)
assert klass.daemon.pid, 'daemon failed to start'
# wait until the daemon has started up
@@ -167,7 +160,7 @@ class UDisksTestCase(unittest.TestCase):
while klass.manager is None and timeout > 0:
time.sleep(0.2)
klass.client = UDisks.Client.new_sync(None)
- assert klass.client != None
+ assert klass.client is not None
klass.manager = klass.client.get_manager()
timeout -= 1
assert klass.manager, 'daemon failed to start'
@@ -185,7 +178,7 @@ class UDisksTestCase(unittest.TestCase):
@classmethod
def sync(klass):
'''Wait until pending events finished processing.
-
+
This should only be called for situations where we genuinely have an
asynchronous response, like invoking a CLI program and waiting for
udev/udisks to catch up on the change events.
@@ -200,27 +193,11 @@ class UDisksTestCase(unittest.TestCase):
timeout -= 1
if timeout <= 0:
sys.stderr.write('[wait timeout!] ')
- sys.stderr.flush()
- klass.client.settle()
-
- @classmethod
- def sync_workaround(klass):
- '''Wait until pending events finished processing (bug workaround).
-
- This should be called for race conditions in the D-BUS API which cause
- properties to not be up to date yet when a method call finishes. Those
- should eventually get fixed properly, but it's unnerving to have the
- tests fail on them when you are working on something else.
-
- This sync is not done if running with --no-workarounds.
- '''
- if workaround_syncs:
- klass.sync()
@classmethod
def zero_device(klass):
- subprocess.call(['dd', 'if=/dev/zero', 'of='+klass.device, 'bs=10M'],
- stderr=subprocess.PIPE)
+ subprocess.call(['dd', 'if=/dev/zero', 'of=' + klass.device, 'bs=10M'],
+ stderr=subprocess.PIPE)
time.sleep(0.5)
klass.sync()
@@ -258,7 +235,7 @@ class UDisksTestCase(unittest.TestCase):
@classmethod
def udisks_filesystem(klass, partition=None, cd=False):
'''Get UDisksFilesystem object for test device or partition
-
+
Return None if there is no file system on that device.
If cd is True, return the CD device, otherwise the hard disk device.
@@ -297,24 +274,21 @@ class UDisksTestCase(unittest.TestCase):
# signature (mailed kzak about it)
if type == 'swap':
subprocess.check_call(['wipefs', '-a', klass.devname(partition)],
- stdout=subprocess.PIPE)
-
- mkcmd = { 'swap': 'mkswap',
- 'ntfs': 'mkntfs',
- }
- label_opt = { 'vfat': '-n',
- 'exfat': '-n',
- 'reiserfs': '-l',
- }
- extra_opt = { 'vfat': [ '-I', '-F', '32'],
- 'swap': ['-f'],
- 'xfs': ['-f'], # XFS complains if there's an existing FS, so force
- 'ext2': ['-F'], # ext* complains about using entire device, so force
- 'ext3': ['-F'],
- 'ext4': ['-F'],
- 'ntfs': ['-F'],
- 'reiserfs': ['-ff'],
- }
+ stdout=subprocess.PIPE)
+
+ mkcmd = {'swap': 'mkswap',
+ 'ntfs': 'mkntfs'}
+ label_opt = {'vfat': '-n',
+ 'exfat': '-n',
+ 'reiserfs': '-l'}
+ extra_opt = {'vfat': ['-I', '-F', '32'],
+ 'swap': ['-f'],
+ 'xfs': ['-f'], # XFS complains if there's an existing FS, so force
+ 'ext2': ['-F'], # ext* complains about using entire device, so force
+ 'ext3': ['-F'],
+ 'ext4': ['-F'],
+ 'ntfs': ['-F'],
+ 'reiserfs': ['-ff']}
cmd = [mkcmd.get(type, 'mkfs.' + type)] + extra_opt.get(type, [])
if label:
@@ -327,7 +301,7 @@ class UDisksTestCase(unittest.TestCase):
# tell us when they are done; so do a little kludge here to know how
# long we need to wait
subprocess.call(['udevadm', 'trigger', '--action=change',
- '--sysname-match=' + os.path.basename(klass.devname(partition))])
+ '--sysname-match=' + os.path.basename(klass.devname(partition))])
klass.sync()
@classmethod
@@ -336,7 +310,6 @@ class UDisksTestCase(unittest.TestCase):
block = klass.udisks_block(partition)
block.call_format_sync(type, options, None)
- klass.sync_workaround()
@classmethod
def retry_busy(klass, fn, *args):
@@ -347,7 +320,7 @@ class UDisksTestCase(unittest.TestCase):
try:
return fn(*args)
except GLib.GError as e:
- if not 'UDisks2.Error.DeviceBusy' in e.message:
+ if 'UDisks2.Error.DeviceBusy' not in e.message:
raise
sys.stderr.write('[busy] ')
time.sleep(0.3)
@@ -360,8 +333,7 @@ class UDisksTestCase(unittest.TestCase):
# read make variables
make_vars = {}
var_re = re.compile('^([a-zA-Z_]+) = (.*)$')
- make = subprocess.Popen(['make', '-p', '/dev/null'],
- stdout=subprocess.PIPE)
+ make = subprocess.Popen(['make', '-p', '/dev/null'], stdout=subprocess.PIPE)
for l in make.stdout:
l = l.decode('UTF-8')
m = var_re.match(l)
@@ -382,15 +354,15 @@ class UDisksTestCase(unittest.TestCase):
# check localstatedir
for d in (os.path.join(make_vars['localstatedir'], 'run', 'udisks2'),
- os.path.join(make_vars['localstatedir'], 'lib', 'udisks2')):
+ os.path.join(make_vars['localstatedir'], 'lib', 'udisks2')):
if not os.path.exists(d):
sys.stderr.write('The directory %s does not exist; please create it before running these tests.\n' % d)
sys.exit(0)
-
+
@classmethod
def setup_vdev(klass):
'''create virtual test devices
-
+
It is zeroed out initially.
Return a pair (writable HD device path, readonly CD device path).
@@ -414,7 +386,7 @@ class UDisksTestCase(unittest.TestCase):
# craete a fake SCSI hard drive
assert subprocess.call(['modprobe', 'scsi_debug', 'dev_size_mb=%i' % (
- VDEV_SIZE/1048576)]) == 0, 'Failure to modprobe scsi_debug'
+ VDEV_SIZE / 1048576)]) == 0, 'Failure to modprobe scsi_debug'
# wait until drive got created
rw_dirs = []
@@ -460,8 +432,7 @@ class UDisksTestCase(unittest.TestCase):
'''release and remove virtual test device'''
klass.remove_device(device)
- assert subprocess.call(['rmmod', 'scsi_debug']) == 0, \
- 'Failure to rmmod scsi_debug'
+ assert subprocess.call(['rmmod', 'scsi_debug']) == 0, 'Failure to rmmod scsi_debug'
@classmethod
def remove_device(klass, device):
@@ -475,7 +446,7 @@ class UDisksTestCase(unittest.TestCase):
while os.path.exists(device):
time.sleep(0.1)
klass.sync()
- time.sleep(0.5) # TODO
+ time.sleep(0.5) # TODO
@classmethod
def readd_devices(klass):
@@ -490,6 +461,28 @@ class UDisksTestCase(unittest.TestCase):
time.sleep(0.5)
klass.sync()
+ def assertEventually(self, fn, value):
+ '''Check that an function is eventually equal to value.
+
+ This is mostly meant for checking object properties, as these are
+ updated asynchronously. This retries up to 10 times.
+ '''
+ retries = 10
+ while retries > 0:
+ if fn() == value:
+ break
+ retries -= 1
+ time.sleep(0.1)
+ self.sync()
+
+ self.assertEqual(fn(), value)
+
+ def assertProperty(self, obj, name, value):
+ '''Check that an object's property is eventually equal to value'''
+
+ self.assertEventually(lambda: obj.get_property(name), value)
+
+
# ----------------------------------------------------------------------------
class Manager(UDisksTestCase):
@@ -508,7 +501,7 @@ class Manager(UDisksTestCase):
fd_list = Gio.UnixFDList.new_from_array([f.fileno()])
(path, out_fd_list) = self.manager.call_loop_setup_sync(
- GLib.Variant('h', 0), # fd index
+ GLib.Variant('h', 0), # fd index
no_options,
fd_list,
None)
@@ -543,7 +536,7 @@ class Manager(UDisksTestCase):
fd_list = Gio.UnixFDList.new_from_array([f.fileno()])
(path, out_fd_list) = self.manager.call_loop_setup_sync(
- GLib.Variant('h', 0), # fd index
+ GLib.Variant('h', 0), # fd index
GLib.Variant('a{sv}', {'read-only': GLib.Variant('b', True)}),
fd_list,
None)
@@ -560,16 +553,16 @@ class Manager(UDisksTestCase):
self.assertEqual(loop.get_property('backing-file'), f.name)
# can't format due to permission error
- self.assertRaises(GLib.GError, block.call_format_sync, 'ext2',
- no_options, None)
+ self.assertRaises(GLib.GError, block.call_format_sync, 'ext2', no_options, None)
- self.assertEqual(block.get_property('id-label'), '')
- self.assertEqual(block.get_property('id-usage'), '')
- self.assertEqual(block.get_property('id-type'), '')
+ self.assertProperty(block, 'id-label', '')
+ self.assertProperty(block, 'id-usage', '')
+ self.assertProperty(block, 'id-type', '')
finally:
self.client.settle()
loop.call_delete_sync(no_options, None)
+
# ----------------------------------------------------------------------------
class Drive(UDisksTestCase):
@@ -584,13 +577,14 @@ class Drive(UDisksTestCase):
self.assertEqual(self.drive.get_property('model'), 'scsi_debug')
self.assertEqual(self.drive.get_property('vendor'), 'Linux')
- self.assertAlmostEqual(self.drive.get_property('size')/1.e6, VDEV_SIZE/1.e6, 0)
+ self.assertAlmostEqual(self.drive.get_property('size') / 1.e6, VDEV_SIZE / 1.e6, 0)
self.assertEqual(self.drive.get_property('media-available'), True)
self.assertEqual(self.drive.get_property('optical'), False)
self.assertNotEqual(len(self.drive.get_property('serial')), 0)
self.assertNotEqual(len(self.drive.get_property('revision')), 0)
+
# ----------------------------------------------------------------------------
class FS(UDisksTestCase):
@@ -604,20 +598,20 @@ class FS(UDisksTestCase):
def tearDown(self):
if subprocess.call(['umount', self.device], stderr=subprocess.PIPE) == 0:
sys.stderr.write('[cleanup unmount] ')
- shutil.rmtree (self.workdir)
+ shutil.rmtree(self.workdir)
def test_zero(self):
'''properties of zeroed out device'''
self.zero_device()
- self.assertEqual(self.block.get_property('device'), self.device)
+ self.assertProperty(self.block, 'device', self.device)
self.assertTrue('Linux_scsi_debug' in self.block.get_property('drive'))
+ self.assertProperty(self.block, 'id-label', '')
self.assertEqual(self.block.get_property('hint-system'), True)
- self.assertEqual(self.block.get_property('id-label'), '')
self.assertEqual(self.block.get_property('id-usage'), '')
self.assertEqual(self.block.get_property('id-type'), '')
self.assertEqual(self.block.get_property('id-uuid'), '')
- self.assertAlmostEqual(self.block.get_property('size')/1.e6, VDEV_SIZE/1.e6, 0)
+ self.assertAlmostEqual(self.block.get_property('size') / 1.e6, VDEV_SIZE / 1.e6, 0)
obj = self.client.get_object(self.block.get_object_path())
self.assertEqual(obj.get_property('filesystem'), None)
self.assertEqual(obj.get_property('partition'), None)
@@ -676,16 +670,16 @@ class FS(UDisksTestCase):
self.mkfs('ext4', 'foo')
block = self.udisks_block()
- self.assertEqual(block.get_property('id-usage'), 'filesystem')
- self.assertEqual(block.get_property('id-type'), 'ext4')
- self.assertEqual(block.get_property('id-label'), 'foo')
+ self.assertProperty(block, 'id-usage', 'filesystem')
+ self.assertProperty(block, 'id-type', 'ext4')
+ self.assertProperty(block, 'id-label', 'foo')
self.assertNotEqual(self.udisks_filesystem(), None)
self.fs_create(None, 'empty', no_options)
- self.assertEqual(block.get_property('id-usage'), '')
- self.assertEqual(block.get_property('id-type'), '')
- self.assertEqual(block.get_property('id-label'), '')
+ self.assertProperty(block, 'id-usage', '')
+ self.assertProperty(block, 'id-type', '')
+ self.assertProperty(block, 'id-label', '')
self.assertEqual(self.udisks_filesystem(), None)
def test_create_fs_unknown_type(self):
@@ -729,18 +723,16 @@ class FS(UDisksTestCase):
# after putting it back, it should be mountable again
self.readd_devices()
fs = self.udisks_filesystem()
- self.assertEqual(fs.get_property('mount-points'), [])
+ self.assertProperty(fs, 'mount-points', [])
mount_path = fs.call_mount_sync(no_options, None)
self.assertTrue(mount_path.endswith('udiskstest'))
self.assertTrue('/media/' in mount_path)
self.assertTrue(self.is_mountpoint(mount_path))
- self.client.settle()
- self.assertEqual(fs.get_property('mount-points'), [mount_path])
+ self.assertProperty(fs, 'mount-points', [mount_path])
self.retry_busy(fs.call_unmount_sync, no_options, None)
- self.client.settle()
- self.assertEqual(fs.get_property('mount-points'), [])
+ self.assertProperty(fs, 'mount-points', [])
def test_existing_manual_mount_point(self):
'''fs: does not reuse existing manual mount point'''
@@ -753,8 +745,7 @@ class FS(UDisksTestCase):
self.assertTrue(mount_path.endswith('udiskstest'))
self.retry_busy(fs.call_unmount_sync, no_options, None)
- self.client.settle()
- self.assertEqual(fs.get_property('mount-points'), [])
+ self.assertProperty(fs, 'mount-points', [])
# cleans up mountpoint
self.assertFalse(os.path.exists(mount_path))
@@ -766,8 +757,7 @@ class FS(UDisksTestCase):
try:
new_mount_path = fs.call_mount_sync(no_options, None)
self.retry_busy(fs.call_unmount_sync, no_options, None)
- self.client.settle()
- self.assertEqual(fs.get_property('mount-points'), [])
+ self.assertProperty(fs, 'mount-points', [])
self.assertEqual(new_mount_path, mount_path + '1')
finally:
os.rmdir(mount_path)
@@ -795,8 +785,7 @@ class FS(UDisksTestCase):
fs = self.udisks_filesystem()
new_mount_path = fs.call_mount_sync(no_options, None)
self.retry_busy(fs.call_unmount_sync, no_options, None)
- self.client.settle()
- self.assertEqual(fs.get_property('mount-points'), [])
+ self.assertProperty(fs, 'mount-points', [])
self.assertEqual(new_mount_path, mount_path)
def _do_fs_check(self, type):
@@ -807,7 +796,7 @@ class FS(UDisksTestCase):
mkfs = 'mkfs.' + type
if type != 'swap' and subprocess.call(['which', mkfs],
- stdout=subprocess.PIPE) != 0:
+ stdout=subprocess.PIPE) != 0:
sys.stderr.write('[no %s, skip] ' % mkfs)
# check correct D-Bus exception
@@ -849,9 +838,8 @@ class FS(UDisksTestCase):
block = self.udisks_block()
- self.assertEqual(block.get_property('id-usage'), (type == 'swap') and 'other' or 'filesystem')
-
- self.assertEqual(block.get_property('id-type'), type)
+ self.assertProperty(block, 'id-usage', (type == 'swap') and 'other' or 'filesystem')
+ self.assertProperty(block, 'id-type', type)
l = block.get_property('id-label')
if type == 'vfat':
l = l.lower() # VFAT is case insensitive
@@ -875,7 +863,7 @@ class FS(UDisksTestCase):
# mount it
if type == 'ntfs' and subprocess.call(['which', 'mount.ntfs-3g'],
- stdout=subprocess.PIPE) == 0:
+ stdout=subprocess.PIPE) == 0:
# prefer mount.ntfs-3g if we have it (on Debian; Ubuntu
# defaults to ntfs-3g if installed); TODO: check other distros
mount_prog = 'mount.ntfs-3g'
@@ -888,19 +876,16 @@ class FS(UDisksTestCase):
return
self.assertEqual(ret, 0)
- time.sleep(0.5)
- self.sync()
- self.assertEqual(fs.get_property('mount-points'), [self.workdir])
+ self.assertProperty(fs, 'mount-points', [self.workdir])
# unmount it
subprocess.call(['umount', self.workdir])
- self.sync()
- self.assertEqual(fs.get_property('mount-points'), [])
+ self.assertProperty(fs, 'mount-points', [])
def _do_udisks_check(self, type, label=None):
'''udisks API correctly changes file system'''
- # create fs
+ # create fs
if label is not None:
options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', label)})
else:
@@ -917,12 +902,13 @@ class FS(UDisksTestCase):
self.assertEqual(l, label or '')
block = self.udisks_block()
- self.assertEqual(block.get_property('id-usage'), (type == 'swap') and 'other' or 'filesystem')
- self.assertEqual(block.get_property('id-type'), type)
- l = block.get_property('id-label')
- if type == 'vfat':
- l = l.lower() # VFAT is case insensitive
- self.assertEqual(l, label or '')
+ self.assertProperty(block, 'id-usage', (type == 'swap') and 'other' or 'filesystem')
+ self.assertProperty(block, 'id-type', type)
+ if type == 'vfat' and label:
+ # VFAT is case insensitive
+ self.assertEventually(lambda: block.get_property('id-label').lower(), label.lower())
+ else:
+ self.assertProperty(block, 'id-label', label or '')
if type == 'swap':
return
@@ -945,9 +931,11 @@ class FS(UDisksTestCase):
else:
self.assertTrue(mount_path.endswith(label))
- self.sync()
- self.assertEqual(fs.get_property('mount-points'), [mount_path])
self.assertTrue(self.is_mountpoint(mount_path))
+ # FIXME: this should work on the existing fs object, but doesn't!
+ self.sync()
+ fs = self.udisks_filesystem()
+ self.assertProperty(fs, 'mount-points', [mount_path])
# no ownership taken, should be root owned
st = os.stat(mount_path)
@@ -958,26 +946,25 @@ class FS(UDisksTestCase):
# unmount
self.retry_busy(fs.call_unmount_sync, no_options, None)
self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
- self.assertEqual(fs.get_property('mount-points'), [mount_path])
+ self.assertProperty(fs, 'mount-points', [])
# create fs with taking ownership (daemon:mail == 1:8)
- #if supports_unix_owners:
- # options.append('take_ownership_uid=1')
- # options.append('take_ownership_gid=8')
- # self.fs_create(None, type, options)
- # mount_path = iface.FilesystemMount('', [])
- # st = os.stat(mount_path)
- # self.assertEqual((st.st_uid, st.st_gid), (1, 8))
- # self.retry_busy(self.partition_iface().FilesystemUnmount, [])
- # self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
+ # if supports_unix_owners:
+ # options.append('take_ownership_uid=1')
+ # options.append('take_ownership_gid=8')
+ # self.fs_create(None, type, options)
+ # mount_path = iface.FilesystemMount('', [])
+ # st = os.stat(mount_path)
+ # self.assertEqual((st.st_uid, st.st_gid), (1, 8))
+ # self.retry_busy(self.partition_iface().FilesystemUnmount, [])
+ # self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
# change label
supported = True
l = 'n"a\m\\"e' + type
if type == 'vfat':
# VFAT does not support some characters
- self.assertRaises(GLib.GError, fs.call_set_label_sync, l,
- no_options, None)
+ self.assertRaises(GLib.GError, fs.call_set_label_sync, l, no_options, None)
l = "n@a$me"
try:
fs.call_set_label_sync(l, no_options, None)
@@ -992,24 +979,22 @@ class FS(UDisksTestCase):
if supported:
block = self.udisks_block()
blkid_label = self.blkid().get('ID_FS_LABEL_ENC', '').replace('\\x22', '"').replace(
- '\\x5c', '\\').replace('\\x24', '$')
- self.sync_workaround()
+ '\\x5c', '\\').replace('\\x24', '$')
if type == 'vfat':
# EXFAIL: often (but not always) the label appears in all upper case
self.assertEqual(blkid_label.upper(), l.upper())
- self.assertEqual(block.get_property('id-label').upper(), l.upper())
+ self.assertEventually(lambda: block.get_property('id-label').upper(), l.upper())
else:
self.assertEqual(blkid_label, l)
- self.assertEqual(block.get_property('id-label'), l)
+ self.assertProperty(block, 'id-label', l)
# test setting empty label
fs.call_set_label_sync('', no_options, None)
- self.sync_workaround()
self.assertEqual(self.blkid().get('ID_FS_LABEL_ENC', ''), '')
- self.assertEqual(block.get_property('id-label'), '')
+ self.assertProperty(block, 'id-label', '')
# check fs - Not implemented in udisks yet
- #self.assertEqual(iface.FilesystemCheck([]), True)
+ # self.assertEqual(iface.FilesystemCheck([]), True)
# check mounting of a read-only device
# this is known-broken for reiserfs and xfs right now:
@@ -1019,7 +1004,7 @@ class FS(UDisksTestCase):
# the scsi_debug CD drive content is the same as for the HD drive, but
# udev does not know about this; so give it a nudge to re-probe
subprocess.call(['udevadm', 'trigger', '--action=change',
- '--sysname-match=' + os.path.basename(self.cd_device)])
+ '--sysname-match=' + os.path.basename(self.cd_device)])
self.sync()
self.sync()
cd_fs = self.udisks_filesystem(cd=True)
@@ -1027,15 +1012,14 @@ class FS(UDisksTestCase):
mount_path = cd_fs.call_mount_sync(no_options, None)
try:
self.assertTrue('/media/' in mount_path)
- self.sync()
- self.assertEqual(cd_fs.get_property('mount-points'), [mount_path])
+ self.assertProperty(cd_fs, 'mount-points', [mount_path])
self.assertTrue(self.is_mountpoint(mount_path))
- self.assertEqual(self.udisks_block(cd=True).get_property('read-only'), True)
+ self.assertProperty(self.udisks_block(cd=True), 'read-only', True)
finally:
self.retry_busy(cd_fs.call_unmount_sync, no_options, None)
self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
- self.assertEqual(cd_fs.get_property('mount-points'), [mount_path])
+ self.assertProperty(cd_fs, 'mount-points', [])
def _do_file_perms_checks(self, type, mount_point):
'''Check for permissions for data files and executables.
@@ -1071,14 +1055,15 @@ class FS(UDisksTestCase):
self.assertTrue(os.access(f, os.W_OK))
self.assertTrue(os.access(f, os.X_OK))
-## ----------------------------------------------------------------------------
+
+# ----------------------------------------------------------------------------
class Smart(UDisksTestCase):
'''Check SMART operation.'''
def test_sda(self):
'''SMART status of first internal hard disk
-
+
This is a best-effort readonly test.
'''
hd = '/dev/sda'
@@ -1088,13 +1073,14 @@ class Smart(UDisksTestCase):
return
has_smart = subprocess.call(['skdump', '--can-smart', hd],
- stdout=subprocess.PIPE, stderr=subprocess.STDOUT) == 0
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT) == 0
block = self.client.get_block_for_dev(os.stat(hd).st_rdev)
self.assertNotEqual(block, None)
drive = self.client.get_drive_for_block(block)
ata = self.client.get_object(drive.get_object_path()).get_property('drive-ata')
- self.assertEqual(ata != None, has_smart)
+ self.assertEqual(ata is not None, has_smart)
if has_smart:
sys.stderr.write('[avail] ')
@@ -1138,8 +1124,8 @@ class Luks(UDisksTestCase):
self.fs_create(None, 'ext4', GLib.Variant('a{sv}', {
'encrypt.passphrase': GLib.Variant('s', 's3kr1t'),
- 'label': GLib.Variant('s', 'treasure'),
- }))
+ 'label': GLib.Variant('s', 'treasure')}))
+ self.client.settle()
try:
block = self.udisks_block()
@@ -1158,15 +1144,15 @@ class Luks(UDisksTestCase):
# check whether we can lock/unlock; we also need this to get the
# cleartext device
encrypted.call_lock_sync(no_options, None)
- self.assertRaises(GLib.GError, encrypted.call_lock_sync,
- no_options, None)
-
+ self.assertRaises(GLib.GError, encrypted.call_lock_sync,
+ no_options, None)
+
# wrong password
- self.assertRaises(GLib.GError, encrypted.call_unlock_sync,
- 'h4ckpassword', no_options, None)
+ self.assertRaises(GLib.GError, encrypted.call_unlock_sync,
+ 'h4ckpassword', no_options, None)
# right password
- clear_path = encrypted.call_unlock_sync('s3kr1t',
- no_options, None)
+ clear_path = encrypted.call_unlock_sync('s3kr1t',
+ no_options, None)
# check cleartext device info
clear_obj = self.client.get_object(clear_path)
@@ -1179,14 +1165,14 @@ class Luks(UDisksTestCase):
clear_dev = clear_block.get_property('device')
self.assertNotEqual(clear_dev, None)
self.assertEqual(clear_block.get_property('id-uuid'),
- self.blkid(device=clear_dev)['ID_FS_UUID'])
+ self.blkid(device=clear_dev)['ID_FS_UUID'])
clear_fs = clear_obj.get_property('filesystem')
self.assertEqual(clear_fs.get_property('mount-points'), [])
# check that we do not leak key information
udev_dump = subprocess.Popen(['udevadm', 'info', '--export-db'],
- stdout=subprocess.PIPE)
+ stdout=subprocess.PIPE)
out = udev_dump.communicate()[0]
self.assertFalse(b's3kr1t' in out, 'password in udev properties')
self.assertFalse(b'essiv:sha' in out, 'key information in udev properties')
@@ -1202,8 +1188,7 @@ class Luks(UDisksTestCase):
crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
encrypted = crypt_obj.get_property('encrypted')
- path = encrypted.call_unlock_sync('s3kr1t',
- no_options, None)
+ path = encrypted.call_unlock_sync('s3kr1t', no_options, None)
self.client.settle()
obj = self.client.get_object(path)
fs = obj.get_property('filesystem')
@@ -1216,8 +1201,7 @@ class Luks(UDisksTestCase):
self.assertTrue('/media/' in mount_path)
self.assertTrue(mount_path.endswith('treasure'))
self.assertTrue(self.is_mountpoint(mount_path))
- self.client.settle()
- self.assertEqual(fs.get_property('mount-points'), [mount_path])
+ self.assertProperty(fs, 'mount-points', [mount_path])
# can't lock, busy
try:
@@ -1242,8 +1226,8 @@ class Luks(UDisksTestCase):