~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/repofmt/pack_repo.py

  • Committer: Robert Collins
  • Date: 2007-10-15 03:09:42 UTC
  • mto: This revision was merged to the branch mainline in revision 2933.
  • Revision ID: robertc@robertcollins.net-20071015030942-koi2eoiaewe9kdod
Output the revision index from NewPack.finish

Show diffs side-by-side

added added

removed removed

Lines of Context:
106
106
        """
107
107
        self.revision_index = revision_index
108
108
 
 
109
    def get_revision_count(self):
 
110
        return self.revision_index.key_count()
 
111
 
 
112
    def inventory_index_name(self, name):
 
113
        """The inv index is the name + .iix."""
 
114
        return name + '.iix'
 
115
 
109
116
    def revision_index_name(self, name):
110
117
        """The revision index is the name + .rix."""
111
118
        return name + '.rix'
118
125
        """The text index is the name + .tix."""
119
126
        return name + '.tix'
120
127
 
121
 
    def inventory_index_name(self, name):
122
 
        """The inv index is the name + .iix."""
123
 
        return name + '.iix'
124
 
 
125
128
 
126
129
class ExistingPack(Pack):
127
130
    """An in memory proxy for an exisiting .pack and its disk indices."""
151
154
        """Get the file name for the pack on disk."""
152
155
        return self.name + '.pack'
153
156
 
154
 
    def get_revision_count(self):
155
 
        return self.revision_index.key_count()
156
 
 
157
157
 
158
158
class NewPack(Pack):
159
159
    """An in memory proxy for a pack which is being created."""
160
160
 
161
 
    def __init__(self):
162
 
        Pack.__init__(self, InMemoryGraphIndex(1))
 
161
    def __init__(self, upload_transport, index_transport, pack_transport,
 
162
        upload_suffix=''):
 
163
        """Create a NewPack instance.
 
164
 
 
165
        :param upload_transport: A writable transport for the pack to be
 
166
            incrementally uploaded to.
 
167
        :param index_transport: A writable transport for the pack's indices to
 
168
            be written to when the pack is finished.
 
169
        :param pack_transport: A writable transport for the pack to be renamed
 
170
            to when the upload is complete.
 
171
        :param upload_suffix: An optional suffix to be given to any temporary
 
172
            files created during the pack creation. e.g '.autopack'
 
173
        """
 
174
        Pack.__init__(self, InMemoryGraphIndex(reference_lists=1))
 
175
        # where should the new pack be opened
 
176
        self.upload_transport = upload_transport
 
177
        # where are indices written out to
 
178
        self.index_transport = index_transport
 
179
        # where is the pack renamed to when it is finished?
 
180
        self.pack_transport = pack_transport
163
181
        # tracks the content written to the .pack file.
164
182
        self._hash = md5.new()
 
183
        # a four-tuple with the length in bytes of the indices, once the pack
 
184
        # is finalised.
 
185
        self.index_sizes = None
 
186
        # the temporary pack file name.
 
187
        self.random_name = rand_chars(20) + upload_suffix
 
188
        # when was this pack started ?
 
189
        self.start_time = time.time()
 
190
 
 
191
    def finish(self):
 
192
        """Finish the new pack.
 
193
 
 
194
        This:
 
195
         - finalises the content
 
196
         - assigns a name (the md5 of the content, currently)
 
197
         - writes out the associated indices
 
198
         - renames the pack into place.
 
199
         - stores the index size tuple for the pack in the index_sizes
 
200
           attribute.
 
201
        """
 
202
        new_name = self._hash.hexdigest()
 
203
        self.index_sizes = [None, None, None, None]
 
204
        revision_index_name = self.revision_index_name(new_name)
 
205
        self.index_sizes[0] = self.index_transport.put_file(
 
206
            revision_index_name, self.revision_index.finish())
 
207
        if 'fetch' in debug.debug_flags:
 
208
            # XXX: size might be interesting?
 
209
            mutter('%s: create_pack: wrote revision index: %s%s t+%6.3fs',
 
210
                time.ctime(), self.upload_transport.base, self.random_name,
 
211
                time.time() - self.start_time)
165
212
 
166
213
 
167
214
class RepositoryPackCollection(object):
297
344
        self._execute_pack_operations(pack_operations)
298
345
        return True
299
346
 
300
 
    def flush_revision_signature_indices(self, new_name):
 
347
    def flush_revision_signature_indices(self, new_name, revision_index_length):
301
348
        """Write out pending indices."""
302
349
        # write a revision index (might be empty)
303
 
        new_index_name = self.repo._packs._new_pack.revision_index_name(new_name)
304
 
        revision_index_length = self._index_transport.put_file(
305
 
            new_index_name, self.repo._packs._new_pack.revision_index.finish())
 
350
        new_index_name = self._new_pack.revision_index_name(new_name)
306
351
        rev_index = GraphIndex(self._index_transport, new_index_name,
307
352
                revision_index_length)
308
353
        if self.repo._revision_all_indices is None:
311
356
            # that in these mapping classes
312
357
            self.repo._revision_pack_map = self._make_index_map('.rix')[0]
313
358
        else:
314
 
            del self.repo._revision_pack_map[self.repo._packs._new_pack.revision_index]
 
359
            del self.repo._revision_pack_map[self._new_pack.revision_index]
315
360
            self.repo._revision_pack_map[rev_index] = (self._pack_tuple(new_name))
316
361
            # revisions 'knit' accessed : update it.
317
362
            self.repo._revision_all_indices.insert_index(0, rev_index)
322
367
            self.repo._revision_knit_access.set_writer(None, None, (None, None))
323
368
 
324
369
        # write a signatures index (might be empty)
325
 
        new_index_name = NewPack().signature_index_name(new_name)
 
370
        new_index_name = self._new_pack.signature_index_name(new_name)
326
371
        signature_index_length = self._index_transport.put_file(
327
372
            new_index_name, self.repo._signature_write_index.finish())
328
373
        self.repo._signature_write_index = None
342
387
    def flush_inventory_index(self, new_name):
343
388
        """Write the index out to new_name."""
344
389
        # write an index (might be empty)
345
 
        new_index_name = NewPack().inventory_index_name(new_name)
 
390
        new_index_name = self._new_pack.inventory_index_name(new_name)
346
391
        inventory_index_length = self._index_transport.put_file(
347
392
            new_index_name, self.repo._inv_write_index.finish())
348
393
        self.repo._inv_write_index = None
364
409
    def flush_text_index(self, new_name):
365
410
        """Write the index out to new_name."""
366
411
        # write a revision index (might be empty)
367
 
        new_index_name = NewPack().text_index_name(new_name)
 
412
        new_index_name = self._new_pack.text_index_name(new_name)
368
413
        text_index_length = self._index_transport.put_file(
369
414
            new_index_name, self.repo._text_write_index.finish())
370
415
        txt_index = GraphIndex(self._index_transport, new_index_name,
404
449
        if getattr(self.repo, '_open_pack_tuple', None) is not None:
405
450
            raise errors.BzrError('call to create_pack_from_packs while '
406
451
                'another pack is being written.')
 
452
        if self._new_pack is not None:
 
453
            raise errors.BzrError('call to create_pack_from_packs while '
 
454
                'another pack is being written.')
407
455
        if revision_ids is not None and len(revision_ids) == 0:
408
456
            # silly fetch request.
409
457
            return None
410
 
        random_name = self._random_name() + suffix
 
458
        new_pack = NewPack(self._upload_transport, self._index_transport,
 
459
            self._pack_transport, upload_suffix=suffix)
 
460
        random_name = new_pack.random_name
411
461
        if 'fetch' in debug.debug_flags:
412
462
            plain_pack_list = ['%s%s' % (a_pack.transport.base, a_pack.name)
413
463
                for a_pack in packs]
419
469
                '%s%s %s revisions wanted %s t=0',
420
470
                time.ctime(), self._upload_transport.base, random_name,
421
471
                plain_pack_list, rev_count)
422
 
            start_time = time.time()
423
472
        write_stream = self._upload_transport.open_write_stream(random_name)
424
473
        if 'fetch' in debug.debug_flags:
425
474
            mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
426
475
                time.ctime(), self._upload_transport.base, random_name,
427
 
                time.time() - start_time)
428
 
        new_pack = NewPack()
 
476
                time.time() - new_pack.start_time)
429
477
        buffer = []
430
478
        def write_data(bytes, update=new_pack._hash.update, write=write_stream.write):
431
479
            buffer.append(bytes)
437
485
        writer = pack.ContainerWriter(write_data)
438
486
        writer.begin()
439
487
        # open new indices
440
 
        revision_index = InMemoryGraphIndex(reference_lists=1)
441
488
        inv_index = InMemoryGraphIndex(reference_lists=2)
442
489
        text_index = InMemoryGraphIndex(reference_lists=2, key_elements=2)
443
490
        signature_index = InMemoryGraphIndex(reference_lists=0)
453
500
        revision_nodes = self._index_contents(revision_index_map, revision_keys)
454
501
        # copy revision keys and adjust values
455
502
        list(self._copy_nodes_graph(revision_nodes, revision_index_map, writer,
456
 
            revision_index))
 
503
            new_pack.revision_index))
457
504
        if 'fetch' in debug.debug_flags:
458
505
            mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
459
506
                time.ctime(), self._upload_transport.base, random_name,
460
 
                revision_index.key_count(),
461
 
                time.time() - start_time)
 
507
                new_pack.revision_index.key_count(),
 
508
                time.time() - new_pack.start_time)
462
509
        # select inventory keys
463
510
        inv_keys = revision_keys # currently the same keyspace, and note that
464
511
        # querying for keys here could introduce a bug where an inventory item
487
534
            mutter('%s: create_pack: inventories copied: %s%s %d items t+%6.3fs',
488
535
                time.ctime(), self._upload_transport.base, random_name,
489
536
                inv_index.key_count(),
490
 
                time.time() - start_time)
 
537
                time.time() - new_pack.start_time)
491
538
        # select text keys
492
539
        text_index_map = self._packs_list_to_pack_map_and_index_list(
493
540
            packs, 'text_index')[0]
515
562
            mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
516
563
                time.ctime(), self._upload_transport.base, random_name,
517
564
                text_index.key_count(),
518
 
                time.time() - start_time)
 
565
                time.time() - new_pack.start_time)
519
566
        # select signature keys
520
567
        signature_filter = revision_keys # same keyspace
521
568
        signature_index_map = self._packs_list_to_pack_map_and_index_list(
528
575
            mutter('%s: create_pack: revision signatures copied: %s%s %d items t+%6.3fs',
529
576
                time.ctime(), self._upload_transport.base, random_name,
530
577
                signature_index.key_count(),
531
 
                time.time() - start_time)
 
578
                time.time() - new_pack.start_time)
532
579
        # finish the pack
533
580
        writer.end()
534
581
        if len(buffer):
537
584
            new_pack._hash.update(bytes)
538
585
        new_name = new_pack._hash.hexdigest()
539
586
        # if nothing has been written, discard the new pack.
540
 
        if 0 == sum((revision_index.key_count(),
 
587
        if 0 == sum((new_pack.get_revision_count(),
541
588
            inv_index.key_count(),
542
589
            text_index.key_count(),
543
590
            signature_index.key_count(),
545
592
            self._upload_transport.delete(random_name)
546
593
            return None
547
594
        # write indices
548
 
        new_pack = NewPack()
549
595
        index_transport = self._index_transport
550
 
        rev_index_name = new_pack.revision_index_name(new_name)
551
 
        revision_index_length = index_transport.put_file(rev_index_name,
552
 
            revision_index.finish())
553
 
        if 'fetch' in debug.debug_flags:
554
 
            # XXX: size might be interesting?
555
 
            mutter('%s: create_pack: wrote revision index: %s%s t+%6.3fs',
556
 
                time.ctime(), self._upload_transport.base, random_name,
557
 
                time.time() - start_time)
 
596
        new_pack.finish()
 
597
        revision_index_length = new_pack.index_sizes[0]
558
598
        inv_index_name = new_pack.inventory_index_name(new_name)
559
599
        inventory_index_length = index_transport.put_file(inv_index_name,
560
600
            inv_index.finish())
562
602
            # XXX: size might be interesting?
563
603
            mutter('%s: create_pack: wrote inventory index: %s%s t+%6.3fs',
564
604
                time.ctime(), self._upload_transport.base, random_name,
565
 
                time.time() - start_time)
 
605
                time.time() - new_pack.start_time)
566
606
        text_index_name = new_pack.text_index_name(new_name)
567
607
        text_index_length = index_transport.put_file(text_index_name,
568
608
            text_index.finish())
570
610
            # XXX: size might be interesting?
571
611
            mutter('%s: create_pack: wrote file texts index: %s%s t+%6.3fs',
572
612
                time.ctime(), self._upload_transport.base, random_name,
573
 
                time.time() - start_time)
 
613
                time.time() - new_pack.start_time)
574
614
        signature_index_name = new_pack.signature_index_name(new_name)
575
615
        signature_index_length = index_transport.put_file(signature_index_name,
576
616
            signature_index.finish())
578
618
            # XXX: size might be interesting?
579
619
            mutter('%s: create_pack: wrote revision signatures index: %s%s t+%6.3fs',
580
620
                time.ctime(), self._upload_transport.base, random_name,
581
 
                time.time() - start_time)
 
621
                time.time() - new_pack.start_time)
582
622
        # add to name
583
623
        self.allocate(new_name, revision_index_length, inventory_index_length,
584
624
            text_index_length, signature_index_length)
587
627
        write_stream.close()
588
628
        self._upload_transport.rename(random_name, '../packs/' + new_name + '.pack')
589
629
        result = ExistingPack(self._upload_transport.clone('../packs/'), new_name,
590
 
            revision_index, inv_index, text_index, signature_index)
 
630
            new_pack.revision_index, inv_index, text_index, signature_index)
591
631
        if 'fetch' in debug.debug_flags:
592
632
            # XXX: size might be interesting?
593
633
            mutter('%s: create_pack: pack renamed into place: %s%s->%s%s t+%6.3fs',
594
634
                time.ctime(), self._upload_transport.base, random_name,
595
635
                result.transport, result.name,
596
 
                time.time() - start_time)
 
636
                time.time() - new_pack.start_time)
597
637
        if 'fetch' in debug.debug_flags:
598
638
            # XXX: size might be interesting?
599
639
            mutter('%s: create_pack: finished: %s%s t+%6.3fs',
600
640
                time.ctime(), self._upload_transport.base, random_name,
601
 
                time.time() - start_time)
 
641
                time.time() - new_pack.start_time)
602
642
        return result
603
643
 
604
644
    def _execute_pack_operations(self, pack_operations):
1019
1059
        else:
1020
1060
            return all_index.iter_entries(key_filter)
1021
1061
 
1022
 
    def _random_name(self):
1023
 
        """Return a random name."""
1024
 
        return rand_chars(20)
1025
 
 
1026
1062
    def release_names(self):
1027
1063
        """Release the mutex around the pack-names index."""
1028
1064
        self.repo.control_files.unlock()
1061
1097
        # Do not permit preparation for writing if we're not in a 'write lock'.
1062
1098
        if not self.repo.is_write_locked():
1063
1099
            raise errors.NotWriteLocked(self)
1064
 
        self._new_pack = NewPack()
 
1100
        self._new_pack = NewPack(self._upload_transport, self._index_transport,
 
1101
            self._pack_transport, upload_suffix='.pack')
1065
1102
 
1066
1103
    def _start_write_group(self):
1067
 
        random_name = self._random_name()
1068
 
        self.repo._open_pack_tuple = (self._upload_transport, random_name + '.pack')
1069
 
        write_stream = self._upload_transport.open_write_stream(random_name + '.pack')
 
1104
        self.setup()
 
1105
        random_name = self._new_pack.random_name
 
1106
        self.repo._open_pack_tuple = (self._upload_transport, random_name)
 
1107
        write_stream = self._upload_transport.open_write_stream(random_name)
1070
1108
        self._write_stream = write_stream
1071
 
        self.setup()
1072
1109
        def write_data(bytes, write=write_stream.write,
1073
1110
                       update=self._new_pack._hash.update):
1074
1111
            write(bytes)
1091
1128
        if data_inserted:
1092
1129
            self._open_pack_writer.end()
1093
1130
            new_name = self._new_pack._hash.hexdigest()
 
1131
            self._new_pack.finish()
1094
1132
            txt_index, text_index_length = self.flush_text_index(new_name)
1095
1133
            inv_index, inventory_index_length = \
1096
1134
                self.flush_inventory_index(new_name)
1097
1135
            rev_index, revision_index_length, \
1098
1136
                sig_index, signature_index_length = \
1099
 
                self.flush_revision_signature_indices(new_name)
 
1137
                self.flush_revision_signature_indices(new_name,
 
1138
                self._new_pack.index_sizes[0])
1100
1139
            new_pack = ExistingPack(self._upload_transport.clone('../packs/'),
1101
1140
                new_name, rev_index, inv_index, txt_index, sig_index)
1102
1141
            self._write_stream.close()
1112
1151
                inventory_index_length, text_index_length,
1113
1152
                signature_index_length)
1114
1153
            self.repo._open_pack_tuple = None
 
1154
            self._new_pack = None
1115
1155
            if not self.autopack():
1116
1156
                self._save_pack_names()
1117
1157
        else:
1154
1194
        if getattr(self.repo, '_revision_knit', None) is not None:
1155
1195
            return self.repo._revision_knit
1156
1196
        pack_map, indices = self.repo._packs._make_index_map('.rix')
1157
 
        if self.repo.is_in_write_group():
 
1197
        if self.repo._packs._new_pack is not None:
1158
1198
            # allow writing: queue writes to a new index
1159
1199
            indices.insert(0, self.repo._packs._new_pack.revision_index)
1160
1200
            pack_map[self.repo._packs._new_pack.revision_index] = self.repo._open_pack_tuple