151
161
"""Return a tuple (transport, name) for the pack content."""
152
162
return self.pack_transport, self.file_name()
164
def _check_references(self):
165
"""Make sure our external references are present.
167
Packs are allowed to have deltas whose base is not in the pack, but it
168
must be present somewhere in this collection. It is not allowed to
169
have deltas based on a fallback repository.
170
(See <https://bugs.launchpad.net/bzr/+bug/288751>)
173
for (index_name, external_refs, index) in [
175
self._get_external_refs(self.text_index),
176
self._pack_collection.text_index.combined_index),
178
self._get_external_refs(self.inventory_index),
179
self._pack_collection.inventory_index.combined_index),
181
missing = external_refs.difference(
182
k for (idx, k, v, r) in
183
index.iter_entries(external_refs))
185
missing_items[index_name] = sorted(list(missing))
187
from pprint import pformat
188
raise errors.BzrCheckError(
189
"Newly created pack file %r has delta references to "
190
"items not in its repository:\n%s"
191
% (self, pformat(missing_items)))
154
193
def file_name(self):
155
194
"""Get the file name for the pack on disk."""
156
195
return self.name + '.pack'
158
197
def get_revision_count(self):
159
198
return self.revision_index.key_count()
200
def index_name(self, index_type, name):
201
"""Get the disk name of an index type for pack name 'name'."""
202
return name + Pack.index_definitions[index_type][0]
204
def index_offset(self, index_type):
205
"""Get the position in a index_size array for a given index type."""
206
return Pack.index_definitions[index_type][1]
161
208
def inventory_index_name(self, name):
162
209
"""The inv index is the name + .iix."""
163
210
return self.index_name('inventory', name)
174
221
"""The text index is the name + .tix."""
175
222
return self.index_name('text', name)
224
def _replace_index_with_readonly(self, index_type):
225
setattr(self, index_type + '_index',
226
self.index_class(self.index_transport,
227
self.index_name(index_type, self.name),
228
self.index_sizes[self.index_offset(index_type)]))
178
231
class ExistingPack(Pack):
179
232
"""An in memory proxy for an existing .pack and its disk indices."""
200
253
return not self.__eq__(other)
202
255
def __repr__(self):
203
return "<bzrlib.repofmt.pack_repo.Pack object at 0x%x, %s, %s" % (
204
id(self), self.pack_transport, self.name)
256
return "<%s.%s object at 0x%x, %s, %s" % (
257
self.__class__.__module__, self.__class__.__name__, id(self),
258
self.pack_transport, self.name)
261
class ResumedPack(ExistingPack):
263
def __init__(self, name, revision_index, inventory_index, text_index,
264
signature_index, upload_transport, pack_transport, index_transport,
266
"""Create a ResumedPack object."""
267
ExistingPack.__init__(self, pack_transport, name, revision_index,
268
inventory_index, text_index, signature_index)
269
self.upload_transport = upload_transport
270
self.index_transport = index_transport
271
self.index_sizes = [None, None, None, None]
273
('revision', revision_index),
274
('inventory', inventory_index),
275
('text', text_index),
276
('signature', signature_index),
278
for index_type, index in indices:
279
offset = self.index_offset(index_type)
280
self.index_sizes[offset] = index._size
281
self.index_class = pack_collection._index_class
282
self._pack_collection = pack_collection
283
self._state = 'resumed'
284
# XXX: perhaps check that the .pack file exists?
286
def access_tuple(self):
287
if self._state == 'finished':
288
return Pack.access_tuple(self)
289
elif self._state == 'resumed':
290
return self.upload_transport, self.file_name()
292
raise AssertionError(self._state)
295
self.upload_transport.delete(self.file_name())
296
indices = [self.revision_index, self.inventory_index, self.text_index,
297
self.signature_index]
298
for index in indices:
299
index._transport.delete(index._name)
302
self._check_references()
303
new_name = '../packs/' + self.file_name()
304
self.upload_transport.rename(self.file_name(), new_name)
305
for index_type in ['revision', 'inventory', 'text', 'signature']:
306
old_name = self.index_name(index_type, self.name)
307
new_name = '../indices/' + old_name
308
self.upload_transport.rename(old_name, new_name)
309
self._replace_index_with_readonly(index_type)
310
self._state = 'finished'
312
def _get_external_refs(self, index):
313
return index.external_references(1)
207
316
class NewPack(Pack):
208
317
"""An in memory proxy for a pack which is being created."""
210
# A map of index 'type' to the file extension and position in the
212
index_definitions = {
213
'revision': ('.rix', 0),
214
'inventory': ('.iix', 1),
216
'signature': ('.six', 3),
219
319
def __init__(self, pack_collection, upload_suffix='', file_mode=None):
220
320
"""Create a NewPack instance.
319
419
raise AssertionError(self._state)
321
def _check_references(self):
322
"""Make sure our external references are present.
324
Packs are allowed to have deltas whose base is not in the pack, but it
325
must be present somewhere in this collection. It is not allowed to
326
have deltas based on a fallback repository.
327
(See <https://bugs.launchpad.net/bzr/+bug/288751>)
330
for (index_name, external_refs, index) in [
332
self.text_index._external_references(),
333
self._pack_collection.text_index.combined_index),
335
self.inventory_index._external_references(),
336
self._pack_collection.inventory_index.combined_index),
338
missing = external_refs.difference(
339
k for (idx, k, v, r) in
340
index.iter_entries(external_refs))
342
missing_items[index_name] = sorted(list(missing))
344
from pprint import pformat
345
raise errors.BzrCheckError(
346
"Newly created pack file %r has delta references to "
347
"items not in its repository:\n%s"
348
% (self, pformat(missing_items)))
350
421
def data_inserted(self):
351
422
"""True if data has been added to this pack."""
352
423
return bool(self.get_revision_count() or
369
440
if self._buffer[1]:
370
441
self._write_data('', flush=True)
371
442
self.name = self._hash.hexdigest()
372
self._check_references()
444
self._check_references()
374
446
# XXX: It'd be better to write them all to temporary names, then
375
447
# rename them all into place, so that the window when only some are
376
448
# visible is smaller. On the other hand none will be seen until
377
449
# they're in the names list.
378
450
self.index_sizes = [None, None, None, None]
379
self._write_index('revision', self.revision_index, 'revision')
380
self._write_index('inventory', self.inventory_index, 'inventory')
381
self._write_index('text', self.text_index, 'file texts')
451
self._write_index('revision', self.revision_index, 'revision', suspend)
452
self._write_index('inventory', self.inventory_index, 'inventory',
454
self._write_index('text', self.text_index, 'file texts', suspend)
382
455
self._write_index('signature', self.signature_index,
383
'revision signatures')
456
'revision signatures', suspend)
384
457
self.write_stream.close()
385
458
# Note that this will clobber an existing pack with the same name,
386
459
# without checking for hash collisions. While this is undesirable this
393
466
# - try for HASH.pack
394
467
# - try for temporary-name
395
468
# - refresh the pack-list to see if the pack is now absent
396
self.upload_transport.rename(self.random_name,
397
'../packs/' + self.name + '.pack')
469
new_name = self.name + '.pack'
471
new_name = '../packs/' + new_name
472
self.upload_transport.rename(self.random_name, new_name)
398
473
self._state = 'finished'
399
474
if 'pack' in debug.debug_flags:
400
475
# XXX: size might be interesting?
401
mutter('%s: create_pack: pack renamed into place: %s%s->%s%s t+%6.3fs',
476
mutter('%s: create_pack: pack finished: %s%s->%s t+%6.3fs',
402
477
time.ctime(), self.upload_transport.base, self.random_name,
403
self.pack_transport, self.name,
404
time.time() - self.start_time)
478
new_name, time.time() - self.start_time)
407
481
"""Flush any current data."""
411
485
self._hash.update(bytes)
412
486
self._buffer[:] = [[], 0]
414
def index_name(self, index_type, name):
415
"""Get the disk name of an index type for pack name 'name'."""
416
return name + NewPack.index_definitions[index_type][0]
418
def index_offset(self, index_type):
419
"""Get the position in a index_size array for a given index type."""
420
return NewPack.index_definitions[index_type][1]
422
def _replace_index_with_readonly(self, index_type):
423
setattr(self, index_type + '_index',
424
self.index_class(self.index_transport,
425
self.index_name(index_type, self.name),
426
self.index_sizes[self.index_offset(index_type)]))
488
def _get_external_refs(self, index):
489
return index._external_references()
428
491
def set_write_cache_size(self, size):
429
492
self._cache_limit = size
431
def _write_index(self, index_type, index, label):
494
def _write_index(self, index_type, index, label, suspend=False):
432
495
"""Write out an index.
434
497
:param index_type: The type of index to write - e.g. 'revision'.
436
499
:param label: What label to give the index e.g. 'revision'.
438
501
index_name = self.index_name(index_type, self.name)
439
self.index_sizes[self.index_offset(index_type)] = \
440
self.index_transport.put_file(index_name, index.finish(),
441
mode=self._file_mode)
503
transport = self.upload_transport
505
transport = self.index_transport
506
self.index_sizes[self.index_offset(index_type)] = transport.put_file(
507
index_name, index.finish(), mode=self._file_mode)
442
508
if 'pack' in debug.debug_flags:
443
509
# XXX: size might be interesting?
444
510
mutter('%s: create_pack: wrote %s index: %s%s t+%6.3fs',
1493
1562
self.add_pack_to_memory(result)
1565
def _resume_pack(self, name):
1566
"""Get a suspended Pack object by name.
1568
:param name: The name of the pack - e.g. '123456'
1569
:return: A Pack object.
1571
if not re.match('[a-f0-9]{32}', name):
1572
# Tokens should be md5sums of the suspended pack file, i.e. 32 hex
1574
raise errors.UnresumableWriteGroup(
1575
self.repo, [name], 'Malformed write group token')
1577
rev_index = self._make_index(name, '.rix', resume=True)
1578
inv_index = self._make_index(name, '.iix', resume=True)
1579
txt_index = self._make_index(name, '.tix', resume=True)
1580
sig_index = self._make_index(name, '.six', resume=True)
1581
result = ResumedPack(name, rev_index, inv_index, txt_index,
1582
sig_index, self._upload_transport, self._pack_transport,
1583
self._index_transport, self)
1584
except errors.NoSuchFile, e:
1585
raise errors.UnresumableWriteGroup(self.repo, [name], str(e))
1586
self.add_pack_to_memory(result)
1587
self._resumed_packs.append(result)
1496
1590
def allocate(self, a_new_pack):
1497
1591
"""Allocate name in the list of packs.
1516
1610
return self._index_class(self.transport, 'pack-names', None
1517
1611
).iter_all_entries()
1519
def _make_index(self, name, suffix):
1613
def _make_index(self, name, suffix, resume=False):
1520
1614
size_offset = self._suffix_offsets[suffix]
1521
1615
index_name = name + suffix
1522
index_size = self._names[name][size_offset]
1523
return self._index_class(
1524
self._index_transport, index_name, index_size)
1617
transport = self._upload_transport
1618
index_size = transport.stat(index_name).st_size
1620
transport = self._index_transport
1621
index_size = self._names[name][size_offset]
1622
return self._index_class(transport, index_name, index_size)
1526
1624
def _max_pack_count(self, total_revisions):
1527
1625
"""Return the maximum number of packs to use for total revisions.
1802
1900
# case. -- mbp 20081113
1803
1901
self._remove_pack_indices(self._new_pack)
1804
1902
self._new_pack = None
1903
for resumed_pack in self._resumed_packs:
1905
resumed_pack.abort()
1907
# See comment in previous finally block.
1909
self._remove_pack_indices(resumed_pack)
1912
del self._resumed_packs[:]
1805
1913
self.repo._text_knit = None
1915
def _remove_resumed_pack_indices(self):
1916
for resumed_pack in self._resumed_packs:
1917
self._remove_pack_indices(resumed_pack)
1918
del self._resumed_packs[:]
1807
1920
def _commit_write_group(self):
1808
1921
all_missing = set()
1809
1922
for prefix, versioned_file in (
1819
1932
"Repository %s has missing compression parent(s) %r "
1820
1933
% (self.repo, sorted(all_missing)))
1821
1934
self._remove_pack_indices(self._new_pack)
1935
should_autopack = False
1822
1936
if self._new_pack.data_inserted():
1823
1937
# get all the data to disk and read to use
1824
1938
self._new_pack.finish()
1825
1939
self.allocate(self._new_pack)
1826
1940
self._new_pack = None
1941
should_autopack = True
1943
self._new_pack.abort()
1944
self._new_pack = None
1945
for resumed_pack in self._resumed_packs:
1946
# XXX: this is a pretty ugly way to turn the resumed pack into a
1947
# properly committed pack.
1948
self._names[resumed_pack.name] = None
1949
self._remove_pack_from_memory(resumed_pack)
1950
resumed_pack.finish()
1951
self.allocate(resumed_pack)
1952
should_autopack = True
1953
del self._resumed_packs[:]
1827
1955
if not self.autopack():
1828
1956
# when autopack takes no steps, the names list is still
1830
1958
self._save_pack_names()
1959
self.repo._text_knit = None
1961
def _suspend_write_group(self):
1962
tokens = [pack.name for pack in self._resumed_packs]
1963
self._remove_pack_indices(self._new_pack)
1964
if self._new_pack.data_inserted():
1965
# get all the data to disk and read to use
1966
self._new_pack.finish(suspend=True)
1967
tokens.append(self._new_pack.name)
1968
self._new_pack = None
1832
1970
self._new_pack.abort()
1833
1971
self._new_pack = None
1972
self._remove_resumed_pack_indices()
1834
1973
self.repo._text_knit = None
1976
def _resume_write_group(self, tokens):
1977
for token in tokens:
1978
self._resume_pack(token)
1837
1981
class KnitPackRepository(KnitRepository):