132
150
texts/deltas (via (fileid, revisionid) tuples).
133
151
:param signature_index: A GraphIndex for determining what signatures are
134
152
present in the Pack and accessing the locations of their texts.
153
:param chk_index: A GraphIndex for accessing content by CHK, if the
136
156
self.revision_index = revision_index
137
157
self.inventory_index = inventory_index
138
158
self.text_index = text_index
139
159
self.signature_index = signature_index
160
self.chk_index = chk_index
141
162
def access_tuple(self):
142
163
"""Return a tuple (transport, name) for the pack content."""
143
164
return self.pack_transport, self.file_name()
166
def _check_references(self):
167
"""Make sure our external references are present.
169
Packs are allowed to have deltas whose base is not in the pack, but it
170
must be present somewhere in this collection. It is not allowed to
171
have deltas based on a fallback repository.
172
(See <https://bugs.launchpad.net/bzr/+bug/288751>)
175
for (index_name, external_refs, index) in [
177
self._get_external_refs(self.text_index),
178
self._pack_collection.text_index.combined_index),
180
self._get_external_refs(self.inventory_index),
181
self._pack_collection.inventory_index.combined_index),
183
missing = external_refs.difference(
184
k for (idx, k, v, r) in
185
index.iter_entries(external_refs))
187
missing_items[index_name] = sorted(list(missing))
189
from pprint import pformat
190
raise errors.BzrCheckError(
191
"Newly created pack file %r has delta references to "
192
"items not in its repository:\n%s"
193
% (self, pformat(missing_items)))
145
195
def file_name(self):
146
196
"""Get the file name for the pack on disk."""
147
197
return self.name + '.pack'
165
223
"""The text index is the name + .tix."""
166
224
return self.index_name('text', name)
168
def _external_compression_parents_of_texts(self):
171
for node in self.text_index.iter_all_entries():
173
refs.update(node[3][1])
226
def _replace_index_with_readonly(self, index_type):
227
setattr(self, index_type + '_index',
228
self.index_class(self.index_transport,
229
self.index_name(index_type, self.name),
230
self.index_sizes[self.index_offset(index_type)]))
177
233
class ExistingPack(Pack):
178
234
"""An in memory proxy for an existing .pack and its disk indices."""
180
236
def __init__(self, pack_transport, name, revision_index, inventory_index,
181
text_index, signature_index):
237
text_index, signature_index, chk_index=None):
182
238
"""Create an ExistingPack object.
184
240
:param pack_transport: The transport where the pack file resides.
185
241
:param name: The name of the pack on disk in the pack_transport.
187
243
Pack.__init__(self, revision_index, inventory_index, text_index,
244
signature_index, chk_index)
190
246
self.pack_transport = pack_transport
191
247
if None in (revision_index, inventory_index, text_index,
199
255
return not self.__eq__(other)
201
257
def __repr__(self):
202
return "<bzrlib.repofmt.pack_repo.Pack object at 0x%x, %s, %s" % (
203
id(self), self.pack_transport, self.name)
258
return "<%s.%s object at 0x%x, %s, %s" % (
259
self.__class__.__module__, self.__class__.__name__, id(self),
260
self.pack_transport, self.name)
263
class ResumedPack(ExistingPack):
265
def __init__(self, name, revision_index, inventory_index, text_index,
266
signature_index, upload_transport, pack_transport, index_transport,
267
pack_collection, chk_index=None):
268
"""Create a ResumedPack object."""
269
ExistingPack.__init__(self, pack_transport, name, revision_index,
270
inventory_index, text_index, signature_index,
272
self.upload_transport = upload_transport
273
self.index_transport = index_transport
274
self.index_sizes = [None, None, None, None]
276
('revision', revision_index),
277
('inventory', inventory_index),
278
('text', text_index),
279
('signature', signature_index),
281
if chk_index is not None:
282
indices.append(('chk', chk_index))
283
self.index_sizes.append(None)
284
for index_type, index in indices:
285
offset = self.index_offset(index_type)
286
self.index_sizes[offset] = index._size
287
self.index_class = pack_collection._index_class
288
self._pack_collection = pack_collection
289
self._state = 'resumed'
290
# XXX: perhaps check that the .pack file exists?
292
def access_tuple(self):
293
if self._state == 'finished':
294
return Pack.access_tuple(self)
295
elif self._state == 'resumed':
296
return self.upload_transport, self.file_name()
298
raise AssertionError(self._state)
301
self.upload_transport.delete(self.file_name())
302
indices = [self.revision_index, self.inventory_index, self.text_index,
303
self.signature_index]
304
if self.chk_index is not None:
305
indices.append(self.chk_index)
306
for index in indices:
307
index._transport.delete(index._name)
310
self._check_references()
311
index_types = ['revision', 'inventory', 'text', 'signature']
312
if self.chk_index is not None:
313
index_types.append('chk')
314
for index_type in index_types:
315
old_name = self.index_name(index_type, self.name)
316
new_name = '../indices/' + old_name
317
self.upload_transport.rename(old_name, new_name)
318
self._replace_index_with_readonly(index_type)
319
new_name = '../packs/' + self.file_name()
320
self.upload_transport.rename(self.file_name(), new_name)
321
self._state = 'finished'
323
def _get_external_refs(self, index):
324
"""Return compression parents for this index that are not present.
326
This returns any compression parents that are referenced by this index,
327
which are not contained *in* this index. They may be present elsewhere.
329
return index.external_references(1)
206
332
class NewPack(Pack):
207
333
"""An in memory proxy for a pack which is being created."""
209
# A map of index 'type' to the file extension and position in the
211
index_definitions = {
212
'revision': ('.rix', 0),
213
'inventory': ('.iix', 1),
215
'signature': ('.six', 3),
218
def __init__(self, upload_transport, index_transport, pack_transport,
219
upload_suffix='', file_mode=None):
335
def __init__(self, pack_collection, upload_suffix='', file_mode=None):
220
336
"""Create a NewPack instance.
222
:param upload_transport: A writable transport for the pack to be
223
incrementally uploaded to.
224
:param index_transport: A writable transport for the pack's indices to
225
be written to when the pack is finished.
226
:param pack_transport: A writable transport for the pack to be renamed
227
to when the upload is complete. This *must* be the same as
228
upload_transport.clone('../packs').
338
:param pack_collection: A PackCollection into which this is being inserted.
229
339
:param upload_suffix: An optional suffix to be given to any temporary
230
340
files created during the pack creation. e.g '.autopack'
231
:param file_mode: An optional file mode to create the new files with.
341
:param file_mode: Unix permissions for newly created file.
233
343
# The relative locations of the packs are constrained, but all are
234
344
# passed in because the caller has them, so as to avoid object churn.
345
index_builder_class = pack_collection._index_builder_class
346
if pack_collection.chk_index is not None:
347
chk_index = index_builder_class(reference_lists=0)
235
350
Pack.__init__(self,
236
351
# Revisions: parents list, no text compression.
237
InMemoryGraphIndex(reference_lists=1),
352
index_builder_class(reference_lists=1),
238
353
# Inventory: We want to map compression only, but currently the
239
354
# knit code hasn't been updated enough to understand that, so we
240
355
# have a regular 2-list index giving parents and compression
242
InMemoryGraphIndex(reference_lists=2),
357
index_builder_class(reference_lists=2),
243
358
# Texts: compression and per file graph, for all fileids - so two
244
359
# reference lists and two elements in the key tuple.
245
InMemoryGraphIndex(reference_lists=2, key_elements=2),
360
index_builder_class(reference_lists=2, key_elements=2),
246
361
# Signatures: Just blobs to store, no compression, no parents
248
InMemoryGraphIndex(reference_lists=0),
363
index_builder_class(reference_lists=0),
364
# CHK based storage - just blobs, no compression or parents.
367
self._pack_collection = pack_collection
368
# When we make readonly indices, we need this.
369
self.index_class = pack_collection._index_class
250
370
# where should the new pack be opened
251
self.upload_transport = upload_transport
371
self.upload_transport = pack_collection._upload_transport
252
372
# where are indices written out to
253
self.index_transport = index_transport
373
self.index_transport = pack_collection._index_transport
254
374
# where is the pack renamed to when it is finished?
255
self.pack_transport = pack_transport
375
self.pack_transport = pack_collection._pack_transport
256
376
# What file mode to upload the pack and indices with.
257
377
self._file_mode = file_mode
258
378
# tracks the content written to the .pack file.
259
self._hash = md5.new()
260
# a four-tuple with the length in bytes of the indices, once the pack
261
# is finalised. (rev, inv, text, sigs)
379
self._hash = osutils.md5()
380
# a tuple with the length in bytes of the indices, once the pack
381
# is finalised. (rev, inv, text, sigs, chk_if_in_use)
262
382
self.index_sizes = None
263
383
# How much data to cache when writing packs. Note that this is not
264
384
# synchronised with reads, because it's not in the transport layer, so
338
469
- stores the index size tuple for the pack in the index_sizes
343
self._write_data('', flush=True)
344
self.name = self._hash.hexdigest()
472
self.finish_content()
474
self._check_references()
346
476
# XXX: It'd be better to write them all to temporary names, then
347
477
# rename them all into place, so that the window when only some are
348
478
# visible is smaller. On the other hand none will be seen until
349
479
# they're in the names list.
350
480
self.index_sizes = [None, None, None, None]
351
self._write_index('revision', self.revision_index, 'revision')
352
self._write_index('inventory', self.inventory_index, 'inventory')
353
self._write_index('text', self.text_index, 'file texts')
481
self._write_index('revision', self.revision_index, 'revision', suspend)
482
self._write_index('inventory', self.inventory_index, 'inventory',
484
self._write_index('text', self.text_index, 'file texts', suspend)
354
485
self._write_index('signature', self.signature_index,
355
'revision signatures')
486
'revision signatures', suspend)
487
if self.chk_index is not None:
488
self.index_sizes.append(None)
489
self._write_index('chk', self.chk_index,
490
'content hash bytes', suspend)
356
491
self.write_stream.close()
357
492
# Note that this will clobber an existing pack with the same name,
358
493
# without checking for hash collisions. While this is undesirable this
590
761
def open_pack(self):
591
762
"""Open a pack for the pack we are creating."""
592
return NewPack(self._pack_collection._upload_transport,
593
self._pack_collection._index_transport,
594
self._pack_collection._pack_transport, upload_suffix=self.suffix,
595
file_mode=self._pack_collection.repo.bzrdir._get_file_mode())
763
new_pack = self._pack_collection.pack_factory(self._pack_collection,
764
upload_suffix=self.suffix,
765
file_mode=self._pack_collection.repo.bzrdir._get_file_mode())
766
# We know that we will process all nodes in order, and don't need to
767
# query, so don't combine any indices spilled to disk until we are done
768
new_pack.revision_index.set_optimize(combine_backing_indices=False)
769
new_pack.inventory_index.set_optimize(combine_backing_indices=False)
770
new_pack.text_index.set_optimize(combine_backing_indices=False)
771
new_pack.signature_index.set_optimize(combine_backing_indices=False)
774
def _update_pack_order(self, entries, index_to_pack_map):
775
"""Determine how we want our packs to be ordered.
777
This changes the sort order of the self.packs list so that packs unused
778
by 'entries' will be at the end of the list, so that future requests
779
can avoid probing them. Used packs will be at the front of the
780
self.packs list, in the order of their first use in 'entries'.
782
:param entries: A list of (index, ...) tuples
783
:param index_to_pack_map: A mapping from index objects to pack objects.
787
for entry in entries:
789
if index not in seen_indexes:
790
packs.append(index_to_pack_map[index])
791
seen_indexes.add(index)
792
if len(packs) == len(self.packs):
793
if 'pack' in debug.debug_flags:
794
mutter('Not changing pack list, all packs used.')
796
seen_packs = set(packs)
797
for pack in self.packs:
798
if pack not in seen_packs:
801
if 'pack' in debug.debug_flags:
802
old_names = [p.access_tuple()[1] for p in self.packs]
803
new_names = [p.access_tuple()[1] for p in packs]
804
mutter('Reordering packs\nfrom: %s\n to: %s',
805
old_names, new_names)
597
808
def _copy_revision_texts(self):
598
809
"""Copy revision data to the new pack."""
740
952
self._pack_collection.allocate(new_pack)
743
def _copy_nodes(self, nodes, index_map, writer, write_index):
744
"""Copy knit nodes between packs with no graph references."""
955
def _copy_chks(self, refs=None):
956
# XXX: Todo, recursive follow-pointers facility when fetching some
958
chk_index_map, chk_indices = self._pack_map_and_index_list(
960
chk_nodes = self._index_contents(chk_indices, refs)
962
# TODO: This isn't strictly tasteful as we are accessing some private
963
# variables (_serializer). Perhaps a better way would be to have
964
# Repository._deserialise_chk_node()
965
search_key_func = chk_map.search_key_registry.get(
966
self._pack_collection.repo._serializer.search_key_name)
967
def accumlate_refs(lines):
968
# XXX: move to a generic location
970
bytes = ''.join(lines)
971
node = chk_map._deserialise(bytes, ("unknown",), search_key_func)
972
new_refs.update(node.refs())
973
self._copy_nodes(chk_nodes, chk_index_map, self.new_pack._writer,
974
self.new_pack.chk_index, output_lines=accumlate_refs)
977
def _copy_nodes(self, nodes, index_map, writer, write_index,
979
"""Copy knit nodes between packs with no graph references.
981
:param output_lines: Output full texts of copied items.
745
983
pb = ui.ui_factory.nested_progress_bar()
747
985
return self._do_copy_nodes(nodes, index_map, writer,
986
write_index, pb, output_lines=output_lines)
752
def _do_copy_nodes(self, nodes, index_map, writer, write_index, pb):
990
def _do_copy_nodes(self, nodes, index_map, writer, write_index, pb,
753
992
# for record verification
754
993
knit = KnitVersionedFiles(None, None)
755
994
# plan a readv on each source pack:
776
1015
# linear scan up the pack
777
1016
pack_readv_requests.sort()
779
transport, path = index_map[index]
780
reader = pack.make_readv_reader(transport, path,
781
[offset[0:2] for offset in pack_readv_requests])
1018
pack_obj = index_map[index]
1019
transport, path = pack_obj.access_tuple()
1021
reader = pack.make_readv_reader(transport, path,
1022
[offset[0:2] for offset in pack_readv_requests])
1023
except errors.NoSuchFile:
1024
if self._reload_func is not None:
782
1027
for (names, read_func), (_1, _2, (key, eol_flag)) in \
783
1028
izip(reader.iter_records(), pack_readv_requests):
784
1029
raw_data = read_func(None)
785
1030
# check the header only
786
df, _ = knit._parse_record_header(key, raw_data)
1031
if output_lines is not None:
1032
output_lines(knit._parse_record(key[-1], raw_data)[0])
1034
df, _ = knit._parse_record_header(key, raw_data)
788
1036
pos, size = writer.add_bytes_record(raw_data, names)
789
1037
write_index.add_node(key, eol_flag + "%d %d" % (pos, size))
790
1038
pb.update("Copied record", record_index)
1101
1365
class RepositoryPackCollection(object):
1102
1366
"""Management of packs within a repository.
1104
1368
:ivar _names: map of {pack_name: (index_size,)}
1371
pack_factory = NewPack
1372
resumed_pack_factory = ResumedPack
1107
1374
def __init__(self, repo, transport, index_transport, upload_transport,
1375
pack_transport, index_builder_class, index_class,
1109
1377
"""Create a new RepositoryPackCollection.
1111
:param transport: Addresses the repository base directory
1379
:param transport: Addresses the repository base directory
1112
1380
(typically .bzr/repository/).
1113
1381
:param index_transport: Addresses the directory containing indices.
1114
1382
:param upload_transport: Addresses the directory into which packs are written
1115
1383
while they're being created.
1116
1384
:param pack_transport: Addresses the directory of existing complete packs.
1385
:param index_builder_class: The index builder class to use.
1386
:param index_class: The index class to use.
1387
:param use_chk_index: Whether to setup and manage a CHK index.
1389
# XXX: This should call self.reset()
1118
1390
self.repo = repo
1119
1391
self.transport = transport
1120
1392
self._index_transport = index_transport
1121
1393
self._upload_transport = upload_transport
1122
1394
self._pack_transport = pack_transport
1123
self._suffix_offsets = {'.rix': 0, '.iix': 1, '.tix': 2, '.six': 3}
1395
self._index_builder_class = index_builder_class
1396
self._index_class = index_class
1397
self._suffix_offsets = {'.rix': 0, '.iix': 1, '.tix': 2, '.six': 3,
1124
1399
self.packs = []
1125
1400
# name:Pack mapping
1126
1402
self._packs_by_name = {}
1127
1403
# the previous pack-names content
1128
1404
self._packs_at_load = None
1129
1405
# when a pack is being created by this object, the state of that pack.
1130
1406
self._new_pack = None
1131
1407
# aggregated revision index data
1132
self.revision_index = AggregateIndex()
1133
self.inventory_index = AggregateIndex()
1134
self.text_index = AggregateIndex()
1135
self.signature_index = AggregateIndex()
1408
flush = self._flush_new_pack
1409
self.revision_index = AggregateIndex(self.reload_pack_names, flush)
1410
self.inventory_index = AggregateIndex(self.reload_pack_names, flush)
1411
self.text_index = AggregateIndex(self.reload_pack_names, flush)
1412
self.signature_index = AggregateIndex(self.reload_pack_names, flush)
1414
self.chk_index = AggregateIndex(self.reload_pack_names, flush)
1416
# used to determine if we're using a chk_index elsewhere.
1417
self.chk_index = None
1419
self._resumed_packs = []
1137
1421
def add_pack_to_memory(self, pack):
1138
1422
"""Make a Pack object available to the repository to satisfy queries.
1140
1424
:param pack: A Pack object.
1142
1426
if pack.name in self._packs_by_name:
1143
raise AssertionError()
1427
raise AssertionError(
1428
'pack %s already in _packs_by_name' % (pack.name,))
1144
1429
self.packs.append(pack)
1145
1430
self._packs_by_name[pack.name] = pack
1146
1431
self.revision_index.add_index(pack.revision_index, pack)
1147
1432
self.inventory_index.add_index(pack.inventory_index, pack)
1148
1433
self.text_index.add_index(pack.text_index, pack)
1149
1434
self.signature_index.add_index(pack.signature_index, pack)
1435
if self.chk_index is not None:
1436
self.chk_index.add_index(pack.chk_index, pack)
1151
1438
def all_packs(self):
1152
1439
"""Return a list of all the Pack objects this repository has.
1199
1490
# group their data with the relevant commit, and that may
1200
1491
# involve rewriting ancient history - which autopack tries to
1201
1492
# avoid. Alternatively we could not group the data but treat
1202
# each of these as having a single revision, and thus add
1493
# each of these as having a single revision, and thus add
1203
1494
# one revision for each to the total revision count, to get
1204
1495
# a matching distribution.
1206
1497
existing_packs.append((revision_count, pack))
1207
1498
pack_operations = self.plan_autopack_combinations(
1208
1499
existing_packs, pack_distribution)
1209
self._execute_pack_operations(pack_operations)
1500
num_new_packs = len(pack_operations)
1501
num_old_packs = sum([len(po[1]) for po in pack_operations])
1502
num_revs_affected = sum([po[0] for po in pack_operations])
1503
mutter('Auto-packing repository %s, which has %d pack files, '
1504
'containing %d revisions. Packing %d files into %d affecting %d'
1505
' revisions', self, total_packs, total_revisions, num_old_packs,
1506
num_new_packs, num_revs_affected)
1507
result = self._execute_pack_operations(pack_operations,
1508
reload_func=self._restart_autopack)
1509
mutter('Auto-packing repository %s completed', self)
1212
def _execute_pack_operations(self, pack_operations, _packer_class=Packer):
1512
def _execute_pack_operations(self, pack_operations, _packer_class=Packer,
1213
1514
"""Execute a series of pack operations.
1215
1516
:param pack_operations: A list of [revision_count, packs_to_combine].
1216
1517
:param _packer_class: The class of packer to use (default: Packer).
1518
:return: The new pack names.
1219
1520
for revision_count, packs in pack_operations:
1220
1521
# we may have no-ops from the setup logic
1221
1522
if len(packs) == 0:
1223
_packer_class(self, packs, '.autopack').pack()
1524
packer = _packer_class(self, packs, '.autopack',
1525
reload_func=reload_func)
1528
except errors.RetryWithNewPacks:
1529
# An exception is propagating out of this context, make sure
1530
# this packer has cleaned up. Packer() doesn't set its new_pack
1531
# state into the RepositoryPackCollection object, so we only
1532
# have access to it directly here.
1533
if packer.new_pack is not None:
1534
packer.new_pack.abort()
1224
1536
for pack in packs:
1225
1537
self._remove_pack_from_memory(pack)
1226
1538
# record the newly available packs and stop advertising the old
1228
self._save_pack_names(clear_obsolete_packs=True)
1540
result = self._save_pack_names(clear_obsolete_packs=True)
1229
1541
# Move the old packs out of the way now they are no longer referenced.
1230
1542
for revision_count, packs in pack_operations:
1231
1543
self._obsolete_packs(packs)
1546
def _flush_new_pack(self):
1547
if self._new_pack is not None:
1548
self._new_pack.flush()
1233
1550
def lock_names(self):
1234
1551
"""Acquire the mutex around the pack-names index.
1236
1553
This cannot be used in the middle of a read-only transaction on the
1239
1556
self.repo.control_files.lock_write()
1558
def _already_packed(self):
1559
"""Is the collection already packed?"""
1560
return not (self.repo._format.pack_compresses or (len(self._names) > 1))
1562
def pack(self, hint=None):
1242
1563
"""Pack the pack collection totally."""
1243
1564
self.ensure_loaded()
1244
1565
total_packs = len(self._names)
1246
# This is arguably wrong because we might not be optimal, but for
1247
# now lets leave it in. (e.g. reconcile -> one pack. But not
1566
if self._already_packed():
1250
1568
total_revisions = self.revision_index.combined_index.key_count()
1251
1569
# XXX: the following may want to be a class, to pack with a given
1253
1571
mutter('Packing repository %s, which has %d pack files, '
1254
'containing %d revisions into 1 packs.', self, total_packs,
1572
'containing %d revisions with hint %r.', self, total_packs,
1573
total_revisions, hint)
1256
1574
# determine which packs need changing
1257
pack_distribution = [1]
1258
1575
pack_operations = [[0, []]]
1259
1576
for pack in self.all_packs():
1260
pack_operations[-1][0] += pack.get_revision_count()
1261
pack_operations[-1][1].append(pack)
1577
if hint is None or pack.name in hint:
1578
# Either no hint was provided (so we are packing everything),
1579
# or this pack was included in the hint.
1580
pack_operations[-1][0] += pack.get_revision_count()
1581
pack_operations[-1][1].append(pack)
1262
1582
self._execute_pack_operations(pack_operations, OptimisingPacker)
1264
1584
def plan_autopack_combinations(self, existing_packs, pack_distribution):
1335
1673
inv_index = self._make_index(name, '.iix')
1336
1674
txt_index = self._make_index(name, '.tix')
1337
1675
sig_index = self._make_index(name, '.six')
1676
if self.chk_index is not None:
1677
chk_index = self._make_index(name, '.cix')
1338
1680
result = ExistingPack(self._pack_transport, name, rev_index,
1339
inv_index, txt_index, sig_index)
1681
inv_index, txt_index, sig_index, chk_index)
1340
1682
self.add_pack_to_memory(result)
1685
def _resume_pack(self, name):
1686
"""Get a suspended Pack object by name.
1688
:param name: The name of the pack - e.g. '123456'
1689
:return: A Pack object.
1691
if not re.match('[a-f0-9]{32}', name):
1692
# Tokens should be md5sums of the suspended pack file, i.e. 32 hex
1694
raise errors.UnresumableWriteGroup(
1695
self.repo, [name], 'Malformed write group token')
1697
rev_index = self._make_index(name, '.rix', resume=True)
1698
inv_index = self._make_index(name, '.iix', resume=True)
1699
txt_index = self._make_index(name, '.tix', resume=True)
1700
sig_index = self._make_index(name, '.six', resume=True)
1701
if self.chk_index is not None:
1702
chk_index = self._make_index(name, '.cix', resume=True)
1705
result = self.resumed_pack_factory(name, rev_index, inv_index,
1706
txt_index, sig_index, self._upload_transport,
1707
self._pack_transport, self._index_transport, self,
1708
chk_index=chk_index)
1709
except errors.NoSuchFile, e:
1710
raise errors.UnresumableWriteGroup(self.repo, [name], str(e))
1711
self.add_pack_to_memory(result)
1712
self._resumed_packs.append(result)
1343
1715
def allocate(self, a_new_pack):
1344
1716
"""Allocate name in the list of packs.
1468
1850
self._packs_by_name = {}
1469
1851
self._packs_at_load = None
1471
def _make_index_map(self, index_suffix):
1472
"""Return information on existing indices.
1474
:param suffix: Index suffix added to pack name.
1476
:returns: (pack_map, indices) where indices is a list of GraphIndex
1477
objects, and pack_map is a mapping from those objects to the
1478
pack tuple they describe.
1480
# TODO: stop using this; it creates new indices unnecessarily.
1481
self.ensure_loaded()
1482
suffix_map = {'.rix': 'revision_index',
1483
'.six': 'signature_index',
1484
'.iix': 'inventory_index',
1485
'.tix': 'text_index',
1487
return self._packs_list_to_pack_map_and_index_list(self.all_packs(),
1488
suffix_map[index_suffix])
1490
def _packs_list_to_pack_map_and_index_list(self, packs, index_attribute):
1491
"""Convert a list of packs to an index pack map and index list.
1493
:param packs: The packs list to process.
1494
:param index_attribute: The attribute that the desired index is found
1496
:return: A tuple (map, list) where map contains the dict from
1497
index:pack_tuple, and lsit contains the indices in the same order
1503
index = getattr(pack, index_attribute)
1504
indices.append(index)
1505
pack_map[index] = (pack.pack_transport, pack.file_name())
1506
return pack_map, indices
1508
def _index_contents(self, pack_map, key_filter=None):
1509
"""Get an iterable of the index contents from a pack_map.
1511
:param pack_map: A map from indices to pack details.
1512
:param key_filter: An optional filter to limit the
1515
indices = [index for index in pack_map.iterkeys()]
1516
all_index = CombinedGraphIndex(indices)
1517
if key_filter is None:
1518
return all_index.iter_all_entries()
1520
return all_index.iter_entries(key_filter)
1522
1853
def _unlock_names(self):
1523
1854
"""Release the mutex around the pack-names index."""
1524
1855
self.repo.control_files.unlock()
1526
def _save_pack_names(self, clear_obsolete_packs=False):
1527
"""Save the list of packs.
1529
This will take out the mutex around the pack names list for the
1530
duration of the method call. If concurrent updates have been made, a
1531
three-way merge between the current list and the current in memory list
1534
:param clear_obsolete_packs: If True, clear out the contents of the
1535
obsolete_packs directory.
1539
builder = GraphIndexBuilder()
1540
# load the disk nodes across
1542
for index, key, value in self._iter_disk_pack_index():
1543
disk_nodes.add((key, value))
1544
# do a two-way diff against our original content
1545
current_nodes = set()
1546
for name, sizes in self._names.iteritems():
1548
((name, ), ' '.join(str(size) for size in sizes)))
1549
deleted_nodes = self._packs_at_load - current_nodes
1550
new_nodes = current_nodes - self._packs_at_load
1551
disk_nodes.difference_update(deleted_nodes)
1552
disk_nodes.update(new_nodes)
1553
# TODO: handle same-name, index-size-changes here -
1554
# e.g. use the value from disk, not ours, *unless* we're the one
1556
for key, value in disk_nodes:
1557
builder.add_node(key, value)
1558
self.transport.put_file('pack-names', builder.finish(),
1559
mode=self.repo.bzrdir._get_file_mode())
1560
# move the baseline forward
1561
self._packs_at_load = disk_nodes
1562
if clear_obsolete_packs:
1563
self._clear_obsolete_packs()
1565
self._unlock_names()
1566
# synchronise the memory packs list with what we just wrote:
1857
def _diff_pack_names(self):
1858
"""Read the pack names from disk, and compare it to the one in memory.
1860
:return: (disk_nodes, deleted_nodes, new_nodes)
1861
disk_nodes The final set of nodes that should be referenced
1862
deleted_nodes Nodes which have been removed from when we started
1863
new_nodes Nodes that are newly introduced
1865
# load the disk nodes across
1867
for index, key, value in self._iter_disk_pack_index():
1868
disk_nodes.add((key, value))
1870
# do a two-way diff against our original content
1871
current_nodes = set()
1872
for name, sizes in self._names.iteritems():
1874
((name, ), ' '.join(str(size) for size in sizes)))
1876
# Packs no longer present in the repository, which were present when we
1877
# locked the repository
1878
deleted_nodes = self._packs_at_load - current_nodes
1879
# Packs which this process is adding
1880
new_nodes = current_nodes - self._packs_at_load
1882
# Update the disk_nodes set to include the ones we are adding, and
1883
# remove the ones which were removed by someone else
1884
disk_nodes.difference_update(deleted_nodes)
1885
disk_nodes.update(new_nodes)
1887
return disk_nodes, deleted_nodes, new_nodes
1889
def _syncronize_pack_names_from_disk_nodes(self, disk_nodes):
1890
"""Given the correct set of pack files, update our saved info.
1892
:return: (removed, added, modified)
1893
removed pack names removed from self._names
1894
added pack names added to self._names
1895
modified pack names that had changed value
1900
## self._packs_at_load = disk_nodes
1567
1901
new_names = dict(disk_nodes)
1568
1902
# drop no longer present nodes
1569
1903
for pack in self.all_packs():
1570
1904
if (pack.name,) not in new_names:
1905
removed.append(pack.name)
1571
1906
self._remove_pack_from_memory(pack)
1572
1907
# add new nodes/refresh existing ones
1573
1908
for key, value in disk_nodes:
1587
1922
self._remove_pack_from_memory(self.get_pack_by_name(name))
1588
1923
self._names[name] = sizes
1589
1924
self.get_pack_by_name(name)
1925
modified.append(name)
1592
1928
self._names[name] = sizes
1593
1929
self.get_pack_by_name(name)
1931
return removed, added, modified
1933
def _save_pack_names(self, clear_obsolete_packs=False):
1934
"""Save the list of packs.
1936
This will take out the mutex around the pack names list for the
1937
duration of the method call. If concurrent updates have been made, a
1938
three-way merge between the current list and the current in memory list
1941
:param clear_obsolete_packs: If True, clear out the contents of the
1942
obsolete_packs directory.
1943
:return: A list of the names saved that were not previously on disk.
1947
builder = self._index_builder_class()
1948
disk_nodes, deleted_nodes, new_nodes = self._diff_pack_names()
1949
# TODO: handle same-name, index-size-changes here -
1950
# e.g. use the value from disk, not ours, *unless* we're the one
1952
for key, value in disk_nodes:
1953
builder.add_node(key, value)
1954
self.transport.put_file('pack-names', builder.finish(),
1955
mode=self.repo.bzrdir._get_file_mode())
1956
# move the baseline forward
1957
self._packs_at_load = disk_nodes
1958
if clear_obsolete_packs:
1959
self._clear_obsolete_packs()
1961
self._unlock_names()
1962
# synchronise the memory packs list with what we just wrote:
1963
self._syncronize_pack_names_from_disk_nodes(disk_nodes)
1964
return [new_node[0][0] for new_node in new_nodes]
1966
def reload_pack_names(self):
1967
"""Sync our pack listing with what is present in the repository.
1969
This should be called when we find out that something we thought was
1970
present is now missing. This happens when another process re-packs the
1973
:return: True if the in-memory list of packs has been altered at all.
1975
# The ensure_loaded call is to handle the case where the first call
1976
# made involving the collection was to reload_pack_names, where we
1977
# don't have a view of disk contents. Its a bit of a bandaid, and
1978
# causes two reads of pack-names, but its a rare corner case not struck
1979
# with regular push/pull etc.
1980
first_read = self.ensure_loaded()
1983
# out the new value.
1984
disk_nodes, _, _ = self._diff_pack_names()
1985
self._packs_at_load = disk_nodes
1987
modified) = self._syncronize_pack_names_from_disk_nodes(disk_nodes)
1988
if removed or added or modified:
1992
def _restart_autopack(self):
1993
"""Reload the pack names list, and restart the autopack code."""
1994
if not self.reload_pack_names():
1995
# Re-raise the original exception, because something went missing
1996
# and a restart didn't find it
1998
raise errors.RetryAutopack(self.repo, False, sys.exc_info())
1595
2000
def _clear_obsolete_packs(self):
1596
2001
"""Delete everything from the obsolete-packs directory.
1628
2038
# FIXME: just drop the transient index.
1629
2039
# forget what names there are
1630
2040
if self._new_pack is not None:
1631
self._new_pack.abort()
1632
self._remove_pack_indices(self._new_pack)
1633
self._new_pack = None
1634
self.repo._text_knit = None
2042
self._new_pack.abort()
2044
# XXX: If we aborted while in the middle of finishing the write
2045
# group, _remove_pack_indices can fail because the indexes are
2046
# already gone. If they're not there we shouldn't fail in this
2047
# case. -- mbp 20081113
2048
self._remove_pack_indices(self._new_pack)
2049
self._new_pack = None
2050
for resumed_pack in self._resumed_packs:
2052
resumed_pack.abort()
2054
# See comment in previous finally block.
2056
self._remove_pack_indices(resumed_pack)
2059
del self._resumed_packs[:]
2061
def _remove_resumed_pack_indices(self):
2062
for resumed_pack in self._resumed_packs:
2063
self._remove_pack_indices(resumed_pack)
2064
del self._resumed_packs[:]
1636
2066
def _commit_write_group(self):
2068
for prefix, versioned_file in (
2069
('revisions', self.repo.revisions),
2070
('inventories', self.repo.inventories),
2071
('texts', self.repo.texts),
2072
('signatures', self.repo.signatures),
2074
missing = versioned_file.get_missing_compression_parent_keys()
2075
all_missing.update([(prefix,) + key for key in missing])
2077
raise errors.BzrCheckError(
2078
"Repository %s has missing compression parent(s) %r "
2079
% (self.repo, sorted(all_missing)))
1637
2080
self._remove_pack_indices(self._new_pack)
2081
should_autopack = False
1638
2082
if self._new_pack.data_inserted():
1639
2083
# get all the data to disk and read to use
1640
2084
self._new_pack.finish()
1641
2085
self.allocate(self._new_pack)
1642
2086
self._new_pack = None
2087
should_autopack = True
2089
self._new_pack.abort()
2090
self._new_pack = None
2091
for resumed_pack in self._resumed_packs:
2092
# XXX: this is a pretty ugly way to turn the resumed pack into a
2093
# properly committed pack.
2094
self._names[resumed_pack.name] = None
2095
self._remove_pack_from_memory(resumed_pack)
2096
resumed_pack.finish()
2097
self.allocate(resumed_pack)
2098
should_autopack = True
2099
del self._resumed_packs[:]
1643
2101
if not self.autopack():
1644
2102
# when autopack takes no steps, the names list is still
1646
self._save_pack_names()
2104
return self._save_pack_names()
2107
def _suspend_write_group(self):
2108
tokens = [pack.name for pack in self._resumed_packs]
2109
self._remove_pack_indices(self._new_pack)
2110
if self._new_pack.data_inserted():
2111
# get all the data to disk and read to use
2112
self._new_pack.finish(suspend=True)
2113
tokens.append(self._new_pack.name)
2114
self._new_pack = None
1648
2116
self._new_pack.abort()
1649
2117
self._new_pack = None
1650
self.repo._text_knit = None
2118
self._remove_resumed_pack_indices()
2121
def _resume_write_group(self, tokens):
2122
for token in tokens:
2123
self._resume_pack(token)
1653
2126
class KnitPackRepository(KnitRepository):
1654
2127
"""Repository with knit objects stored inside pack containers.
1656
2129
The layering for a KnitPackRepository is:
1658
2131
Graph | HPSS | Repository public layer |
1733
2222
% (self._format, self.bzrdir.transport.base))
1735
2224
def _abort_write_group(self):
2225
self.revisions._index._key_dependencies.refs.clear()
1736
2226
self._pack_collection._abort_write_group()
1738
def _find_inconsistent_revision_parents(self):
1739
"""Find revisions with incorrectly cached parents.
1741
:returns: an iterator yielding tuples of (revison-id, parents-in-index,
1742
parents-in-revision).
1744
if not self.is_locked():
1745
raise errors.ObjectNotLocked(self)
1746
pb = ui.ui_factory.nested_progress_bar()
1749
revision_nodes = self._pack_collection.revision_index \
1750
.combined_index.iter_all_entries()
1751
index_positions = []
1752
# Get the cached index values for all revisions, and also the location
1753
# in each index of the revision text so we can perform linear IO.
1754
for index, key, value, refs in revision_nodes:
1755
pos, length = value[1:].split(' ')
1756
index_positions.append((index, int(pos), key[0],
1757
tuple(parent[0] for parent in refs[0])))
1758
pb.update("Reading revision index.", 0, 0)
1759
index_positions.sort()
1760
batch_count = len(index_positions) / 1000 + 1
1761
pb.update("Checking cached revision graph.", 0, batch_count)
1762
for offset in xrange(batch_count):
1763
pb.update("Checking cached revision graph.", offset)
1764
to_query = index_positions[offset * 1000:(offset + 1) * 1000]
1767
rev_ids = [item[2] for item in to_query]
1768
revs = self.get_revisions(rev_ids)
1769
for revision, item in zip(revs, to_query):
1770
index_parents = item[3]
1771
rev_parents = tuple(revision.parent_ids)
1772
if index_parents != rev_parents:
1773
result.append((revision.revision_id, index_parents, rev_parents))
1778
@symbol_versioning.deprecated_method(symbol_versioning.one_one)
1779
def get_parents(self, revision_ids):
1780
"""See graph._StackedParentsProvider.get_parents."""
1781
parent_map = self.get_parent_map(revision_ids)
1782
return [parent_map.get(r, None) for r in revision_ids]
2228
def _get_source(self, to_format):
2229
if to_format.network_name() == self._format.network_name():
2230
return KnitPackStreamSource(self, to_format)
2231
return super(KnitPackRepository, self)._get_source(to_format)
1784
2233
def _make_parents_provider(self):
1785
2234
return graph.CachingParentsProvider(self)
1787
2236
def _refresh_data(self):
1788
if self._write_lock_count == 1 or (
1789
self.control_files._lock_count == 1 and
1790
self.control_files._lock_mode == 'r'):
1791
# forget what names there are
1792
self._pack_collection.reset()
1793
# XXX: Better to do an in-memory merge when acquiring a new lock -
1794
# factor out code from _save_pack_names.
1795
self._pack_collection.ensure_loaded()
2237
if not self.is_locked():
2239
self._pack_collection.reload_pack_names()
1797
2241
def _start_write_group(self):
1798
2242
self._pack_collection._start_write_group()
1800
2244
def _commit_write_group(self):
2245
self.revisions._index._key_dependencies.refs.clear()
1801
2246
return self._pack_collection._commit_write_group()
2248
def suspend_write_group(self):
2249
# XXX check self._write_group is self.get_transaction()?
2250
tokens = self._pack_collection._suspend_write_group()
2251
self.revisions._index._key_dependencies.refs.clear()
2252
self._write_group = None
2255
def _resume_write_group(self, tokens):
2256
self._start_write_group()
2258
self._pack_collection._resume_write_group(tokens)
2259
except errors.UnresumableWriteGroup:
2260
self._abort_write_group()
2262
for pack in self._pack_collection._resumed_packs:
2263
self.revisions._index.scan_unvalidated_index(pack.revision_index)
1803
2265
def get_transaction(self):
1804
2266
if self._write_lock_count:
1805
2267
return self._transaction
1872
2341
transaction = self._transaction
1873
2342
self._transaction = None
1874
2343
transaction.finish()
1875
for repo in self._fallback_repositories:
1878
2345
self.control_files.unlock()
2347
if not self.is_locked():
1879
2348
for repo in self._fallback_repositories:
2352
class KnitPackStreamSource(StreamSource):
2353
"""A StreamSource used to transfer data between same-format KnitPack repos.
2355
This source assumes:
2356
1) Same serialization format for all objects
2357
2) Same root information
2358
3) XML format inventories
2359
4) Atomic inserts (so we can stream inventory texts before text
2364
def __init__(self, from_repository, to_format):
2365
super(KnitPackStreamSource, self).__init__(from_repository, to_format)
2366
self._text_keys = None
2367
self._text_fetch_order = 'unordered'
2369
def _get_filtered_inv_stream(self, revision_ids):
2370
from_repo = self.from_repository
2371
parent_ids = from_repo._find_parent_ids_of_revisions(revision_ids)
2372
parent_keys = [(p,) for p in parent_ids]
2373
find_text_keys = from_repo._find_text_key_references_from_xml_inventory_lines
2374
parent_text_keys = set(find_text_keys(
2375
from_repo._inventory_xml_lines_for_keys(parent_keys)))
2376
content_text_keys = set()
2377
knit = KnitVersionedFiles(None, None)
2378
factory = KnitPlainFactory()
2379
def find_text_keys_from_content(record):
2380
if record.storage_kind not in ('knit-delta-gz', 'knit-ft-gz'):
2381
raise ValueError("Unknown content storage kind for"
2382
" inventory text: %s" % (record.storage_kind,))
2383
# It's a knit record, it has a _raw_record field (even if it was
2384
# reconstituted from a network stream).
2385
raw_data = record._raw_record
2386
# read the entire thing
2387
revision_id = record.key[-1]
2388
content, _ = knit._parse_record(revision_id, raw_data)
2389
if record.storage_kind == 'knit-delta-gz':
2390
line_iterator = factory.get_linedelta_content(content)
2391
elif record.storage_kind == 'knit-ft-gz':
2392
line_iterator = factory.get_fulltext_content(content)
2393
content_text_keys.update(find_text_keys(
2394
[(line, revision_id) for line in line_iterator]))
2395
revision_keys = [(r,) for r in revision_ids]
2396
def _filtered_inv_stream():
2397
source_vf = from_repo.inventories
2398
stream = source_vf.get_record_stream(revision_keys,
2400
for record in stream:
2401
if record.storage_kind == 'absent':
2402
raise errors.NoSuchRevision(from_repo, record.key)
2403
find_text_keys_from_content(record)
2405
self._text_keys = content_text_keys - parent_text_keys
2406
return ('inventories', _filtered_inv_stream())
2408
def _get_text_stream(self):
2409
# Note: We know we don't have to handle adding root keys, because both
2410
# the source and target are the identical network name.
2411
text_stream = self.from_repository.texts.get_record_stream(
2412
self._text_keys, self._text_fetch_order, False)
2413
return ('texts', text_stream)
2415
def get_stream(self, search):
2416
revision_ids = search.get_keys()
2417
for stream_info in self._fetch_revision_texts(revision_ids):
2419
self._revision_keys = [(rev_id,) for rev_id in revision_ids]
2420
yield self._get_filtered_inv_stream(revision_ids)
2421
yield self._get_text_stream()
1883
2425
class RepositoryFormatPack(MetaDirRepositoryFormat):
1884
2426
"""Format logic for pack structured repositories.
2199
2760
def get_format_string(self):
2200
2761
"""See RepositoryFormat.get_format_string()."""
2201
return "Bazaar development format 1 (needs bzr.dev from before 1.6)\n"
2762
return "Bazaar RepositoryFormatKnitPack6 (bzr 1.9)\n"
2203
2764
def get_format_description(self):
2204
2765
"""See RepositoryFormat.get_format_description()."""
2205
return ("Development repository format, currently the same as "
2206
"pack-0.92 with external reference support.\n")
2208
def check_conversion_target(self, target_format):
2766
return "Packs 6 (uses btree indexes, requires bzr 1.9)"
2769
class RepositoryFormatKnitPack6RichRoot(RepositoryFormatPack):
2770
"""A repository with rich roots, no subtrees, stacking and btree indexes.
2772
1.6-rich-root with B+Tree indices.
2775
repository_class = KnitPackRepository
2776
_commit_builder_class = PackRootCommitBuilder
2777
rich_root_data = True
2778
supports_tree_reference = False # no subtrees
2779
supports_external_lookups = True
2780
# What index classes to use
2781
index_builder_class = BTreeBuilder
2782
index_class = BTreeGraphIndex
2785
def _serializer(self):
2786
return xml6.serializer_v6
2788
def _get_matching_bzrdir(self):
2789
return bzrdir.format_registry.make_bzrdir(
2792
def _ignore_setting_bzrdir(self, format):
2212
class RepositoryFormatPackDevelopment1Subtree(RepositoryFormatPack):
2795
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
2797
def get_format_string(self):
2798
"""See RepositoryFormat.get_format_string()."""
2799
return "Bazaar RepositoryFormatKnitPack6RichRoot (bzr 1.9)\n"
2801
def get_format_description(self):
2802
return "Packs 6 rich-root (uses btree indexes, requires bzr 1.9)"
2805
class RepositoryFormatPackDevelopment2Subtree(RepositoryFormatPack):
2213
2806
"""A subtrees development repository.
2215
This format should be retained until the second release after bzr 1.5.
2217
Supports external lookups, which results in non-truncated ghosts after
2218
reconcile compared to pack-0.92 formats.
2808
This format should be retained until the second release after bzr 1.7.
2810
1.6.1-subtree[as it might have been] with B+Tree indices.
2812
This is [now] retained until we have a CHK based subtree format in
2221
2816
repository_class = KnitPackRepository
2222
2817
_commit_builder_class = PackRootCommitBuilder
2223
2818
rich_root_data = True
2224
2819
supports_tree_reference = True
2225
_serializer = xml7.serializer_v7
2226
2820
supports_external_lookups = True
2821
# What index classes to use
2822
index_builder_class = BTreeBuilder
2823
index_class = BTreeGraphIndex
2826
def _serializer(self):
2827
return xml7.serializer_v7
2228
2829
def _get_matching_bzrdir(self):
2229
2830
return bzrdir.format_registry.make_bzrdir(
2230
'development1-subtree')
2831
'development-subtree')
2232
2833
def _ignore_setting_bzrdir(self, format):
2235
2836
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
2237
def check_conversion_target(self, target_format):
2238
if not target_format.rich_root_data:
2239
raise errors.BadConversionTarget(
2240
'Does not support rich root data.', target_format)
2241
if not getattr(target_format, 'supports_tree_reference', False):
2242
raise errors.BadConversionTarget(
2243
'Does not support nested trees', target_format)
2245
2838
def get_format_string(self):
2246
2839
"""See RepositoryFormat.get_format_string()."""
2247
return ("Bazaar development format 1 with subtree support "
2248
"(needs bzr.dev from before 1.6)\n")
2840
return ("Bazaar development format 2 with subtree support "
2841
"(needs bzr.dev from before 1.8)\n")
2250
2843
def get_format_description(self):
2251
2844
"""See RepositoryFormat.get_format_description()."""
2252
2845
return ("Development repository format, currently the same as "
2253
"pack-0.92-subtree with external reference support.\n")
2846
"1.6.1-subtree with B+Tree indices.\n")