~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/repofmt/pack_repo.py

  • Committer: John Arbash Meinel
  • Date: 2009-07-29 21:35:05 UTC
  • mfrom: (4576 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4577.
  • Revision ID: john@arbash-meinel.com-20090729213505-tkqsvy1zfpocu75w
Merge bzr.dev 4576 in prep for NEWS

Show diffs side-by-side

added added

removed removed

Lines of Context:
36
36
    )
37
37
from bzrlib.index import (
38
38
    CombinedGraphIndex,
39
 
    GraphIndex,
40
 
    GraphIndexBuilder,
41
39
    GraphIndexPrefixAdapter,
42
 
    InMemoryGraphIndex,
43
40
    )
44
41
from bzrlib.knit import (
45
42
    KnitPlainFactory,
55
52
    lockable_files,
56
53
    lockdir,
57
54
    revision as _mod_revision,
58
 
    symbol_versioning,
59
55
    )
60
56
 
61
57
from bzrlib.decorators import needs_write_lock
73
69
    MetaDirRepositoryFormat,
74
70
    RepositoryFormat,
75
71
    RootCommitBuilder,
 
72
    StreamSource,
76
73
    )
77
 
import bzrlib.revision as _mod_revision
78
74
from bzrlib.trace import (
79
75
    mutter,
80
76
    warning,
312
308
 
313
309
    def finish(self):
314
310
        self._check_references()
315
 
        new_name = '../packs/' + self.file_name()
316
 
        self.upload_transport.rename(self.file_name(), new_name)
317
311
        index_types = ['revision', 'inventory', 'text', 'signature']
318
312
        if self.chk_index is not None:
319
313
            index_types.append('chk')
322
316
            new_name = '../indices/' + old_name
323
317
            self.upload_transport.rename(old_name, new_name)
324
318
            self._replace_index_with_readonly(index_type)
 
319
        new_name = '../packs/' + self.file_name()
 
320
        self.upload_transport.rename(self.file_name(), new_name)
325
321
        self._state = 'finished'
326
322
 
327
323
    def _get_external_refs(self, index):
1458
1454
        in synchronisation with certain steps. Otherwise the names collection
1459
1455
        is not flushed.
1460
1456
 
1461
 
        :return: True if packing took place.
 
1457
        :return: Something evaluating true if packing took place.
1462
1458
        """
1463
1459
        while True:
1464
1460
            try:
1465
1461
                return self._do_autopack()
1466
 
            except errors.RetryAutopack, e:
 
1462
            except errors.RetryAutopack:
1467
1463
                # If we get a RetryAutopack exception, we should abort the
1468
1464
                # current action, and retry.
1469
1465
                pass
1473
1469
        total_revisions = self.revision_index.combined_index.key_count()
1474
1470
        total_packs = len(self._names)
1475
1471
        if self._max_pack_count(total_revisions) >= total_packs:
1476
 
            return False
 
1472
            return None
1477
1473
        # determine which packs need changing
1478
1474
        pack_distribution = self.pack_distribution(total_revisions)
1479
1475
        existing_packs = []
1501
1497
            'containing %d revisions. Packing %d files into %d affecting %d'
1502
1498
            ' revisions', self, total_packs, total_revisions, num_old_packs,
1503
1499
            num_new_packs, num_revs_affected)
1504
 
        self._execute_pack_operations(pack_operations,
 
1500
        result = self._execute_pack_operations(pack_operations,
1505
1501
                                      reload_func=self._restart_autopack)
1506
1502
        mutter('Auto-packing repository %s completed', self)
1507
 
        return True
 
1503
        return result
1508
1504
 
1509
1505
    def _execute_pack_operations(self, pack_operations, _packer_class=Packer,
1510
1506
                                 reload_func=None):
1512
1508
 
1513
1509
        :param pack_operations: A list of [revision_count, packs_to_combine].
1514
1510
        :param _packer_class: The class of packer to use (default: Packer).
1515
 
        :return: None.
 
1511
        :return: The new pack names.
1516
1512
        """
1517
1513
        for revision_count, packs in pack_operations:
1518
1514
            # we may have no-ops from the setup logic
1534
1530
                self._remove_pack_from_memory(pack)
1535
1531
        # record the newly available packs and stop advertising the old
1536
1532
        # packs
1537
 
        self._save_pack_names(clear_obsolete_packs=True)
 
1533
        result = self._save_pack_names(clear_obsolete_packs=True)
1538
1534
        # Move the old packs out of the way now they are no longer referenced.
1539
1535
        for revision_count, packs in pack_operations:
1540
1536
            self._obsolete_packs(packs)
 
1537
        return result
1541
1538
 
1542
1539
    def _flush_new_pack(self):
1543
1540
        if self._new_pack is not None:
1553
1550
 
1554
1551
    def _already_packed(self):
1555
1552
        """Is the collection already packed?"""
1556
 
        return len(self._names) < 2
 
1553
        return not (self.repo._format.pack_compresses or (len(self._names) > 1))
1557
1554
 
1558
 
    def pack(self):
 
1555
    def pack(self, hint=None):
1559
1556
        """Pack the pack collection totally."""
1560
1557
        self.ensure_loaded()
1561
1558
        total_packs = len(self._names)
1562
1559
        if self._already_packed():
1563
 
            # This is arguably wrong because we might not be optimal, but for
1564
 
            # now lets leave it in. (e.g. reconcile -> one pack. But not
1565
 
            # optimal.
1566
1560
            return
1567
1561
        total_revisions = self.revision_index.combined_index.key_count()
1568
1562
        # XXX: the following may want to be a class, to pack with a given
1569
1563
        # policy.
1570
1564
        mutter('Packing repository %s, which has %d pack files, '
1571
 
            'containing %d revisions into 1 packs.', self, total_packs,
1572
 
            total_revisions)
 
1565
            'containing %d revisions with hint %r.', self, total_packs,
 
1566
            total_revisions, hint)
1573
1567
        # determine which packs need changing
1574
 
        pack_distribution = [1]
1575
1568
        pack_operations = [[0, []]]
1576
1569
        for pack in self.all_packs():
1577
 
            pack_operations[-1][0] += pack.get_revision_count()
1578
 
            pack_operations[-1][1].append(pack)
 
1570
            if not hint or pack.name in hint:
 
1571
                pack_operations[-1][0] += pack.get_revision_count()
 
1572
                pack_operations[-1][1].append(pack)
1579
1573
        self._execute_pack_operations(pack_operations, OptimisingPacker)
1580
1574
 
1581
1575
    def plan_autopack_combinations(self, existing_packs, pack_distribution):
1937
1931
 
1938
1932
        :param clear_obsolete_packs: If True, clear out the contents of the
1939
1933
            obsolete_packs directory.
 
1934
        :return: A list of the names saved that were not previously on disk.
1940
1935
        """
1941
1936
        self.lock_names()
1942
1937
        try:
1957
1952
            self._unlock_names()
1958
1953
        # synchronise the memory packs list with what we just wrote:
1959
1954
        self._syncronize_pack_names_from_disk_nodes(disk_nodes)
 
1955
        return [new_node[0][0] for new_node in new_nodes]
1960
1956
 
1961
1957
    def reload_pack_names(self):
1962
1958
        """Sync our pack listing with what is present in the repository.
2096
2092
            if not self.autopack():
2097
2093
                # when autopack takes no steps, the names list is still
2098
2094
                # unsaved.
2099
 
                self._save_pack_names()
 
2095
                return self._save_pack_names()
2100
2096
 
2101
2097
    def _suspend_write_group(self):
2102
2098
        tokens = [pack.name for pack in self._resumed_packs]
2265
2261
            pb.finished()
2266
2262
        return result
2267
2263
 
 
2264
    def _get_source(self, to_format):
 
2265
        if to_format.network_name() == self._format.network_name():
 
2266
            return KnitPackStreamSource(self, to_format)
 
2267
        return super(KnitPackRepository, self)._get_source(to_format)
 
2268
 
2268
2269
    def _make_parents_provider(self):
2269
2270
        return graph.CachingParentsProvider(self)
2270
2271
 
2342
2343
        raise NotImplementedError(self.dont_leave_lock_in_place)
2343
2344
 
2344
2345
    @needs_write_lock
2345
 
    def pack(self):
 
2346
    def pack(self, hint=None):
2346
2347
        """Compress the data within the repository.
2347
2348
 
2348
2349
        This will pack all the data to a single pack. In future it may
2349
2350
        recompress deltas or do other such expensive operations.
2350
2351
        """
2351
 
        self._pack_collection.pack()
 
2352
        self._pack_collection.pack(hint=hint)
2352
2353
 
2353
2354
    @needs_write_lock
2354
2355
    def reconcile(self, other=None, thorough=False):
2384
2385
                repo.unlock()
2385
2386
 
2386
2387
 
 
2388
class KnitPackStreamSource(StreamSource):
 
2389
    """A StreamSource used to transfer data between same-format KnitPack repos.
 
2390
 
 
2391
    This source assumes:
 
2392
        1) Same serialization format for all objects
 
2393
        2) Same root information
 
2394
        3) XML format inventories
 
2395
        4) Atomic inserts (so we can stream inventory texts before text
 
2396
           content)
 
2397
        5) No chk_bytes
 
2398
    """
 
2399
 
 
2400
    def __init__(self, from_repository, to_format):
 
2401
        super(KnitPackStreamSource, self).__init__(from_repository, to_format)
 
2402
        self._text_keys = None
 
2403
        self._text_fetch_order = 'unordered'
 
2404
 
 
2405
    def _get_filtered_inv_stream(self, revision_ids):
 
2406
        from_repo = self.from_repository
 
2407
        parent_ids = from_repo._find_parent_ids_of_revisions(revision_ids)
 
2408
        parent_keys = [(p,) for p in parent_ids]
 
2409
        find_text_keys = from_repo._find_text_key_references_from_xml_inventory_lines
 
2410
        parent_text_keys = set(find_text_keys(
 
2411
            from_repo._inventory_xml_lines_for_keys(parent_keys)))
 
2412
        content_text_keys = set()
 
2413
        knit = KnitVersionedFiles(None, None)
 
2414
        factory = KnitPlainFactory()
 
2415
        def find_text_keys_from_content(record):
 
2416
            if record.storage_kind not in ('knit-delta-gz', 'knit-ft-gz'):
 
2417
                raise ValueError("Unknown content storage kind for"
 
2418
                    " inventory text: %s" % (record.storage_kind,))
 
2419
            # It's a knit record, it has a _raw_record field (even if it was
 
2420
            # reconstituted from a network stream).
 
2421
            raw_data = record._raw_record
 
2422
            # read the entire thing
 
2423
            revision_id = record.key[-1]
 
2424
            content, _ = knit._parse_record(revision_id, raw_data)
 
2425
            if record.storage_kind == 'knit-delta-gz':
 
2426
                line_iterator = factory.get_linedelta_content(content)
 
2427
            elif record.storage_kind == 'knit-ft-gz':
 
2428
                line_iterator = factory.get_fulltext_content(content)
 
2429
            content_text_keys.update(find_text_keys(
 
2430
                [(line, revision_id) for line in line_iterator]))
 
2431
        revision_keys = [(r,) for r in revision_ids]
 
2432
        def _filtered_inv_stream():
 
2433
            source_vf = from_repo.inventories
 
2434
            stream = source_vf.get_record_stream(revision_keys,
 
2435
                                                 'unordered', False)
 
2436
            for record in stream:
 
2437
                if record.storage_kind == 'absent':
 
2438
                    raise errors.NoSuchRevision(from_repo, record.key)
 
2439
                find_text_keys_from_content(record)
 
2440
                yield record
 
2441
            self._text_keys = content_text_keys - parent_text_keys
 
2442
        return ('inventories', _filtered_inv_stream())
 
2443
 
 
2444
    def _get_text_stream(self):
 
2445
        # Note: We know we don't have to handle adding root keys, because both
 
2446
        # the source and target are the identical network name.
 
2447
        text_stream = self.from_repository.texts.get_record_stream(
 
2448
                        self._text_keys, self._text_fetch_order, False)
 
2449
        return ('texts', text_stream)
 
2450
 
 
2451
    def get_stream(self, search):
 
2452
        revision_ids = search.get_keys()
 
2453
        for stream_info in self._fetch_revision_texts(revision_ids):
 
2454
            yield stream_info
 
2455
        self._revision_keys = [(rev_id,) for rev_id in revision_ids]
 
2456
        yield self._get_filtered_inv_stream(revision_ids)
 
2457
        yield self._get_text_stream()
 
2458
 
 
2459
 
 
2460
 
2387
2461
class RepositoryFormatPack(MetaDirRepositoryFormat):
2388
2462
    """Format logic for pack structured repositories.
2389
2463