~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/repofmt/pack_repo.py

  • Committer: Matt Nordhoff
  • Date: 2009-04-04 02:50:01 UTC
  • mfrom: (4253 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4256.
  • Revision ID: mnordhoff@mattnordhoff.com-20090404025001-z1403k0tatmc8l91
Merge bzr.dev, fixing conflicts.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
import re
18
18
import sys
81
81
 
82
82
class PackCommitBuilder(CommitBuilder):
83
83
    """A subclass of CommitBuilder to add texts with pack semantics.
84
 
    
 
84
 
85
85
    Specifically this uses one knit object rather than one knit object per
86
86
    added text, reducing memory and object pressure.
87
87
    """
102
102
 
103
103
class PackRootCommitBuilder(RootCommitBuilder):
104
104
    """A subclass of RootCommitBuilder to add texts with pack semantics.
105
 
    
 
105
 
106
106
    Specifically this uses one knit object rather than one knit object per
107
107
    added text, reducing memory and object pressure.
108
108
    """
163
163
 
164
164
    def _check_references(self):
165
165
        """Make sure our external references are present.
166
 
        
 
166
 
167
167
        Packs are allowed to have deltas whose base is not in the pack, but it
168
168
        must be present somewhere in this collection.  It is not allowed to
169
169
        have deltas based on a fallback repository.
374
374
            mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
375
375
                time.ctime(), self.upload_transport.base, self.random_name,
376
376
                time.time() - self.start_time)
377
 
        # A list of byte sequences to be written to the new pack, and the 
378
 
        # aggregate size of them.  Stored as a list rather than separate 
 
377
        # A list of byte sequences to be written to the new pack, and the
 
378
        # aggregate size of them.  Stored as a list rather than separate
379
379
        # variables so that the _write_data closure below can update them.
380
380
        self._buffer = [[], 0]
381
 
        # create a callable for adding data 
 
381
        # create a callable for adding data
382
382
        #
383
383
        # robertc says- this is a closure rather than a method on the object
384
384
        # so that the variables are locals, and faster than accessing object
510
510
            mutter('%s: create_pack: wrote %s index: %s%s t+%6.3fs',
511
511
                time.ctime(), label, self.upload_transport.base,
512
512
                self.random_name, time.time() - self.start_time)
513
 
        # Replace the writable index on this object with a readonly, 
 
513
        # Replace the writable index on this object with a readonly,
514
514
        # presently unloaded index. We should alter
515
515
        # the index layer to make its finish() error if add_node is
516
516
        # subsequently used. RBC
525
525
    such as 'revision index'.
526
526
 
527
527
    A CombinedIndex provides an index on a single key space built up
528
 
    from several on-disk indices.  The AggregateIndex builds on this 
 
528
    from several on-disk indices.  The AggregateIndex builds on this
529
529
    to provide a knit access layer, and allows having up to one writable
530
530
    index within the collection.
531
531
    """
532
532
    # XXX: Probably 'can be written to' could/should be separated from 'acts
533
533
    # like a knit index' -- mbp 20071024
534
534
 
535
 
    def __init__(self, reload_func=None):
 
535
    def __init__(self, reload_func=None, flush_func=None):
536
536
        """Create an AggregateIndex.
537
537
 
538
538
        :param reload_func: A function to call if we find we are missing an
543
543
        self.index_to_pack = {}
544
544
        self.combined_index = CombinedGraphIndex([], reload_func=reload_func)
545
545
        self.data_access = _DirectPackAccess(self.index_to_pack,
546
 
                                             reload_func=reload_func)
 
546
                                             reload_func=reload_func,
 
547
                                             flush_func=flush_func)
547
548
        self.add_callback = None
548
549
 
549
550
    def replace_indices(self, index_to_pack, indices):
571
572
 
572
573
        Future searches on the aggregate index will seach this new index
573
574
        before all previously inserted indices.
574
 
        
 
575
 
575
576
        :param index: An Index for the pack.
576
577
        :param pack: A Pack instance.
577
578
        """
585
586
 
586
587
        There can be at most one writable index at any time.  Any
587
588
        modifications made to the knit are put into this index.
588
 
        
 
589
 
589
590
        :param index: An index from the pack parameter.
590
591
        :param pack: A Pack instance.
591
592
        """
608
609
 
609
610
    def remove_index(self, index, pack):
610
611
        """Remove index from the indices used to answer queries.
611
 
        
 
612
 
612
613
        :param index: An index from the pack parameter.
613
614
        :param pack: A Pack instance.
614
615
        """
689
690
        This does little more than a bulk copy of data. One key difference
690
691
        is that data with the same item key across multiple packs is elided
691
692
        from the output. The new pack is written into the current pack store
692
 
        along with its indices, and the name added to the pack names. The 
 
693
        along with its indices, and the name added to the pack names. The
693
694
        source packs are not altered and are not required to be in the current
694
695
        pack collection.
695
696
 
725
726
 
726
727
    def open_pack(self):
727
728
        """Open a pack for the pack we are creating."""
728
 
        return NewPack(self._pack_collection, upload_suffix=self.suffix,
 
729
        new_pack = NewPack(self._pack_collection, upload_suffix=self.suffix,
729
730
                file_mode=self._pack_collection.repo.bzrdir._get_file_mode())
 
731
        # We know that we will process all nodes in order, and don't need to
 
732
        # query, so don't combine any indices spilled to disk until we are done
 
733
        new_pack.revision_index.set_optimize(combine_backing_indices=False)
 
734
        new_pack.inventory_index.set_optimize(combine_backing_indices=False)
 
735
        new_pack.text_index.set_optimize(combine_backing_indices=False)
 
736
        new_pack.signature_index.set_optimize(combine_backing_indices=False)
 
737
        return new_pack
730
738
 
731
739
    def _update_pack_order(self, entries, index_to_pack_map):
732
740
        """Determine how we want our packs to be ordered.
842
850
            if missing_text_keys:
843
851
                # TODO: raise a specific error that can handle many missing
844
852
                # keys.
 
853
                mutter("missing keys during fetch: %r", missing_text_keys)
845
854
                a_missing_key = missing_text_keys.pop()
846
855
                raise errors.RevisionNotPresent(a_missing_key[1],
847
856
                    a_missing_key[0])
914
923
        nodes = sorted(nodes)
915
924
        # how to map this into knit.py - or knit.py into this?
916
925
        # we don't want the typical knit logic, we want grouping by pack
917
 
        # at this point - perhaps a helper library for the following code 
 
926
        # at this point - perhaps a helper library for the following code
918
927
        # duplication points?
919
928
        request_groups = {}
920
929
        for index, key, value in nodes:
1020
1029
 
1021
1030
    def _least_readv_node_readv(self, nodes):
1022
1031
        """Generate request groups for nodes using the least readv's.
1023
 
        
 
1032
 
1024
1033
        :param nodes: An iterable of graph index nodes.
1025
1034
        :return: Total node count and an iterator of the data needed to perform
1026
1035
            readvs to obtain the data for nodes. Each item yielded by the
1137
1146
 
1138
1147
class ReconcilePacker(Packer):
1139
1148
    """A packer which regenerates indices etc as it copies.
1140
 
    
 
1149
 
1141
1150
    This is used by ``bzr reconcile`` to cause parent text pointers to be
1142
1151
    regenerated.
1143
1152
    """
1166
1175
        # 1) generate the ideal index
1167
1176
        repo = self._pack_collection.repo
1168
1177
        ancestors = dict([(key[0], tuple(ref[0] for ref in refs[0])) for
1169
 
            _1, key, _2, refs in 
 
1178
            _1, key, _2, refs in
1170
1179
            self.new_pack.revision_index.iter_all_entries()])
1171
1180
        ideal_index = repo._generate_text_key_index(self._text_refs, ancestors)
1172
1181
        # 2) generate a text_nodes list that contains all the deltas that can
1178
1187
        text_index_map, text_nodes = self._get_text_nodes()
1179
1188
        for node in text_nodes:
1180
1189
            # 0 - index
1181
 
            # 1 - key 
 
1190
            # 1 - key
1182
1191
            # 2 - value
1183
1192
            # 3 - refs
1184
1193
            try:
1279
1288
 
1280
1289
class RepositoryPackCollection(object):
1281
1290
    """Management of packs within a repository.
1282
 
    
 
1291
 
1283
1292
    :ivar _names: map of {pack_name: (index_size,)}
1284
1293
    """
1285
1294
 
1287
1296
                 pack_transport, index_builder_class, index_class):
1288
1297
        """Create a new RepositoryPackCollection.
1289
1298
 
1290
 
        :param transport: Addresses the repository base directory 
 
1299
        :param transport: Addresses the repository base directory
1291
1300
            (typically .bzr/repository/).
1292
1301
        :param index_transport: Addresses the directory containing indices.
1293
1302
        :param upload_transport: Addresses the directory into which packs are written
1296
1305
        :param index_builder_class: The index builder class to use.
1297
1306
        :param index_class: The index class to use.
1298
1307
        """
 
1308
        # XXX: This should call self.reset()
1299
1309
        self.repo = repo
1300
1310
        self.transport = transport
1301
1311
        self._index_transport = index_transport
1306
1316
        self._suffix_offsets = {'.rix': 0, '.iix': 1, '.tix': 2, '.six': 3}
1307
1317
        self.packs = []
1308
1318
        # name:Pack mapping
 
1319
        self._names = None
1309
1320
        self._packs_by_name = {}
1310
1321
        # the previous pack-names content
1311
1322
        self._packs_at_load = None
1312
1323
        # when a pack is being created by this object, the state of that pack.
1313
1324
        self._new_pack = None
1314
1325
        # aggregated revision index data
1315
 
        self.revision_index = AggregateIndex(self.reload_pack_names)
1316
 
        self.inventory_index = AggregateIndex(self.reload_pack_names)
1317
 
        self.text_index = AggregateIndex(self.reload_pack_names)
1318
 
        self.signature_index = AggregateIndex(self.reload_pack_names)
 
1326
        flush = self._flush_new_pack
 
1327
        self.revision_index = AggregateIndex(self.reload_pack_names, flush)
 
1328
        self.inventory_index = AggregateIndex(self.reload_pack_names, flush)
 
1329
        self.text_index = AggregateIndex(self.reload_pack_names, flush)
 
1330
        self.signature_index = AggregateIndex(self.reload_pack_names, flush)
1319
1331
        # resumed packs
1320
1332
        self._resumed_packs = []
1321
1333
 
1322
1334
    def add_pack_to_memory(self, pack):
1323
1335
        """Make a Pack object available to the repository to satisfy queries.
1324
 
        
 
1336
 
1325
1337
        :param pack: A Pack object.
1326
1338
        """
1327
1339
        if pack.name in self._packs_by_name:
1333
1345
        self.inventory_index.add_index(pack.inventory_index, pack)
1334
1346
        self.text_index.add_index(pack.text_index, pack)
1335
1347
        self.signature_index.add_index(pack.signature_index, pack)
1336
 
        
 
1348
 
1337
1349
    def all_packs(self):
1338
1350
        """Return a list of all the Pack objects this repository has.
1339
1351
 
1348
1360
 
1349
1361
    def autopack(self):
1350
1362
        """Pack the pack collection incrementally.
1351
 
        
 
1363
 
1352
1364
        This will not attempt global reorganisation or recompression,
1353
1365
        rather it will just ensure that the total number of packs does
1354
1366
        not grow without bound. It uses the _max_pack_count method to
1391
1403
                # group their data with the relevant commit, and that may
1392
1404
                # involve rewriting ancient history - which autopack tries to
1393
1405
                # avoid. Alternatively we could not group the data but treat
1394
 
                # each of these as having a single revision, and thus add 
 
1406
                # each of these as having a single revision, and thus add
1395
1407
                # one revision for each to the total revision count, to get
1396
1408
                # a matching distribution.
1397
1409
                continue
1442
1454
        for revision_count, packs in pack_operations:
1443
1455
            self._obsolete_packs(packs)
1444
1456
 
 
1457
    def _flush_new_pack(self):
 
1458
        if self._new_pack is not None:
 
1459
            self._new_pack.flush()
 
1460
 
1445
1461
    def lock_names(self):
1446
1462
        """Acquire the mutex around the pack-names index.
1447
 
        
 
1463
 
1448
1464
        This cannot be used in the middle of a read-only transaction on the
1449
1465
        repository.
1450
1466
        """
1526
1542
        return [[final_rev_count, final_pack_list]]
1527
1543
 
1528
1544
    def ensure_loaded(self):
 
1545
        """Ensure we have read names from disk.
 
1546
 
 
1547
        :return: True if the disk names had not been previously read.
 
1548
        """
1529
1549
        # NB: if you see an assertion error here, its probably access against
1530
1550
        # an unlocked repo. Naughty.
1531
1551
        if not self.repo.is_locked():
1537
1557
                name = key[0]
1538
1558
                self._names[name] = self._parse_index_sizes(value)
1539
1559
                self._packs_at_load.add((key, value))
 
1560
            result = True
 
1561
        else:
 
1562
            result = False
1540
1563
        # populate all the metadata.
1541
1564
        self.all_packs()
 
1565
        return result
1542
1566
 
1543
1567
    def _parse_index_sizes(self, value):
1544
1568
        """Parse a string of index sizes."""
1602
1626
 
1603
1627
    def _iter_disk_pack_index(self):
1604
1628
        """Iterate over the contents of the pack-names index.
1605
 
        
 
1629
 
1606
1630
        This is used when loading the list from disk, and before writing to
1607
1631
        detect updates from others during our write operation.
1608
1632
        :return: An iterator of the index contents.
1623
1647
 
1624
1648
    def _max_pack_count(self, total_revisions):
1625
1649
        """Return the maximum number of packs to use for total revisions.
1626
 
        
 
1650
 
1627
1651
        :param total_revisions: The total number of revisions in the
1628
1652
            repository.
1629
1653
        """
1684
1708
 
1685
1709
    def _remove_pack_from_memory(self, pack):
1686
1710
        """Remove pack from the packs accessed by this repository.
1687
 
        
 
1711
 
1688
1712
        Only affects memory state, until self._save_pack_names() is invoked.
1689
1713
        """
1690
1714
        self._names.pop(pack.name)
1815
1839
        try:
1816
1840
            builder = self._index_builder_class()
1817
1841
            disk_nodes, deleted_nodes, new_nodes = self._diff_pack_names()
1818
 
            # TODO: handle same-name, index-size-changes here - 
 
1842
            # TODO: handle same-name, index-size-changes here -
1819
1843
            # e.g. use the value from disk, not ours, *unless* we're the one
1820
1844
            # changing it.
1821
1845
            for key, value in disk_nodes:
1837
1861
        This should be called when we find out that something we thought was
1838
1862
        present is now missing. This happens when another process re-packs the
1839
1863
        repository, etc.
 
1864
 
 
1865
        :return: True if the in-memory list of packs has been altered at all.
1840
1866
        """
1841
 
        # This is functionally similar to _save_pack_names, but we don't write
 
1867
        # The ensure_loaded call is to handle the case where the first call
 
1868
        # made involving the collection was to reload_pack_names, where we 
 
1869
        # don't have a view of disk contents. Its a bit of a bandaid, and
 
1870
        # causes two reads of pack-names, but its a rare corner case not struck
 
1871
        # with regular push/pull etc.
 
1872
        first_read = self.ensure_loaded()
 
1873
        if first_read:
 
1874
            return True
1842
1875
        # out the new value.
1843
1876
        disk_nodes, _, _ = self._diff_pack_names()
1844
1877
        self._packs_at_load = disk_nodes
1980
2013
 
1981
2014
class KnitPackRepository(KnitRepository):
1982
2015
    """Repository with knit objects stored inside pack containers.
1983
 
    
 
2016
 
1984
2017
    The layering for a KnitPackRepository is:
1985
2018
 
1986
2019
    Graph        |  HPSS    | Repository public layer |
2000
2033
      pack file. The GraphIndex layer works in N-tuples and is unaware of any
2001
2034
      semantic value.
2002
2035
    ===================================================
2003
 
    
 
2036
 
2004
2037
    """
2005
2038
 
2006
2039
    def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
2038
2071
                deltas=True, parents=True, is_locked=self.is_locked),
2039
2072
            data_access=self._pack_collection.text_index.data_access,
2040
2073
            max_delta_chain=200)
 
2074
        self.chk_bytes = None
2041
2075
        # True when the repository object is 'write locked' (as opposed to the
2042
 
        # physical lock only taken out around changes to the pack-names list.) 
 
2076
        # physical lock only taken out around changes to the pack-names list.)
2043
2077
        # Another way to represent this would be a decorator around the control
2044
2078
        # files object that presents logical locks as physical ones - if this
2045
2079
        # gets ugly consider that alternative design. RBC 20071011
2049
2083
        self._reconcile_does_inventory_gc = True
2050
2084
        self._reconcile_fixes_text_parents = True
2051
2085
        self._reconcile_backsup_inventory = False
2052
 
        self._fetch_order = 'unordered'
2053
2086
 
2054
2087
    def _warn_if_deprecated(self):
2055
2088
        # This class isn't deprecated, but one sub-format is
2085
2118
                pos, length = value[1:].split(' ')
2086
2119
                index_positions.append((index, int(pos), key[0],
2087
2120
                    tuple(parent[0] for parent in refs[0])))
2088
 
                pb.update("Reading revision index.", 0, 0)
 
2121
                pb.update("Reading revision index", 0, 0)
2089
2122
            index_positions.sort()
2090
2123
            batch_count = len(index_positions) / 1000 + 1
2091
 
            pb.update("Checking cached revision graph.", 0, batch_count)
 
2124
            pb.update("Checking cached revision graph", 0, batch_count)
2092
2125
            for offset in xrange(batch_count):
2093
 
                pb.update("Checking cached revision graph.", offset)
 
2126
                pb.update("Checking cached revision graph", offset)
2094
2127
                to_query = index_positions[offset * 1000:(offset + 1) * 1000]
2095
2128
                if not to_query:
2096
2129
                    break
2105
2138
            pb.finished()
2106
2139
        return result
2107
2140
 
2108
 
    @symbol_versioning.deprecated_method(symbol_versioning.one_one)
2109
 
    def get_parents(self, revision_ids):
2110
 
        """See graph._StackedParentsProvider.get_parents."""
2111
 
        parent_map = self.get_parent_map(revision_ids)
2112
 
        return [parent_map.get(r, None) for r in revision_ids]
2113
 
 
2114
2141
    def _make_parents_provider(self):
2115
2142
        return graph.CachingParentsProvider(self)
2116
2143
 
2117
2144
    def _refresh_data(self):
2118
 
        if self._write_lock_count == 1 or (
2119
 
            self.control_files._lock_count == 1 and
2120
 
            self.control_files._lock_mode == 'r'):
2121
 
            # forget what names there are
2122
 
            self._pack_collection.reset()
2123
 
            # XXX: Better to do an in-memory merge when acquiring a new lock -
2124
 
            # factor out code from _save_pack_names.
2125
 
            self._pack_collection.ensure_loaded()
 
2145
        if not self.is_locked():
 
2146
            return
 
2147
        self._pack_collection.reload_pack_names()
2126
2148
 
2127
2149
    def _start_write_group(self):
2128
2150
        self._pack_collection._start_write_group()
2153
2175
        return self._write_lock_count
2154
2176
 
2155
2177
    def lock_write(self, token=None):
2156
 
        if not self._write_lock_count and self.is_locked():
 
2178
        locked = self.is_locked()
 
2179
        if not self._write_lock_count and locked:
2157
2180
            raise errors.ReadOnlyError(self)
2158
2181
        self._write_lock_count += 1
2159
2182
        if self._write_lock_count == 1:
2161
2184
            for repo in self._fallback_repositories:
2162
2185
                # Writes don't affect fallback repos
2163
2186
                repo.lock_read()
2164
 
        self._refresh_data()
 
2187
        if not locked:
 
2188
            self._refresh_data()
2165
2189
 
2166
2190
    def lock_read(self):
 
2191
        locked = self.is_locked()
2167
2192
        if self._write_lock_count:
2168
2193
            self._write_lock_count += 1
2169
2194
        else:
2171
2196
            for repo in self._fallback_repositories:
2172
2197
                # Writes don't affect fallback repos
2173
2198
                repo.lock_read()
2174
 
        self._refresh_data()
 
2199
        if not locked:
 
2200
            self._refresh_data()
2175
2201
 
2176
2202
    def leave_lock_in_place(self):
2177
2203
        # not supported - raise an error
2198
2224
        reconciler.reconcile()
2199
2225
        return reconciler
2200
2226
 
 
2227
    def _reconcile_pack(self, collection, packs, extension, revs, pb):
 
2228
        packer = ReconcilePacker(collection, packs, extension, revs)
 
2229
        return packer.pack(pb)
 
2230
 
2201
2231
    def unlock(self):
2202
2232
        if self._write_lock_count == 1 and self._write_group is not None:
2203
2233
            self.abort_write_group()
2245
2275
    # Set this attribute in derived clases to control the _serializer that the
2246
2276
    # repository objects will have passed to their constructor.
2247
2277
    _serializer = None
 
2278
    # Packs are not confused by ghosts.
 
2279
    supports_ghosts = True
2248
2280
    # External references are not supported in pack repositories yet.
2249
2281
    supports_external_lookups = False
 
2282
    # Most pack formats do not use chk lookups.
 
2283
    supports_chks = False
2250
2284
    # What index classes to use
2251
2285
    index_builder_class = None
2252
2286
    index_class = None
 
2287
    _fetch_uses_deltas = True
 
2288
    fast_deltas = False
2253
2289
 
2254
2290
    def initialize(self, a_bzrdir, shared=False):
2255
2291
        """Create a pack based repository.
2264
2300
        builder = self.index_builder_class()
2265
2301
        files = [('pack-names', builder.finish())]
2266
2302
        utf8_files = [('format', self.get_format_string())]
2267
 
        
 
2303
 
2268
2304
        self._upload_blank_content(a_bzrdir, dirs, files, utf8_files, shared)
2269
2305
        return self.open(a_bzrdir=a_bzrdir, _found=True)
2270
2306
 
2271
2307
    def open(self, a_bzrdir, _found=False, _override_transport=None):
2272
2308
        """See RepositoryFormat.open().
2273
 
        
 
2309
 
2274
2310
        :param _override_transport: INTERNAL USE ONLY. Allows opening the
2275
2311
                                    repository at a slightly different url
2276
2312
                                    than normal. I.e. during 'upgrade'.
2362
2398
        if not getattr(target_format, 'supports_tree_reference', False):
2363
2399
            raise errors.BadConversionTarget(
2364
2400
                'Does not support nested trees', target_format)
2365
 
            
 
2401
 
2366
2402
    def get_format_string(self):
2367
2403
        """See RepositoryFormat.get_format_string()."""
2368
2404
        return "Bazaar pack repository format 1 with subtree support (needs bzr 0.92)\n"
2645
2681
    # What index classes to use
2646
2682
    index_builder_class = BTreeBuilder
2647
2683
    index_class = BTreeGraphIndex
 
2684
    # Set to true to get the fast-commit code path tested until a really fast
 
2685
    # format lands in trunk. Not actually fast in this format.
 
2686
    fast_deltas = True
2648
2687
 
2649
2688
    @property
2650
2689
    def _serializer(self):
2708
2747
        if not getattr(target_format, 'supports_tree_reference', False):
2709
2748
            raise errors.BadConversionTarget(
2710
2749
                'Does not support nested trees', target_format)
2711
 
            
 
2750
 
2712
2751
    def get_format_string(self):
2713
2752
        """See RepositoryFormat.get_format_string()."""
2714
2753
        return ("Bazaar development format 2 with subtree support "