50
52
from bzrlib import (
56
57
revision as _mod_revision,
59
from bzrlib.decorators import needs_write_lock, only_raises
61
from bzrlib.decorators import needs_write_lock
62
from bzrlib.btree_index import (
60
66
from bzrlib.index import (
62
68
InMemoryGraphIndex,
64
from bzrlib.lock import LogicalLockResult
65
70
from bzrlib.repofmt.knitrepo import KnitRepository
66
71
from bzrlib.repository import (
68
73
MetaDirRepositoryFormat,
70
RepositoryWriteLockResult,
77
import bzrlib.revision as _mod_revision
74
78
from bzrlib.trace import (
225
228
return self.index_name('text', name)
227
230
def _replace_index_with_readonly(self, index_type):
228
unlimited_cache = False
229
if index_type == 'chk':
230
unlimited_cache = True
231
index = self.index_class(self.index_transport,
232
self.index_name(index_type, self.name),
233
self.index_sizes[self.index_offset(index_type)],
234
unlimited_cache=unlimited_cache)
235
if index_type == 'chk':
236
index._leaf_factory = btree_index._gcchk_factory
237
setattr(self, index_type + '_index', index)
231
setattr(self, index_type + '_index',
232
self.index_class(self.index_transport,
233
self.index_name(index_type, self.name),
234
self.index_sizes[self.index_offset(index_type)]))
240
237
class ExistingPack(Pack):
272
269
def __init__(self, name, revision_index, inventory_index, text_index,
273
270
signature_index, upload_transport, pack_transport, index_transport,
274
pack_collection, chk_index=None):
275
272
"""Create a ResumedPack object."""
276
273
ExistingPack.__init__(self, pack_transport, name, revision_index,
277
inventory_index, text_index, signature_index,
274
inventory_index, text_index, signature_index)
279
275
self.upload_transport = upload_transport
280
276
self.index_transport = index_transport
281
277
self.index_sizes = [None, None, None, None]
308
301
self.upload_transport.delete(self.file_name())
309
302
indices = [self.revision_index, self.inventory_index, self.text_index,
310
303
self.signature_index]
311
if self.chk_index is not None:
312
indices.append(self.chk_index)
313
304
for index in indices:
314
305
index._transport.delete(index._name)
316
307
def finish(self):
317
308
self._check_references()
318
index_types = ['revision', 'inventory', 'text', 'signature']
319
if self.chk_index is not None:
320
index_types.append('chk')
321
for index_type in index_types:
309
new_name = '../packs/' + self.file_name()
310
self.upload_transport.rename(self.file_name(), new_name)
311
for index_type in ['revision', 'inventory', 'text', 'signature']:
322
312
old_name = self.index_name(index_type, self.name)
323
313
new_name = '../indices/' + old_name
324
314
self.upload_transport.rename(old_name, new_name)
325
315
self._replace_index_with_readonly(index_type)
326
new_name = '../packs/' + self.file_name()
327
self.upload_transport.rename(self.file_name(), new_name)
328
316
self._state = 'finished'
330
318
def _get_external_refs(self, index):
331
"""Return compression parents for this index that are not present.
333
This returns any compression parents that are referenced by this index,
334
which are not contained *in* this index. They may be present elsewhere.
336
319
return index.external_references(1)
588
564
flush_func=flush_func)
589
565
self.add_callback = None
567
def replace_indices(self, index_to_pack, indices):
568
"""Replace the current mappings with fresh ones.
570
This should probably not be used eventually, rather incremental add and
571
removal of indices. It has been added during refactoring of existing
574
:param index_to_pack: A mapping from index objects to
575
(transport, name) tuples for the pack file data.
576
:param indices: A list of indices.
578
# refresh the revision pack map dict without replacing the instance.
579
self.index_to_pack.clear()
580
self.index_to_pack.update(index_to_pack)
581
# XXX: API break - clearly a 'replace' method would be good?
582
self.combined_index._indices[:] = indices
583
# the current add nodes callback for the current writable index if
585
self.add_callback = None
591
587
def add_index(self, index, pack):
592
588
"""Add index to the aggregate, which is an index for Pack pack.
626
622
self.data_access.set_writer(None, None, (None, None))
627
623
self.index_to_pack.clear()
628
624
del self.combined_index._indices[:]
629
del self.combined_index._index_names[:]
630
625
self.add_callback = None
632
def remove_index(self, index):
627
def remove_index(self, index, pack):
633
628
"""Remove index from the indices used to answer queries.
635
630
:param index: An index from the pack parameter.
631
:param pack: A Pack instance.
637
633
del self.index_to_pack[index]
638
pos = self.combined_index._indices.index(index)
639
del self.combined_index._indices[pos]
640
del self.combined_index._index_names[pos]
634
self.combined_index._indices.remove(index)
641
635
if (self.add_callback is not None and
642
636
getattr(index, 'add_nodes', None) == self.add_callback):
643
637
self.add_callback = None
1292
1286
# reinserted, and if d3 has incorrect parents it will also be
1293
1287
# reinserted. If we insert d3 first, d2 is present (as it was bulk
1294
1288
# copied), so we will try to delta, but d2 is not currently able to be
1295
# extracted because its basis d1 is not present. Topologically sorting
1289
# extracted because it's basis d1 is not present. Topologically sorting
1296
1290
# addresses this. The following generates a sort for all the texts that
1297
1291
# are being inserted without having to reference the entire text key
1298
1292
# space (we only topo sort the revisions, which is smaller).
1299
1293
topo_order = tsort.topo_sort(ancestors)
1300
1294
rev_order = dict(zip(topo_order, range(len(topo_order))))
1301
bad_texts.sort(key=lambda key:rev_order.get(key[0][1], 0))
1295
bad_texts.sort(key=lambda key:rev_order[key[0][1]])
1302
1296
transaction = repo.get_transaction()
1303
1297
file_id_index = GraphIndexPrefixAdapter(
1304
1298
self.new_pack.text_index,
1399
1392
self.inventory_index = AggregateIndex(self.reload_pack_names, flush)
1400
1393
self.text_index = AggregateIndex(self.reload_pack_names, flush)
1401
1394
self.signature_index = AggregateIndex(self.reload_pack_names, flush)
1402
all_indices = [self.revision_index, self.inventory_index,
1403
self.text_index, self.signature_index]
1404
1395
if use_chk_index:
1405
1396
self.chk_index = AggregateIndex(self.reload_pack_names, flush)
1406
all_indices.append(self.chk_index)
1408
1398
# used to determine if we're using a chk_index elsewhere.
1409
1399
self.chk_index = None
1410
# Tell all the CombinedGraphIndex objects about each other, so they can
1411
# share hints about which pack names to search first.
1412
all_combined = [agg_idx.combined_index for agg_idx in all_indices]
1413
for combined_idx in all_combined:
1414
combined_idx.set_sibling_indices(
1415
set(all_combined).difference([combined_idx]))
1416
1400
# resumed packs
1417
1401
self._resumed_packs = []
1420
return '%s(%r)' % (self.__class__.__name__, self.repo)
1422
1403
def add_pack_to_memory(self, pack):
1423
1404
"""Make a Pack object available to the repository to satisfy queries.
1505
1486
'containing %d revisions. Packing %d files into %d affecting %d'
1506
1487
' revisions', self, total_packs, total_revisions, num_old_packs,
1507
1488
num_new_packs, num_revs_affected)
1508
result = self._execute_pack_operations(pack_operations,
1489
self._execute_pack_operations(pack_operations,
1509
1490
reload_func=self._restart_autopack)
1510
1491
mutter('Auto-packing repository %s completed', self)
1513
1494
def _execute_pack_operations(self, pack_operations, _packer_class=Packer,
1514
1495
reload_func=None):
1538
1519
self._remove_pack_from_memory(pack)
1539
1520
# record the newly available packs and stop advertising the old
1541
to_be_obsoleted = []
1542
for _, packs in pack_operations:
1543
to_be_obsoleted.extend(packs)
1544
result = self._save_pack_names(clear_obsolete_packs=True,
1545
obsolete_packs=to_be_obsoleted)
1522
self._save_pack_names(clear_obsolete_packs=True)
1523
# Move the old packs out of the way now they are no longer referenced.
1524
for revision_count, packs in pack_operations:
1525
self._obsolete_packs(packs)
1548
1527
def _flush_new_pack(self):
1549
1528
if self._new_pack is not None:
1560
1539
def _already_packed(self):
1561
1540
"""Is the collection already packed?"""
1562
return not (self.repo._format.pack_compresses or (len(self._names) > 1))
1541
return len(self._names) < 2
1564
def pack(self, hint=None, clean_obsolete_packs=False):
1565
1544
"""Pack the pack collection totally."""
1566
1545
self.ensure_loaded()
1567
1546
total_packs = len(self._names)
1568
1547
if self._already_packed():
1548
# This is arguably wrong because we might not be optimal, but for
1549
# now lets leave it in. (e.g. reconcile -> one pack. But not
1570
1552
total_revisions = self.revision_index.combined_index.key_count()
1571
1553
# XXX: the following may want to be a class, to pack with a given
1573
1555
mutter('Packing repository %s, which has %d pack files, '
1574
'containing %d revisions with hint %r.', self, total_packs,
1575
total_revisions, hint)
1556
'containing %d revisions into 1 packs.', self, total_packs,
1576
1558
# determine which packs need changing
1559
pack_distribution = [1]
1577
1560
pack_operations = [[0, []]]
1578
1561
for pack in self.all_packs():
1579
if hint is None or pack.name in hint:
1580
# Either no hint was provided (so we are packing everything),
1581
# or this pack was included in the hint.
1582
pack_operations[-1][0] += pack.get_revision_count()
1583
pack_operations[-1][1].append(pack)
1562
pack_operations[-1][0] += pack.get_revision_count()
1563
pack_operations[-1][1].append(pack)
1584
1564
self._execute_pack_operations(pack_operations, OptimisingPacker)
1586
if clean_obsolete_packs:
1587
self._clear_obsolete_packs()
1589
1566
def plan_autopack_combinations(self, existing_packs, pack_distribution):
1590
1567
"""Plan a pack operation.
1600
1577
pack_operations = [[0, []]]
1601
1578
# plan out what packs to keep, and what to reorganise
1602
1579
while len(existing_packs):
1603
# take the largest pack, and if it's less than the head of the
1580
# take the largest pack, and if its less than the head of the
1604
1581
# distribution chart we will include its contents in the new pack
1605
# for that position. If it's larger, we remove its size from the
1582
# for that position. If its larger, we remove its size from the
1606
1583
# distribution chart
1607
1584
next_pack_rev_count, next_pack = existing_packs.pop(0)
1608
1585
if next_pack_rev_count >= pack_distribution[0]:
1703
1680
inv_index = self._make_index(name, '.iix', resume=True)
1704
1681
txt_index = self._make_index(name, '.tix', resume=True)
1705
1682
sig_index = self._make_index(name, '.six', resume=True)
1706
if self.chk_index is not None:
1707
chk_index = self._make_index(name, '.cix', resume=True,
1711
result = self.resumed_pack_factory(name, rev_index, inv_index,
1712
txt_index, sig_index, self._upload_transport,
1713
self._pack_transport, self._index_transport, self,
1714
chk_index=chk_index)
1683
result = ResumedPack(name, rev_index, inv_index, txt_index,
1684
sig_index, self._upload_transport, self._pack_transport,
1685
self._index_transport, self)
1715
1686
except errors.NoSuchFile, e:
1716
1687
raise errors.UnresumableWriteGroup(self.repo, [name], str(e))
1717
1688
self.add_pack_to_memory(result)
1751
1722
transport = self._index_transport
1752
1723
index_size = self._names[name][size_offset]
1753
index = self._index_class(transport, index_name, index_size,
1754
unlimited_cache=is_chk)
1755
if is_chk and self._index_class is btree_index.BTreeGraphIndex:
1756
index._leaf_factory = btree_index._gcchk_factory
1724
return self._index_class(transport, index_name, index_size)
1759
1726
def _max_pack_count(self, total_revisions):
1760
1727
"""Return the maximum number of packs to use for total revisions.
1788
1755
:param return: None.
1790
1757
for pack in packs:
1792
pack.pack_transport.rename(pack.file_name(),
1793
'../obsolete_packs/' + pack.file_name())
1794
except (errors.PathError, errors.TransportError), e:
1795
# TODO: Should these be warnings or mutters?
1796
mutter("couldn't rename obsolete pack, skipping it:\n%s"
1758
pack.pack_transport.rename(pack.file_name(),
1759
'../obsolete_packs/' + pack.file_name())
1798
1760
# TODO: Probably needs to know all possible indices for this pack
1799
1761
# - or maybe list the directory and move all indices matching this
1800
1762
# name whether we recognize it or not?
1802
1764
if self.chk_index is not None:
1803
1765
suffixes.append('.cix')
1804
1766
for suffix in suffixes:
1806
self._index_transport.rename(pack.name + suffix,
1807
'../obsolete_packs/' + pack.name + suffix)
1808
except (errors.PathError, errors.TransportError), e:
1809
mutter("couldn't rename obsolete index, skipping it:\n%s"
1767
self._index_transport.rename(pack.name + suffix,
1768
'../obsolete_packs/' + pack.name + suffix)
1812
1770
def pack_distribution(self, total_revisions):
1813
1771
"""Generate a list of the number of revisions to put in each pack.
1839
1797
self._remove_pack_indices(pack)
1840
1798
self.packs.remove(pack)
1842
def _remove_pack_indices(self, pack, ignore_missing=False):
1843
"""Remove the indices for pack from the aggregated indices.
1845
:param ignore_missing: Suppress KeyErrors from calling remove_index.
1847
for index_type in Pack.index_definitions.keys():
1848
attr_name = index_type + '_index'
1849
aggregate_index = getattr(self, attr_name)
1850
if aggregate_index is not None:
1851
pack_index = getattr(pack, attr_name)
1853
aggregate_index.remove_index(pack_index)
1800
def _remove_pack_indices(self, pack):
1801
"""Remove the indices for pack from the aggregated indices."""
1802
self.revision_index.remove_index(pack.revision_index, pack)
1803
self.inventory_index.remove_index(pack.inventory_index, pack)
1804
self.text_index.remove_index(pack.text_index, pack)
1805
self.signature_index.remove_index(pack.signature_index, pack)
1806
if self.chk_index is not None:
1807
self.chk_index.remove_index(pack.chk_index, pack)
1859
1809
def reset(self):
1860
1810
"""Clear all cached data."""
1861
1811
# cached revision data
1812
self.repo._revision_knit = None
1862
1813
self.revision_index.clear()
1863
1814
# cached signature data
1815
self.repo._signature_knit = None
1864
1816
self.signature_index.clear()
1865
1817
# cached file text data
1866
1818
self.text_index.clear()
1819
self.repo._text_knit = None
1867
1820
# cached inventory data
1868
1821
self.inventory_index.clear()
1869
1822
# cached chk data
1969
1921
:param clear_obsolete_packs: If True, clear out the contents of the
1970
1922
obsolete_packs directory.
1971
:param obsolete_packs: Packs that are obsolete once the new pack-names
1972
file has been written.
1973
:return: A list of the names saved that were not previously on disk.
1975
already_obsolete = []
1976
1924
self.lock_names()
1978
1926
builder = self._index_builder_class()
1979
(disk_nodes, deleted_nodes, new_nodes,
1980
orig_disk_nodes) = self._diff_pack_names()
1927
disk_nodes, deleted_nodes, new_nodes = self._diff_pack_names()
1981
1928
# TODO: handle same-name, index-size-changes here -
1982
1929
# e.g. use the value from disk, not ours, *unless* we're the one
1985
1932
builder.add_node(key, value)
1986
1933
self.transport.put_file('pack-names', builder.finish(),
1987
1934
mode=self.repo.bzrdir._get_file_mode())
1935
# move the baseline forward
1988
1936
self._packs_at_load = disk_nodes
1989
1937
if clear_obsolete_packs:
1992
to_preserve = set([o.name for o in obsolete_packs])
1993
already_obsolete = self._clear_obsolete_packs(to_preserve)
1938
self._clear_obsolete_packs()
1995
1940
self._unlock_names()
1996
1941
# synchronise the memory packs list with what we just wrote:
1997
1942
self._syncronize_pack_names_from_disk_nodes(disk_nodes)
1999
# TODO: We could add one more condition here. "if o.name not in
2000
# orig_disk_nodes and o != the new_pack we haven't written to
2001
# disk yet. However, the new pack object is not easily
2002
# accessible here (it would have to be passed through the
2003
# autopacking code, etc.)
2004
obsolete_packs = [o for o in obsolete_packs
2005
if o.name not in already_obsolete]
2006
self._obsolete_packs(obsolete_packs)
2007
return [new_node[0][0] for new_node in new_nodes]
2009
1944
def reload_pack_names(self):
2010
1945
"""Sync our pack listing with what is present in the repository.
2018
1953
# The ensure_loaded call is to handle the case where the first call
2019
1954
# made involving the collection was to reload_pack_names, where we
2020
# don't have a view of disk contents. It's a bit of a bandaid, and
2021
# causes two reads of pack-names, but it's a rare corner case not
2022
# struck with regular push/pull etc.
1955
# don't have a view of disk contents. Its a bit of a bandaid, and
1956
# causes two reads of pack-names, but its a rare corner case not struck
1957
# with regular push/pull etc.
2023
1958
first_read = self.ensure_loaded()
2026
1961
# out the new value.
2027
(disk_nodes, deleted_nodes, new_nodes,
2028
orig_disk_nodes) = self._diff_pack_names()
2029
# _packs_at_load is meant to be the explicit list of names in
2030
# 'pack-names' at then start. As such, it should not contain any
2031
# pending names that haven't been written out yet.
2032
self._packs_at_load = orig_disk_nodes
1962
disk_nodes, _, _ = self._diff_pack_names()
1963
self._packs_at_load = disk_nodes
2033
1964
(removed, added,
2034
1965
modified) = self._syncronize_pack_names_from_disk_nodes(disk_nodes)
2035
1966
if removed or added or modified:
2045
1976
raise errors.RetryAutopack(self.repo, False, sys.exc_info())
2047
def _clear_obsolete_packs(self, preserve=None):
1978
def _clear_obsolete_packs(self):
2048
1979
"""Delete everything from the obsolete-packs directory.
2050
:return: A list of pack identifiers (the filename without '.pack') that
2051
were found in obsolete_packs.
2054
1981
obsolete_pack_transport = self.transport.clone('obsolete_packs')
2055
if preserve is None:
2057
1982
for filename in obsolete_pack_transport.list_dir('.'):
2058
name, ext = osutils.splitext(filename)
2061
if name in preserve:
2064
1984
obsolete_pack_transport.delete(filename)
2065
1985
except (errors.PathError, errors.TransportError), e:
2066
warning("couldn't delete obsolete pack, skipping it:\n%s"
1986
warning("couldn't delete obsolete pack, skipping it:\n%s" % (e,))
2070
1988
def _start_write_group(self):
2071
1989
# Do not permit preparation for writing if we're not in a 'write lock'.
2098
2016
# FIXME: just drop the transient index.
2099
2017
# forget what names there are
2100
2018
if self._new_pack is not None:
2101
operation = cleanup.OperationWithCleanups(self._new_pack.abort)
2102
operation.add_cleanup(setattr, self, '_new_pack', None)
2103
# If we aborted while in the middle of finishing the write
2104
# group, _remove_pack_indices could fail because the indexes are
2105
# already gone. But they're not there we shouldn't fail in this
2106
# case, so we pass ignore_missing=True.
2107
operation.add_cleanup(self._remove_pack_indices, self._new_pack,
2108
ignore_missing=True)
2109
operation.run_simple()
2020
self._new_pack.abort()
2022
# XXX: If we aborted while in the middle of finishing the write
2023
# group, _remove_pack_indices can fail because the indexes are
2024
# already gone. If they're not there we shouldn't fail in this
2025
# case. -- mbp 20081113
2026
self._remove_pack_indices(self._new_pack)
2027
self._new_pack = None
2110
2028
for resumed_pack in self._resumed_packs:
2111
operation = cleanup.OperationWithCleanups(resumed_pack.abort)
2112
# See comment in previous finally block.
2113
operation.add_cleanup(self._remove_pack_indices, resumed_pack,
2114
ignore_missing=True)
2115
operation.run_simple()
2030
resumed_pack.abort()
2032
# See comment in previous finally block.
2034
self._remove_pack_indices(resumed_pack)
2116
2037
del self._resumed_packs[:]
2038
self.repo._text_knit = None
2118
2040
def _remove_resumed_pack_indices(self):
2119
2041
for resumed_pack in self._resumed_packs:
2120
2042
self._remove_pack_indices(resumed_pack)
2121
2043
del self._resumed_packs[:]
2123
def _check_new_inventories(self):
2124
"""Detect missing inventories in this write group.
2126
:returns: list of strs, summarising any problems found. If the list is
2127
empty no problems were found.
2129
# The base implementation does no checks. GCRepositoryPackCollection
2133
2045
def _commit_write_group(self):
2134
2046
all_missing = set()
2135
2047
for prefix, versioned_file in (
2144
2056
raise errors.BzrCheckError(
2145
2057
"Repository %s has missing compression parent(s) %r "
2146
2058
% (self.repo, sorted(all_missing)))
2147
problems = self._check_new_inventories()
2149
problems_summary = '\n'.join(problems)
2150
raise errors.BzrCheckError(
2151
"Cannot add revision(s) to repository: " + problems_summary)
2152
2059
self._remove_pack_indices(self._new_pack)
2153
any_new_content = False
2060
should_autopack = False
2154
2061
if self._new_pack.data_inserted():
2155
2062
# get all the data to disk and read to use
2156
2063
self._new_pack.finish()
2157
2064
self.allocate(self._new_pack)
2158
2065
self._new_pack = None
2159
any_new_content = True
2066
should_autopack = True
2161
2068
self._new_pack.abort()
2162
2069
self._new_pack = None
2284
2190
self._reconcile_fixes_text_parents = True
2285
2191
self._reconcile_backsup_inventory = False
2287
def _warn_if_deprecated(self, branch=None):
2193
def _warn_if_deprecated(self):
2288
2194
# This class isn't deprecated, but one sub-format is
2289
2195
if isinstance(self._format, RepositoryFormatKnitPack5RichRootBroken):
2290
super(KnitPackRepository, self)._warn_if_deprecated(branch)
2196
from bzrlib import repository
2197
if repository._deprecation_warning_done:
2199
repository._deprecation_warning_done = True
2200
warning("Format %s for %s is deprecated - please use"
2201
" 'bzr upgrade --1.6.1-rich-root'"
2202
% (self._format, self.bzrdir.transport.base))
2292
2204
def _abort_write_group(self):
2293
self.revisions._index._key_dependencies.clear()
2294
2205
self._pack_collection._abort_write_group()
2296
def _get_source(self, to_format):
2297
if to_format.network_name() == self._format.network_name():
2298
return KnitPackStreamSource(self, to_format)
2299
return super(KnitPackRepository, self)._get_source(to_format)
2207
def _find_inconsistent_revision_parents(self):
2208
"""Find revisions with incorrectly cached parents.
2210
:returns: an iterator yielding tuples of (revison-id, parents-in-index,
2211
parents-in-revision).
2213
if not self.is_locked():
2214
raise errors.ObjectNotLocked(self)
2215
pb = ui.ui_factory.nested_progress_bar()
2218
revision_nodes = self._pack_collection.revision_index \
2219
.combined_index.iter_all_entries()
2220
index_positions = []
2221
# Get the cached index values for all revisions, and also the
2222
# location in each index of the revision text so we can perform
2224
for index, key, value, refs in revision_nodes:
2225
node = (index, key, value, refs)
2226
index_memo = self.revisions._index._node_to_position(node)
2227
if index_memo[0] != index:
2228
raise AssertionError('%r != %r' % (index_memo[0], index))
2229
index_positions.append((index_memo, key[0],
2230
tuple(parent[0] for parent in refs[0])))
2231
pb.update("Reading revision index", 0, 0)
2232
index_positions.sort()
2234
pb.update("Checking cached revision graph", 0,
2235
len(index_positions))
2236
for offset in xrange(0, len(index_positions), 1000):
2237
pb.update("Checking cached revision graph", offset)
2238
to_query = index_positions[offset:offset + batch_size]
2241
rev_ids = [item[1] for item in to_query]
2242
revs = self.get_revisions(rev_ids)
2243
for revision, item in zip(revs, to_query):
2244
index_parents = item[2]
2245
rev_parents = tuple(revision.parent_ids)
2246
if index_parents != rev_parents:
2247
result.append((revision.revision_id, index_parents,
2301
2253
def _make_parents_provider(self):
2302
2254
return graph.CachingParentsProvider(self)
2310
2262
self._pack_collection._start_write_group()
2312
2264
def _commit_write_group(self):
2313
hint = self._pack_collection._commit_write_group()
2314
self.revisions._index._key_dependencies.clear()
2265
return self._pack_collection._commit_write_group()
2317
2267
def suspend_write_group(self):
2318
2268
# XXX check self._write_group is self.get_transaction()?
2319
2269
tokens = self._pack_collection._suspend_write_group()
2320
self.revisions._index._key_dependencies.clear()
2321
2270
self._write_group = None
2324
2273
def _resume_write_group(self, tokens):
2325
2274
self._start_write_group()
2327
self._pack_collection._resume_write_group(tokens)
2328
except errors.UnresumableWriteGroup:
2329
self._abort_write_group()
2275
self._pack_collection._resume_write_group(tokens)
2331
2276
for pack in self._pack_collection._resumed_packs:
2332
2277
self.revisions._index.scan_unvalidated_index(pack.revision_index)
2344
2289
return self._write_lock_count
2346
2291
def lock_write(self, token=None):
2347
"""Lock the repository for writes.
2349
:return: A bzrlib.repository.RepositoryWriteLockResult.
2351
2292
locked = self.is_locked()
2352
2293
if not self._write_lock_count and locked:
2353
2294
raise errors.ReadOnlyError(self)
2354
2295
self._write_lock_count += 1
2355
2296
if self._write_lock_count == 1:
2356
2297
self._transaction = transactions.WriteTransaction()
2358
if 'relock' in debug.debug_flags and self._prev_lock == 'w':
2359
note('%r was write locked again', self)
2360
self._prev_lock = 'w'
2361
2298
for repo in self._fallback_repositories:
2362
2299
# Writes don't affect fallback repos
2363
2300
repo.lock_read()
2364
2302
self._refresh_data()
2365
return RepositoryWriteLockResult(self.unlock, None)
2367
2304
def lock_read(self):
2368
"""Lock the repository for reads.
2370
:return: A bzrlib.lock.LogicalLockResult.
2372
2305
locked = self.is_locked()
2373
2306
if self._write_lock_count:
2374
2307
self._write_lock_count += 1
2376
2309
self.control_files.lock_read()
2378
if 'relock' in debug.debug_flags and self._prev_lock == 'r':
2379
note('%r was read locked again', self)
2380
self._prev_lock = 'r'
2381
2310
for repo in self._fallback_repositories:
2311
# Writes don't affect fallback repos
2382
2312
repo.lock_read()
2383
2314
self._refresh_data()
2384
return LogicalLockResult(self.unlock)
2386
2316
def leave_lock_in_place(self):
2387
2317
# not supported - raise an error
2427
2356
transaction = self._transaction
2428
2357
self._transaction = None
2429
2358
transaction.finish()
2359
for repo in self._fallback_repositories:
2431
2362
self.control_files.unlock()
2433
if not self.is_locked():
2434
2363
for repo in self._fallback_repositories:
2438
class KnitPackStreamSource(StreamSource):
2439
"""A StreamSource used to transfer data between same-format KnitPack repos.
2441
This source assumes:
2442
1) Same serialization format for all objects
2443
2) Same root information
2444
3) XML format inventories
2445
4) Atomic inserts (so we can stream inventory texts before text
2450
def __init__(self, from_repository, to_format):
2451
super(KnitPackStreamSource, self).__init__(from_repository, to_format)
2452
self._text_keys = None
2453
self._text_fetch_order = 'unordered'
2455
def _get_filtered_inv_stream(self, revision_ids):
2456
from_repo = self.from_repository
2457
parent_ids = from_repo._find_parent_ids_of_revisions(revision_ids)
2458
parent_keys = [(p,) for p in parent_ids]
2459
find_text_keys = from_repo._find_text_key_references_from_xml_inventory_lines
2460
parent_text_keys = set(find_text_keys(
2461
from_repo._inventory_xml_lines_for_keys(parent_keys)))
2462
content_text_keys = set()
2463
knit = KnitVersionedFiles(None, None)
2464
factory = KnitPlainFactory()
2465
def find_text_keys_from_content(record):
2466
if record.storage_kind not in ('knit-delta-gz', 'knit-ft-gz'):
2467
raise ValueError("Unknown content storage kind for"
2468
" inventory text: %s" % (record.storage_kind,))
2469
# It's a knit record, it has a _raw_record field (even if it was
2470
# reconstituted from a network stream).
2471
raw_data = record._raw_record
2472
# read the entire thing
2473
revision_id = record.key[-1]
2474
content, _ = knit._parse_record(revision_id, raw_data)
2475
if record.storage_kind == 'knit-delta-gz':
2476
line_iterator = factory.get_linedelta_content(content)
2477
elif record.storage_kind == 'knit-ft-gz':
2478
line_iterator = factory.get_fulltext_content(content)
2479
content_text_keys.update(find_text_keys(
2480
[(line, revision_id) for line in line_iterator]))
2481
revision_keys = [(r,) for r in revision_ids]
2482
def _filtered_inv_stream():
2483
source_vf = from_repo.inventories
2484
stream = source_vf.get_record_stream(revision_keys,
2486
for record in stream:
2487
if record.storage_kind == 'absent':
2488
raise errors.NoSuchRevision(from_repo, record.key)
2489
find_text_keys_from_content(record)
2491
self._text_keys = content_text_keys - parent_text_keys
2492
return ('inventories', _filtered_inv_stream())
2494
def _get_text_stream(self):
2495
# Note: We know we don't have to handle adding root keys, because both
2496
# the source and target are the identical network name.
2497
text_stream = self.from_repository.texts.get_record_stream(
2498
self._text_keys, self._text_fetch_order, False)
2499
return ('texts', text_stream)
2501
def get_stream(self, search):
2502
revision_ids = search.get_keys()
2503
for stream_info in self._fetch_revision_texts(revision_ids):
2505
self._revision_keys = [(rev_id,) for rev_id in revision_ids]
2506
yield self._get_filtered_inv_stream(revision_ids)
2507
yield self._get_text_stream()
2511
2367
class RepositoryFormatPack(MetaDirRepositoryFormat):
2512
2368
"""Format logic for pack structured repositories.
2560
2416
utf8_files = [('format', self.get_format_string())]
2562
2418
self._upload_blank_content(a_bzrdir, dirs, files, utf8_files, shared)
2563
repository = self.open(a_bzrdir=a_bzrdir, _found=True)
2564
self._run_post_repo_init_hooks(repository, a_bzrdir, shared)
2419
return self.open(a_bzrdir=a_bzrdir, _found=True)
2567
2421
def open(self, a_bzrdir, _found=False, _override_transport=None):
2568
2422
"""See RepositoryFormat.open().
2650
2506
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
2508
def check_conversion_target(self, target_format):
2509
if not target_format.rich_root_data:
2510
raise errors.BadConversionTarget(
2511
'Does not support rich root data.', target_format)
2512
if not getattr(target_format, 'supports_tree_reference', False):
2513
raise errors.BadConversionTarget(
2514
'Does not support nested trees', target_format)
2652
2516
def get_format_string(self):
2653
2517
"""See RepositoryFormat.get_format_string()."""
2654
2518
return "Bazaar pack repository format 1 with subtree support (needs bzr 0.92)\n"
2919
2808
def _get_matching_bzrdir(self):
2920
2809
return bzrdir.format_registry.make_bzrdir(
2921
'development5-subtree')
2810
'development-subtree')
2923
2812
def _ignore_setting_bzrdir(self, format):
2926
2815
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
2817
def check_conversion_target(self, target_format):
2818
if not target_format.rich_root_data:
2819
raise errors.BadConversionTarget(
2820
'Does not support rich root data.', target_format)
2821
if not getattr(target_format, 'supports_tree_reference', False):
2822
raise errors.BadConversionTarget(
2823
'Does not support nested trees', target_format)
2928
2825
def get_format_string(self):
2929
2826
"""See RepositoryFormat.get_format_string()."""
2930
2827
return ("Bazaar development format 2 with subtree support "