85
91
self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
86
92
self.assertRaises(StopIteration, it.next)
95
class MockTransport(object):
97
def __init__(self, file_lines=None):
98
self.file_lines = file_lines
101
def get(self, filename):
102
if self.file_lines is None:
103
raise NoSuchFile(filename)
105
return StringIO("\n".join(self.file_lines))
107
def __getattr__(self, name):
108
def queue_call(*args, **kwargs):
109
self.calls.append((name, args, kwargs))
113
class LowLevelKnitIndexTests(TestCase):
115
def test_no_such_file(self):
116
transport = MockTransport()
118
self.assertRaises(NoSuchFile, _KnitIndex, transport, "filename", "r")
119
self.assertRaises(NoSuchFile, _KnitIndex, transport,
120
"filename", "w", create=False)
122
def test_create_file(self):
123
transport = MockTransport()
125
index = _KnitIndex(transport, "filename", "w",
126
file_mode="wb", create=True)
128
("put_bytes_non_atomic",
129
("filename", index.HEADER), {"mode": "wb"}),
130
transport.calls.pop(0))
132
def test_delay_create_file(self):
133
transport = MockTransport()
135
index = _KnitIndex(transport, "filename", "w",
136
create=True, file_mode="wb", create_parent_dir=True,
137
delay_create=True, dir_mode=0777)
138
self.assertEqual([], transport.calls)
140
index.add_versions([])
141
name, (filename, f), kwargs = transport.calls.pop(0)
142
self.assertEqual("put_file_non_atomic", name)
144
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
146
self.assertEqual("filename", filename)
147
self.assertEqual(index.HEADER, f.read())
149
index.add_versions([])
150
self.assertEqual(("append_bytes", ("filename", ""), {}),
151
transport.calls.pop(0))
153
def test_read_utf8_version_id(self):
154
transport = MockTransport([
156
u"version-\N{CYRILLIC CAPITAL LETTER A}"
157
u" option 0 1 :".encode("utf-8")
159
index = _KnitIndex(transport, "filename", "r")
161
index.has_version(u"version-\N{CYRILLIC CAPITAL LETTER A}"))
163
def test_read_utf8_parents(self):
164
transport = MockTransport([
166
u"version option 0 1"
167
u" .version-\N{CYRILLIC CAPITAL LETTER A} :".encode("utf-8")
169
index = _KnitIndex(transport, "filename", "r")
170
self.assertEqual([u"version-\N{CYRILLIC CAPITAL LETTER A}"],
171
index.get_parents_with_ghosts("version"))
173
def test_read_ignore_corrupted_lines(self):
174
transport = MockTransport([
177
"corrupted options 0 1 .b .c ",
178
"version options 0 1 :"
180
index = _KnitIndex(transport, "filename", "r")
181
self.assertEqual(1, index.num_versions())
182
self.assertTrue(index.has_version(u"version"))
184
def test_read_corrupted_header(self):
185
transport = MockTransport([])
186
self.assertRaises(KnitHeaderError,
187
_KnitIndex, transport, "filename", "r")
189
def test_read_duplicate_entries(self):
190
transport = MockTransport([
192
"parent options 0 1 :",
193
"version options1 0 1 0 :",
194
"version options2 1 2 .other :",
195
"version options3 3 4 0 .other :"
197
index = _KnitIndex(transport, "filename", "r")
198
self.assertEqual(2, index.num_versions())
199
self.assertEqual(1, index.lookup(u"version"))
200
self.assertEqual((3, 4), index.get_position(u"version"))
201
self.assertEqual(["options3"], index.get_options(u"version"))
202
self.assertEqual([u"parent", u"other"],
203
index.get_parents_with_ghosts(u"version"))
205
def test_read_compressed_parents(self):
206
transport = MockTransport([
210
"c option 0 1 1 0 :",
212
index = _KnitIndex(transport, "filename", "r")
213
self.assertEqual([u"a"], index.get_parents(u"b"))
214
self.assertEqual([u"b", u"a"], index.get_parents(u"c"))
216
def test_write_utf8_version_id(self):
217
transport = MockTransport([
220
index = _KnitIndex(transport, "filename", "r")
221
index.add_version(u"version-\N{CYRILLIC CAPITAL LETTER A}",
222
["option"], 0, 1, [])
223
self.assertEqual(("append_bytes", ("filename",
224
u"\nversion-\N{CYRILLIC CAPITAL LETTER A}"
225
u" option 0 1 :".encode("utf-8")),
227
transport.calls.pop(0))
229
def test_write_utf8_parents(self):
230
transport = MockTransport([
233
index = _KnitIndex(transport, "filename", "r")
234
index.add_version(u"version", ["option"], 0, 1,
235
[u"version-\N{CYRILLIC CAPITAL LETTER A}"])
236
self.assertEqual(("append_bytes", ("filename",
237
u"\nversion option 0 1"
238
u" .version-\N{CYRILLIC CAPITAL LETTER A} :".encode("utf-8")),
240
transport.calls.pop(0))
242
def test_get_graph(self):
243
transport = MockTransport()
244
index = _KnitIndex(transport, "filename", "w", create=True)
245
self.assertEqual([], index.get_graph())
247
index.add_version(u"a", ["option"], 0, 1, [u"b"])
248
self.assertEqual([(u"a", [u"b"])], index.get_graph())
250
index.add_version(u"c", ["option"], 0, 1, [u"d"])
251
self.assertEqual([(u"a", [u"b"]), (u"c", [u"d"])],
252
sorted(index.get_graph()))
254
def test_get_ancestry(self):
255
transport = MockTransport([
258
"b option 0 1 0 .e :",
259
"c option 0 1 1 0 :",
260
"d option 0 1 2 .f :"
262
index = _KnitIndex(transport, "filename", "r")
264
self.assertEqual([], index.get_ancestry([]))
265
self.assertEqual([u"a"], index.get_ancestry([u"a"]))
266
self.assertEqual([u"a", u"b"], index.get_ancestry([u"b"]))
267
self.assertEqual([u"a", u"b", u"c"], index.get_ancestry([u"c"]))
268
self.assertEqual([u"a", u"b", u"c", u"d"], index.get_ancestry([u"d"]))
269
self.assertEqual([u"a", u"b"], index.get_ancestry([u"a", u"b"]))
270
self.assertEqual([u"a", u"b", u"c"], index.get_ancestry([u"a", u"c"]))
272
self.assertRaises(RevisionNotPresent, index.get_ancestry, [u"e"])
274
def test_get_ancestry_with_ghosts(self):
275
transport = MockTransport([
278
"b option 0 1 0 .e :",
279
"c option 0 1 0 .f .g :",
280
"d option 0 1 2 .h .j .k :"
282
index = _KnitIndex(transport, "filename", "r")
284
self.assertEqual([], index.get_ancestry_with_ghosts([]))
285
self.assertEqual([u"a"], index.get_ancestry_with_ghosts([u"a"]))
286
self.assertEqual([u"a", u"e", u"b"],
287
index.get_ancestry_with_ghosts([u"b"]))
288
self.assertEqual([u"a", u"g", u"f", u"c"],
289
index.get_ancestry_with_ghosts([u"c"]))
290
self.assertEqual([u"a", u"g", u"f", u"c", u"k", u"j", u"h", u"d"],
291
index.get_ancestry_with_ghosts([u"d"]))
292
self.assertEqual([u"a", u"e", u"b"],
293
index.get_ancestry_with_ghosts([u"a", u"b"]))
294
self.assertEqual([u"a", u"g", u"f", u"c"],
295
index.get_ancestry_with_ghosts([u"a", u"c"]))
297
[u"a", u"g", u"f", u"c", u"e", u"b", u"k", u"j", u"h", u"d"],
298
index.get_ancestry_with_ghosts([u"b", u"d"]))
300
self.assertRaises(RevisionNotPresent,
301
index.get_ancestry_with_ghosts, [u"e"])
303
def test_num_versions(self):
304
transport = MockTransport([
307
index = _KnitIndex(transport, "filename", "r")
309
self.assertEqual(0, index.num_versions())
310
self.assertEqual(0, len(index))
312
index.add_version(u"a", ["option"], 0, 1, [])
313
self.assertEqual(1, index.num_versions())
314
self.assertEqual(1, len(index))
316
index.add_version(u"a", ["option2"], 1, 2, [])
317
self.assertEqual(1, index.num_versions())
318
self.assertEqual(1, len(index))
320
index.add_version(u"b", ["option"], 0, 1, [])
321
self.assertEqual(2, index.num_versions())
322
self.assertEqual(2, len(index))
324
def test_get_versions(self):
325
transport = MockTransport([
328
index = _KnitIndex(transport, "filename", "r")
330
self.assertEqual([], index.get_versions())
332
index.add_version(u"a", ["option"], 0, 1, [])
333
self.assertEqual([u"a"], index.get_versions())
335
index.add_version(u"a", ["option"], 0, 1, [])
336
self.assertEqual([u"a"], index.get_versions())
338
index.add_version(u"b", ["option"], 0, 1, [])
339
self.assertEqual([u"a", u"b"], index.get_versions())
341
def test_idx_to_name(self):
342
transport = MockTransport([
347
index = _KnitIndex(transport, "filename", "r")
349
self.assertEqual(u"a", index.idx_to_name(0))
350
self.assertEqual(u"b", index.idx_to_name(1))
351
self.assertEqual(u"b", index.idx_to_name(-1))
352
self.assertEqual(u"a", index.idx_to_name(-2))
354
def test_lookup(self):
355
transport = MockTransport([
360
index = _KnitIndex(transport, "filename", "r")
362
self.assertEqual(0, index.lookup(u"a"))
363
self.assertEqual(1, index.lookup(u"b"))
365
def test_add_version(self):
366
transport = MockTransport([
369
index = _KnitIndex(transport, "filename", "r")
371
index.add_version(u"a", ["option"], 0, 1, [u"b"])
372
self.assertEqual(("append_bytes",
373
("filename", "\na option 0 1 .b :"),
374
{}), transport.calls.pop(0))
375
self.assertTrue(index.has_version(u"a"))
376
self.assertEqual(1, index.num_versions())
377
self.assertEqual((0, 1), index.get_position(u"a"))
378
self.assertEqual(["option"], index.get_options(u"a"))
379
self.assertEqual([u"b"], index.get_parents_with_ghosts(u"a"))
381
index.add_version(u"a", ["opt"], 1, 2, [u"c"])
382
self.assertEqual(("append_bytes",
383
("filename", "\na opt 1 2 .c :"),
384
{}), transport.calls.pop(0))
385
self.assertTrue(index.has_version(u"a"))
386
self.assertEqual(1, index.num_versions())
387
self.assertEqual((1, 2), index.get_position(u"a"))
388
self.assertEqual(["opt"], index.get_options(u"a"))
389
self.assertEqual([u"c"], index.get_parents_with_ghosts(u"a"))
391
index.add_version(u"b", ["option"], 2, 3, [u"a"])
392
self.assertEqual(("append_bytes",
393
("filename", "\nb option 2 3 0 :"),
394
{}), transport.calls.pop(0))
395
self.assertTrue(index.has_version(u"b"))
396
self.assertEqual(2, index.num_versions())
397
self.assertEqual((2, 3), index.get_position(u"b"))
398
self.assertEqual(["option"], index.get_options(u"b"))
399
self.assertEqual([u"a"], index.get_parents_with_ghosts(u"b"))
401
def test_add_versions(self):
402
transport = MockTransport([
405
index = _KnitIndex(transport, "filename", "r")
408
(u"a", ["option"], 0, 1, [u"b"]),
409
(u"a", ["opt"], 1, 2, [u"c"]),
410
(u"b", ["option"], 2, 3, [u"a"])
412
self.assertEqual(("append_bytes", ("filename",
413
"\na option 0 1 .b :"
416
), {}), transport.calls.pop(0))
417
self.assertTrue(index.has_version(u"a"))
418
self.assertTrue(index.has_version(u"b"))
419
self.assertEqual(2, index.num_versions())
420
self.assertEqual((1, 2), index.get_position(u"a"))
421
self.assertEqual((2, 3), index.get_position(u"b"))
422
self.assertEqual(["opt"], index.get_options(u"a"))
423
self.assertEqual(["option"], index.get_options(u"b"))
424
self.assertEqual([u"c"], index.get_parents_with_ghosts(u"a"))
425
self.assertEqual([u"a"], index.get_parents_with_ghosts(u"b"))
427
def test_delay_create_and_add_versions(self):
428
transport = MockTransport()
430
index = _KnitIndex(transport, "filename", "w",
431
create=True, file_mode="wb", create_parent_dir=True,
432
delay_create=True, dir_mode=0777)
433
self.assertEqual([], transport.calls)
436
(u"a", ["option"], 0, 1, [u"b"]),
437
(u"a", ["opt"], 1, 2, [u"c"]),
438
(u"b", ["option"], 2, 3, [u"a"])
440
name, (filename, f), kwargs = transport.calls.pop(0)
441
self.assertEqual("put_file_non_atomic", name)
443
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
445
self.assertEqual("filename", filename)
448
"\na option 0 1 .b :"
450
"\nb option 2 3 0 :",
453
def test_has_version(self):
454
transport = MockTransport([
458
index = _KnitIndex(transport, "filename", "r")
460
self.assertTrue(index.has_version(u"a"))
461
self.assertFalse(index.has_version(u"b"))
463
def test_get_position(self):
464
transport = MockTransport([
469
index = _KnitIndex(transport, "filename", "r")
471
self.assertEqual((0, 1), index.get_position(u"a"))
472
self.assertEqual((1, 2), index.get_position(u"b"))
474
def test_get_method(self):
475
transport = MockTransport([
477
"a fulltext,unknown 0 1 :",
478
"b unknown,line-delta 1 2 :",
481
index = _KnitIndex(transport, "filename", "r")
483
self.assertEqual("fulltext", index.get_method(u"a"))
484
self.assertEqual("line-delta", index.get_method(u"b"))
485
self.assertRaises(AssertionError, index.get_method, u"c")
487
def test_get_options(self):
488
transport = MockTransport([
493
index = _KnitIndex(transport, "filename", "r")
495
self.assertEqual(["opt1"], index.get_options(u"a"))
496
self.assertEqual(["opt2", "opt3"], index.get_options(u"b"))
498
def test_get_parents(self):
499
transport = MockTransport([
502
"b option 1 2 0 .c :",
503
"c option 1 2 1 0 .e :"
505
index = _KnitIndex(transport, "filename", "r")
507
self.assertEqual([], index.get_parents(u"a"))
508
self.assertEqual([u"a", u"c"], index.get_parents(u"b"))
509
self.assertEqual([u"b", u"a"], index.get_parents(u"c"))
511
def test_get_parents_with_ghosts(self):
512
transport = MockTransport([
515
"b option 1 2 0 .c :",
516
"c option 1 2 1 0 .e :"
518
index = _KnitIndex(transport, "filename", "r")
520
self.assertEqual([], index.get_parents_with_ghosts(u"a"))
521
self.assertEqual([u"a", u"c"], index.get_parents_with_ghosts(u"b"))
522
self.assertEqual([u"b", u"a", u"e"],
523
index.get_parents_with_ghosts(u"c"))
525
def test_check_versions_present(self):
526
transport = MockTransport([
531
index = _KnitIndex(transport, "filename", "r")
533
check = index.check_versions_present
539
self.assertRaises(RevisionNotPresent, check, [u"c"])
540
self.assertRaises(RevisionNotPresent, check, [u"a", u"b", u"c"])
88
543
class KnitTests(TestCaseWithTransport):
89
544
"""Class containing knit test helper routines."""