1
# Copyright (C) 2004, 2005, 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21
from cStringIO import StringIO
24
from bzrlib import urlutils
25
from bzrlib.errors import (ConnectionError,
34
from bzrlib.tests import TestCase, TestCaseInTempDir
35
from bzrlib.transport import (_CoalescedOffset,
36
_get_protocol_handlers,
37
_get_transport_modules,
40
register_lazy_transport,
41
_set_protocol_handlers,
44
from bzrlib.transport.chroot import ChrootServer
45
from bzrlib.transport.memory import MemoryTransport
46
from bzrlib.transport.local import (LocalTransport,
47
EmulatedWin32LocalTransport)
50
# TODO: Should possibly split transport-specific tests into their own files.
53
class TestTransport(TestCase):
54
"""Test the non transport-concrete class functionality."""
56
def test__get_set_protocol_handlers(self):
57
handlers = _get_protocol_handlers()
58
self.assertNotEqual({}, handlers)
60
_set_protocol_handlers({})
61
self.assertEqual({}, _get_protocol_handlers())
63
_set_protocol_handlers(handlers)
65
def test_get_transport_modules(self):
66
handlers = _get_protocol_handlers()
67
class SampleHandler(object):
68
"""I exist, isnt that enough?"""
71
_set_protocol_handlers(my_handlers)
72
register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
73
register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
74
self.assertEqual([SampleHandler.__module__, 'bzrlib.transport.chroot'],
75
_get_transport_modules())
77
_set_protocol_handlers(handlers)
79
def test_transport_dependency(self):
80
"""Transport with missing dependency causes no error"""
81
saved_handlers = _get_protocol_handlers()
83
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
84
'BadTransportHandler')
86
get_transport('foo://fooserver/foo')
87
except UnsupportedProtocol, e:
89
self.assertEquals('Unsupported protocol'
90
' for url "foo://fooserver/foo":'
91
' Unable to import library "some_lib":'
92
' testing missing dependency', str(e))
94
self.fail('Did not raise UnsupportedProtocol')
96
# restore original values
97
_set_protocol_handlers(saved_handlers)
99
def test_transport_fallback(self):
100
"""Transport with missing dependency causes no error"""
101
saved_handlers = _get_protocol_handlers()
103
_set_protocol_handlers({})
104
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
105
'BackupTransportHandler')
106
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
107
'BadTransportHandler')
108
t = get_transport('foo://fooserver/foo')
109
self.assertTrue(isinstance(t, BackupTransportHandler))
111
_set_protocol_handlers(saved_handlers)
113
def test__combine_paths(self):
115
self.assertEqual('/home/sarah/project/foo',
116
t._combine_paths('/home/sarah', 'project/foo'))
117
self.assertEqual('/etc',
118
t._combine_paths('/home/sarah', '../../etc'))
119
self.assertEqual('/etc',
120
t._combine_paths('/home/sarah', '../../../etc'))
121
self.assertEqual('/etc',
122
t._combine_paths('/home/sarah', '/etc'))
125
class TestCoalesceOffsets(TestCase):
127
def check(self, expected, offsets, limit=0, fudge=0):
128
coalesce = Transport._coalesce_offsets
129
exp = [_CoalescedOffset(*x) for x in expected]
130
out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
131
self.assertEqual(exp, out)
133
def test_coalesce_empty(self):
136
def test_coalesce_simple(self):
137
self.check([(0, 10, [(0, 10)])], [(0, 10)])
139
def test_coalesce_unrelated(self):
140
self.check([(0, 10, [(0, 10)]),
142
], [(0, 10), (20, 10)])
144
def test_coalesce_unsorted(self):
145
self.check([(20, 10, [(0, 10)]),
147
], [(20, 10), (0, 10)])
149
def test_coalesce_nearby(self):
150
self.check([(0, 20, [(0, 10), (10, 10)])],
153
def test_coalesce_overlapped(self):
154
self.check([(0, 15, [(0, 10), (5, 10)])],
157
def test_coalesce_limit(self):
158
self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
159
(30, 10), (40, 10)]),
160
(60, 50, [(0, 10), (10, 10), (20, 10),
161
(30, 10), (40, 10)]),
162
], [(10, 10), (20, 10), (30, 10), (40, 10),
163
(50, 10), (60, 10), (70, 10), (80, 10),
164
(90, 10), (100, 10)],
167
def test_coalesce_no_limit(self):
168
self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
169
(30, 10), (40, 10), (50, 10),
170
(60, 10), (70, 10), (80, 10),
172
], [(10, 10), (20, 10), (30, 10), (40, 10),
173
(50, 10), (60, 10), (70, 10), (80, 10),
174
(90, 10), (100, 10)])
176
def test_coalesce_fudge(self):
177
self.check([(10, 30, [(0, 10), (20, 10)]),
178
(100, 10, [(0, 10),]),
179
], [(10, 10), (30, 10), (100, 10)],
184
class TestMemoryTransport(TestCase):
186
def test_get_transport(self):
189
def test_clone(self):
190
transport = MemoryTransport()
191
self.assertTrue(isinstance(transport, MemoryTransport))
192
self.assertEqual("memory:///", transport.clone("/").base)
194
def test_abspath(self):
195
transport = MemoryTransport()
196
self.assertEqual("memory:///relpath", transport.abspath('relpath'))
198
def test_abspath_of_root(self):
199
transport = MemoryTransport()
200
self.assertEqual("memory:///", transport.base)
201
self.assertEqual("memory:///", transport.abspath('/'))
203
def test_abspath_of_relpath_starting_at_root(self):
204
transport = MemoryTransport()
205
self.assertEqual("memory:///foo", transport.abspath('/foo'))
207
def test_append_and_get(self):
208
transport = MemoryTransport()
209
transport.append_bytes('path', 'content')
210
self.assertEqual(transport.get('path').read(), 'content')
211
transport.append_file('path', StringIO('content'))
212
self.assertEqual(transport.get('path').read(), 'contentcontent')
214
def test_put_and_get(self):
215
transport = MemoryTransport()
216
transport.put_file('path', StringIO('content'))
217
self.assertEqual(transport.get('path').read(), 'content')
218
transport.put_bytes('path', 'content')
219
self.assertEqual(transport.get('path').read(), 'content')
221
def test_append_without_dir_fails(self):
222
transport = MemoryTransport()
223
self.assertRaises(NoSuchFile,
224
transport.append_bytes, 'dir/path', 'content')
226
def test_put_without_dir_fails(self):
227
transport = MemoryTransport()
228
self.assertRaises(NoSuchFile,
229
transport.put_file, 'dir/path', StringIO('content'))
231
def test_get_missing(self):
232
transport = MemoryTransport()
233
self.assertRaises(NoSuchFile, transport.get, 'foo')
235
def test_has_missing(self):
236
transport = MemoryTransport()
237
self.assertEquals(False, transport.has('foo'))
239
def test_has_present(self):
240
transport = MemoryTransport()
241
transport.append_bytes('foo', 'content')
242
self.assertEquals(True, transport.has('foo'))
244
def test_list_dir(self):
245
transport = MemoryTransport()
246
transport.put_bytes('foo', 'content')
247
transport.mkdir('dir')
248
transport.put_bytes('dir/subfoo', 'content')
249
transport.put_bytes('dirlike', 'content')
251
self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
252
self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
254
def test_mkdir(self):
255
transport = MemoryTransport()
256
transport.mkdir('dir')
257
transport.append_bytes('dir/path', 'content')
258
self.assertEqual(transport.get('dir/path').read(), 'content')
260
def test_mkdir_missing_parent(self):
261
transport = MemoryTransport()
262
self.assertRaises(NoSuchFile,
263
transport.mkdir, 'dir/dir')
265
def test_mkdir_twice(self):
266
transport = MemoryTransport()
267
transport.mkdir('dir')
268
self.assertRaises(FileExists, transport.mkdir, 'dir')
270
def test_parameters(self):
271
transport = MemoryTransport()
272
self.assertEqual(True, transport.listable())
273
self.assertEqual(False, transport.should_cache())
274
self.assertEqual(False, transport.is_readonly())
276
def test_iter_files_recursive(self):
277
transport = MemoryTransport()
278
transport.mkdir('dir')
279
transport.put_bytes('dir/foo', 'content')
280
transport.put_bytes('dir/bar', 'content')
281
transport.put_bytes('bar', 'content')
282
paths = set(transport.iter_files_recursive())
283
self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
286
transport = MemoryTransport()
287
transport.put_bytes('foo', 'content')
288
transport.put_bytes('bar', 'phowar')
289
self.assertEqual(7, transport.stat('foo').st_size)
290
self.assertEqual(6, transport.stat('bar').st_size)
293
class ChrootDecoratorTransportTest(TestCase):
294
"""Chroot decoration specific tests."""
296
def test_abspath(self):
297
# The abspath is always relative to the chroot_url.
298
server = ChrootServer(get_transport('memory:///foo/bar/'))
300
transport = get_transport(server.get_url())
301
self.assertEqual(server.get_url(), transport.abspath('/'))
303
subdir_transport = transport.clone('subdir')
304
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
307
def test_clone(self):
308
server = ChrootServer(get_transport('memory:///foo/bar/'))
310
transport = get_transport(server.get_url())
311
# relpath from root and root path are the same
312
relpath_cloned = transport.clone('foo')
313
abspath_cloned = transport.clone('/foo')
314
self.assertEqual(server, relpath_cloned.server)
315
self.assertEqual(server, abspath_cloned.server)
318
def test_chroot_url_preserves_chroot(self):
319
"""Calling get_transport on a chroot transport's base should produce a
320
transport with exactly the same behaviour as the original chroot
323
This is so that it is not possible to escape a chroot by doing::
324
url = chroot_transport.base
325
parent_url = urlutils.join(url, '..')
326
new_transport = get_transport(parent_url)
328
server = ChrootServer(get_transport('memory:///path/subpath'))
330
transport = get_transport(server.get_url())
331
new_transport = get_transport(transport.base)
332
self.assertEqual(transport.server, new_transport.server)
333
self.assertEqual(transport.base, new_transport.base)
336
def test_urljoin_preserves_chroot(self):
337
"""Using urlutils.join(url, '..') on a chroot URL should not produce a
338
URL that escapes the intended chroot.
340
This is so that it is not possible to escape a chroot by doing::
341
url = chroot_transport.base
342
parent_url = urlutils.join(url, '..')
343
new_transport = get_transport(parent_url)
345
server = ChrootServer(get_transport('memory:///path/'))
347
transport = get_transport(server.get_url())
349
InvalidURLJoin, urlutils.join, transport.base, '..')
353
class ChrootServerTest(TestCase):
355
def test_construct(self):
356
backing_transport = MemoryTransport()
357
server = ChrootServer(backing_transport)
358
self.assertEqual(backing_transport, server.backing_transport)
360
def test_setUp(self):
361
backing_transport = MemoryTransport()
362
server = ChrootServer(backing_transport)
364
self.assertTrue(server.scheme in _protocol_handlers.keys())
366
def test_tearDown(self):
367
backing_transport = MemoryTransport()
368
server = ChrootServer(backing_transport)
371
self.assertFalse(server.scheme in _protocol_handlers.keys())
373
def test_get_url(self):
374
backing_transport = MemoryTransport()
375
server = ChrootServer(backing_transport)
377
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
381
class ReadonlyDecoratorTransportTest(TestCase):
382
"""Readonly decoration specific tests."""
384
def test_local_parameters(self):
385
import bzrlib.transport.readonly as readonly
386
# connect to . in readonly mode
387
transport = readonly.ReadonlyTransportDecorator('readonly+.')
388
self.assertEqual(True, transport.listable())
389
self.assertEqual(False, transport.should_cache())
390
self.assertEqual(True, transport.is_readonly())
392
def test_http_parameters(self):
393
from bzrlib.tests.HttpServer import HttpServer
394
import bzrlib.transport.readonly as readonly
395
# connect to . via http which is not listable
396
server = HttpServer()
399
transport = get_transport('readonly+' + server.get_url())
400
self.failUnless(isinstance(transport,
401
readonly.ReadonlyTransportDecorator))
402
self.assertEqual(False, transport.listable())
403
self.assertEqual(True, transport.should_cache())
404
self.assertEqual(True, transport.is_readonly())
409
class FakeNFSDecoratorTests(TestCaseInTempDir):
410
"""NFS decorator specific tests."""
412
def get_nfs_transport(self, url):
413
import bzrlib.transport.fakenfs as fakenfs
414
# connect to url with nfs decoration
415
return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
417
def test_local_parameters(self):
418
# the listable, should_cache and is_readonly parameters
419
# are not changed by the fakenfs decorator
420
transport = self.get_nfs_transport('.')
421
self.assertEqual(True, transport.listable())
422
self.assertEqual(False, transport.should_cache())
423
self.assertEqual(False, transport.is_readonly())
425
def test_http_parameters(self):
426
# the listable, should_cache and is_readonly parameters
427
# are not changed by the fakenfs decorator
428
from bzrlib.tests.HttpServer import HttpServer
429
# connect to . via http which is not listable
430
server = HttpServer()
433
transport = self.get_nfs_transport(server.get_url())
434
self.assertIsInstance(
435
transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
436
self.assertEqual(False, transport.listable())
437
self.assertEqual(True, transport.should_cache())
438
self.assertEqual(True, transport.is_readonly())
442
def test_fakenfs_server_default(self):
443
# a FakeNFSServer() should bring up a local relpath server for itself
444
import bzrlib.transport.fakenfs as fakenfs
445
server = fakenfs.FakeNFSServer()
448
# the url should be decorated appropriately
449
self.assertStartsWith(server.get_url(), 'fakenfs+')
450
# and we should be able to get a transport for it
451
transport = get_transport(server.get_url())
452
# which must be a FakeNFSTransportDecorator instance.
453
self.assertIsInstance(
454
transport, fakenfs.FakeNFSTransportDecorator)
458
def test_fakenfs_rename_semantics(self):
459
# a FakeNFS transport must mangle the way rename errors occur to
460
# look like NFS problems.
461
transport = self.get_nfs_transport('.')
462
self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
464
self.assertRaises(bzrlib.errors.ResourceBusy,
465
transport.rename, 'from', 'to')
468
class FakeVFATDecoratorTests(TestCaseInTempDir):
469
"""Tests for simulation of VFAT restrictions"""
471
def get_vfat_transport(self, url):
472
"""Return vfat-backed transport for test directory"""
473
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
474
return FakeVFATTransportDecorator('vfat+' + url)
476
def test_transport_creation(self):
477
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
478
transport = self.get_vfat_transport('.')
479
self.assertIsInstance(transport, FakeVFATTransportDecorator)
481
def test_transport_mkdir(self):
482
transport = self.get_vfat_transport('.')
483
transport.mkdir('HELLO')
484
self.assertTrue(transport.has('hello'))
485
self.assertTrue(transport.has('Hello'))
487
def test_forbidden_chars(self):
488
transport = self.get_vfat_transport('.')
489
self.assertRaises(ValueError, transport.has, "<NU>")
492
class BadTransportHandler(Transport):
493
def __init__(self, base_url):
494
raise DependencyNotPresent('some_lib', 'testing missing dependency')
497
class BackupTransportHandler(Transport):
498
"""Test transport that works as a backup for the BadTransportHandler"""
502
class TestTransportImplementation(TestCaseInTempDir):
503
"""Implementation verification for transports.
505
To verify a transport we need a server factory, which is a callable
506
that accepts no parameters and returns an implementation of
507
bzrlib.transport.Server.
509
That Server is then used to construct transport instances and test
510
the transport via loopback activity.
512
Currently this assumes that the Transport object is connected to the
513
current working directory. So that whatever is done
514
through the transport, should show up in the working
515
directory, and vice-versa. This is a bug, because its possible to have
516
URL schemes which provide access to something that may not be
517
result in storage on the local disk, i.e. due to file system limits, or
518
due to it being a database or some other non-filesystem tool.
520
This also tests to make sure that the functions work with both
521
generators and lists (assuming iter(list) is effectively a generator)
525
super(TestTransportImplementation, self).setUp()
526
self._server = self.transport_server()
528
self.addCleanup(self._server.tearDown)
530
def get_transport(self):
531
"""Return a connected transport to the local directory."""
532
base_url = self._server.get_url()
533
# try getting the transport via the regular interface:
534
t = get_transport(base_url)
535
if not isinstance(t, self.transport_class):
536
# we did not get the correct transport class type. Override the
537
# regular connection behaviour by direct construction.
538
t = self.transport_class(base_url)
542
class TestLocalTransports(TestCase):
544
def test_get_transport_from_abspath(self):
545
here = os.path.abspath('.')
546
t = get_transport(here)
547
self.assertIsInstance(t, LocalTransport)
548
self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
550
def test_get_transport_from_relpath(self):
551
here = os.path.abspath('.')
552
t = get_transport('.')
553
self.assertIsInstance(t, LocalTransport)
554
self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
556
def test_get_transport_from_local_url(self):
557
here = os.path.abspath('.')
558
here_url = urlutils.local_path_to_url(here) + '/'
559
t = get_transport(here_url)
560
self.assertIsInstance(t, LocalTransport)
561
self.assertEquals(t.base, here_url)
564
class TestWin32LocalTransport(TestCase):
566
def test_unc_clone_to_root(self):
567
# Win32 UNC path like \\HOST\path
568
# clone to root should stop at least at \\HOST part
570
t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
573
self.assertEquals(t.base, 'file://HOST/')
574
# make sure we reach the root
576
self.assertEquals(t.base, 'file://HOST/')