~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/repofmt/pack_repo.py

  • Committer: Robert Collins
  • Date: 2007-10-16 02:48:22 UTC
  • mto: This revision was merged to the branch mainline in revision 2933.
  • Revision ID: robertc@robertcollins.net-20071016024822-vsaucu2p3gnz3b6m
Make NewPack reopen the index files, separating out the task of refreshing the index maps in the repository and managing the completion of writing a single pack to disk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
118
118
        self.text_index = text_index
119
119
        self.signature_index = signature_index
120
120
 
 
121
    def file_name(self):
 
122
        """Get the file name for the pack on disk."""
 
123
        return self.name + '.pack'
 
124
 
121
125
    def get_revision_count(self):
122
126
        return self.revision_index.key_count()
123
127
 
124
128
    def inventory_index_name(self, name):
125
129
        """The inv index is the name + .iix."""
126
 
        return name + '.iix'
 
130
        return self.index_name('inventory', name)
127
131
 
128
132
    def revision_index_name(self, name):
129
133
        """The revision index is the name + .rix."""
130
 
        return name + '.rix'
 
134
        return self.index_name('revision', name)
131
135
 
132
136
    def signature_index_name(self, name):
133
137
        """The signature index is the name + .six."""
134
 
        return name + '.six'
 
138
        return self.index_name('signature', name)
135
139
 
136
140
    def text_index_name(self, name):
137
141
        """The text index is the name + .tix."""
138
 
        return name + '.tix'
 
142
        return self.index_name('text', name)
139
143
 
140
144
 
141
145
class ExistingPack(Pack):
142
146
    """An in memory proxy for an exisiting .pack and its disk indices."""
143
147
 
144
 
    def __init__(self, transport, name, revision_index, inventory_index,
 
148
    def __init__(self, pack_transport, name, revision_index, inventory_index,
145
149
        text_index, signature_index):
 
150
        """Create an ExistingPack object.
 
151
 
 
152
        :param pack_transport: The transport where the pack file resides.
 
153
        :param name: The name of the pack on disk in the pack_transport.
 
154
        """
146
155
        Pack.__init__(self, revision_index, inventory_index, text_index,
147
156
            signature_index)
148
157
        self.name = name
149
 
        self.transport = transport
 
158
        self.pack_transport = pack_transport
150
159
        assert None not in (revision_index, inventory_index, text_index,
151
 
            signature_index, name, transport)
 
160
            signature_index, name, pack_transport)
152
161
 
153
162
    def __eq__(self, other):
154
163
        return self.__dict__ == other.__dict__
160
169
        return "<bzrlib.repofmt.pack_repo.Pack object at 0x%x, %s, %s" % (
161
170
            id(self), self.transport, self.name)
162
171
 
163
 
    def file_name(self):
164
 
        """Get the file name for the pack on disk."""
165
 
        return self.name + '.pack'
166
 
 
167
172
 
168
173
class NewPack(Pack):
169
174
    """An in memory proxy for a pack which is being created."""
170
175
 
 
176
    # A map of index 'type' to the file extension and position in the
 
177
    # index_sizes array.
 
178
    indices = {
 
179
        'revision':('.rix', 0),
 
180
        'inventory':('.iix', 1),
 
181
        'text':('.tix', 2),
 
182
        'signature':('.six', 3),
 
183
        }
 
184
 
171
185
    def __init__(self, upload_transport, index_transport, pack_transport,
172
186
        upload_suffix=''):
173
187
        """Create a NewPack instance.
233
247
        """
234
248
        self.name = self._hash.hexdigest()
235
249
        self.index_sizes = [None, None, None, None]
236
 
        self._write_index(self.revision_index, 0,
237
 
            self.revision_index_name, 'revision')
238
 
        self._write_index(self.inventory_index, 1,
239
 
            self.inventory_index_name, 'inventory')
240
 
        self._write_index(self.text_index, 2,
241
 
            self.text_index_name, 'file texts')
242
 
        self._write_index(self.signature_index, 3,
243
 
            self.signature_index_name, 'revision signatures')
244
 
 
245
 
    def _write_index(self, index, index_offset, name_getter, label):
 
250
        self._write_index('revision', self.revision_index, 'revision')
 
251
        self._write_index('inventory', self.inventory_index, 'inventory')
 
252
        self._write_index('text', self.text_index, 'file texts')
 
253
        self._write_index('signature', self.signature_index,
 
254
            'revision signatures')
 
255
 
 
256
 
 
257
    def make_index(self, index_type):
 
258
        """Construct a GraphIndex object for this packs index 'index_type'."""
 
259
        setattr(self, index_type + '_index',
 
260
            GraphIndex(self.index_transport,
 
261
                self.index_name(index_type, self.name),
 
262
                self.index_sizes[self.index_offset(index_type)]))
 
263
 
 
264
    def index_name(self, index_type, name):
 
265
        """Get the disk name of an index type for pack name 'name'."""
 
266
        return name + NewPack.indices[index_type][0]
 
267
 
 
268
    def index_offset(self, index_type):
 
269
        """Get the position in a index_size array for a given index type."""
 
270
        return NewPack.indices[index_type][1]
 
271
 
 
272
    def _write_index(self, index_type, index, label):
246
273
        """Write out an index.
247
274
 
248
275
        :param index: The index object to serialise.
250
277
        :param name_getter: What to use to get the name of the index on disk.
251
278
        :param label: What label to give the index e.g. 'revision'.
252
279
        """
253
 
        index_name = name_getter(self.name)
254
 
        self.index_sizes[index_offset] = self.index_transport.put_file(
255
 
            index_name, index.finish())
 
280
        index_name = self.index_name(index_type, self.name)
 
281
        self.index_sizes[self.index_offset(index_type)] = \
 
282
            self.index_transport.put_file(index_name, index.finish())
256
283
        if 'fetch' in debug.debug_flags:
257
284
            # XXX: size might be interesting?
258
285
            mutter('%s: create_pack: wrote %s index: %s%s t+%6.3fs',
259
286
                time.ctime(), label, self.upload_transport.base,
260
287
                self.random_name, time.time() - self.start_time)
 
288
        # As we have no current protection against erroneous additional
 
289
        # insertions, load the index from disk on further use. We should alter
 
290
        # the index layer to make it's finish() error if add_node is
 
291
        # subsequently used. RBC
 
292
        self.make_index(index_type)
261
293
 
262
294
 
263
295
class RepositoryPackCollection(object):
302
334
            pass
303
335
        else:
304
336
            self.repo._revision_pack_map[pack.revision_index] = (
305
 
                pack.transport, pack.name + '.pack')
 
337
                pack.pack_transport, pack.name + '.pack')
306
338
            self.repo._revision_all_indices.insert_index(0, pack.revision_index)
307
339
        if self.repo._inv_all_indices is not None:
308
340
            # inv 'knit' has been used : update it.
309
341
            self.repo._inv_all_indices.insert_index(0,
310
342
                pack.inventory_index)
311
 
            self.repo._inv_pack_map[pack.inventory_index] = pack.transport, pack.name + '.pack'
 
343
            self.repo._inv_pack_map[pack.inventory_index] = pack.pack_transport, pack.name + '.pack'
312
344
        if self.repo._text_all_indices is not None:
313
345
            # text 'knits' have been used : update it.
314
346
            self.repo._text_all_indices.insert_index(0,
393
425
        self._execute_pack_operations(pack_operations)
394
426
        return True
395
427
 
396
 
    def flush_revision_signature_indices(self, new_name, revision_index_length,
397
 
        signature_index_length):
398
 
        """Write out pending indices."""
399
 
        # write a revision index (might be empty)
400
 
        new_index_name = self._new_pack.revision_index_name(new_name)
401
 
        rev_index = GraphIndex(self._index_transport, new_index_name,
402
 
                revision_index_length)
 
428
    def refresh_revision_signature_indices(self):
 
429
        """Refresh the mappings for revisions."""
 
430
        index_map, index_list = self._make_index_map('.rix')
403
431
        if self.repo._revision_all_indices is None:
404
432
            # create a pack map for the autopack code - XXX finish
405
433
            # making a clear managed list of packs, indices and use
406
434
            # that in these mapping classes
407
 
            self.repo._revision_pack_map = self._make_index_map('.rix')[0]
 
435
            self.repo._revision_pack_map = index_map
408
436
        else:
409
 
            del self.repo._revision_pack_map[self._new_pack.revision_index]
410
 
            self.repo._revision_pack_map[rev_index] = (self._pack_tuple(new_name))
 
437
            # refresh the revision pack map dict without replacing the instance.
 
438
            self.repo._revision_pack_map.clear()
 
439
            self.repo._revision_pack_map.update(index_map)
411
440
            # revisions 'knit' accessed : update it.
412
 
            self.repo._revision_all_indices.insert_index(0, rev_index)
413
 
            # remove the write buffering index. XXX: API break
414
 
            # - clearly we need a remove_index call too.
415
 
            del self.repo._revision_all_indices._indices[1]
 
441
            # XXX: API break - clearly a 'replace' method would be good?
 
442
            self.repo._revision_all_indices._indices[:] = index_list
416
443
            # reset the knit access writer
417
444
            self.repo._revision_knit_access.set_writer(None, None, (None, None))
418
445
 
419
 
        # write a signatures index (might be empty)
420
 
        new_index_name = self._new_pack.signature_index_name(new_name)
421
 
        sig_index = GraphIndex(self._index_transport, new_index_name,
422
 
            signature_index_length)
 
446
        index_map, index_list = self._make_index_map('.six')
423
447
        if self.repo._signature_all_indices is not None:
424
 
            # sigatures 'knit' accessed : update it.
425
 
            self.repo._signature_all_indices.insert_index(0, sig_index)
426
 
            # remove the write buffering index. XXX: API break
427
 
            # - clearly we need a remove_index call too.
428
 
            del self.repo._signature_all_indices._indices[1]
 
448
            # signature 'knit' accessed : update it.
 
449
            # XXX: API break - clearly a 'replace' method would be good?
 
450
            self.repo._signature_all_indices._indices[:] = index_list
429
451
            # reset the knit access writer
430
452
            self.repo._signature_knit_access.set_writer(None, None, (None, None))
431
 
        return rev_index, sig_index,
432
453
 
433
 
    def flush_inventory_index(self, new_name, inventory_index_length):
434
 
        """Write the index out to new_name."""
435
 
        # write an index (might be empty)
436
 
        new_index_name = self._new_pack.inventory_index_name(new_name)
437
 
        inv_index = GraphIndex(self._index_transport, new_index_name,
438
 
            inventory_index_length)
 
454
    def refresh_inventory_index(self):
 
455
        """Refresh the inventory access index mappings."""
 
456
        index_map, index_list = self._make_index_map('.iix')
439
457
        if self.repo._inv_all_indices is not None:
440
 
            # inv 'knit' has been used, replace the mutated memory index
441
 
            # with the new on-disk one. XXX: is this really a good idea?
442
 
            # perhaps just keep using the memory one ?
443
 
            self.repo._inv_all_indices.insert_index(0, inv_index)
444
 
            # remove the write buffering index. XXX: API break
445
 
            # - clearly we need a remove_index call too.
446
 
            del self.repo._inv_all_indices._indices[1]
 
458
            # refresh the pack map dict without replacing the instance.
 
459
            self.repo._inv_pack_map.clear()
 
460
            self.repo._inv_pack_map.update(index_map)
 
461
            # invs 'knit' accessed : update it.
 
462
            # XXX: API break - clearly a 'replace' method would be good?
 
463
            self.repo._inv_all_indices._indices[:] = index_list
 
464
            # reset the knit access writer
447
465
            self.repo._inv_knit_access.set_writer(None, None, (None, None))
448
466
        else:
 
467
            # inventory knit not used, ensure the pack map is regenerated at
 
468
            # next use.
449
469
            self.repo._inv_pack_map = None
450
 
        return inv_index
451
470
 
452
 
    def flush_text_index(self, new_name, text_index_length):
453
 
        """Write the index out to new_name."""
454
 
        # write a revision index (might be empty)
455
 
        new_index_name = self._new_pack.text_index_name(new_name)
456
 
        txt_index = GraphIndex(self._index_transport, new_index_name,
457
 
            text_index_length)
 
471
    def refresh_text_index(self):
 
472
        """Refresh the text index mappings."""
 
473
        index_map, index_list = self._make_index_map('.tix')
458
474
        self.repo.weave_store._setup_knit(False)
459
475
        if self.repo._text_all_indices is not None:
460
 
            # text 'knits' have been used, replace the mutated memory index
461
 
            # with the new on-disk one. XXX: is this really a good idea?
462
 
            # perhaps just keep using the memory one ?
463
 
            self.repo._text_all_indices.insert_index(0, txt_index)
464
 
            # remove the write buffering index. XXX: API break
465
 
            # - clearly we need a remove_index call too.
466
 
            del self.repo._text_all_indices._indices[1]
467
 
        return txt_index
 
476
            # refresh the pack map dict without replacing the instance.
 
477
            self.repo._text_pack_map.clear()
 
478
            self.repo._text_pack_map.update(index_map)
 
479
            # invs 'knit' accessed : update it.
 
480
            # XXX: API break - clearly a 'replace' method would be good?
 
481
            self.repo._text_all_indices._indices[:] = index_list
468
482
 
469
483
    def create_pack_from_packs(self, packs, suffix, revision_ids=None):
470
484
        """Create a new pack by reading data from other packs.
499
513
            self._pack_transport, upload_suffix=suffix)
500
514
        random_name = new_pack.random_name
501
515
        if 'fetch' in debug.debug_flags:
502
 
            plain_pack_list = ['%s%s' % (a_pack.transport.base, a_pack.name)
 
516
            plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
503
517
                for a_pack in packs]
504
518
            if revision_ids is not None:
505
519
                rev_count = len(revision_ids)
618
632
            bytes = ''.join(buffer)
619
633
            write_stream.write(bytes)
620
634
            new_pack._hash.update(bytes)
621
 
        new_name = new_pack._hash.hexdigest()
622
635
        if not new_pack.data_inserted():
623
636
            # nothing was copied, discard the new pack.
624
637
            self._upload_transport.delete(random_name)
626
639
        # write indices
627
640
        new_pack.finish()
628
641
        # add to name
629
 
        self.allocate(new_name, new_pack.index_sizes[0],
 
642
        self.allocate(new_pack, new_pack.name, new_pack.index_sizes[0],
630
643
            new_pack.index_sizes[1], new_pack.index_sizes[2],
631
644
            new_pack.index_sizes[3])
632
645
        # rename into place. XXX: should rename each index too rather than just
633
646
        # uploading blind under the chosen name.
634
647
        write_stream.close()
635
 
        self._upload_transport.rename(random_name, '../packs/' + new_name + '.pack')
636
 
        result = ExistingPack(self._upload_transport.clone('../packs/'), new_name,
637
 
            new_pack.revision_index, new_pack.inventory_index, new_pack.text_index,
638
 
            new_pack.signature_index)
 
648
        self._upload_transport.rename(random_name, '../packs/' + new_pack.name + '.pack')
 
649
        result = new_pack
639
650
        if 'fetch' in debug.debug_flags:
640
651
            # XXX: size might be interesting?
641
652
            mutter('%s: create_pack: pack renamed into place: %s%s->%s%s t+%6.3fs',
642
653
                time.ctime(), self._upload_transport.base, random_name,
643
 
                result.transport, result.name,
 
654
                result.pack_transport, result.name,
644
655
                time.time() - new_pack.start_time)
645
656
        if 'fetch' in debug.debug_flags:
646
657
            # XXX: size might be interesting?
864
875
            self.add_pack_to_memory(result)
865
876
            return result
866
877
 
867
 
    def allocate(self, name, revision_index_length, inventory_index_length,
 
878
    def allocate(self, a_new_pack, name, revision_index_length, inventory_index_length,
868
879
        text_index_length, signature_index_length):
869
880
        """Allocate name in the list of packs.
870
881
 
950
961
        :param return: None.
951
962
        """
952
963
        for pack in packs:
953
 
            pack.transport.rename(pack.file_name(),
 
964
            pack.pack_transport.rename(pack.file_name(),
954
965
                '../obsolete_packs/' + pack.file_name())
955
966
            # TODO: Probably needs to know all possible indexes for this pack
956
967
            # - or maybe list the directory and move all indexes matching this
1045
1056
        for pack in packs:
1046
1057
            index = getattr(pack, index_attribute)
1047
1058
            indices.append(index)
1048
 
            pack_map[index] = (pack.transport, pack.file_name())
 
1059
            pack_map[index] = (pack.pack_transport, pack.file_name())
1049
1060
        return pack_map, indices
1050
1061
 
1051
1062
    def _index_contents(self, pack_map, key_filter=None):
1127
1138
    def _commit_write_group(self):
1128
1139
        if self._new_pack.data_inserted():
1129
1140
            self._open_pack_writer.end()
1130
 
            new_name = self._new_pack._hash.hexdigest()
1131
1141
            self._new_pack.finish()
1132
 
            txt_index = \
1133
 
                self.flush_text_index(new_name, self._new_pack.index_sizes[2])
1134
 
            inv_index = \
1135
 
                self.flush_inventory_index(new_name, self._new_pack.index_sizes[1])
1136
 
            rev_index, sig_index = \
1137
 
                self.flush_revision_signature_indices(new_name,
1138
 
                self._new_pack.index_sizes[0], self._new_pack.index_sizes[3])
1139
 
            new_pack = ExistingPack(self._upload_transport.clone('../packs/'),
1140
 
                new_name, rev_index, inv_index, txt_index, sig_index)
1141
1142
            self._write_stream.close()
1142
1143
            self._upload_transport.rename(self.repo._open_pack_tuple[1],
1143
 
                '../packs/' + new_name + '.pack')
 
1144
                '../packs/' + self._new_pack.name + '.pack')
1144
1145
            # If this fails, its a hash collision. We should:
1145
1146
            # - determine if its a collision or
1146
1147
            # - the same content or
1147
1148
            # - the existing name is not the actual hash - e.g.
1148
1149
            #   its a deliberate attack or data corruption has
1149
1150
            #   occuring during the write of that file.
1150
 
            self.allocate(new_name, self._new_pack.index_sizes[0],
 
1151
            self.allocate(self._new_pack, self._new_pack.name, self._new_pack.index_sizes[0],
1151
1152
                self._new_pack.index_sizes[1], self._new_pack.index_sizes[2],
1152
1153
                self._new_pack.index_sizes[3])
1153
1154
            self.repo._open_pack_tuple = None
1154
1155
            self._new_pack = None
1155
1156
            if not self.autopack():
1156
1157
                self._save_pack_names()
 
1158
            # now setup the maps we need to access data again.
 
1159
            self.refresh_text_index()
 
1160
            self.refresh_inventory_index()
 
1161
            self.refresh_revision_signature_indices()
1157
1162
        else:
1158
1163
            # remove the pending upload
1159
1164
            self._upload_transport.delete(self.repo._open_pack_tuple[1])
1358
1363
        if for_write:
1359
1364
            # a reused knit object for commit specifically.
1360
1365
            self.repo._text_knit = self.get_weave_or_empty(
1361
 
                'all-texts', self.repo.get_transaction(), for_write)
 
1366
                'all-texts', None, for_write)
1362
1367
        else:
1363
1368
            self.repo._text_knit = None
1364
1369