107
107
self.revision_index = revision_index
109
def get_revision_count(self):
110
return self.revision_index.key_count()
112
def inventory_index_name(self, name):
113
"""The inv index is the name + .iix."""
109
116
def revision_index_name(self, name):
110
117
"""The revision index is the name + .rix."""
111
118
return name + '.rix'
151
154
"""Get the file name for the pack on disk."""
152
155
return self.name + '.pack'
154
def get_revision_count(self):
155
return self.revision_index.key_count()
158
158
class NewPack(Pack):
159
159
"""An in memory proxy for a pack which is being created."""
162
Pack.__init__(self, InMemoryGraphIndex(1))
161
def __init__(self, upload_transport, index_transport, pack_transport,
163
"""Create a NewPack instance.
165
:param upload_transport: A writable transport for the pack to be
166
incrementally uploaded to.
167
:param index_transport: A writable transport for the pack's indices to
168
be written to when the pack is finished.
169
:param pack_transport: A writable transport for the pack to be renamed
170
to when the upload is complete.
171
:param upload_suffix: An optional suffix to be given to any temporary
172
files created during the pack creation. e.g '.autopack'
174
Pack.__init__(self, InMemoryGraphIndex(reference_lists=1))
175
# where should the new pack be opened
176
self.upload_transport = upload_transport
177
# where are indices written out to
178
self.index_transport = index_transport
179
# where is the pack renamed to when it is finished?
180
self.pack_transport = pack_transport
163
181
# tracks the content written to the .pack file.
164
182
self._hash = md5.new()
183
# a four-tuple with the length in bytes of the indices, once the pack
185
self.index_sizes = None
186
# the temporary pack file name.
187
self.random_name = rand_chars(20) + upload_suffix
188
# when was this pack started ?
189
self.start_time = time.time()
192
"""Finish the new pack.
195
- finalises the content
196
- assigns a name (the md5 of the content, currently)
197
- writes out the associated indices
198
- renames the pack into place.
199
- stores the index size tuple for the pack in the index_sizes
202
new_name = self._hash.hexdigest()
203
self.index_sizes = [None, None, None, None]
204
revision_index_name = self.revision_index_name(new_name)
205
self.index_sizes[0] = self.index_transport.put_file(
206
revision_index_name, self.revision_index.finish())
207
if 'fetch' in debug.debug_flags:
208
# XXX: size might be interesting?
209
mutter('%s: create_pack: wrote revision index: %s%s t+%6.3fs',
210
time.ctime(), self.upload_transport.base, self.random_name,
211
time.time() - self.start_time)
167
214
class RepositoryPackCollection(object):
297
344
self._execute_pack_operations(pack_operations)
300
def flush_revision_signature_indices(self, new_name):
347
def flush_revision_signature_indices(self, new_name, revision_index_length):
301
348
"""Write out pending indices."""
302
349
# write a revision index (might be empty)
303
new_index_name = self.repo._packs._new_pack.revision_index_name(new_name)
304
revision_index_length = self._index_transport.put_file(
305
new_index_name, self.repo._packs._new_pack.revision_index.finish())
350
new_index_name = self._new_pack.revision_index_name(new_name)
306
351
rev_index = GraphIndex(self._index_transport, new_index_name,
307
352
revision_index_length)
308
353
if self.repo._revision_all_indices is None:
311
356
# that in these mapping classes
312
357
self.repo._revision_pack_map = self._make_index_map('.rix')[0]
314
del self.repo._revision_pack_map[self.repo._packs._new_pack.revision_index]
359
del self.repo._revision_pack_map[self._new_pack.revision_index]
315
360
self.repo._revision_pack_map[rev_index] = (self._pack_tuple(new_name))
316
361
# revisions 'knit' accessed : update it.
317
362
self.repo._revision_all_indices.insert_index(0, rev_index)
322
367
self.repo._revision_knit_access.set_writer(None, None, (None, None))
324
369
# write a signatures index (might be empty)
325
new_index_name = NewPack().signature_index_name(new_name)
370
new_index_name = self._new_pack.signature_index_name(new_name)
326
371
signature_index_length = self._index_transport.put_file(
327
372
new_index_name, self.repo._signature_write_index.finish())
328
373
self.repo._signature_write_index = None
342
387
def flush_inventory_index(self, new_name):
343
388
"""Write the index out to new_name."""
344
389
# write an index (might be empty)
345
new_index_name = NewPack().inventory_index_name(new_name)
390
new_index_name = self._new_pack.inventory_index_name(new_name)
346
391
inventory_index_length = self._index_transport.put_file(
347
392
new_index_name, self.repo._inv_write_index.finish())
348
393
self.repo._inv_write_index = None
364
409
def flush_text_index(self, new_name):
365
410
"""Write the index out to new_name."""
366
411
# write a revision index (might be empty)
367
new_index_name = NewPack().text_index_name(new_name)
412
new_index_name = self._new_pack.text_index_name(new_name)
368
413
text_index_length = self._index_transport.put_file(
369
414
new_index_name, self.repo._text_write_index.finish())
370
415
txt_index = GraphIndex(self._index_transport, new_index_name,
404
449
if getattr(self.repo, '_open_pack_tuple', None) is not None:
405
450
raise errors.BzrError('call to create_pack_from_packs while '
406
451
'another pack is being written.')
452
if self._new_pack is not None:
453
raise errors.BzrError('call to create_pack_from_packs while '
454
'another pack is being written.')
407
455
if revision_ids is not None and len(revision_ids) == 0:
408
456
# silly fetch request.
410
random_name = self._random_name() + suffix
458
new_pack = NewPack(self._upload_transport, self._index_transport,
459
self._pack_transport, upload_suffix=suffix)
460
random_name = new_pack.random_name
411
461
if 'fetch' in debug.debug_flags:
412
462
plain_pack_list = ['%s%s' % (a_pack.transport.base, a_pack.name)
413
463
for a_pack in packs]
419
469
'%s%s %s revisions wanted %s t=0',
420
470
time.ctime(), self._upload_transport.base, random_name,
421
471
plain_pack_list, rev_count)
422
start_time = time.time()
423
472
write_stream = self._upload_transport.open_write_stream(random_name)
424
473
if 'fetch' in debug.debug_flags:
425
474
mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
426
475
time.ctime(), self._upload_transport.base, random_name,
427
time.time() - start_time)
476
time.time() - new_pack.start_time)
430
478
def write_data(bytes, update=new_pack._hash.update, write=write_stream.write):
431
479
buffer.append(bytes)
453
500
revision_nodes = self._index_contents(revision_index_map, revision_keys)
454
501
# copy revision keys and adjust values
455
502
list(self._copy_nodes_graph(revision_nodes, revision_index_map, writer,
503
new_pack.revision_index))
457
504
if 'fetch' in debug.debug_flags:
458
505
mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
459
506
time.ctime(), self._upload_transport.base, random_name,
460
revision_index.key_count(),
461
time.time() - start_time)
507
new_pack.revision_index.key_count(),
508
time.time() - new_pack.start_time)
462
509
# select inventory keys
463
510
inv_keys = revision_keys # currently the same keyspace, and note that
464
511
# querying for keys here could introduce a bug where an inventory item
515
562
mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
516
563
time.ctime(), self._upload_transport.base, random_name,
517
564
text_index.key_count(),
518
time.time() - start_time)
565
time.time() - new_pack.start_time)
519
566
# select signature keys
520
567
signature_filter = revision_keys # same keyspace
521
568
signature_index_map = self._packs_list_to_pack_map_and_index_list(
528
575
mutter('%s: create_pack: revision signatures copied: %s%s %d items t+%6.3fs',
529
576
time.ctime(), self._upload_transport.base, random_name,
530
577
signature_index.key_count(),
531
time.time() - start_time)
578
time.time() - new_pack.start_time)
532
579
# finish the pack
537
584
new_pack._hash.update(bytes)
538
585
new_name = new_pack._hash.hexdigest()
539
586
# if nothing has been written, discard the new pack.
540
if 0 == sum((revision_index.key_count(),
587
if 0 == sum((new_pack.get_revision_count(),
541
588
inv_index.key_count(),
542
589
text_index.key_count(),
543
590
signature_index.key_count(),
545
592
self._upload_transport.delete(random_name)
549
595
index_transport = self._index_transport
550
rev_index_name = new_pack.revision_index_name(new_name)
551
revision_index_length = index_transport.put_file(rev_index_name,
552
revision_index.finish())
553
if 'fetch' in debug.debug_flags:
554
# XXX: size might be interesting?
555
mutter('%s: create_pack: wrote revision index: %s%s t+%6.3fs',
556
time.ctime(), self._upload_transport.base, random_name,
557
time.time() - start_time)
597
revision_index_length = new_pack.index_sizes[0]
558
598
inv_index_name = new_pack.inventory_index_name(new_name)
559
599
inventory_index_length = index_transport.put_file(inv_index_name,
560
600
inv_index.finish())
562
602
# XXX: size might be interesting?
563
603
mutter('%s: create_pack: wrote inventory index: %s%s t+%6.3fs',
564
604
time.ctime(), self._upload_transport.base, random_name,
565
time.time() - start_time)
605
time.time() - new_pack.start_time)
566
606
text_index_name = new_pack.text_index_name(new_name)
567
607
text_index_length = index_transport.put_file(text_index_name,
568
608
text_index.finish())
570
610
# XXX: size might be interesting?
571
611
mutter('%s: create_pack: wrote file texts index: %s%s t+%6.3fs',
572
612
time.ctime(), self._upload_transport.base, random_name,
573
time.time() - start_time)
613
time.time() - new_pack.start_time)
574
614
signature_index_name = new_pack.signature_index_name(new_name)
575
615
signature_index_length = index_transport.put_file(signature_index_name,
576
616
signature_index.finish())
578
618
# XXX: size might be interesting?
579
619
mutter('%s: create_pack: wrote revision signatures index: %s%s t+%6.3fs',
580
620
time.ctime(), self._upload_transport.base, random_name,
581
time.time() - start_time)
621
time.time() - new_pack.start_time)
583
623
self.allocate(new_name, revision_index_length, inventory_index_length,
584
624
text_index_length, signature_index_length)
587
627
write_stream.close()
588
628
self._upload_transport.rename(random_name, '../packs/' + new_name + '.pack')
589
629
result = ExistingPack(self._upload_transport.clone('../packs/'), new_name,
590
revision_index, inv_index, text_index, signature_index)
630
new_pack.revision_index, inv_index, text_index, signature_index)
591
631
if 'fetch' in debug.debug_flags:
592
632
# XXX: size might be interesting?
593
633
mutter('%s: create_pack: pack renamed into place: %s%s->%s%s t+%6.3fs',
594
634
time.ctime(), self._upload_transport.base, random_name,
595
635
result.transport, result.name,
596
time.time() - start_time)
636
time.time() - new_pack.start_time)
597
637
if 'fetch' in debug.debug_flags:
598
638
# XXX: size might be interesting?
599
639
mutter('%s: create_pack: finished: %s%s t+%6.3fs',
600
640
time.ctime(), self._upload_transport.base, random_name,
601
time.time() - start_time)
641
time.time() - new_pack.start_time)
604
644
def _execute_pack_operations(self, pack_operations):
1061
1097
# Do not permit preparation for writing if we're not in a 'write lock'.
1062
1098
if not self.repo.is_write_locked():
1063
1099
raise errors.NotWriteLocked(self)
1064
self._new_pack = NewPack()
1100
self._new_pack = NewPack(self._upload_transport, self._index_transport,
1101
self._pack_transport, upload_suffix='.pack')
1066
1103
def _start_write_group(self):
1067
random_name = self._random_name()
1068
self.repo._open_pack_tuple = (self._upload_transport, random_name + '.pack')
1069
write_stream = self._upload_transport.open_write_stream(random_name + '.pack')
1105
random_name = self._new_pack.random_name
1106
self.repo._open_pack_tuple = (self._upload_transport, random_name)
1107
write_stream = self._upload_transport.open_write_stream(random_name)
1070
1108
self._write_stream = write_stream
1072
1109
def write_data(bytes, write=write_stream.write,
1073
1110
update=self._new_pack._hash.update):
1091
1128
if data_inserted:
1092
1129
self._open_pack_writer.end()
1093
1130
new_name = self._new_pack._hash.hexdigest()
1131
self._new_pack.finish()
1094
1132
txt_index, text_index_length = self.flush_text_index(new_name)
1095
1133
inv_index, inventory_index_length = \
1096
1134
self.flush_inventory_index(new_name)
1097
1135
rev_index, revision_index_length, \
1098
1136
sig_index, signature_index_length = \
1099
self.flush_revision_signature_indices(new_name)
1137
self.flush_revision_signature_indices(new_name,
1138
self._new_pack.index_sizes[0])
1100
1139
new_pack = ExistingPack(self._upload_transport.clone('../packs/'),
1101
1140
new_name, rev_index, inv_index, txt_index, sig_index)
1102
1141
self._write_stream.close()
1154
1194
if getattr(self.repo, '_revision_knit', None) is not None:
1155
1195
return self.repo._revision_knit
1156
1196
pack_map, indices = self.repo._packs._make_index_map('.rix')
1157
if self.repo.is_in_write_group():
1197
if self.repo._packs._new_pack is not None:
1158
1198
# allow writing: queue writes to a new index
1159
1199
indices.insert(0, self.repo._packs._new_pack.revision_index)
1160
1200
pack_map[self.repo._packs._new_pack.revision_index] = self.repo._open_pack_tuple