105
83
self.fail('Did not raise UnsupportedProtocol')
107
85
# restore original values
108
transport._set_protocol_handlers(saved_handlers)
86
_set_protocol_handlers(saved_handlers)
110
88
def test_transport_fallback(self):
111
89
"""Transport with missing dependency causes no error"""
112
saved_handlers = transport._get_protocol_handlers()
90
saved_handlers = _get_protocol_handlers()
114
transport._clear_protocol_handlers()
115
transport.register_transport_proto('foo')
116
transport.register_lazy_transport(
117
'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
118
transport.register_lazy_transport(
119
'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
120
t = transport.get_transport('foo://fooserver/foo')
92
_set_protocol_handlers({})
93
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
94
'BackupTransportHandler')
95
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
96
'BadTransportHandler')
97
t = get_transport('foo://fooserver/foo')
121
98
self.assertTrue(isinstance(t, BackupTransportHandler))
123
transport._set_protocol_handlers(saved_handlers)
125
def test_ssh_hints(self):
126
"""Transport ssh:// should raise an error pointing out bzr+ssh://"""
128
transport.get_transport('ssh://fooserver/foo')
129
except errors.UnsupportedProtocol, e:
131
self.assertEquals('Unsupported protocol'
132
' for url "ssh://fooserver/foo":'
133
' bzr supports bzr+ssh to operate over ssh,'
134
' use "bzr+ssh://fooserver/foo".',
137
self.fail('Did not raise UnsupportedProtocol')
139
def test_LateReadError(self):
140
"""The LateReadError helper should raise on read()."""
141
a_file = transport.LateReadError('a path')
144
except errors.ReadError, error:
145
self.assertEqual('a path', error.path)
146
self.assertRaises(errors.ReadError, a_file.read, 40)
149
def test__combine_paths(self):
150
t = transport.Transport('/')
151
self.assertEqual('/home/sarah/project/foo',
152
t._combine_paths('/home/sarah', 'project/foo'))
153
self.assertEqual('/etc',
154
t._combine_paths('/home/sarah', '../../etc'))
155
self.assertEqual('/etc',
156
t._combine_paths('/home/sarah', '../../../etc'))
157
self.assertEqual('/etc',
158
t._combine_paths('/home/sarah', '/etc'))
160
def test_local_abspath_non_local_transport(self):
161
# the base implementation should throw
162
t = memory.MemoryTransport()
163
e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
164
self.assertEqual('memory:///t is not a local path.', str(e))
167
class TestCoalesceOffsets(tests.TestCase):
169
def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
170
coalesce = transport.Transport._coalesce_offsets
171
exp = [transport._CoalescedOffset(*x) for x in expected]
172
out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
174
self.assertEqual(exp, out)
176
def test_coalesce_empty(self):
179
def test_coalesce_simple(self):
180
self.check([(0, 10, [(0, 10)])], [(0, 10)])
182
def test_coalesce_unrelated(self):
183
self.check([(0, 10, [(0, 10)]),
185
], [(0, 10), (20, 10)])
187
def test_coalesce_unsorted(self):
188
self.check([(20, 10, [(0, 10)]),
190
], [(20, 10), (0, 10)])
192
def test_coalesce_nearby(self):
193
self.check([(0, 20, [(0, 10), (10, 10)])],
196
def test_coalesce_overlapped(self):
197
self.assertRaises(ValueError,
198
self.check, [(0, 15, [(0, 10), (5, 10)])],
201
def test_coalesce_limit(self):
202
self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
203
(30, 10), (40, 10)]),
204
(60, 50, [(0, 10), (10, 10), (20, 10),
205
(30, 10), (40, 10)]),
206
], [(10, 10), (20, 10), (30, 10), (40, 10),
207
(50, 10), (60, 10), (70, 10), (80, 10),
208
(90, 10), (100, 10)],
211
def test_coalesce_no_limit(self):
212
self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
213
(30, 10), (40, 10), (50, 10),
214
(60, 10), (70, 10), (80, 10),
216
], [(10, 10), (20, 10), (30, 10), (40, 10),
217
(50, 10), (60, 10), (70, 10), (80, 10),
218
(90, 10), (100, 10)])
220
def test_coalesce_fudge(self):
221
self.check([(10, 30, [(0, 10), (20, 10)]),
222
(100, 10, [(0, 10),]),
223
], [(10, 10), (30, 10), (100, 10)],
226
def test_coalesce_max_size(self):
227
self.check([(10, 20, [(0, 10), (10, 10)]),
229
# If one range is above max_size, it gets its own coalesced
231
(100, 80, [(0, 80),]),],
232
[(10, 10), (20, 10), (30, 50), (100, 80)],
236
def test_coalesce_no_max_size(self):
237
self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
238
[(10, 10), (20, 10), (30, 50), (80, 100)],
241
def test_coalesce_default_limit(self):
242
# By default we use a 100MB max size.
243
ten_mb = 10*1024*1024
244
self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
245
(10*ten_mb, ten_mb, [(0, ten_mb)])],
246
[(i*ten_mb, ten_mb) for i in range(11)])
247
self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
248
[(i*ten_mb, ten_mb) for i in range(11)],
249
max_size=1*1024*1024*1024)
252
class TestMemoryServer(tests.TestCase):
254
def test_create_server(self):
255
server = memory.MemoryServer()
256
server.start_server()
257
url = server.get_url()
258
self.assertTrue(url in transport.transport_list_registry)
259
t = transport.get_transport(url)
262
self.assertFalse(url in transport.transport_list_registry)
263
self.assertRaises(errors.UnsupportedProtocol,
264
transport.get_transport, url)
267
class TestMemoryTransport(tests.TestCase):
100
_set_protocol_handlers(saved_handlers)
103
class TestMemoryTransport(TestCase):
269
105
def test_get_transport(self):
270
memory.MemoryTransport()
272
108
def test_clone(self):
273
t = memory.MemoryTransport()
274
self.assertTrue(isinstance(t, memory.MemoryTransport))
275
self.assertEqual("memory:///", t.clone("/").base)
109
transport = MemoryTransport()
110
self.assertTrue(isinstance(transport, MemoryTransport))
277
112
def test_abspath(self):
278
t = memory.MemoryTransport()
279
self.assertEqual("memory:///relpath", t.abspath('relpath'))
281
def test_abspath_of_root(self):
282
t = memory.MemoryTransport()
283
self.assertEqual("memory:///", t.base)
284
self.assertEqual("memory:///", t.abspath('/'))
286
def test_abspath_of_relpath_starting_at_root(self):
287
t = memory.MemoryTransport()
288
self.assertEqual("memory:///foo", t.abspath('/foo'))
113
transport = MemoryTransport()
114
self.assertEqual("memory:///relpath", transport.abspath('relpath'))
116
def test_relpath(self):
117
transport = MemoryTransport()
290
119
def test_append_and_get(self):
291
t = memory.MemoryTransport()
292
t.append_bytes('path', 'content')
293
self.assertEqual(t.get('path').read(), 'content')
294
t.append_file('path', StringIO('content'))
295
self.assertEqual(t.get('path').read(), 'contentcontent')
120
transport = MemoryTransport()
121
transport.append('path', StringIO('content'))
122
self.assertEqual(transport.get('path').read(), 'content')
123
transport.append('path', StringIO('content'))
124
self.assertEqual(transport.get('path').read(), 'contentcontent')
297
126
def test_put_and_get(self):
298
t = memory.MemoryTransport()
299
t.put_file('path', StringIO('content'))
300
self.assertEqual(t.get('path').read(), 'content')
301
t.put_bytes('path', 'content')
302
self.assertEqual(t.get('path').read(), 'content')
127
transport = MemoryTransport()
128
transport.put('path', StringIO('content'))
129
self.assertEqual(transport.get('path').read(), 'content')
130
transport.put('path', StringIO('content'))
131
self.assertEqual(transport.get('path').read(), 'content')
304
133
def test_append_without_dir_fails(self):
305
t = memory.MemoryTransport()
306
self.assertRaises(errors.NoSuchFile,
307
t.append_bytes, 'dir/path', 'content')
134
transport = MemoryTransport()
135
self.assertRaises(NoSuchFile,
136
transport.append, 'dir/path', StringIO('content'))
309
138
def test_put_without_dir_fails(self):
310
t = memory.MemoryTransport()
311
self.assertRaises(errors.NoSuchFile,
312
t.put_file, 'dir/path', StringIO('content'))
139
transport = MemoryTransport()
140
self.assertRaises(NoSuchFile,
141
transport.put, 'dir/path', StringIO('content'))
314
143
def test_get_missing(self):
315
transport = memory.MemoryTransport()
316
self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
144
transport = MemoryTransport()
145
self.assertRaises(NoSuchFile, transport.get, 'foo')
318
147
def test_has_missing(self):
319
t = memory.MemoryTransport()
320
self.assertEquals(False, t.has('foo'))
148
transport = MemoryTransport()
149
self.assertEquals(False, transport.has('foo'))
322
151
def test_has_present(self):
323
t = memory.MemoryTransport()
324
t.append_bytes('foo', 'content')
325
self.assertEquals(True, t.has('foo'))
327
def test_list_dir(self):
328
t = memory.MemoryTransport()
329
t.put_bytes('foo', 'content')
331
t.put_bytes('dir/subfoo', 'content')
332
t.put_bytes('dirlike', 'content')
334
self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
335
self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
152
transport = MemoryTransport()
153
transport.append('foo', StringIO('content'))
154
self.assertEquals(True, transport.has('foo'))
337
156
def test_mkdir(self):
338
t = memory.MemoryTransport()
340
t.append_bytes('dir/path', 'content')
341
self.assertEqual(t.get('dir/path').read(), 'content')
157
transport = MemoryTransport()
158
transport.mkdir('dir')
159
transport.append('dir/path', StringIO('content'))
160
self.assertEqual(transport.get('dir/path').read(), 'content')
343
162
def test_mkdir_missing_parent(self):
344
t = memory.MemoryTransport()
345
self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
163
transport = MemoryTransport()
164
self.assertRaises(NoSuchFile,
165
transport.mkdir, 'dir/dir')
347
167
def test_mkdir_twice(self):
348
t = memory.MemoryTransport()
350
self.assertRaises(errors.FileExists, t.mkdir, 'dir')
168
transport = MemoryTransport()
169
transport.mkdir('dir')
170
self.assertRaises(FileExists, transport.mkdir, 'dir')
352
172
def test_parameters(self):
353
t = memory.MemoryTransport()
354
self.assertEqual(True, t.listable())
355
self.assertEqual(False, t.is_readonly())
173
transport = MemoryTransport()
174
self.assertEqual(True, transport.listable())
175
self.assertEqual(False, transport.should_cache())
176
self.assertEqual(False, transport.is_readonly())
357
178
def test_iter_files_recursive(self):
358
t = memory.MemoryTransport()
360
t.put_bytes('dir/foo', 'content')
361
t.put_bytes('dir/bar', 'content')
362
t.put_bytes('bar', 'content')
363
paths = set(t.iter_files_recursive())
179
transport = MemoryTransport()
180
transport.mkdir('dir')
181
transport.put('dir/foo', StringIO('content'))
182
transport.put('dir/bar', StringIO('content'))
183
transport.put('bar', StringIO('content'))
184
paths = set(transport.iter_files_recursive())
364
185
self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
366
187
def test_stat(self):
367
t = memory.MemoryTransport()
368
t.put_bytes('foo', 'content')
369
t.put_bytes('bar', 'phowar')
370
self.assertEqual(7, t.stat('foo').st_size)
371
self.assertEqual(6, t.stat('bar').st_size)
374
class ChrootDecoratorTransportTest(tests.TestCase):
375
"""Chroot decoration specific tests."""
377
def test_abspath(self):
378
# The abspath is always relative to the chroot_url.
379
server = chroot.ChrootServer(
380
transport.get_transport('memory:///foo/bar/'))
381
self.start_server(server)
382
t = transport.get_transport(server.get_url())
383
self.assertEqual(server.get_url(), t.abspath('/'))
385
subdir_t = t.clone('subdir')
386
self.assertEqual(server.get_url(), subdir_t.abspath('/'))
388
def test_clone(self):
389
server = chroot.ChrootServer(
390
transport.get_transport('memory:///foo/bar/'))
391
self.start_server(server)
392
t = transport.get_transport(server.get_url())
393
# relpath from root and root path are the same
394
relpath_cloned = t.clone('foo')
395
abspath_cloned = t.clone('/foo')
396
self.assertEqual(server, relpath_cloned.server)
397
self.assertEqual(server, abspath_cloned.server)
399
def test_chroot_url_preserves_chroot(self):
400
"""Calling get_transport on a chroot transport's base should produce a
401
transport with exactly the same behaviour as the original chroot
404
This is so that it is not possible to escape a chroot by doing::
405
url = chroot_transport.base
406
parent_url = urlutils.join(url, '..')
407
new_t = transport.get_transport(parent_url)
409
server = chroot.ChrootServer(
410
transport.get_transport('memory:///path/subpath'))
411
self.start_server(server)
412
t = transport.get_transport(server.get_url())
413
new_t = transport.get_transport(t.base)
414
self.assertEqual(t.server, new_t.server)
415
self.assertEqual(t.base, new_t.base)
417
def test_urljoin_preserves_chroot(self):
418
"""Using urlutils.join(url, '..') on a chroot URL should not produce a
419
URL that escapes the intended chroot.
421
This is so that it is not possible to escape a chroot by doing::
422
url = chroot_transport.base
423
parent_url = urlutils.join(url, '..')
424
new_t = transport.get_transport(parent_url)
426
server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
427
self.start_server(server)
428
t = transport.get_transport(server.get_url())
430
errors.InvalidURLJoin, urlutils.join, t.base, '..')
433
class TestChrootServer(tests.TestCase):
435
def test_construct(self):
436
backing_transport = memory.MemoryTransport()
437
server = chroot.ChrootServer(backing_transport)
438
self.assertEqual(backing_transport, server.backing_transport)
440
def test_setUp(self):
441
backing_transport = memory.MemoryTransport()
442
server = chroot.ChrootServer(backing_transport)
443
server.start_server()
445
self.assertTrue(server.scheme
446
in transport._get_protocol_handlers().keys())
450
def test_stop_server(self):
451
backing_transport = memory.MemoryTransport()
452
server = chroot.ChrootServer(backing_transport)
453
server.start_server()
455
self.assertFalse(server.scheme
456
in transport._get_protocol_handlers().keys())
458
def test_get_url(self):
459
backing_transport = memory.MemoryTransport()
460
server = chroot.ChrootServer(backing_transport)
461
server.start_server()
463
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
468
class PathFilteringDecoratorTransportTest(tests.TestCase):
469
"""Pathfilter decoration specific tests."""
471
def test_abspath(self):
472
# The abspath is always relative to the base of the backing transport.
473
server = pathfilter.PathFilteringServer(
474
transport.get_transport('memory:///foo/bar/'),
476
server.start_server()
477
t = transport.get_transport(server.get_url())
478
self.assertEqual(server.get_url(), t.abspath('/'))
480
subdir_t = t.clone('subdir')
481
self.assertEqual(server.get_url(), subdir_t.abspath('/'))
484
def make_pf_transport(self, filter_func=None):
485
"""Make a PathFilteringTransport backed by a MemoryTransport.
188
transport = MemoryTransport()
189
transport.put('foo', StringIO('content'))
190
transport.put('bar', StringIO('phowar'))
191
self.assertEqual(7, transport.stat('foo').st_size)
192
self.assertEqual(6, transport.stat('bar').st_size)
487
:param filter_func: by default this will be a no-op function. Use this
488
parameter to override it."""
489
if filter_func is None:
490
filter_func = lambda x: x
491
server = pathfilter.PathFilteringServer(
492
transport.get_transport('memory:///foo/bar/'), filter_func)
493
server.start_server()
494
self.addCleanup(server.stop_server)
495
return transport.get_transport(server.get_url())
497
def test__filter(self):
498
# _filter (with an identity func as filter_func) always returns
499
# paths relative to the base of the backing transport.
500
t = self.make_pf_transport()
501
self.assertEqual('foo', t._filter('foo'))
502
self.assertEqual('foo/bar', t._filter('foo/bar'))
503
self.assertEqual('', t._filter('..'))
504
self.assertEqual('', t._filter('/'))
505
# The base of the pathfiltering transport is taken into account too.
506
t = t.clone('subdir1/subdir2')
507
self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
508
self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
509
self.assertEqual('subdir1', t._filter('..'))
510
self.assertEqual('', t._filter('/'))
512
def test_filter_invocation(self):
515
filter_log.append(path)
517
t = self.make_pf_transport(filter)
519
self.assertEqual(['abc'], filter_log)
521
t.clone('abc').has('xyz')
522
self.assertEqual(['abc/xyz'], filter_log)
525
self.assertEqual(['abc'], filter_log)
527
def test_clone(self):
528
t = self.make_pf_transport()
529
# relpath from root and root path are the same
530
relpath_cloned = t.clone('foo')
531
abspath_cloned = t.clone('/foo')
532
self.assertEqual(t.server, relpath_cloned.server)
533
self.assertEqual(t.server, abspath_cloned.server)
535
def test_url_preserves_pathfiltering(self):
536
"""Calling get_transport on a pathfiltered transport's base should
537
produce a transport with exactly the same behaviour as the original
538
pathfiltered transport.
540
This is so that it is not possible to escape (accidentally or
541
otherwise) the filtering by doing::
542
url = filtered_transport.base
543
parent_url = urlutils.join(url, '..')
544
new_t = transport.get_transport(parent_url)
546
t = self.make_pf_transport()
547
new_t = transport.get_transport(t.base)
548
self.assertEqual(t.server, new_t.server)
549
self.assertEqual(t.base, new_t.base)
552
class ReadonlyDecoratorTransportTest(tests.TestCase):
195
class ReadonlyDecoratorTransportTest(TestCase):
553
196
"""Readonly decoration specific tests."""
555
198
def test_local_parameters(self):
199
import bzrlib.transport.readonly as readonly
556
200
# connect to . in readonly mode
557
t = readonly.ReadonlyTransportDecorator('readonly+.')
558
self.assertEqual(True, t.listable())
559
self.assertEqual(True, t.is_readonly())
201
transport = readonly.ReadonlyTransportDecorator('readonly+.')
202
self.assertEqual(True, transport.listable())
203
self.assertEqual(False, transport.should_cache())
204
self.assertEqual(True, transport.is_readonly())
561
206
def test_http_parameters(self):
562
from bzrlib.tests.http_server import HttpServer
563
# connect to '.' via http which is not listable
207
import bzrlib.transport.readonly as readonly
208
from bzrlib.transport.http import HttpServer
209
# connect to . via http which is not listable
564
210
server = HttpServer()
565
self.start_server(server)
566
t = transport.get_transport('readonly+' + server.get_url())
567
self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
568
self.assertEqual(False, t.listable())
569
self.assertEqual(True, t.is_readonly())
572
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
213
transport = get_transport('readonly+' + server.get_url())
214
self.failUnless(isinstance(transport,
215
readonly.ReadonlyTransportDecorator))
216
self.assertEqual(False, transport.listable())
217
self.assertEqual(True, transport.should_cache())
218
self.assertEqual(True, transport.is_readonly())
223
class FakeNFSDecoratorTests(TestCaseInTempDir):
573
224
"""NFS decorator specific tests."""
575
226
def get_nfs_transport(self, url):
227
import bzrlib.transport.fakenfs as fakenfs
576
228
# connect to url with nfs decoration
577
229
return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
579
231
def test_local_parameters(self):
580
# the listable and is_readonly parameters
232
# the listable, should_cache and is_readonly parameters
581
233
# are not changed by the fakenfs decorator
582
t = self.get_nfs_transport('.')
583
self.assertEqual(True, t.listable())
584
self.assertEqual(False, t.is_readonly())
234
transport = self.get_nfs_transport('.')
235
self.assertEqual(True, transport.listable())
236
self.assertEqual(False, transport.should_cache())
237
self.assertEqual(False, transport.is_readonly())
586
239
def test_http_parameters(self):
587
# the listable and is_readonly parameters
240
# the listable, should_cache and is_readonly parameters
588
241
# are not changed by the fakenfs decorator
589
from bzrlib.tests.http_server import HttpServer
590
# connect to '.' via http which is not listable
242
from bzrlib.transport.http import HttpServer
243
# connect to . via http which is not listable
591
244
server = HttpServer()
592
self.start_server(server)
593
t = self.get_nfs_transport(server.get_url())
594
self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
595
self.assertEqual(False, t.listable())
596
self.assertEqual(True, t.is_readonly())
247
transport = self.get_nfs_transport(server.get_url())
248
self.assertIsInstance(
249
transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
250
self.assertEqual(False, transport.listable())
251
self.assertEqual(True, transport.should_cache())
252
self.assertEqual(True, transport.is_readonly())
598
256
def test_fakenfs_server_default(self):
599
257
# a FakeNFSServer() should bring up a local relpath server for itself
600
server = test_server.FakeNFSServer()
601
self.start_server(server)
602
# the url should be decorated appropriately
603
self.assertStartsWith(server.get_url(), 'fakenfs+')
604
# and we should be able to get a transport for it
605
t = transport.get_transport(server.get_url())
606
# which must be a FakeNFSTransportDecorator instance.
607
self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
258
import bzrlib.transport.fakenfs as fakenfs
259
server = fakenfs.FakeNFSServer()
262
# the server should be a relpath localhost server
263
self.assertEqual(server.get_url(), 'fakenfs+.')
264
# and we should be able to get a transport for it
265
transport = get_transport(server.get_url())
266
# which must be a FakeNFSTransportDecorator instance.
267
self.assertIsInstance(
268
transport, fakenfs.FakeNFSTransportDecorator)
609
272
def test_fakenfs_rename_semantics(self):
610
273
# a FakeNFS transport must mangle the way rename errors occur to
611
274
# look like NFS problems.
612
t = self.get_nfs_transport('.')
275
transport = self.get_nfs_transport('.')
613
276
self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
615
self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
618
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
278
self.assertRaises(bzrlib.errors.ResourceBusy,
279
transport.rename, 'from', 'to')
282
class FakeVFATDecoratorTests(TestCaseInTempDir):
619
283
"""Tests for simulation of VFAT restrictions"""
621
285
def get_vfat_transport(self, url):
626
290
def test_transport_creation(self):
627
291
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
628
t = self.get_vfat_transport('.')
629
self.assertIsInstance(t, FakeVFATTransportDecorator)
292
transport = self.get_vfat_transport('.')
293
self.assertIsInstance(transport, FakeVFATTransportDecorator)
631
295
def test_transport_mkdir(self):
632
t = self.get_vfat_transport('.')
634
self.assertTrue(t.has('hello'))
635
self.assertTrue(t.has('Hello'))
296
transport = self.get_vfat_transport('.')
297
transport.mkdir('HELLO')
298
self.assertTrue(transport.has('hello'))
299
self.assertTrue(transport.has('Hello'))
637
301
def test_forbidden_chars(self):
638
t = self.get_vfat_transport('.')
639
self.assertRaises(ValueError, t.has, "<NU>")
642
class BadTransportHandler(transport.Transport):
302
transport = self.get_vfat_transport('.')
303
self.assertRaises(ValueError, transport.has, "<NU>")
306
class BadTransportHandler(Transport):
643
307
def __init__(self, base_url):
644
raise errors.DependencyNotPresent('some_lib',
645
'testing missing dependency')
648
class BackupTransportHandler(transport.Transport):
308
raise DependencyNotPresent('some_lib', 'testing missing dependency')
311
class BackupTransportHandler(Transport):
649
312
"""Test transport that works as a backup for the BadTransportHandler"""
653
class TestTransportImplementation(tests.TestCaseInTempDir):
654
"""Implementation verification for transports.
656
To verify a transport we need a server factory, which is a callable
657
that accepts no parameters and returns an implementation of
658
bzrlib.transport.Server.
660
That Server is then used to construct transport instances and test
661
the transport via loopback activity.
663
Currently this assumes that the Transport object is connected to the
664
current working directory. So that whatever is done
665
through the transport, should show up in the working
666
directory, and vice-versa. This is a bug, because its possible to have
667
URL schemes which provide access to something that may not be
668
result in storage on the local disk, i.e. due to file system limits, or
669
due to it being a database or some other non-filesystem tool.
671
This also tests to make sure that the functions work with both
672
generators and lists (assuming iter(list) is effectively a generator)
676
super(TestTransportImplementation, self).setUp()
677
self._server = self.transport_server()
678
self.start_server(self._server)
680
def get_transport(self, relpath=None):
681
"""Return a connected transport to the local directory.
683
:param relpath: a path relative to the base url.
685
base_url = self._server.get_url()
686
url = self._adjust_url(base_url, relpath)
687
# try getting the transport via the regular interface:
688
t = transport.get_transport(url)
689
# vila--20070607 if the following are commented out the test suite
690
# still pass. Is this really still needed or was it a forgotten
692
if not isinstance(t, self.transport_class):
693
# we did not get the correct transport class type. Override the
694
# regular connection behaviour by direct construction.
695
t = self.transport_class(url)
699
class TestLocalTransports(tests.TestCase):
701
def test_get_transport_from_abspath(self):
702
here = osutils.abspath('.')
703
t = transport.get_transport(here)
704
self.assertIsInstance(t, local.LocalTransport)
705
self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
707
def test_get_transport_from_relpath(self):
708
here = osutils.abspath('.')
709
t = transport.get_transport('.')
710
self.assertIsInstance(t, local.LocalTransport)
711
self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
713
def test_get_transport_from_local_url(self):
714
here = osutils.abspath('.')
715
here_url = urlutils.local_path_to_url(here) + '/'
716
t = transport.get_transport(here_url)
717
self.assertIsInstance(t, local.LocalTransport)
718
self.assertEquals(t.base, here_url)
720
def test_local_abspath(self):
721
here = osutils.abspath('.')
722
t = transport.get_transport(here)
723
self.assertEquals(t.local_abspath(''), here)
726
class TestWin32LocalTransport(tests.TestCase):
728
def test_unc_clone_to_root(self):
729
# Win32 UNC path like \\HOST\path
730
# clone to root should stop at least at \\HOST part
732
t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
735
self.assertEquals(t.base, 'file://HOST/')
736
# make sure we reach the root
738
self.assertEquals(t.base, 'file://HOST/')
741
class TestConnectedTransport(tests.TestCase):
742
"""Tests for connected to remote server transports"""
744
def test_parse_url(self):
745
t = transport.ConnectedTransport(
746
'http://simple.example.com/home/source')
747
self.assertEquals(t._host, 'simple.example.com')
748
self.assertEquals(t._port, None)
749
self.assertEquals(t._path, '/home/source/')
750
self.failUnless(t._user is None)
751
self.failUnless(t._password is None)
753
self.assertEquals(t.base, 'http://simple.example.com/home/source/')
755
def test_parse_url_with_at_in_user(self):
757
t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
758
self.assertEquals(t._user, 'user@host.com')
760
def test_parse_quoted_url(self):
761
t = transport.ConnectedTransport(
762
'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
763
self.assertEquals(t._host, 'exAmple.com')
764
self.assertEquals(t._port, 2222)
765
self.assertEquals(t._user, 'robey')
766
self.assertEquals(t._password, 'h@t')
767
self.assertEquals(t._path, '/path/')
769
# Base should not keep track of the password
770
self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
772
def test_parse_invalid_url(self):
773
self.assertRaises(errors.InvalidURL,
774
transport.ConnectedTransport,
775
'sftp://lily.org:~janneke/public/bzr/gub')
777
def test_relpath(self):
778
t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
780
self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
781
self.assertRaises(errors.PathNotChild, t.relpath,
782
'http://user@host.com/abs/path/sub')
783
self.assertRaises(errors.PathNotChild, t.relpath,
784
'sftp://user2@host.com/abs/path/sub')
785
self.assertRaises(errors.PathNotChild, t.relpath,
786
'sftp://user@otherhost.com/abs/path/sub')
787
self.assertRaises(errors.PathNotChild, t.relpath,
788
'sftp://user@host.com:33/abs/path/sub')
789
# Make sure it works when we don't supply a username
790
t = transport.ConnectedTransport('sftp://host.com/abs/path')
791
self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
793
# Make sure it works when parts of the path will be url encoded
794
t = transport.ConnectedTransport('sftp://host.com/dev/%path')
795
self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
797
def test_connection_sharing_propagate_credentials(self):
798
t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
799
self.assertEquals('user', t._user)
800
self.assertEquals('host.com', t._host)
801
self.assertIs(None, t._get_connection())
802
self.assertIs(None, t._password)
803
c = t.clone('subdir')
804
self.assertIs(None, c._get_connection())
805
self.assertIs(None, t._password)
807
# Simulate the user entering a password
809
connection = object()
810
t._set_connection(connection, password)
811
self.assertIs(connection, t._get_connection())
812
self.assertIs(password, t._get_credentials())
813
self.assertIs(connection, c._get_connection())
814
self.assertIs(password, c._get_credentials())
816
# credentials can be updated
817
new_password = 'even more secret'
818
c._update_credentials(new_password)
819
self.assertIs(connection, t._get_connection())
820
self.assertIs(new_password, t._get_credentials())
821
self.assertIs(connection, c._get_connection())
822
self.assertIs(new_password, c._get_credentials())
825
class TestReusedTransports(tests.TestCase):
826
"""Tests for transport reuse"""
828
def test_reuse_same_transport(self):
829
possible_transports = []
830
t1 = transport.get_transport('http://foo/',
831
possible_transports=possible_transports)
832
self.assertEqual([t1], possible_transports)
833
t2 = transport.get_transport('http://foo/',
834
possible_transports=[t1])
835
self.assertIs(t1, t2)
837
# Also check that final '/' are handled correctly
838
t3 = transport.get_transport('http://foo/path/')
839
t4 = transport.get_transport('http://foo/path',
840
possible_transports=[t3])
841
self.assertIs(t3, t4)
843
t5 = transport.get_transport('http://foo/path')
844
t6 = transport.get_transport('http://foo/path/',
845
possible_transports=[t5])
846
self.assertIs(t5, t6)
848
def test_don_t_reuse_different_transport(self):
849
t1 = transport.get_transport('http://foo/path')
850
t2 = transport.get_transport('http://bar/path',
851
possible_transports=[t1])
852
self.assertIsNot(t1, t2)
855
class TestTransportTrace(tests.TestCase):
858
t = transport.get_transport('trace+memory://')
859
self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
861
def test_clone_preserves_activity(self):
862
t = transport.get_transport('trace+memory://')
864
self.assertTrue(t is not t2)
865
self.assertTrue(t._activity is t2._activity)
867
# the following specific tests are for the operations that have made use of
868
# logging in tests; we could test every single operation but doing that
869
# still won't cause a test failure when the top level Transport API
870
# changes; so there is little return doing that.
872
t = transport.get_transport('trace+memory:///')
873
t.put_bytes('foo', 'barish')
876
# put_bytes records the bytes, not the content to avoid memory
878
expected_result.append(('put_bytes', 'foo', 6, None))
879
# get records the file name only.
880
expected_result.append(('get', 'foo'))
881
self.assertEqual(expected_result, t._activity)
883
def test_readv(self):
884
t = transport.get_transport('trace+memory:///')
885
t.put_bytes('foo', 'barish')
886
list(t.readv('foo', [(0, 1), (3, 2)],
887
adjust_for_latency=True, upper_limit=6))
889
# put_bytes records the bytes, not the content to avoid memory
891
expected_result.append(('put_bytes', 'foo', 6, None))
892
# readv records the supplied offset request
893
expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
894
self.assertEqual(expected_result, t._activity)
897
class TestSSHConnections(tests.TestCaseWithTransport):
899
def test_bzr_connect_to_bzr_ssh(self):
900
"""User acceptance that get_transport of a bzr+ssh:// behaves correctly.
902
bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
904
# This test actually causes a bzr instance to be invoked, which is very
905
# expensive: it should be the only such test in the test suite.
906
# A reasonable evolution for this would be to simply check inside
907
# check_channel_exec_request that the command is appropriate, and then
908
# satisfy requests in-process.
909
self.requireFeature(features.paramiko)
910
# SFTPFullAbsoluteServer has a get_url method, and doesn't
911
# override the interface (doesn't change self._vendor).
912
# Note that this does encryption, so can be slow.
913
from bzrlib.tests import stub_sftp
915
# Start an SSH server
916
self.command_executed = []
917
# XXX: This is horrible -- we define a really dumb SSH server that
918
# executes commands, and manage the hooking up of stdin/out/err to the
919
# SSH channel ourselves. Surely this has already been implemented
922
class StubSSHServer(stub_sftp.StubServer):
926
def check_channel_exec_request(self, channel, command):
927
self.test.command_executed.append(command)
928
proc = subprocess.Popen(
929
command, shell=True, stdin=subprocess.PIPE,
930
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
932
# XXX: horribly inefficient, not to mention ugly.
933
# Start a thread for each of stdin/out/err, and relay bytes from
934
# the subprocess to channel and vice versa.
935
def ferry_bytes(read, write, close):
944
(channel.recv, proc.stdin.write, proc.stdin.close),
945
(proc.stdout.read, channel.sendall, channel.close),
946
(proc.stderr.read, channel.sendall_stderr, channel.close)]
948
for read, write, close in file_functions:
949
t = threading.Thread(
950
target=ferry_bytes, args=(read, write, close))
956
ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
957
# We *don't* want to override the default SSH vendor: the detected one
960
# FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
961
# inherits from SFTPServer which forces the SSH vendor to
962
# ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
963
self.start_server(ssh_server)
964
port = ssh_server.port
966
if sys.platform == 'win32':
967
bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
969
bzr_remote_path = self.get_bzr_path()
970
os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
972
# Access the branch via a bzr+ssh URL. The BZR_REMOTE_PATH environment
973
# variable is used to tell bzr what command to run on the remote end.
974
path_to_branch = osutils.abspath('.')
975
if sys.platform == 'win32':
976
# On Windows, we export all drives as '/C:/, etc. So we need to
977
# prefix a '/' to get the right path.
978
path_to_branch = '/' + path_to_branch
979
url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
980
t = transport.get_transport(url)
981
self.permit_url(t.base)
985
['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
986
self.command_executed)
987
# Make sure to disconnect, so that the remote process can stop, and we
988
# can cleanup. Then pause the test until everything is shutdown
989
t._client._medium.disconnect()
992
# First wait for the subprocess
994
# And the rest are threads
995
for t in started[1:]:
999
class TestUnhtml(tests.TestCase):
1001
"""Tests for unhtml_roughly"""
1003
def test_truncation(self):
1004
fake_html = "<p>something!\n" * 1000
1005
result = http.unhtml_roughly(fake_html)
1006
self.assertEquals(len(result), 1000)
1007
self.assertStartsWith(result, " something!")