~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_remote.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-08-14 05:49:27 UTC
  • mfrom: (4476.3.86 inventory-delta)
  • Revision ID: pqm@pqm.ubuntu.com-20090814054927-k0k18dn46ax4b91f
(andrew) Add inventory-delta streaming for cross-format fetch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
31
31
    config,
32
32
    errors,
33
33
    graph,
 
34
    inventory,
 
35
    inventory_delta,
34
36
    pack,
35
37
    remote,
36
38
    repository,
38
40
    tests,
39
41
    treebuilder,
40
42
    urlutils,
 
43
    versionedfile,
41
44
    )
42
45
from bzrlib.branch import Branch
43
46
from bzrlib.bzrdir import BzrDir, BzrDirFormat
332
335
        reference_bzrdir_format = bzrdir.format_registry.get('default')()
333
336
        return reference_bzrdir_format.repository_format
334
337
 
335
 
    def disable_verb(self, verb):
336
 
        """Disable a verb for one test."""
337
 
        request_handlers = smart.request.request_handlers
338
 
        orig_method = request_handlers.get(verb)
339
 
        request_handlers.remove(verb)
340
 
        def restoreVerb():
341
 
            request_handlers.register(verb, orig_method)
342
 
        self.addCleanup(restoreVerb)
343
 
 
344
338
    def assertFinished(self, fake_client):
345
339
        """Assert that all of a FakeClient's expected calls have occurred."""
346
340
        fake_client.finished_test()
2219
2213
        self.assertEqual([], client._calls)
2220
2214
 
2221
2215
 
2222
 
class TestRepositoryInsertStream(TestRemoteRepository):
2223
 
 
2224
 
    def test_unlocked_repo(self):
2225
 
        transport_path = 'quack'
2226
 
        repo, client = self.setup_fake_client_and_repository(transport_path)
2227
 
        client.add_expected_call(
2228
 
            'Repository.insert_stream', ('quack/', ''),
2229
 
            'success', ('ok',))
2230
 
        client.add_expected_call(
2231
 
            'Repository.insert_stream', ('quack/', ''),
2232
 
            'success', ('ok',))
2233
 
        sink = repo._get_sink()
2234
 
        fmt = repository.RepositoryFormat.get_default_format()
2235
 
        resume_tokens, missing_keys = sink.insert_stream([], fmt, [])
2236
 
        self.assertEqual([], resume_tokens)
2237
 
        self.assertEqual(set(), missing_keys)
2238
 
        self.assertFinished(client)
2239
 
 
2240
 
    def test_locked_repo_with_no_lock_token(self):
2241
 
        transport_path = 'quack'
2242
 
        repo, client = self.setup_fake_client_and_repository(transport_path)
2243
 
        client.add_expected_call(
2244
 
            'Repository.lock_write', ('quack/', ''),
2245
 
            'success', ('ok', ''))
2246
 
        client.add_expected_call(
2247
 
            'Repository.insert_stream', ('quack/', ''),
2248
 
            'success', ('ok',))
2249
 
        client.add_expected_call(
2250
 
            'Repository.insert_stream', ('quack/', ''),
2251
 
            'success', ('ok',))
2252
 
        repo.lock_write()
2253
 
        sink = repo._get_sink()
2254
 
        fmt = repository.RepositoryFormat.get_default_format()
2255
 
        resume_tokens, missing_keys = sink.insert_stream([], fmt, [])
2256
 
        self.assertEqual([], resume_tokens)
2257
 
        self.assertEqual(set(), missing_keys)
2258
 
        self.assertFinished(client)
2259
 
 
2260
 
    def test_locked_repo_with_lock_token(self):
2261
 
        transport_path = 'quack'
2262
 
        repo, client = self.setup_fake_client_and_repository(transport_path)
2263
 
        client.add_expected_call(
2264
 
            'Repository.lock_write', ('quack/', ''),
2265
 
            'success', ('ok', 'a token'))
2266
 
        client.add_expected_call(
2267
 
            'Repository.insert_stream_locked', ('quack/', '', 'a token'),
2268
 
            'success', ('ok',))
2269
 
        client.add_expected_call(
2270
 
            'Repository.insert_stream_locked', ('quack/', '', 'a token'),
2271
 
            'success', ('ok',))
2272
 
        repo.lock_write()
2273
 
        sink = repo._get_sink()
2274
 
        fmt = repository.RepositoryFormat.get_default_format()
2275
 
        resume_tokens, missing_keys = sink.insert_stream([], fmt, [])
2276
 
        self.assertEqual([], resume_tokens)
2277
 
        self.assertEqual(set(), missing_keys)
2278
 
        self.assertFinished(client)
 
2216
class TestRepositoryInsertStreamBase(TestRemoteRepository):
 
2217
    """Base class for Repository.insert_stream and .insert_stream_1.19
 
2218
    tests.
 
2219
    """
 
2220
    
 
2221
    def checkInsertEmptyStream(self, repo, client):
 
2222
        """Insert an empty stream, checking the result.
 
2223
 
 
2224
        This checks that there are no resume_tokens or missing_keys, and that
 
2225
        the client is finished.
 
2226
        """
 
2227
        sink = repo._get_sink()
 
2228
        fmt = repository.RepositoryFormat.get_default_format()
 
2229
        resume_tokens, missing_keys = sink.insert_stream([], fmt, [])
 
2230
        self.assertEqual([], resume_tokens)
 
2231
        self.assertEqual(set(), missing_keys)
 
2232
        self.assertFinished(client)
 
2233
 
 
2234
 
 
2235
class TestRepositoryInsertStream(TestRepositoryInsertStreamBase):
 
2236
    """Tests for using Repository.insert_stream verb when the _1.19 variant is
 
2237
    not available.
 
2238
 
 
2239
    This test case is very similar to TestRepositoryInsertStream_1_19.
 
2240
    """
 
2241
 
 
2242
    def setUp(self):
 
2243
        TestRemoteRepository.setUp(self)
 
2244
        self.disable_verb('Repository.insert_stream_1.19')
 
2245
 
 
2246
    def test_unlocked_repo(self):
 
2247
        transport_path = 'quack'
 
2248
        repo, client = self.setup_fake_client_and_repository(transport_path)
 
2249
        client.add_expected_call(
 
2250
            'Repository.insert_stream_1.19', ('quack/', ''),
 
2251
            'unknown', ('Repository.insert_stream_1.19',))
 
2252
        client.add_expected_call(
 
2253
            'Repository.insert_stream', ('quack/', ''),
 
2254
            'success', ('ok',))
 
2255
        client.add_expected_call(
 
2256
            'Repository.insert_stream', ('quack/', ''),
 
2257
            'success', ('ok',))
 
2258
        self.checkInsertEmptyStream(repo, client)
 
2259
 
 
2260
    def test_locked_repo_with_no_lock_token(self):
 
2261
        transport_path = 'quack'
 
2262
        repo, client = self.setup_fake_client_and_repository(transport_path)
 
2263
        client.add_expected_call(
 
2264
            'Repository.lock_write', ('quack/', ''),
 
2265
            'success', ('ok', ''))
 
2266
        client.add_expected_call(
 
2267
            'Repository.insert_stream_1.19', ('quack/', ''),
 
2268
            'unknown', ('Repository.insert_stream_1.19',))
 
2269
        client.add_expected_call(
 
2270
            'Repository.insert_stream', ('quack/', ''),
 
2271
            'success', ('ok',))
 
2272
        client.add_expected_call(
 
2273
            'Repository.insert_stream', ('quack/', ''),
 
2274
            'success', ('ok',))
 
2275
        repo.lock_write()
 
2276
        self.checkInsertEmptyStream(repo, client)
 
2277
 
 
2278
    def test_locked_repo_with_lock_token(self):
 
2279
        transport_path = 'quack'
 
2280
        repo, client = self.setup_fake_client_and_repository(transport_path)
 
2281
        client.add_expected_call(
 
2282
            'Repository.lock_write', ('quack/', ''),
 
2283
            'success', ('ok', 'a token'))
 
2284
        client.add_expected_call(
 
2285
            'Repository.insert_stream_1.19', ('quack/', '', 'a token'),
 
2286
            'unknown', ('Repository.insert_stream_1.19',))
 
2287
        client.add_expected_call(
 
2288
            'Repository.insert_stream_locked', ('quack/', '', 'a token'),
 
2289
            'success', ('ok',))
 
2290
        client.add_expected_call(
 
2291
            'Repository.insert_stream_locked', ('quack/', '', 'a token'),
 
2292
            'success', ('ok',))
 
2293
        repo.lock_write()
 
2294
        self.checkInsertEmptyStream(repo, client)
 
2295
 
 
2296
    def test_stream_with_inventory_deltas(self):
 
2297
        """'inventory-deltas' substreams cannot be sent to the
 
2298
        Repository.insert_stream verb, because not all servers that implement
 
2299
        that verb will accept them.  So when one is encountered the RemoteSink
 
2300
        immediately stops using that verb and falls back to VFS insert_stream.
 
2301
        """
 
2302
        transport_path = 'quack'
 
2303
        repo, client = self.setup_fake_client_and_repository(transport_path)
 
2304
        client.add_expected_call(
 
2305
            'Repository.insert_stream_1.19', ('quack/', ''),
 
2306
            'unknown', ('Repository.insert_stream_1.19',))
 
2307
        client.add_expected_call(
 
2308
            'Repository.insert_stream', ('quack/', ''),
 
2309
            'success', ('ok',))
 
2310
        client.add_expected_call(
 
2311
            'Repository.insert_stream', ('quack/', ''),
 
2312
            'success', ('ok',))
 
2313
        # Create a fake real repository for insert_stream to fall back on, so
 
2314
        # that we can directly see the records the RemoteSink passes to the
 
2315
        # real sink.
 
2316
        class FakeRealSink:
 
2317
            def __init__(self):
 
2318
                self.records = []
 
2319
            def insert_stream(self, stream, src_format, resume_tokens):
 
2320
                for substream_kind, substream in stream:
 
2321
                    self.records.append(
 
2322
                        (substream_kind, [record.key for record in substream]))
 
2323
                return ['fake tokens'], ['fake missing keys']
 
2324
        fake_real_sink = FakeRealSink()
 
2325
        class FakeRealRepository:
 
2326
            def _get_sink(self):
 
2327
                return fake_real_sink
 
2328
        repo._real_repository = FakeRealRepository()
 
2329
        sink = repo._get_sink()
 
2330
        fmt = repository.RepositoryFormat.get_default_format()
 
2331
        stream = self.make_stream_with_inv_deltas(fmt)
 
2332
        resume_tokens, missing_keys = sink.insert_stream(stream, fmt, [])
 
2333
        # Every record from the first inventory delta should have been sent to
 
2334
        # the VFS sink.
 
2335
        expected_records = [
 
2336
            ('inventory-deltas', [('rev2',), ('rev3',)]),
 
2337
            ('texts', [('some-rev', 'some-file')])]
 
2338
        self.assertEqual(expected_records, fake_real_sink.records)
 
2339
        # The return values from the real sink's insert_stream are propagated
 
2340
        # back to the original caller.
 
2341
        self.assertEqual(['fake tokens'], resume_tokens)
 
2342
        self.assertEqual(['fake missing keys'], missing_keys)
 
2343
        self.assertFinished(client)
 
2344
 
 
2345
    def make_stream_with_inv_deltas(self, fmt):
 
2346
        """Make a simple stream with an inventory delta followed by more
 
2347
        records and more substreams to test that all records and substreams
 
2348
        from that point on are used.
 
2349
 
 
2350
        This sends, in order:
 
2351
           * inventories substream: rev1, rev2, rev3.  rev2 and rev3 are
 
2352
             inventory-deltas.
 
2353
           * texts substream: (some-rev, some-file)
 
2354
        """
 
2355
        # Define a stream using generators so that it isn't rewindable.
 
2356
        inv = inventory.Inventory(revision_id='rev1')
 
2357
        def stream_with_inv_delta():
 
2358
            yield ('inventories', inventories_substream())
 
2359
            yield ('inventory-deltas', inventory_delta_substream())
 
2360
            yield ('texts', [
 
2361
                versionedfile.FulltextContentFactory(
 
2362
                    ('some-rev', 'some-file'), (), None, 'content')])
 
2363
        def inventories_substream():
 
2364
            # An empty inventory fulltext.  This will be streamed normally.
 
2365
            text = fmt._serializer.write_inventory_to_string(inv)
 
2366
            yield versionedfile.FulltextContentFactory(
 
2367
                ('rev1',), (), None, text)
 
2368
        def inventory_delta_substream():
 
2369
            # An inventory delta.  This can't be streamed via this verb, so it
 
2370
            # will trigger a fallback to VFS insert_stream.
 
2371
            entry = inv.make_entry(
 
2372
                'directory', 'newdir', inv.root.file_id, 'newdir-id')
 
2373
            entry.revision = 'ghost'
 
2374
            delta = [(None, 'newdir', 'newdir-id', entry)]
 
2375
            serializer = inventory_delta.InventoryDeltaSerializer(
 
2376
                versioned_root=True, tree_references=False)
 
2377
            lines = serializer.delta_to_lines('rev1', 'rev2', delta)
 
2378
            yield versionedfile.ChunkedContentFactory(
 
2379
                ('rev2',), (('rev1',)), None, lines)
 
2380
            # Another delta.
 
2381
            lines = serializer.delta_to_lines('rev1', 'rev3', delta)
 
2382
            yield versionedfile.ChunkedContentFactory(
 
2383
                ('rev3',), (('rev1',)), None, lines)
 
2384
        return stream_with_inv_delta()
 
2385
 
 
2386
 
 
2387
class TestRepositoryInsertStream_1_19(TestRepositoryInsertStreamBase):
 
2388
 
 
2389
    def test_unlocked_repo(self):
 
2390
        transport_path = 'quack'
 
2391
        repo, client = self.setup_fake_client_and_repository(transport_path)
 
2392
        client.add_expected_call(
 
2393
            'Repository.insert_stream_1.19', ('quack/', ''),
 
2394
            'success', ('ok',))
 
2395
        client.add_expected_call(
 
2396
            'Repository.insert_stream_1.19', ('quack/', ''),
 
2397
            'success', ('ok',))
 
2398
        self.checkInsertEmptyStream(repo, client)
 
2399
 
 
2400
    def test_locked_repo_with_no_lock_token(self):
 
2401
        transport_path = 'quack'
 
2402
        repo, client = self.setup_fake_client_and_repository(transport_path)
 
2403
        client.add_expected_call(
 
2404
            'Repository.lock_write', ('quack/', ''),
 
2405
            'success', ('ok', ''))
 
2406
        client.add_expected_call(
 
2407
            'Repository.insert_stream_1.19', ('quack/', ''),
 
2408
            'success', ('ok',))
 
2409
        client.add_expected_call(
 
2410
            'Repository.insert_stream_1.19', ('quack/', ''),
 
2411
            'success', ('ok',))
 
2412
        repo.lock_write()
 
2413
        self.checkInsertEmptyStream(repo, client)
 
2414
 
 
2415
    def test_locked_repo_with_lock_token(self):
 
2416
        transport_path = 'quack'
 
2417
        repo, client = self.setup_fake_client_and_repository(transport_path)
 
2418
        client.add_expected_call(
 
2419
            'Repository.lock_write', ('quack/', ''),
 
2420
            'success', ('ok', 'a token'))
 
2421
        client.add_expected_call(
 
2422
            'Repository.insert_stream_1.19', ('quack/', '', 'a token'),
 
2423
            'success', ('ok',))
 
2424
        client.add_expected_call(
 
2425
            'Repository.insert_stream_1.19', ('quack/', '', 'a token'),
 
2426
            'success', ('ok',))
 
2427
        repo.lock_write()
 
2428
        self.checkInsertEmptyStream(repo, client)
2279
2429
 
2280
2430
 
2281
2431
class TestRepositoryTarball(TestRemoteRepository):