1
# Copyright (C) 2004, 2005, 2006 by Canonical Ltd
1
# Copyright (C) 2004, 2005 by Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21
from cStringIO import StringIO
24
from bzrlib.errors import (NoSuchFile, FileExists,
30
from bzrlib.tests import TestCase, TestCaseInTempDir
31
from bzrlib.transport import (_CoalescedOffset,
32
_get_protocol_handlers,
33
_get_transport_modules,
35
register_lazy_transport,
36
_set_protocol_handlers,
39
from bzrlib.transport.memory import MemoryTransport
40
from bzrlib.transport.local import LocalTransport
43
class TestTransport(TestCase):
44
"""Test the non transport-concrete class functionality."""
46
def test__get_set_protocol_handlers(self):
47
handlers = _get_protocol_handlers()
48
self.assertNotEqual({}, handlers)
50
_set_protocol_handlers({})
51
self.assertEqual({}, _get_protocol_handlers())
53
_set_protocol_handlers(handlers)
55
def test_get_transport_modules(self):
56
handlers = _get_protocol_handlers()
57
class SampleHandler(object):
58
"""I exist, isnt that enough?"""
61
_set_protocol_handlers(my_handlers)
62
register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
63
register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
64
self.assertEqual([SampleHandler.__module__],
65
_get_transport_modules())
67
_set_protocol_handlers(handlers)
69
def test_transport_dependency(self):
70
"""Transport with missing dependency causes no error"""
71
saved_handlers = _get_protocol_handlers()
73
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
74
'BadTransportHandler')
76
get_transport('foo://fooserver/foo')
77
except UnsupportedProtocol, e:
79
self.assertEquals('Unsupported protocol'
80
' for url "foo://fooserver/foo":'
81
' Unable to import library "some_lib":'
82
' testing missing dependency', str(e))
84
self.fail('Did not raise UnsupportedProtocol')
86
# restore original values
87
_set_protocol_handlers(saved_handlers)
89
def test_transport_fallback(self):
90
"""Transport with missing dependency causes no error"""
91
saved_handlers = _get_protocol_handlers()
93
_set_protocol_handlers({})
94
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
95
'BackupTransportHandler')
96
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
97
'BadTransportHandler')
98
t = get_transport('foo://fooserver/foo')
99
self.assertTrue(isinstance(t, BackupTransportHandler))
101
_set_protocol_handlers(saved_handlers)
104
class TestCoalesceOffsets(TestCase):
106
def check(self, expected, offsets, limit=0, fudge=0):
107
coalesce = Transport._coalesce_offsets
108
exp = [_CoalescedOffset(*x) for x in expected]
109
out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
110
self.assertEqual(exp, out)
112
def test_coalesce_empty(self):
115
def test_coalesce_simple(self):
116
self.check([(0, 10, [(0, 10)])], [(0, 10)])
118
def test_coalesce_unrelated(self):
119
self.check([(0, 10, [(0, 10)]),
121
], [(0, 10), (20, 10)])
123
def test_coalesce_unsorted(self):
124
self.check([(20, 10, [(0, 10)]),
126
], [(20, 10), (0, 10)])
128
def test_coalesce_nearby(self):
129
self.check([(0, 20, [(0, 10), (10, 10)])],
132
def test_coalesce_overlapped(self):
133
self.check([(0, 15, [(0, 10), (5, 10)])],
136
def test_coalesce_limit(self):
137
self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
138
(30, 10), (40, 10)]),
139
(60, 50, [(0, 10), (10, 10), (20, 10),
140
(30, 10), (40, 10)]),
141
], [(10, 10), (20, 10), (30, 10), (40, 10),
142
(50, 10), (60, 10), (70, 10), (80, 10),
143
(90, 10), (100, 10)],
146
def test_coalesce_no_limit(self):
147
self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
148
(30, 10), (40, 10), (50, 10),
149
(60, 10), (70, 10), (80, 10),
151
], [(10, 10), (20, 10), (30, 10), (40, 10),
152
(50, 10), (60, 10), (70, 10), (80, 10),
153
(90, 10), (100, 10)])
155
def test_coalesce_fudge(self):
156
self.check([(10, 30, [(0, 10), (20, 10)]),
157
(100, 10, [(0, 10),]),
158
], [(10, 10), (30, 10), (100, 10)],
163
class TestMemoryTransport(TestCase):
165
def test_get_transport(self):
168
def test_clone(self):
169
transport = MemoryTransport()
170
self.assertTrue(isinstance(transport, MemoryTransport))
171
self.assertEqual("memory:///", transport.clone("/").base)
173
def test_abspath(self):
174
transport = MemoryTransport()
175
self.assertEqual("memory:///relpath", transport.abspath('relpath'))
177
def test_abspath_of_root(self):
178
transport = MemoryTransport()
179
self.assertEqual("memory:///", transport.base)
180
self.assertEqual("memory:///", transport.abspath('/'))
182
def test_relpath(self):
183
transport = MemoryTransport()
185
def test_append_and_get(self):
186
transport = MemoryTransport()
187
transport.append_bytes('path', 'content')
188
self.assertEqual(transport.get('path').read(), 'content')
189
transport.append_file('path', StringIO('content'))
190
self.assertEqual(transport.get('path').read(), 'contentcontent')
192
def test_put_and_get(self):
193
transport = MemoryTransport()
194
transport.put_file('path', StringIO('content'))
195
self.assertEqual(transport.get('path').read(), 'content')
196
transport.put_bytes('path', 'content')
197
self.assertEqual(transport.get('path').read(), 'content')
199
def test_append_without_dir_fails(self):
200
transport = MemoryTransport()
201
self.assertRaises(NoSuchFile,
202
transport.append_bytes, 'dir/path', 'content')
204
def test_put_without_dir_fails(self):
205
transport = MemoryTransport()
206
self.assertRaises(NoSuchFile,
207
transport.put_file, 'dir/path', StringIO('content'))
209
def test_get_missing(self):
210
transport = MemoryTransport()
211
self.assertRaises(NoSuchFile, transport.get, 'foo')
213
def test_has_missing(self):
214
transport = MemoryTransport()
215
self.assertEquals(False, transport.has('foo'))
217
def test_has_present(self):
218
transport = MemoryTransport()
219
transport.append_bytes('foo', 'content')
220
self.assertEquals(True, transport.has('foo'))
222
def test_mkdir(self):
223
transport = MemoryTransport()
224
transport.mkdir('dir')
225
transport.append_bytes('dir/path', 'content')
226
self.assertEqual(transport.get('dir/path').read(), 'content')
228
def test_mkdir_missing_parent(self):
229
transport = MemoryTransport()
230
self.assertRaises(NoSuchFile,
231
transport.mkdir, 'dir/dir')
233
def test_mkdir_twice(self):
234
transport = MemoryTransport()
235
transport.mkdir('dir')
236
self.assertRaises(FileExists, transport.mkdir, 'dir')
238
def test_parameters(self):
239
transport = MemoryTransport()
240
self.assertEqual(True, transport.listable())
241
self.assertEqual(False, transport.should_cache())
242
self.assertEqual(False, transport.is_readonly())
244
def test_iter_files_recursive(self):
245
transport = MemoryTransport()
246
transport.mkdir('dir')
247
transport.put_bytes('dir/foo', 'content')
248
transport.put_bytes('dir/bar', 'content')
249
transport.put_bytes('bar', 'content')
250
paths = set(transport.iter_files_recursive())
251
self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
254
transport = MemoryTransport()
255
transport.put_bytes('foo', 'content')
256
transport.put_bytes('bar', 'phowar')
257
self.assertEqual(7, transport.stat('foo').st_size)
258
self.assertEqual(6, transport.stat('bar').st_size)
261
class ReadonlyDecoratorTransportTest(TestCase):
262
"""Readonly decoration specific tests."""
264
def test_local_parameters(self):
265
import bzrlib.transport.readonly as readonly
266
# connect to . in readonly mode
267
transport = readonly.ReadonlyTransportDecorator('readonly+.')
268
self.assertEqual(True, transport.listable())
269
self.assertEqual(False, transport.should_cache())
270
self.assertEqual(True, transport.is_readonly())
272
def test_http_parameters(self):
273
import bzrlib.transport.readonly as readonly
274
from bzrlib.transport.http import HttpServer
275
# connect to . via http which is not listable
276
server = HttpServer()
279
transport = get_transport('readonly+' + server.get_url())
280
self.failUnless(isinstance(transport,
281
readonly.ReadonlyTransportDecorator))
282
self.assertEqual(False, transport.listable())
283
self.assertEqual(True, transport.should_cache())
284
self.assertEqual(True, transport.is_readonly())
289
class FakeNFSDecoratorTests(TestCaseInTempDir):
290
"""NFS decorator specific tests."""
292
def get_nfs_transport(self, url):
293
import bzrlib.transport.fakenfs as fakenfs
294
# connect to url with nfs decoration
295
return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
297
def test_local_parameters(self):
298
# the listable, should_cache and is_readonly parameters
299
# are not changed by the fakenfs decorator
300
transport = self.get_nfs_transport('.')
301
self.assertEqual(True, transport.listable())
302
self.assertEqual(False, transport.should_cache())
303
self.assertEqual(False, transport.is_readonly())
305
def test_http_parameters(self):
306
# the listable, should_cache and is_readonly parameters
307
# are not changed by the fakenfs decorator
308
from bzrlib.transport.http import HttpServer
309
# connect to . via http which is not listable
310
server = HttpServer()
313
transport = self.get_nfs_transport(server.get_url())
314
self.assertIsInstance(
315
transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
316
self.assertEqual(False, transport.listable())
317
self.assertEqual(True, transport.should_cache())
318
self.assertEqual(True, transport.is_readonly())
322
def test_fakenfs_server_default(self):
323
# a FakeNFSServer() should bring up a local relpath server for itself
324
import bzrlib.transport.fakenfs as fakenfs
325
server = fakenfs.FakeNFSServer()
328
# the server should be a relpath localhost server
329
self.assertEqual(server.get_url(), 'fakenfs+.')
330
# and we should be able to get a transport for it
331
transport = get_transport(server.get_url())
332
# which must be a FakeNFSTransportDecorator instance.
333
self.assertIsInstance(
334
transport, fakenfs.FakeNFSTransportDecorator)
338
def test_fakenfs_rename_semantics(self):
339
# a FakeNFS transport must mangle the way rename errors occur to
340
# look like NFS problems.
341
transport = self.get_nfs_transport('.')
342
self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
344
self.assertRaises(bzrlib.errors.ResourceBusy,
345
transport.rename, 'from', 'to')
348
class FakeVFATDecoratorTests(TestCaseInTempDir):
349
"""Tests for simulation of VFAT restrictions"""
351
def get_vfat_transport(self, url):
352
"""Return vfat-backed transport for test directory"""
353
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
354
return FakeVFATTransportDecorator('vfat+' + url)
356
def test_transport_creation(self):
357
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
358
transport = self.get_vfat_transport('.')
359
self.assertIsInstance(transport, FakeVFATTransportDecorator)
361
def test_transport_mkdir(self):
362
transport = self.get_vfat_transport('.')
363
transport.mkdir('HELLO')
364
self.assertTrue(transport.has('hello'))
365
self.assertTrue(transport.has('Hello'))
367
def test_forbidden_chars(self):
368
transport = self.get_vfat_transport('.')
369
self.assertRaises(ValueError, transport.has, "<NU>")
372
class BadTransportHandler(Transport):
373
def __init__(self, base_url):
374
raise DependencyNotPresent('some_lib', 'testing missing dependency')
377
class BackupTransportHandler(Transport):
378
"""Test transport that works as a backup for the BadTransportHandler"""
382
class TestTransportImplementation(TestCaseInTempDir):
383
"""Implementation verification for transports.
385
To verify a transport we need a server factory, which is a callable
386
that accepts no parameters and returns an implementation of
387
bzrlib.transport.Server.
389
That Server is then used to construct transport instances and test
390
the transport via loopback activity.
392
Currently this assumes that the Transport object is connected to the
393
current working directory. So that whatever is done
394
through the transport, should show up in the working
395
directory, and vice-versa. This is a bug, because its possible to have
396
URL schemes which provide access to something that may not be
397
result in storage on the local disk, i.e. due to file system limits, or
398
due to it being a database or some other non-filesystem tool.
18
from bzrlib.selftest import TestCaseInTempDir
20
def test_transport(tester, t, readonly=False):
21
"""Test a transport object. Basically, it assumes that the
22
Transport object is connected to the current working directory.
23
So that whatever is done through the transport, should show
24
up in the working directory, and vice-versa.
400
26
This also tests to make sure that the functions work with both
401
27
generators and lists (assuming iter(list) is effectively a generator)
405
super(TestTransportImplementation, self).setUp()
406
self._server = self.transport_server()
410
super(TestTransportImplementation, self).tearDown()
411
self._server.tearDown()
413
def get_transport(self):
414
"""Return a connected transport to the local directory."""
415
base_url = self._server.get_url()
416
# try getting the transport via the regular interface:
417
t = get_transport(base_url)
418
if not isinstance(t, self.transport_class):
419
# we did not get the correct transport class type. Override the
420
# regular connection behaviour by direct construction.
421
t = self.transport_class(base_url)
30
from bzrlib.transport.local import LocalTransport
33
files = ['a', 'b', 'e', 'g']
34
tester.build_tree(files)
35
tester.assertEqual(t.has('a'), True)
36
tester.assertEqual(t.has('c'), False)
37
tester.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
38
[True, True, False, False, True, False, True, False])
39
tester.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
40
[True, True, False, False, True, False, True, False])
43
tester.assertEqual(t.get('a').read(), open('a').read())
44
content_f = t.get_multi(files)
45
for path,f in zip(files, content_f):
46
tester.assertEqual(open(path).read(), f.read())
48
content_f = t.get_multi(iter(files))
49
for path,f in zip(files, content_f):
50
tester.assertEqual(open(path).read(), f.read())
52
tester.assertRaises(NoSuchFile, t.get, 'c')
54
files = list(t.get_multi(['a', 'b', 'c']))
58
tester.fail('Failed to raise NoSuchFile for missing file in get_multi')
60
files = list(t.get_multi(iter(['a', 'b', 'c', 'e'])))
64
tester.fail('Failed to raise NoSuchFile for missing file in get_multi')
68
open('c', 'wb').write('some text for c\n')
70
t.put('c', 'some text for c\n')
71
tester.assert_(os.path.exists('c'))
72
tester.assertEqual(open('c').read(), 'some text for c\n')
73
tester.assertEqual(t.get('c').read(), 'some text for c\n')
74
# Make sure 'has' is updated
75
tester.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
76
[True, True, True, False, True, False, True, False])
78
open('a', 'wb').write('new\ncontents for\na\n')
79
open('d', 'wb').write('contents\nfor d\n')
81
# Put also replaces contents
82
tester.assertEqual(t.put_multi([('a', 'new\ncontents for\na\n'),
83
('d', 'contents\nfor d\n')]),
85
tester.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
86
[True, True, True, True, True, False, True, False])
87
tester.assertEqual(open('a').read(), 'new\ncontents for\na\n')
88
tester.assertEqual(open('d').read(), 'contents\nfor d\n')
91
open('a', 'wb').write('diff\ncontents for\na\n')
92
open('d', 'wb').write('another contents\nfor d\n')
95
t.put_multi(iter([('a', 'diff\ncontents for\na\n'),
96
('d', 'another contents\nfor d\n')]))
98
tester.assertEqual(open('a').read(), 'diff\ncontents for\na\n')
99
tester.assertEqual(open('d').read(), 'another contents\nfor d\n')
102
tester.assertRaises(NoSuchFile, t.put, 'path/doesnt/exist/c', 'contents')
106
tester.assertEqual(t.has('dir_a'), True)
107
tester.assertEqual(t.has('dir_b'), False)
113
tester.assertEqual(t.has('dir_b'), True)
114
tester.assert_(os.path.isdir('dir_b'))
120
t.mkdir_multi(['dir_c', 'dir_d'])
121
tester.assertEqual(list(t.has_multi(['dir_a', 'dir_b', 'dir_c', 'dir_d', 'dir_e', 'dir_b'])),
122
[True, True, True, True, False, True])
123
for d in ['dir_a', 'dir_b', 'dir_c', 'dir_d']:
124
tester.assert_(os.path.isdir(d))
127
tester.assertRaises(NoSuchFile, t.mkdir, 'path/doesnt/exist')
128
tester.assertRaises(FileExists, t.mkdir, 'dir_a') # Creating a directory again should fail
130
# Make sure the transport recognizes when a
131
# directory is created by other means
132
# Caching Transports will fail, because dir_e was already seen not
133
# to exist. So instead, we will search for a new directory
136
# tester.assertRaises(FileExists, t.mkdir, 'dir_e')
140
tester.assertRaises(FileExists, t.mkdir, 'dir_f')
142
# Test get/put in sub-directories
144
open('dir_a/a', 'wb').write('contents of dir_a/a')
145
open('dir_b/b', 'wb').write('contents of dir_b/b')
148
t.put_multi([('dir_a/a', 'contents of dir_a/a'),
149
('dir_b/b', 'contents of dir_b/b')])
151
for f in ('dir_a/a', 'dir_b/b'):
152
tester.assertEqual(t.get(f).read(), open(f).read())
155
dtmp = tempfile.mkdtemp(dir='.', prefix='test-transport-')
156
dtmp_base = os.path.basename(dtmp)
157
local_t = LocalTransport(dtmp)
159
files = ['a', 'b', 'c', 'd']
160
t.copy_to(files, local_t)
162
tester.assertEquals(open(f).read(), open(os.path.join(dtmp_base, f)).read())
165
# TODO: Make sure all entries support file-like objects as well as strings.
167
class LocalTransportTest(TestCaseInTempDir):
168
def test_local_transport(self):
169
from bzrlib.transport.local import LocalTransport
171
t = LocalTransport('.')
172
test_transport(self, t)
174
class HttpServer(object):
175
"""This just encapsulates spawning and stopping
179
"""This just spawns a separate process to serve files from
180
this directory. Call the .stop() function to kill the
183
from BaseHTTPServer import HTTPServer
184
from SimpleHTTPServer import SimpleHTTPRequestHandler
186
if hasattr(os, 'fork'):
190
else: # How do we handle windows, which doesn't have fork?
191
raise NotImplementedError('At present HttpServer cannot fork on Windows')
193
# We might be able to do something like os.spawn() for the
194
# python executable, and give it a simple script to run.
195
# but then how do we kill it?
198
self.s = HTTPServer(('', 9999), SimpleHTTPRequestHandler)
199
# TODO: Is there something nicer than killing the server when done?
200
self.s.serve_forever()
201
except KeyboardInterrupt:
209
if hasattr(os, 'kill'):
211
os.kill(self.pid, signal.SIGINT)
212
os.waitpid(self.pid, 0)
215
raise NotImplementedError('At present HttpServer cannot stop on Windows')
217
class HttpTransportTest(TestCaseInTempDir):
218
def test_http_transport(self):
219
from bzrlib.transport.http import HttpTransport
223
t = HttpTransport('http://localhost:9999/')
224
test_transport(self, t, readonly=True)