2219
2213
self.assertEqual([], client._calls)
2222
class TestRepositoryInsertStream(TestRemoteRepository):
2224
def test_unlocked_repo(self):
2225
transport_path = 'quack'
2226
repo, client = self.setup_fake_client_and_repository(transport_path)
2227
client.add_expected_call(
2228
'Repository.insert_stream', ('quack/', ''),
2230
client.add_expected_call(
2231
'Repository.insert_stream', ('quack/', ''),
2233
sink = repo._get_sink()
2234
fmt = repository.RepositoryFormat.get_default_format()
2235
resume_tokens, missing_keys = sink.insert_stream([], fmt, [])
2236
self.assertEqual([], resume_tokens)
2237
self.assertEqual(set(), missing_keys)
2238
self.assertFinished(client)
2240
def test_locked_repo_with_no_lock_token(self):
2241
transport_path = 'quack'
2242
repo, client = self.setup_fake_client_and_repository(transport_path)
2243
client.add_expected_call(
2244
'Repository.lock_write', ('quack/', ''),
2245
'success', ('ok', ''))
2246
client.add_expected_call(
2247
'Repository.insert_stream', ('quack/', ''),
2249
client.add_expected_call(
2250
'Repository.insert_stream', ('quack/', ''),
2253
sink = repo._get_sink()
2254
fmt = repository.RepositoryFormat.get_default_format()
2255
resume_tokens, missing_keys = sink.insert_stream([], fmt, [])
2256
self.assertEqual([], resume_tokens)
2257
self.assertEqual(set(), missing_keys)
2258
self.assertFinished(client)
2260
def test_locked_repo_with_lock_token(self):
2261
transport_path = 'quack'
2262
repo, client = self.setup_fake_client_and_repository(transport_path)
2263
client.add_expected_call(
2264
'Repository.lock_write', ('quack/', ''),
2265
'success', ('ok', 'a token'))
2266
client.add_expected_call(
2267
'Repository.insert_stream_locked', ('quack/', '', 'a token'),
2269
client.add_expected_call(
2270
'Repository.insert_stream_locked', ('quack/', '', 'a token'),
2273
sink = repo._get_sink()
2274
fmt = repository.RepositoryFormat.get_default_format()
2275
resume_tokens, missing_keys = sink.insert_stream([], fmt, [])
2276
self.assertEqual([], resume_tokens)
2277
self.assertEqual(set(), missing_keys)
2278
self.assertFinished(client)
2216
class TestRepositoryInsertStreamBase(TestRemoteRepository):
2217
"""Base class for Repository.insert_stream and .insert_stream_1.19
2221
def checkInsertEmptyStream(self, repo, client):
2222
"""Insert an empty stream, checking the result.
2224
This checks that there are no resume_tokens or missing_keys, and that
2225
the client is finished.
2227
sink = repo._get_sink()
2228
fmt = repository.RepositoryFormat.get_default_format()
2229
resume_tokens, missing_keys = sink.insert_stream([], fmt, [])
2230
self.assertEqual([], resume_tokens)
2231
self.assertEqual(set(), missing_keys)
2232
self.assertFinished(client)
2235
class TestRepositoryInsertStream(TestRepositoryInsertStreamBase):
2236
"""Tests for using Repository.insert_stream verb when the _1.19 variant is
2239
This test case is very similar to TestRepositoryInsertStream_1_19.
2243
TestRemoteRepository.setUp(self)
2244
self.disable_verb('Repository.insert_stream_1.19')
2246
def test_unlocked_repo(self):
2247
transport_path = 'quack'
2248
repo, client = self.setup_fake_client_and_repository(transport_path)
2249
client.add_expected_call(
2250
'Repository.insert_stream_1.19', ('quack/', ''),
2251
'unknown', ('Repository.insert_stream_1.19',))
2252
client.add_expected_call(
2253
'Repository.insert_stream', ('quack/', ''),
2255
client.add_expected_call(
2256
'Repository.insert_stream', ('quack/', ''),
2258
self.checkInsertEmptyStream(repo, client)
2260
def test_locked_repo_with_no_lock_token(self):
2261
transport_path = 'quack'
2262
repo, client = self.setup_fake_client_and_repository(transport_path)
2263
client.add_expected_call(
2264
'Repository.lock_write', ('quack/', ''),
2265
'success', ('ok', ''))
2266
client.add_expected_call(
2267
'Repository.insert_stream_1.19', ('quack/', ''),
2268
'unknown', ('Repository.insert_stream_1.19',))
2269
client.add_expected_call(
2270
'Repository.insert_stream', ('quack/', ''),
2272
client.add_expected_call(
2273
'Repository.insert_stream', ('quack/', ''),
2276
self.checkInsertEmptyStream(repo, client)
2278
def test_locked_repo_with_lock_token(self):
2279
transport_path = 'quack'
2280
repo, client = self.setup_fake_client_and_repository(transport_path)
2281
client.add_expected_call(
2282
'Repository.lock_write', ('quack/', ''),
2283
'success', ('ok', 'a token'))
2284
client.add_expected_call(
2285
'Repository.insert_stream_1.19', ('quack/', '', 'a token'),
2286
'unknown', ('Repository.insert_stream_1.19',))
2287
client.add_expected_call(
2288
'Repository.insert_stream_locked', ('quack/', '', 'a token'),
2290
client.add_expected_call(
2291
'Repository.insert_stream_locked', ('quack/', '', 'a token'),
2294
self.checkInsertEmptyStream(repo, client)
2296
def test_stream_with_inventory_deltas(self):
2297
"""'inventory-deltas' substreams cannot be sent to the
2298
Repository.insert_stream verb, because not all servers that implement
2299
that verb will accept them. So when one is encountered the RemoteSink
2300
immediately stops using that verb and falls back to VFS insert_stream.
2302
transport_path = 'quack'
2303
repo, client = self.setup_fake_client_and_repository(transport_path)
2304
client.add_expected_call(
2305
'Repository.insert_stream_1.19', ('quack/', ''),
2306
'unknown', ('Repository.insert_stream_1.19',))
2307
client.add_expected_call(
2308
'Repository.insert_stream', ('quack/', ''),
2310
client.add_expected_call(
2311
'Repository.insert_stream', ('quack/', ''),
2313
# Create a fake real repository for insert_stream to fall back on, so
2314
# that we can directly see the records the RemoteSink passes to the
2319
def insert_stream(self, stream, src_format, resume_tokens):
2320
for substream_kind, substream in stream:
2321
self.records.append(
2322
(substream_kind, [record.key for record in substream]))
2323
return ['fake tokens'], ['fake missing keys']
2324
fake_real_sink = FakeRealSink()
2325
class FakeRealRepository:
2326
def _get_sink(self):
2327
return fake_real_sink
2328
repo._real_repository = FakeRealRepository()
2329
sink = repo._get_sink()
2330
fmt = repository.RepositoryFormat.get_default_format()
2331
stream = self.make_stream_with_inv_deltas(fmt)
2332
resume_tokens, missing_keys = sink.insert_stream(stream, fmt, [])
2333
# Every record from the first inventory delta should have been sent to
2335
expected_records = [
2336
('inventory-deltas', [('rev2',), ('rev3',)]),
2337
('texts', [('some-rev', 'some-file')])]
2338
self.assertEqual(expected_records, fake_real_sink.records)
2339
# The return values from the real sink's insert_stream are propagated
2340
# back to the original caller.
2341
self.assertEqual(['fake tokens'], resume_tokens)
2342
self.assertEqual(['fake missing keys'], missing_keys)
2343
self.assertFinished(client)
2345
def make_stream_with_inv_deltas(self, fmt):
2346
"""Make a simple stream with an inventory delta followed by more
2347
records and more substreams to test that all records and substreams
2348
from that point on are used.
2350
This sends, in order:
2351
* inventories substream: rev1, rev2, rev3. rev2 and rev3 are
2353
* texts substream: (some-rev, some-file)
2355
# Define a stream using generators so that it isn't rewindable.
2356
inv = inventory.Inventory(revision_id='rev1')
2357
def stream_with_inv_delta():
2358
yield ('inventories', inventories_substream())
2359
yield ('inventory-deltas', inventory_delta_substream())
2361
versionedfile.FulltextContentFactory(
2362
('some-rev', 'some-file'), (), None, 'content')])
2363
def inventories_substream():
2364
# An empty inventory fulltext. This will be streamed normally.
2365
text = fmt._serializer.write_inventory_to_string(inv)
2366
yield versionedfile.FulltextContentFactory(
2367
('rev1',), (), None, text)
2368
def inventory_delta_substream():
2369
# An inventory delta. This can't be streamed via this verb, so it
2370
# will trigger a fallback to VFS insert_stream.
2371
entry = inv.make_entry(
2372
'directory', 'newdir', inv.root.file_id, 'newdir-id')
2373
entry.revision = 'ghost'
2374
delta = [(None, 'newdir', 'newdir-id', entry)]
2375
serializer = inventory_delta.InventoryDeltaSerializer(
2376
versioned_root=True, tree_references=False)
2377
lines = serializer.delta_to_lines('rev1', 'rev2', delta)
2378
yield versionedfile.ChunkedContentFactory(
2379
('rev2',), (('rev1',)), None, lines)
2381
lines = serializer.delta_to_lines('rev1', 'rev3', delta)
2382
yield versionedfile.ChunkedContentFactory(
2383
('rev3',), (('rev1',)), None, lines)
2384
return stream_with_inv_delta()
2387
class TestRepositoryInsertStream_1_19(TestRepositoryInsertStreamBase):
2389
def test_unlocked_repo(self):
2390
transport_path = 'quack'
2391
repo, client = self.setup_fake_client_and_repository(transport_path)
2392
client.add_expected_call(
2393
'Repository.insert_stream_1.19', ('quack/', ''),
2395
client.add_expected_call(
2396
'Repository.insert_stream_1.19', ('quack/', ''),
2398
self.checkInsertEmptyStream(repo, client)
2400
def test_locked_repo_with_no_lock_token(self):
2401
transport_path = 'quack'
2402
repo, client = self.setup_fake_client_and_repository(transport_path)
2403
client.add_expected_call(
2404
'Repository.lock_write', ('quack/', ''),
2405
'success', ('ok', ''))
2406
client.add_expected_call(
2407
'Repository.insert_stream_1.19', ('quack/', ''),
2409
client.add_expected_call(
2410
'Repository.insert_stream_1.19', ('quack/', ''),
2413
self.checkInsertEmptyStream(repo, client)
2415
def test_locked_repo_with_lock_token(self):
2416
transport_path = 'quack'
2417
repo, client = self.setup_fake_client_and_repository(transport_path)
2418
client.add_expected_call(
2419
'Repository.lock_write', ('quack/', ''),
2420
'success', ('ok', 'a token'))
2421
client.add_expected_call(
2422
'Repository.insert_stream_1.19', ('quack/', '', 'a token'),
2424
client.add_expected_call(
2425
'Repository.insert_stream_1.19', ('quack/', '', 'a token'),
2428
self.checkInsertEmptyStream(repo, client)
2281
2431
class TestRepositoryTarball(TestRemoteRepository):