141
151
texts/deltas (via (fileid, revisionid) tuples).
142
152
:param signature_index: A GraphIndex for determining what signatures are
143
153
present in the Pack and accessing the locations of their texts.
154
:param chk_index: A GraphIndex for accessing content by CHK, if the
145
157
self.revision_index = revision_index
146
158
self.inventory_index = inventory_index
147
159
self.text_index = text_index
148
160
self.signature_index = signature_index
161
self.chk_index = chk_index
150
163
def access_tuple(self):
151
164
"""Return a tuple (transport, name) for the pack content."""
152
165
return self.pack_transport, self.file_name()
167
def _check_references(self):
168
"""Make sure our external references are present.
170
Packs are allowed to have deltas whose base is not in the pack, but it
171
must be present somewhere in this collection. It is not allowed to
172
have deltas based on a fallback repository.
173
(See <https://bugs.launchpad.net/bzr/+bug/288751>)
176
for (index_name, external_refs, index) in [
178
self._get_external_refs(self.text_index),
179
self._pack_collection.text_index.combined_index),
181
self._get_external_refs(self.inventory_index),
182
self._pack_collection.inventory_index.combined_index),
184
missing = external_refs.difference(
185
k for (idx, k, v, r) in
186
index.iter_entries(external_refs))
188
missing_items[index_name] = sorted(list(missing))
190
from pprint import pformat
191
raise errors.BzrCheckError(
192
"Newly created pack file %r has delta references to "
193
"items not in its repository:\n%s"
194
% (self, pformat(missing_items)))
154
196
def file_name(self):
155
197
"""Get the file name for the pack on disk."""
156
198
return self.name + '.pack'
174
224
"""The text index is the name + .tix."""
175
225
return self.index_name('text', name)
227
def _replace_index_with_readonly(self, index_type):
228
unlimited_cache = False
229
if index_type == 'chk':
230
unlimited_cache = True
231
setattr(self, index_type + '_index',
232
self.index_class(self.index_transport,
233
self.index_name(index_type, self.name),
234
self.index_sizes[self.index_offset(index_type)],
235
unlimited_cache=unlimited_cache))
178
238
class ExistingPack(Pack):
179
239
"""An in memory proxy for an existing .pack and its disk indices."""
181
241
def __init__(self, pack_transport, name, revision_index, inventory_index,
182
text_index, signature_index):
242
text_index, signature_index, chk_index=None):
183
243
"""Create an ExistingPack object.
185
245
:param pack_transport: The transport where the pack file resides.
186
246
:param name: The name of the pack on disk in the pack_transport.
188
248
Pack.__init__(self, revision_index, inventory_index, text_index,
249
signature_index, chk_index)
191
251
self.pack_transport = pack_transport
192
252
if None in (revision_index, inventory_index, text_index,
200
260
return not self.__eq__(other)
202
262
def __repr__(self):
203
return "<bzrlib.repofmt.pack_repo.Pack object at 0x%x, %s, %s" % (
204
id(self), self.pack_transport, self.name)
263
return "<%s.%s object at 0x%x, %s, %s" % (
264
self.__class__.__module__, self.__class__.__name__, id(self),
265
self.pack_transport, self.name)
268
class ResumedPack(ExistingPack):
270
def __init__(self, name, revision_index, inventory_index, text_index,
271
signature_index, upload_transport, pack_transport, index_transport,
272
pack_collection, chk_index=None):
273
"""Create a ResumedPack object."""
274
ExistingPack.__init__(self, pack_transport, name, revision_index,
275
inventory_index, text_index, signature_index,
277
self.upload_transport = upload_transport
278
self.index_transport = index_transport
279
self.index_sizes = [None, None, None, None]
281
('revision', revision_index),
282
('inventory', inventory_index),
283
('text', text_index),
284
('signature', signature_index),
286
if chk_index is not None:
287
indices.append(('chk', chk_index))
288
self.index_sizes.append(None)
289
for index_type, index in indices:
290
offset = self.index_offset(index_type)
291
self.index_sizes[offset] = index._size
292
self.index_class = pack_collection._index_class
293
self._pack_collection = pack_collection
294
self._state = 'resumed'
295
# XXX: perhaps check that the .pack file exists?
297
def access_tuple(self):
298
if self._state == 'finished':
299
return Pack.access_tuple(self)
300
elif self._state == 'resumed':
301
return self.upload_transport, self.file_name()
303
raise AssertionError(self._state)
306
self.upload_transport.delete(self.file_name())
307
indices = [self.revision_index, self.inventory_index, self.text_index,
308
self.signature_index]
309
if self.chk_index is not None:
310
indices.append(self.chk_index)
311
for index in indices:
312
index._transport.delete(index._name)
315
self._check_references()
316
index_types = ['revision', 'inventory', 'text', 'signature']
317
if self.chk_index is not None:
318
index_types.append('chk')
319
for index_type in index_types:
320
old_name = self.index_name(index_type, self.name)
321
new_name = '../indices/' + old_name
322
self.upload_transport.rename(old_name, new_name)
323
self._replace_index_with_readonly(index_type)
324
new_name = '../packs/' + self.file_name()
325
self.upload_transport.rename(self.file_name(), new_name)
326
self._state = 'finished'
328
def _get_external_refs(self, index):
329
"""Return compression parents for this index that are not present.
331
This returns any compression parents that are referenced by this index,
332
which are not contained *in* this index. They may be present elsewhere.
334
return index.external_references(1)
207
337
class NewPack(Pack):
208
338
"""An in memory proxy for a pack which is being created."""
210
# A map of index 'type' to the file extension and position in the
212
index_definitions = {
213
'revision': ('.rix', 0),
214
'inventory': ('.iix', 1),
216
'signature': ('.six', 3),
219
340
def __init__(self, pack_collection, upload_suffix='', file_mode=None):
220
341
"""Create a NewPack instance.
319
448
raise AssertionError(self._state)
321
def _check_references(self):
322
"""Make sure our external references are present.
324
Packs are allowed to have deltas whose base is not in the pack, but it
325
must be present somewhere in this collection. It is not allowed to
326
have deltas based on a fallback repository.
327
(See <https://bugs.launchpad.net/bzr/+bug/288751>)
330
for (index_name, external_refs, index) in [
332
self.text_index._external_references(),
333
self._pack_collection.text_index.combined_index),
335
self.inventory_index._external_references(),
336
self._pack_collection.inventory_index.combined_index),
338
missing = external_refs.difference(
339
k for (idx, k, v, r) in
340
index.iter_entries(external_refs))
342
missing_items[index_name] = sorted(list(missing))
344
from pprint import pformat
345
raise errors.BzrCheckError(
346
"Newly created pack file %r has delta references to "
347
"items not in its repository:\n%s"
348
% (self, pformat(missing_items)))
350
450
def data_inserted(self):
351
451
"""True if data has been added to this pack."""
352
452
return bool(self.get_revision_count() or
353
453
self.inventory_index.key_count() or
354
454
self.text_index.key_count() or
355
self.signature_index.key_count())
455
self.signature_index.key_count() or
456
(self.chk_index is not None and self.chk_index.key_count()))
458
def finish_content(self):
459
if self.name is not None:
463
self._write_data('', flush=True)
464
self.name = self._hash.hexdigest()
466
def finish(self, suspend=False):
358
467
"""Finish the new pack.
365
474
- stores the index size tuple for the pack in the index_sizes
370
self._write_data('', flush=True)
371
self.name = self._hash.hexdigest()
372
self._check_references()
477
self.finish_content()
479
self._check_references()
374
481
# XXX: It'd be better to write them all to temporary names, then
375
482
# rename them all into place, so that the window when only some are
376
483
# visible is smaller. On the other hand none will be seen until
377
484
# they're in the names list.
378
485
self.index_sizes = [None, None, None, None]
379
self._write_index('revision', self.revision_index, 'revision')
380
self._write_index('inventory', self.inventory_index, 'inventory')
381
self._write_index('text', self.text_index, 'file texts')
486
self._write_index('revision', self.revision_index, 'revision', suspend)
487
self._write_index('inventory', self.inventory_index, 'inventory',
489
self._write_index('text', self.text_index, 'file texts', suspend)
382
490
self._write_index('signature', self.signature_index,
383
'revision signatures')
491
'revision signatures', suspend)
492
if self.chk_index is not None:
493
self.index_sizes.append(None)
494
self._write_index('chk', self.chk_index,
495
'content hash bytes', suspend)
384
496
self.write_stream.close()
385
497
# Note that this will clobber an existing pack with the same name,
386
498
# without checking for hash collisions. While this is undesirable this
393
505
# - try for HASH.pack
394
506
# - try for temporary-name
395
507
# - refresh the pack-list to see if the pack is now absent
396
self.upload_transport.rename(self.random_name,
397
'../packs/' + self.name + '.pack')
508
new_name = self.name + '.pack'
510
new_name = '../packs/' + new_name
511
self.upload_transport.rename(self.random_name, new_name)
398
512
self._state = 'finished'
399
513
if 'pack' in debug.debug_flags:
400
514
# XXX: size might be interesting?
401
mutter('%s: create_pack: pack renamed into place: %s%s->%s%s t+%6.3fs',
515
mutter('%s: create_pack: pack finished: %s%s->%s t+%6.3fs',
402
516
time.ctime(), self.upload_transport.base, self.random_name,
403
self.pack_transport, self.name,
404
time.time() - self.start_time)
517
new_name, time.time() - self.start_time)
407
520
"""Flush any current data."""
411
524
self._hash.update(bytes)
412
525
self._buffer[:] = [[], 0]
414
def index_name(self, index_type, name):
415
"""Get the disk name of an index type for pack name 'name'."""
416
return name + NewPack.index_definitions[index_type][0]
418
def index_offset(self, index_type):
419
"""Get the position in a index_size array for a given index type."""
420
return NewPack.index_definitions[index_type][1]
422
def _replace_index_with_readonly(self, index_type):
423
setattr(self, index_type + '_index',
424
self.index_class(self.index_transport,
425
self.index_name(index_type, self.name),
426
self.index_sizes[self.index_offset(index_type)]))
527
def _get_external_refs(self, index):
528
return index._external_references()
428
530
def set_write_cache_size(self, size):
429
531
self._cache_limit = size
431
def _write_index(self, index_type, index, label):
533
def _write_index(self, index_type, index, label, suspend=False):
432
534
"""Write out an index.
434
536
:param index_type: The type of index to write - e.g. 'revision'.
436
538
:param label: What label to give the index e.g. 'revision'.
438
540
index_name = self.index_name(index_type, self.name)
439
self.index_sizes[self.index_offset(index_type)] = \
440
self.index_transport.put_file(index_name, index.finish(),
441
mode=self._file_mode)
542
transport = self.upload_transport
544
transport = self.index_transport
545
self.index_sizes[self.index_offset(index_type)] = transport.put_file(
546
index_name, index.finish(), mode=self._file_mode)
442
547
if 'pack' in debug.debug_flags:
443
548
# XXX: size might be interesting?
444
549
mutter('%s: create_pack: wrote %s index: %s%s t+%6.3fs',
445
550
time.ctime(), label, self.upload_transport.base,
446
551
self.random_name, time.time() - self.start_time)
447
# Replace the writable index on this object with a readonly,
552
# Replace the writable index on this object with a readonly,
448
553
# presently unloaded index. We should alter
449
554
# the index layer to make its finish() error if add_node is
450
555
# subsequently used. RBC
660
766
def open_pack(self):
661
767
"""Open a pack for the pack we are creating."""
662
return NewPack(self._pack_collection, upload_suffix=self.suffix,
768
new_pack = self._pack_collection.pack_factory(self._pack_collection,
769
upload_suffix=self.suffix,
663
770
file_mode=self._pack_collection.repo.bzrdir._get_file_mode())
771
# We know that we will process all nodes in order, and don't need to
772
# query, so don't combine any indices spilled to disk until we are done
773
new_pack.revision_index.set_optimize(combine_backing_indices=False)
774
new_pack.inventory_index.set_optimize(combine_backing_indices=False)
775
new_pack.text_index.set_optimize(combine_backing_indices=False)
776
new_pack.signature_index.set_optimize(combine_backing_indices=False)
665
779
def _update_pack_order(self, entries, index_to_pack_map):
666
780
"""Determine how we want our packs to be ordered.
822
937
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
823
938
new_pack.signature_index.key_count(),
824
939
time.time() - new_pack.start_time)
941
# NB XXX: how to check CHK references are present? perhaps by yielding
942
# the items? How should that interact with stacked repos?
943
if new_pack.chk_index is not None:
945
if 'pack' in debug.debug_flags:
946
mutter('%s: create_pack: chk content copied: %s%s %d items t+%6.3fs',
947
time.ctime(), self._pack_collection._upload_transport.base,
948
new_pack.random_name,
949
new_pack.chk_index.key_count(),
950
time.time() - new_pack.start_time)
825
951
new_pack._check_references()
826
952
if not self._use_pack(new_pack):
831
957
self._pack_collection.allocate(new_pack)
834
def _copy_nodes(self, nodes, index_map, writer, write_index):
835
"""Copy knit nodes between packs with no graph references."""
960
def _copy_chks(self, refs=None):
961
# XXX: Todo, recursive follow-pointers facility when fetching some
963
chk_index_map, chk_indices = self._pack_map_and_index_list(
965
chk_nodes = self._index_contents(chk_indices, refs)
967
# TODO: This isn't strictly tasteful as we are accessing some private
968
# variables (_serializer). Perhaps a better way would be to have
969
# Repository._deserialise_chk_node()
970
search_key_func = chk_map.search_key_registry.get(
971
self._pack_collection.repo._serializer.search_key_name)
972
def accumlate_refs(lines):
973
# XXX: move to a generic location
975
bytes = ''.join(lines)
976
node = chk_map._deserialise(bytes, ("unknown",), search_key_func)
977
new_refs.update(node.refs())
978
self._copy_nodes(chk_nodes, chk_index_map, self.new_pack._writer,
979
self.new_pack.chk_index, output_lines=accumlate_refs)
982
def _copy_nodes(self, nodes, index_map, writer, write_index,
984
"""Copy knit nodes between packs with no graph references.
986
:param output_lines: Output full texts of copied items.
836
988
pb = ui.ui_factory.nested_progress_bar()
838
990
return self._do_copy_nodes(nodes, index_map, writer,
991
write_index, pb, output_lines=output_lines)
843
def _do_copy_nodes(self, nodes, index_map, writer, write_index, pb):
995
def _do_copy_nodes(self, nodes, index_map, writer, write_index, pb,
844
997
# for record verification
845
998
knit = KnitVersionedFiles(None, None)
846
999
# plan a readv on each source pack:
1214
1370
class RepositoryPackCollection(object):
1215
1371
"""Management of packs within a repository.
1217
1373
:ivar _names: map of {pack_name: (index_size,)}
1376
pack_factory = NewPack
1377
resumed_pack_factory = ResumedPack
1220
1379
def __init__(self, repo, transport, index_transport, upload_transport,
1221
pack_transport, index_builder_class, index_class):
1380
pack_transport, index_builder_class, index_class,
1222
1382
"""Create a new RepositoryPackCollection.
1224
:param transport: Addresses the repository base directory
1384
:param transport: Addresses the repository base directory
1225
1385
(typically .bzr/repository/).
1226
1386
:param index_transport: Addresses the directory containing indices.
1227
1387
:param upload_transport: Addresses the directory into which packs are written
1237
1399
self._pack_transport = pack_transport
1238
1400
self._index_builder_class = index_builder_class
1239
1401
self._index_class = index_class
1240
self._suffix_offsets = {'.rix': 0, '.iix': 1, '.tix': 2, '.six': 3}
1402
self._suffix_offsets = {'.rix': 0, '.iix': 1, '.tix': 2, '.six': 3,
1241
1404
self.packs = []
1242
1405
# name:Pack mapping
1243
1407
self._packs_by_name = {}
1244
1408
# the previous pack-names content
1245
1409
self._packs_at_load = None
1246
1410
# when a pack is being created by this object, the state of that pack.
1247
1411
self._new_pack = None
1248
1412
# aggregated revision index data
1249
self.revision_index = AggregateIndex(self.reload_pack_names)
1250
self.inventory_index = AggregateIndex(self.reload_pack_names)
1251
self.text_index = AggregateIndex(self.reload_pack_names)
1252
self.signature_index = AggregateIndex(self.reload_pack_names)
1413
flush = self._flush_new_pack
1414
self.revision_index = AggregateIndex(self.reload_pack_names, flush)
1415
self.inventory_index = AggregateIndex(self.reload_pack_names, flush)
1416
self.text_index = AggregateIndex(self.reload_pack_names, flush)
1417
self.signature_index = AggregateIndex(self.reload_pack_names, flush)
1419
self.chk_index = AggregateIndex(self.reload_pack_names, flush)
1421
# used to determine if we're using a chk_index elsewhere.
1422
self.chk_index = None
1424
self._resumed_packs = []
1427
return '%s(%r)' % (self.__class__.__name__, self.repo)
1254
1429
def add_pack_to_memory(self, pack):
1255
1430
"""Make a Pack object available to the repository to satisfy queries.
1257
1432
:param pack: A Pack object.
1259
1434
if pack.name in self._packs_by_name:
1260
raise AssertionError()
1435
raise AssertionError(
1436
'pack %s already in _packs_by_name' % (pack.name,))
1261
1437
self.packs.append(pack)
1262
1438
self._packs_by_name[pack.name] = pack
1263
1439
self.revision_index.add_index(pack.revision_index, pack)
1264
1440
self.inventory_index.add_index(pack.inventory_index, pack)
1265
1441
self.text_index.add_index(pack.text_index, pack)
1266
1442
self.signature_index.add_index(pack.signature_index, pack)
1443
if self.chk_index is not None:
1444
self.chk_index.add_index(pack.chk_index, pack)
1268
1446
def all_packs(self):
1269
1447
"""Return a list of all the Pack objects this repository has.
1368
1545
self._remove_pack_from_memory(pack)
1369
1546
# record the newly available packs and stop advertising the old
1371
self._save_pack_names(clear_obsolete_packs=True)
1372
# Move the old packs out of the way now they are no longer referenced.
1373
for revision_count, packs in pack_operations:
1374
self._obsolete_packs(packs)
1548
to_be_obsoleted = []
1549
for _, packs in pack_operations:
1550
to_be_obsoleted.extend(packs)
1551
result = self._save_pack_names(clear_obsolete_packs=True,
1552
obsolete_packs=to_be_obsoleted)
1555
def _flush_new_pack(self):
1556
if self._new_pack is not None:
1557
self._new_pack.flush()
1376
1559
def lock_names(self):
1377
1560
"""Acquire the mutex around the pack-names index.
1379
1562
This cannot be used in the middle of a read-only transaction on the
1382
1565
self.repo.control_files.lock_write()
1567
def _already_packed(self):
1568
"""Is the collection already packed?"""
1569
return not (self.repo._format.pack_compresses or (len(self._names) > 1))
1571
def pack(self, hint=None):
1385
1572
"""Pack the pack collection totally."""
1386
1573
self.ensure_loaded()
1387
1574
total_packs = len(self._names)
1389
# This is arguably wrong because we might not be optimal, but for
1390
# now lets leave it in. (e.g. reconcile -> one pack. But not
1575
if self._already_packed():
1393
1577
total_revisions = self.revision_index.combined_index.key_count()
1394
1578
# XXX: the following may want to be a class, to pack with a given
1396
1580
mutter('Packing repository %s, which has %d pack files, '
1397
'containing %d revisions into 1 packs.', self, total_packs,
1581
'containing %d revisions with hint %r.', self, total_packs,
1582
total_revisions, hint)
1399
1583
# determine which packs need changing
1400
pack_distribution = [1]
1401
1584
pack_operations = [[0, []]]
1402
1585
for pack in self.all_packs():
1403
pack_operations[-1][0] += pack.get_revision_count()
1404
pack_operations[-1][1].append(pack)
1586
if hint is None or pack.name in hint:
1587
# Either no hint was provided (so we are packing everything),
1588
# or this pack was included in the hint.
1589
pack_operations[-1][0] += pack.get_revision_count()
1590
pack_operations[-1][1].append(pack)
1405
1591
self._execute_pack_operations(pack_operations, OptimisingPacker)
1407
1593
def plan_autopack_combinations(self, existing_packs, pack_distribution):
1488
1682
inv_index = self._make_index(name, '.iix')
1489
1683
txt_index = self._make_index(name, '.tix')
1490
1684
sig_index = self._make_index(name, '.six')
1685
if self.chk_index is not None:
1686
chk_index = self._make_index(name, '.cix', unlimited_cache=True)
1491
1689
result = ExistingPack(self._pack_transport, name, rev_index,
1492
inv_index, txt_index, sig_index)
1690
inv_index, txt_index, sig_index, chk_index)
1493
1691
self.add_pack_to_memory(result)
1694
def _resume_pack(self, name):
1695
"""Get a suspended Pack object by name.
1697
:param name: The name of the pack - e.g. '123456'
1698
:return: A Pack object.
1700
if not re.match('[a-f0-9]{32}', name):
1701
# Tokens should be md5sums of the suspended pack file, i.e. 32 hex
1703
raise errors.UnresumableWriteGroup(
1704
self.repo, [name], 'Malformed write group token')
1706
rev_index = self._make_index(name, '.rix', resume=True)
1707
inv_index = self._make_index(name, '.iix', resume=True)
1708
txt_index = self._make_index(name, '.tix', resume=True)
1709
sig_index = self._make_index(name, '.six', resume=True)
1710
if self.chk_index is not None:
1711
chk_index = self._make_index(name, '.cix', resume=True,
1712
unlimited_cache=True)
1715
result = self.resumed_pack_factory(name, rev_index, inv_index,
1716
txt_index, sig_index, self._upload_transport,
1717
self._pack_transport, self._index_transport, self,
1718
chk_index=chk_index)
1719
except errors.NoSuchFile, e:
1720
raise errors.UnresumableWriteGroup(self.repo, [name], str(e))
1721
self.add_pack_to_memory(result)
1722
self._resumed_packs.append(result)
1496
1725
def allocate(self, a_new_pack):
1497
1726
"""Allocate name in the list of packs.
1516
1745
return self._index_class(self.transport, 'pack-names', None
1517
1746
).iter_all_entries()
1519
def _make_index(self, name, suffix):
1748
def _make_index(self, name, suffix, resume=False, unlimited_cache=False):
1520
1749
size_offset = self._suffix_offsets[suffix]
1521
1750
index_name = name + suffix
1522
index_size = self._names[name][size_offset]
1523
return self._index_class(
1524
self._index_transport, index_name, index_size)
1752
transport = self._upload_transport
1753
index_size = transport.stat(index_name).st_size
1755
transport = self._index_transport
1756
index_size = self._names[name][size_offset]
1757
return self._index_class(transport, index_name, index_size,
1758
unlimited_cache=unlimited_cache)
1526
1760
def _max_pack_count(self, total_revisions):
1527
1761
"""Return the maximum number of packs to use for total revisions.
1529
1763
:param total_revisions: The total number of revisions in the
1555
1789
:param return: None.
1557
1791
for pack in packs:
1558
pack.pack_transport.rename(pack.file_name(),
1559
'../obsolete_packs/' + pack.file_name())
1793
pack.pack_transport.rename(pack.file_name(),
1794
'../obsolete_packs/' + pack.file_name())
1795
except (errors.PathError, errors.TransportError), e:
1796
# TODO: Should these be warnings or mutters?
1797
mutter("couldn't rename obsolete pack, skipping it:\n%s"
1560
1799
# TODO: Probably needs to know all possible indices for this pack
1561
1800
# - or maybe list the directory and move all indices matching this
1562
1801
# name whether we recognize it or not?
1563
for suffix in ('.iix', '.six', '.tix', '.rix'):
1564
self._index_transport.rename(pack.name + suffix,
1565
'../obsolete_packs/' + pack.name + suffix)
1802
suffixes = ['.iix', '.six', '.tix', '.rix']
1803
if self.chk_index is not None:
1804
suffixes.append('.cix')
1805
for suffix in suffixes:
1807
self._index_transport.rename(pack.name + suffix,
1808
'../obsolete_packs/' + pack.name + suffix)
1809
except (errors.PathError, errors.TransportError), e:
1810
mutter("couldn't rename obsolete index, skipping it:\n%s"
1567
1813
def pack_distribution(self, total_revisions):
1568
1814
"""Generate a list of the number of revisions to put in each pack.
1713
1962
:param clear_obsolete_packs: If True, clear out the contents of the
1714
1963
obsolete_packs directory.
1964
:param obsolete_packs: Packs that are obsolete once the new pack-names
1965
file has been written.
1966
:return: A list of the names saved that were not previously on disk.
1968
already_obsolete = []
1716
1969
self.lock_names()
1718
1971
builder = self._index_builder_class()
1719
disk_nodes, deleted_nodes, new_nodes = self._diff_pack_names()
1720
# TODO: handle same-name, index-size-changes here -
1972
(disk_nodes, deleted_nodes, new_nodes,
1973
orig_disk_nodes) = self._diff_pack_names()
1974
# TODO: handle same-name, index-size-changes here -
1721
1975
# e.g. use the value from disk, not ours, *unless* we're the one
1723
1977
for key, value in disk_nodes:
1724
1978
builder.add_node(key, value)
1725
1979
self.transport.put_file('pack-names', builder.finish(),
1726
1980
mode=self.repo.bzrdir._get_file_mode())
1727
# move the baseline forward
1728
1981
self._packs_at_load = disk_nodes
1729
1982
if clear_obsolete_packs:
1730
self._clear_obsolete_packs()
1985
to_preserve = set([o.name for o in obsolete_packs])
1986
already_obsolete = self._clear_obsolete_packs(to_preserve)
1732
1988
self._unlock_names()
1733
1989
# synchronise the memory packs list with what we just wrote:
1734
1990
self._syncronize_pack_names_from_disk_nodes(disk_nodes)
1992
# TODO: We could add one more condition here. "if o.name not in
1993
# orig_disk_nodes and o != the new_pack we haven't written to
1994
# disk yet. However, the new pack object is not easily
1995
# accessible here (it would have to be passed through the
1996
# autopacking code, etc.)
1997
obsolete_packs = [o for o in obsolete_packs
1998
if o.name not in already_obsolete]
1999
self._obsolete_packs(obsolete_packs)
2000
return [new_node[0][0] for new_node in new_nodes]
1736
2002
def reload_pack_names(self):
1737
2003
"""Sync our pack listing with what is present in the repository.
1739
2005
This should be called when we find out that something we thought was
1740
2006
present is now missing. This happens when another process re-packs the
1741
2007
repository, etc.
2009
:return: True if the in-memory list of packs has been altered at all.
1743
# This is functionally similar to _save_pack_names, but we don't write
2011
# The ensure_loaded call is to handle the case where the first call
2012
# made involving the collection was to reload_pack_names, where we
2013
# don't have a view of disk contents. Its a bit of a bandaid, and
2014
# causes two reads of pack-names, but its a rare corner case not struck
2015
# with regular push/pull etc.
2016
first_read = self.ensure_loaded()
1744
2019
# out the new value.
1745
disk_nodes, _, _ = self._diff_pack_names()
1746
self._packs_at_load = disk_nodes
2020
(disk_nodes, deleted_nodes, new_nodes,
2021
orig_disk_nodes) = self._diff_pack_names()
2022
# _packs_at_load is meant to be the explicit list of names in
2023
# 'pack-names' at then start. As such, it should not contain any
2024
# pending names that haven't been written out yet.
2025
self._packs_at_load = orig_disk_nodes
1747
2026
(removed, added,
1748
2027
modified) = self._syncronize_pack_names_from_disk_nodes(disk_nodes)
1749
2028
if removed or added or modified:
1759
2038
raise errors.RetryAutopack(self.repo, False, sys.exc_info())
1761
def _clear_obsolete_packs(self):
2040
def _clear_obsolete_packs(self, preserve=None):
1762
2041
"""Delete everything from the obsolete-packs directory.
2043
:return: A list of pack identifiers (the filename without '.pack') that
2044
were found in obsolete_packs.
1764
2047
obsolete_pack_transport = self.transport.clone('obsolete_packs')
2048
if preserve is None:
1765
2050
for filename in obsolete_pack_transport.list_dir('.'):
2051
name, ext = osutils.splitext(filename)
2054
if name in preserve:
1767
2057
obsolete_pack_transport.delete(filename)
1768
2058
except (errors.PathError, errors.TransportError), e:
1769
warning("couldn't delete obsolete pack, skipping it:\n%s" % (e,))
2059
warning("couldn't delete obsolete pack, skipping it:\n%s"
1771
2063
def _start_write_group(self):
1772
2064
# Do not permit preparation for writing if we're not in a 'write lock'.
1773
2065
if not self.repo.is_write_locked():
1774
2066
raise errors.NotWriteLocked(self)
1775
self._new_pack = NewPack(self, upload_suffix='.pack',
2067
self._new_pack = self.pack_factory(self, upload_suffix='.pack',
1776
2068
file_mode=self.repo.bzrdir._get_file_mode())
1777
2069
# allow writing: queue writes to a new index
1778
2070
self.revision_index.add_writable_index(self._new_pack.revision_index,
1802
2100
# case. -- mbp 20081113
1803
2101
self._remove_pack_indices(self._new_pack)
1804
2102
self._new_pack = None
1805
self.repo._text_knit = None
2103
for resumed_pack in self._resumed_packs:
2105
resumed_pack.abort()
2107
# See comment in previous finally block.
2109
self._remove_pack_indices(resumed_pack)
2112
del self._resumed_packs[:]
2114
def _remove_resumed_pack_indices(self):
2115
for resumed_pack in self._resumed_packs:
2116
self._remove_pack_indices(resumed_pack)
2117
del self._resumed_packs[:]
2119
def _check_new_inventories(self):
2120
"""Detect missing inventories in this write group.
2122
:returns: list of strs, summarising any problems found. If the list is
2123
empty no problems were found.
2125
# The base implementation does no checks. GCRepositoryPackCollection
1807
2129
def _commit_write_group(self):
2131
for prefix, versioned_file in (
2132
('revisions', self.repo.revisions),
2133
('inventories', self.repo.inventories),
2134
('texts', self.repo.texts),
2135
('signatures', self.repo.signatures),
2137
missing = versioned_file.get_missing_compression_parent_keys()
2138
all_missing.update([(prefix,) + key for key in missing])
2140
raise errors.BzrCheckError(
2141
"Repository %s has missing compression parent(s) %r "
2142
% (self.repo, sorted(all_missing)))
2143
problems = self._check_new_inventories()
2145
problems_summary = '\n'.join(problems)
2146
raise errors.BzrCheckError(
2147
"Cannot add revision(s) to repository: " + problems_summary)
1808
2148
self._remove_pack_indices(self._new_pack)
2149
any_new_content = False
1809
2150
if self._new_pack.data_inserted():
1810
2151
# get all the data to disk and read to use
1811
2152
self._new_pack.finish()
1812
2153
self.allocate(self._new_pack)
1813
2154
self._new_pack = None
1814
if not self.autopack():
2155
any_new_content = True
2157
self._new_pack.abort()
2158
self._new_pack = None
2159
for resumed_pack in self._resumed_packs:
2160
# XXX: this is a pretty ugly way to turn the resumed pack into a
2161
# properly committed pack.
2162
self._names[resumed_pack.name] = None
2163
self._remove_pack_from_memory(resumed_pack)
2164
resumed_pack.finish()
2165
self.allocate(resumed_pack)
2166
any_new_content = True
2167
del self._resumed_packs[:]
2169
result = self.autopack()
1815
2171
# when autopack takes no steps, the names list is still
1817
self._save_pack_names()
2173
return self._save_pack_names()
2177
def _suspend_write_group(self):
2178
tokens = [pack.name for pack in self._resumed_packs]
2179
self._remove_pack_indices(self._new_pack)
2180
if self._new_pack.data_inserted():
2181
# get all the data to disk and read to use
2182
self._new_pack.finish(suspend=True)
2183
tokens.append(self._new_pack.name)
2184
self._new_pack = None
1819
2186
self._new_pack.abort()
1820
2187
self._new_pack = None
1821
self.repo._text_knit = None
2188
self._remove_resumed_pack_indices()
2191
def _resume_write_group(self, tokens):
2192
for token in tokens:
2193
self._resume_pack(token)
1824
2196
class KnitPackRepository(KnitRepository):
1825
2197
"""Repository with knit objects stored inside pack containers.
1827
2199
The layering for a KnitPackRepository is:
1829
2201
Graph | HPSS | Repository public layer |
1881
2256
deltas=True, parents=True, is_locked=self.is_locked),
1882
2257
data_access=self._pack_collection.text_index.data_access,
1883
2258
max_delta_chain=200)
2259
if _format.supports_chks:
2260
# No graph, no compression:- references from chks are between
2261
# different objects not temporal versions of the same; and without
2262
# some sort of temporal structure knit compression will just fail.
2263
self.chk_bytes = KnitVersionedFiles(
2264
_KnitGraphIndex(self._pack_collection.chk_index.combined_index,
2265
add_callback=self._pack_collection.chk_index.add_callback,
2266
deltas=False, parents=False, is_locked=self.is_locked),
2267
data_access=self._pack_collection.chk_index.data_access,
2270
self.chk_bytes = None
1884
2271
# True when the repository object is 'write locked' (as opposed to the
1885
# physical lock only taken out around changes to the pack-names list.)
2272
# physical lock only taken out around changes to the pack-names list.)
1886
2273
# Another way to represent this would be a decorator around the control
1887
2274
# files object that presents logical locks as physical ones - if this
1888
2275
# gets ugly consider that alternative design. RBC 20071011
1892
2279
self._reconcile_does_inventory_gc = True
1893
2280
self._reconcile_fixes_text_parents = True
1894
2281
self._reconcile_backsup_inventory = False
1895
self._fetch_order = 'unordered'
1897
def _warn_if_deprecated(self):
2283
def _warn_if_deprecated(self, branch=None):
1898
2284
# This class isn't deprecated, but one sub-format is
1899
2285
if isinstance(self._format, RepositoryFormatKnitPack5RichRootBroken):
1900
from bzrlib import repository
1901
if repository._deprecation_warning_done:
1903
repository._deprecation_warning_done = True
1904
warning("Format %s for %s is deprecated - please use"
1905
" 'bzr upgrade --1.6.1-rich-root'"
1906
% (self._format, self.bzrdir.transport.base))
2286
super(KnitPackRepository, self)._warn_if_deprecated(branch)
1908
2288
def _abort_write_group(self):
2289
self.revisions._index._key_dependencies.clear()
1909
2290
self._pack_collection._abort_write_group()
1911
def _find_inconsistent_revision_parents(self):
1912
"""Find revisions with incorrectly cached parents.
1914
:returns: an iterator yielding tuples of (revison-id, parents-in-index,
1915
parents-in-revision).
1917
if not self.is_locked():
1918
raise errors.ObjectNotLocked(self)
1919
pb = ui.ui_factory.nested_progress_bar()
1922
revision_nodes = self._pack_collection.revision_index \
1923
.combined_index.iter_all_entries()
1924
index_positions = []
1925
# Get the cached index values for all revisions, and also the location
1926
# in each index of the revision text so we can perform linear IO.
1927
for index, key, value, refs in revision_nodes:
1928
pos, length = value[1:].split(' ')
1929
index_positions.append((index, int(pos), key[0],
1930
tuple(parent[0] for parent in refs[0])))
1931
pb.update("Reading revision index.", 0, 0)
1932
index_positions.sort()
1933
batch_count = len(index_positions) / 1000 + 1
1934
pb.update("Checking cached revision graph.", 0, batch_count)
1935
for offset in xrange(batch_count):
1936
pb.update("Checking cached revision graph.", offset)
1937
to_query = index_positions[offset * 1000:(offset + 1) * 1000]
1940
rev_ids = [item[2] for item in to_query]
1941
revs = self.get_revisions(rev_ids)
1942
for revision, item in zip(revs, to_query):
1943
index_parents = item[3]
1944
rev_parents = tuple(revision.parent_ids)
1945
if index_parents != rev_parents:
1946
result.append((revision.revision_id, index_parents, rev_parents))
1951
@symbol_versioning.deprecated_method(symbol_versioning.one_one)
1952
def get_parents(self, revision_ids):
1953
"""See graph._StackedParentsProvider.get_parents."""
1954
parent_map = self.get_parent_map(revision_ids)
1955
return [parent_map.get(r, None) for r in revision_ids]
2292
def _get_source(self, to_format):
2293
if to_format.network_name() == self._format.network_name():
2294
return KnitPackStreamSource(self, to_format)
2295
return super(KnitPackRepository, self)._get_source(to_format)
1957
2297
def _make_parents_provider(self):
1958
2298
return graph.CachingParentsProvider(self)
1960
2300
def _refresh_data(self):
1961
if self._write_lock_count == 1 or (
1962
self.control_files._lock_count == 1 and
1963
self.control_files._lock_mode == 'r'):
1964
# forget what names there are
1965
self._pack_collection.reset()
1966
# XXX: Better to do an in-memory merge when acquiring a new lock -
1967
# factor out code from _save_pack_names.
1968
self._pack_collection.ensure_loaded()
2301
if not self.is_locked():
2303
self._pack_collection.reload_pack_names()
1970
2305
def _start_write_group(self):
1971
2306
self._pack_collection._start_write_group()
1973
2308
def _commit_write_group(self):
1974
return self._pack_collection._commit_write_group()
2309
hint = self._pack_collection._commit_write_group()
2310
self.revisions._index._key_dependencies.clear()
2313
def suspend_write_group(self):
2314
# XXX check self._write_group is self.get_transaction()?
2315
tokens = self._pack_collection._suspend_write_group()
2316
self.revisions._index._key_dependencies.clear()
2317
self._write_group = None
2320
def _resume_write_group(self, tokens):
2321
self._start_write_group()
2323
self._pack_collection._resume_write_group(tokens)
2324
except errors.UnresumableWriteGroup:
2325
self._abort_write_group()
2327
for pack in self._pack_collection._resumed_packs:
2328
self.revisions._index.scan_unvalidated_index(pack.revision_index)
1976
2330
def get_transaction(self):
1977
2331
if self._write_lock_count:
1986
2340
return self._write_lock_count
1988
2342
def lock_write(self, token=None):
1989
if not self._write_lock_count and self.is_locked():
2343
locked = self.is_locked()
2344
if not self._write_lock_count and locked:
1990
2345
raise errors.ReadOnlyError(self)
1991
2346
self._write_lock_count += 1
1992
2347
if self._write_lock_count == 1:
1993
2348
self._transaction = transactions.WriteTransaction()
2350
if 'relock' in debug.debug_flags and self._prev_lock == 'w':
2351
note('%r was write locked again', self)
2352
self._prev_lock = 'w'
1994
2353
for repo in self._fallback_repositories:
1995
2354
# Writes don't affect fallback repos
1996
2355
repo.lock_read()
1997
self._refresh_data()
2356
self._refresh_data()
1999
2358
def lock_read(self):
2359
locked = self.is_locked()
2000
2360
if self._write_lock_count:
2001
2361
self._write_lock_count += 1
2003
2363
self.control_files.lock_read()
2365
if 'relock' in debug.debug_flags and self._prev_lock == 'r':
2366
note('%r was read locked again', self)
2367
self._prev_lock = 'r'
2004
2368
for repo in self._fallback_repositories:
2005
# Writes don't affect fallback repos
2006
2369
repo.lock_read()
2007
self._refresh_data()
2370
self._refresh_data()
2009
2372
def leave_lock_in_place(self):
2010
2373
# not supported - raise an error
2045
2413
transaction = self._transaction
2046
2414
self._transaction = None
2047
2415
transaction.finish()
2048
for repo in self._fallback_repositories:
2051
2417
self.control_files.unlock()
2419
if not self.is_locked():
2052
2420
for repo in self._fallback_repositories:
2424
class KnitPackStreamSource(StreamSource):
2425
"""A StreamSource used to transfer data between same-format KnitPack repos.
2427
This source assumes:
2428
1) Same serialization format for all objects
2429
2) Same root information
2430
3) XML format inventories
2431
4) Atomic inserts (so we can stream inventory texts before text
2436
def __init__(self, from_repository, to_format):
2437
super(KnitPackStreamSource, self).__init__(from_repository, to_format)
2438
self._text_keys = None
2439
self._text_fetch_order = 'unordered'
2441
def _get_filtered_inv_stream(self, revision_ids):
2442
from_repo = self.from_repository
2443
parent_ids = from_repo._find_parent_ids_of_revisions(revision_ids)
2444
parent_keys = [(p,) for p in parent_ids]
2445
find_text_keys = from_repo._find_text_key_references_from_xml_inventory_lines
2446
parent_text_keys = set(find_text_keys(
2447
from_repo._inventory_xml_lines_for_keys(parent_keys)))
2448
content_text_keys = set()
2449
knit = KnitVersionedFiles(None, None)
2450
factory = KnitPlainFactory()
2451
def find_text_keys_from_content(record):
2452
if record.storage_kind not in ('knit-delta-gz', 'knit-ft-gz'):
2453
raise ValueError("Unknown content storage kind for"
2454
" inventory text: %s" % (record.storage_kind,))
2455
# It's a knit record, it has a _raw_record field (even if it was
2456
# reconstituted from a network stream).
2457
raw_data = record._raw_record
2458
# read the entire thing
2459
revision_id = record.key[-1]
2460
content, _ = knit._parse_record(revision_id, raw_data)
2461
if record.storage_kind == 'knit-delta-gz':
2462
line_iterator = factory.get_linedelta_content(content)
2463
elif record.storage_kind == 'knit-ft-gz':
2464
line_iterator = factory.get_fulltext_content(content)
2465
content_text_keys.update(find_text_keys(
2466
[(line, revision_id) for line in line_iterator]))
2467
revision_keys = [(r,) for r in revision_ids]
2468
def _filtered_inv_stream():
2469
source_vf = from_repo.inventories
2470
stream = source_vf.get_record_stream(revision_keys,
2472
for record in stream:
2473
if record.storage_kind == 'absent':
2474
raise errors.NoSuchRevision(from_repo, record.key)
2475
find_text_keys_from_content(record)
2477
self._text_keys = content_text_keys - parent_text_keys
2478
return ('inventories', _filtered_inv_stream())
2480
def _get_text_stream(self):
2481
# Note: We know we don't have to handle adding root keys, because both
2482
# the source and target are the identical network name.
2483
text_stream = self.from_repository.texts.get_record_stream(
2484
self._text_keys, self._text_fetch_order, False)
2485
return ('texts', text_stream)
2487
def get_stream(self, search):
2488
revision_ids = search.get_keys()
2489
for stream_info in self._fetch_revision_texts(revision_ids):
2491
self._revision_keys = [(rev_id,) for rev_id in revision_ids]
2492
yield self._get_filtered_inv_stream(revision_ids)
2493
yield self._get_text_stream()
2056
2497
class RepositoryFormatPack(MetaDirRepositoryFormat):
2057
2498
"""Format logic for pack structured repositories.
2464
2874
return "Packs 6 rich-root (uses btree indexes, requires bzr 1.9)"
2467
class RepositoryFormatPackDevelopment2(RepositoryFormatPack):
2468
"""A no-subtrees development repository.
2470
This format should be retained until the second release after bzr 1.7.
2472
This is pack-1.6.1 with B+Tree indices.
2475
repository_class = KnitPackRepository
2476
_commit_builder_class = PackCommitBuilder
2477
supports_external_lookups = True
2478
# What index classes to use
2479
index_builder_class = BTreeBuilder
2480
index_class = BTreeGraphIndex
2483
def _serializer(self):
2484
return xml5.serializer_v5
2486
def _get_matching_bzrdir(self):
2487
return bzrdir.format_registry.make_bzrdir('development2')
2489
def _ignore_setting_bzrdir(self, format):
2492
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
2494
def get_format_string(self):
2495
"""See RepositoryFormat.get_format_string()."""
2496
return "Bazaar development format 2 (needs bzr.dev from before 1.8)\n"
2498
def get_format_description(self):
2499
"""See RepositoryFormat.get_format_description()."""
2500
return ("Development repository format, currently the same as "
2501
"1.6.1 with B+Trees.\n")
2503
def check_conversion_target(self, target_format):
2507
2877
class RepositoryFormatPackDevelopment2Subtree(RepositoryFormatPack):
2508
2878
"""A subtrees development repository.
2510
2880
This format should be retained until the second release after bzr 1.7.
2512
2882
1.6.1-subtree[as it might have been] with B+Tree indices.
2884
This is [now] retained until we have a CHK based subtree format in
2515
2888
repository_class = KnitPackRepository
2528
2901
def _get_matching_bzrdir(self):
2529
2902
return bzrdir.format_registry.make_bzrdir(
2530
'development2-subtree')
2903
'development-subtree')
2532
2905
def _ignore_setting_bzrdir(self, format):
2535
2908
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
2537
def check_conversion_target(self, target_format):
2538
if not target_format.rich_root_data:
2539
raise errors.BadConversionTarget(
2540
'Does not support rich root data.', target_format)
2541
if not getattr(target_format, 'supports_tree_reference', False):
2542
raise errors.BadConversionTarget(
2543
'Does not support nested trees', target_format)
2545
2910
def get_format_string(self):
2546
2911
"""See RepositoryFormat.get_format_string()."""
2547
2912
return ("Bazaar development format 2 with subtree support "