108
108
def encode_name(content_kind, revision_id, file_id=None):
109
109
"""Encode semantic ids as a container name"""
110
if content_kind not in ('revision', 'file', 'inventory', 'signature',
112
raise ValueError(content_kind)
110
assert content_kind in ('revision', 'file', 'inventory', 'signature',
113
113
if content_kind == 'file':
115
raise AssertionError()
114
assert file_id is not None
117
if file_id is not None:
118
raise AssertionError()
116
assert file_id is None
119
117
if content_kind == 'info':
120
if revision_id is not None:
121
raise AssertionError()
122
elif revision_id is None:
123
raise AssertionError()
118
assert revision_id is None
120
assert revision_id is not None
124
121
names = [n.replace('/', '//') for n in
125
122
(content_kind, revision_id, file_id) if n is not None]
126
123
return '/'.join(names)
150
def __init__(self, fileobj, stream_input=True):
153
:param fileobj: a file containing a bzip-encoded container
154
:param stream_input: If True, the BundleReader stream input rather than
155
reading it all into memory at once. Reading it into memory all at
156
once is (currently) faster.
147
def __init__(self, fileobj):
158
148
line = fileobj.readline()
160
150
fileobj.readline()
161
151
self.patch_lines = []
163
source_file = iterablefile.IterableFile(self.iter_decode(fileobj))
165
source_file = StringIO(bz2.decompress(fileobj.read()))
166
self._container_file = source_file
152
self._container = pack.ContainerReader(
153
iterablefile.IterableFile(self.iter_decode(fileobj)))
169
156
def iter_decode(fileobj):
170
157
"""Iterate through decoded fragments of the file"""
171
158
decompressor = bz2.BZ2Decompressor()
172
159
for line in fileobj:
174
yield decompressor.decompress(line)
160
yield decompressor.decompress(line)
179
163
def decode_name(name):
205
189
:return: a generator of (bytes, metadata, content_kind, revision_id,
208
iterator = pack.iter_records_from_file(self._container_file)
209
for names, bytes in iterator:
192
iterator = self._container.iter_records()
193
for names, meta_bytes in iterator:
210
194
if len(names) != 1:
211
195
raise errors.BadBundle('Record has %d names instead of 1'
213
metadata = bencode.bdecode(bytes)
197
metadata = bencode.bdecode(meta_bytes(None))
214
198
if metadata['storage_kind'] == 'header':
217
201
_unused, bytes = iterator.next()
218
203
yield (bytes, metadata) + self.decode_name(names[0][0])
221
class BundleSerializerV4(bundle_serializer.BundleSerializer):
206
class BundleSerializerV4(serializer.BundleSerializer):
222
207
"""Implement the high-level bundle interface"""
224
209
def write(self, repository, revision_ids, forced_bases, fileobj):
270
255
self.repository = repository
271
256
bundle = BundleWriter(fileobj)
272
257
self.bundle = bundle
258
self.base_ancestry = set(repository.get_ancestry(base,
273
260
if revision_ids is not None:
274
261
self.revision_ids = revision_ids
276
graph = repository.get_graph()
277
revision_ids = graph.find_unique_ancestors(target, [base])
279
parents = graph.get_parent_map(revision_ids)
280
self.revision_ids = [r for r in revision_ids if r in parents]
281
self.revision_keys = set([(revid,) for revid in self.revision_ids])
263
revision_ids = set(repository.get_ancestry(target,
265
self.revision_ids = revision_ids.difference(self.base_ancestry)
283
267
def do_write(self):
284
268
"""Write all data to the bundle"""
285
trace.note('Bundling %d revision(s).', len(self.revision_ids))
286
self.repository.lock_read()
291
self.write_revisions()
294
self.repository.unlock()
272
self.write_revisions()
295
274
return self.revision_ids
297
276
def write_info(self):
302
281
self.bundle.add_info_record(serializer=serializer_format,
303
282
supports_rich_root=supports_rich_root)
284
def iter_file_revisions(self):
285
"""Iterate through all relevant revisions of all files.
287
This is the correct implementation, but is not compatible with bzr.dev,
288
because certain old revisions were not converted correctly, and have
289
the wrong "revision" marker in inventories.
291
transaction = self.repository.get_transaction()
292
altered = self.repository.fileids_altered_by_revision_ids(
294
for file_id, file_revision_ids in altered.iteritems():
295
vf = self.repository.weave_store.get_weave(file_id, transaction)
296
yield vf, file_id, file_revision_ids
298
def iter_file_revisions_aggressive(self):
299
"""Iterate through all relevant revisions of all files.
301
This uses the standard iter_file_revisions to determine what revisions
302
are referred to by inventories, but then uses the versionedfile to
303
determine what the build-dependencies of each required revision.
305
All build dependencies which are not ancestors of the base revision
308
for vf, file_id, file_revision_ids in self.iter_file_revisions():
309
new_revision_ids = set()
310
pending = list(file_revision_ids)
311
while len(pending) > 0:
312
revision_id = pending.pop()
313
if revision_id in new_revision_ids:
315
if revision_id in self.base_ancestry:
317
new_revision_ids.add(revision_id)
318
pending.extend(vf.get_parents(revision_id))
319
yield vf, file_id, new_revision_ids
305
321
def write_files(self):
306
322
"""Write bundle records for all revisions of all files"""
308
altered_fileids = self.repository.fileids_altered_by_revision_ids(
310
for file_id, revision_ids in altered_fileids.iteritems():
311
for revision_id in revision_ids:
312
text_keys.append((file_id, revision_id))
313
self._add_mp_records_keys('file', self.repository.texts, text_keys)
323
for vf, file_id, revision_ids in self.iter_file_revisions_aggressive():
324
self.add_mp_records('file', file_id, vf, revision_ids)
315
326
def write_revisions(self):
316
327
"""Write bundle records for all revisions and signatures"""
317
inv_vf = self.repository.inventories
318
revision_order = [key[-1] for key in multiparent.topo_iter_keys(inv_vf,
328
inv_vf = self.repository.get_inventory_weave()
329
revision_order = list(multiparent.topo_iter(inv_vf, self.revision_ids))
320
330
if self.target is not None and self.target in self.revision_ids:
321
331
revision_order.remove(self.target)
322
332
revision_order.append(self.target)
323
self._add_mp_records_keys('inventory', inv_vf, [(revid,) for revid in revision_order])
324
parent_map = self.repository.get_parent_map(revision_order)
325
revision_to_str = self.repository._serializer.write_revision_to_string
326
revisions = self.repository.get_revisions(revision_order)
327
for revision in revisions:
328
revision_id = revision.revision_id
329
parents = parent_map.get(revision_id, None)
330
revision_text = revision_to_str(revision)
333
self.add_mp_records('inventory', None, inv_vf, revision_order)
334
parents_list = self.repository.get_parents(revision_order)
335
for parents, revision_id in zip(parents_list, revision_order):
336
revision_text = self.repository.get_revision_xml(revision_id)
331
337
self.bundle.add_fulltext_record(revision_text, parents,
332
338
'revision', revision_id)
352
358
base = parents[0]
353
359
return base, target
355
def _add_mp_records_keys(self, repo_kind, vf, keys):
361
def add_mp_records(self, repo_kind, file_id, vf, revision_ids):
356
362
"""Add multi-parent diff records to a bundle"""
357
ordered_keys = list(multiparent.topo_iter_keys(vf, keys))
358
mpdiffs = vf.make_mpdiffs(ordered_keys)
359
sha1s = vf.get_sha1s(ordered_keys)
360
parent_map = vf.get_parent_map(ordered_keys)
361
for mpdiff, item_key, in zip(mpdiffs, ordered_keys):
362
sha1 = sha1s[item_key]
363
parents = [key[-1] for key in parent_map[item_key]]
363
revision_ids = list(multiparent.topo_iter(vf, revision_ids))
364
mpdiffs = vf.make_mpdiffs(revision_ids)
365
sha1s = vf.get_sha1s(revision_ids)
366
for mpdiff, revision_id, sha1, in zip(mpdiffs, revision_ids, sha1s):
367
parents = vf.get_parents(revision_id)
364
368
text = ''.join(mpdiff.to_patch())
365
# Infer file id records as appropriate.
366
if len(item_key) == 2:
367
file_id = item_key[0]
370
369
self.bundle.add_multiparent_record(text, sha1, parents, repo_kind,
371
item_key[-1], file_id)
370
revision_id, file_id)
374
373
class BundleInfoV4(object):
383
382
def install(self, repository):
384
383
return self.install_revisions(repository)
386
def install_revisions(self, repository, stream_input=True):
387
"""Install this bundle's revisions into the specified repository
389
:param target_repo: The repository to install into
390
:param stream_input: If True, will stream input rather than reading it
391
all into memory at once. Reading it into memory all at once is
385
def install_revisions(self, repository):
386
"""Install this bundle's revisions into the specified repository"""
394
387
repository.lock_write()
396
ri = RevisionInstaller(self.get_bundle_reader(stream_input),
389
ri = RevisionInstaller(self.get_bundle_reader(),
397
390
self._serializer, repository)
398
391
return ri.install()
457
444
self._info = None
459
446
def install(self):
460
"""Perform the installation.
462
Must be called with the Repository locked.
464
self._repository.start_write_group()
466
result = self._install_in_write_group()
468
self._repository.abort_write_group()
470
self._repository.commit_write_group()
473
def _install_in_write_group(self):
447
"""Perform the installation"""
474
448
current_file = None
475
449
current_versionedfile = None
476
450
pending_file_records = []
478
pending_inventory_records = []
479
451
added_inv = set()
480
452
target_revision = None
481
453
for bytes, metadata, repo_kind, revision_id, file_id in\
482
454
self._container.iter_records():
483
455
if repo_kind == 'info':
484
if self._info is not None:
485
raise AssertionError()
456
assert self._info is None
486
457
self._handle_info(metadata)
487
if (pending_file_records and
488
(repo_kind, file_id) != ('file', current_file)):
489
# Flush the data for a single file - prevents memory
490
# spiking due to buffering all files in memory.
491
self._install_mp_records_keys(self._repository.texts,
458
if repo_kind != 'file':
459
self._install_mp_records(current_versionedfile,
492
460
pending_file_records)
493
461
current_file = None
494
del pending_file_records[:]
495
if len(pending_inventory_records) > 0 and repo_kind != 'inventory':
496
self._install_inventory_records(pending_inventory_records)
497
pending_inventory_records = []
498
if repo_kind == 'inventory':
499
pending_inventory_records.append(((revision_id,), metadata, bytes))
500
if repo_kind == 'revision':
501
target_revision = revision_id
502
self._install_revision(revision_id, metadata, bytes)
503
if repo_kind == 'signature':
504
self._install_signature(revision_id, metadata, bytes)
462
current_versionedfile = None
463
pending_file_records = []
464
if repo_kind == 'inventory':
465
self._install_inventory(revision_id, metadata, bytes)
466
if repo_kind == 'revision':
467
target_revision = revision_id
468
self._install_revision(revision_id, metadata, bytes)
469
if repo_kind == 'signature':
470
self._install_signature(revision_id, metadata, bytes)
505
471
if repo_kind == 'file':
506
current_file = file_id
507
pending_file_records.append(((file_id, revision_id), metadata, bytes))
508
self._install_mp_records_keys(self._repository.texts, pending_file_records)
472
if file_id != current_file:
473
self._install_mp_records(current_versionedfile,
474
pending_file_records)
475
current_file = file_id
476
current_versionedfile = \
477
self._repository.weave_store.get_weave_or_empty(
478
file_id, self._repository.get_transaction())
479
pending_file_records = []
480
if revision_id in current_versionedfile:
482
pending_file_records.append((revision_id, metadata, bytes))
483
self._install_mp_records(current_versionedfile, pending_file_records)
509
484
return target_revision
511
486
def _handle_info(self, info):
526
501
records if r not in versionedfile]
527
502
versionedfile.add_mpdiffs(vf_records)
529
def _install_mp_records_keys(self, versionedfile, records):
530
d_func = multiparent.MultiParent.from_patch
532
for key, meta, text in records:
533
# Adapt to tuple interface: A length two key is a file_id,
534
# revision_id pair, a length 1 key is a
535
# revision/signature/inventory. We need to do this because
536
# the metadata extraction from the bundle has not yet been updated
537
# to use the consistent tuple interface itself.
542
parents = [prefix + (parent,) for parent in meta['parents']]
543
vf_records.append((key, parents, meta['sha1'], d_func(text)))
544
versionedfile.add_mpdiffs(vf_records)
546
def _install_inventory_records(self, records):
504
def _install_inventory(self, revision_id, metadata, text):
505
vf = self._repository.get_inventory_weave()
506
if revision_id in vf:
508
parent_ids = metadata['parents']
547
509
if self._info['serializer'] == self._repository._serializer.format_num:
548
return self._install_mp_records_keys(self._repository.inventories,
550
for key, metadata, bytes in records:
551
revision_id = key[-1]
552
parent_ids = metadata['parents']
553
parents = [self._repository.get_inventory(p)
555
p_texts = [self._source_serializer.write_inventory_to_string(p)
557
target_lines = multiparent.MultiParent.from_patch(bytes).to_lines(
559
sha1 = osutils.sha_strings(target_lines)
560
if sha1 != metadata['sha1']:
561
raise errors.BadBundle("Can't convert to target format")
562
target_inv = self._source_serializer.read_inventory_from_string(
563
''.join(target_lines))
564
self._handle_root(target_inv, parent_ids)
566
self._repository.add_inventory(revision_id, target_inv,
568
except errors.UnsupportedInventoryKind:
569
raise errors.IncompatibleRevision(repr(self._repository))
510
return self._install_mp_records(vf, [(revision_id, metadata,
512
parents = [self._repository.get_inventory(p)
514
parent_texts = [self._source_serializer.write_inventory_to_string(p)
516
target_lines = multiparent.MultiParent.from_patch(text).to_lines(
518
sha1 = osutils.sha_strings(target_lines)
519
if sha1 != metadata['sha1']:
520
raise errors.BadBundle("Can't convert to target format")
521
target_inv = self._source_serializer.read_inventory_from_string(
522
''.join(target_lines))
523
self._handle_root(target_inv, parent_ids)
525
self._repository.add_inventory(revision_id, target_inv, parent_ids)
526
except errors.UnsupportedInventoryKind:
527
raise errors.IncompatibleRevision(repr(self._repository))
571
529
def _handle_root(self, target_inv, parent_ids):
572
530
revision_id = target_inv.revision_id
573
531
if self.update_root:
574
text_key = (target_inv.root.file_id, revision_id)
575
parent_keys = [(target_inv.root.file_id, parent) for
576
parent in parent_ids]
577
self._repository.texts.add_lines(text_key, parent_keys, [])
532
target_inv.root.revision = revision_id
533
store = self._repository.weave_store
534
transaction = self._repository.get_transaction()
535
vf = store.get_weave_or_empty(target_inv.root.file_id, transaction)
536
vf.add_lines(revision_id, parent_ids, [])
578
537
elif not self._repository.supports_rich_root():
579
538
if target_inv.root.revision != revision_id:
580
539
raise errors.IncompatibleRevision(repr(self._repository))
582
542
def _install_revision(self, revision_id, metadata, text):
583
543
if self._repository.has_revision(revision_id):
585
revision = self._source_serializer.read_revision_from_string(text)
586
self._repository.add_revision(revision.revision_id, revision)
545
self._repository._add_revision_text(revision_id, text)
588
547
def _install_signature(self, revision_id, metadata, text):
589
548
transaction = self._repository.get_transaction()
590
if self._repository.has_signature_for_revision_id(revision_id):
549
if self._repository._revision_store.has_signature(revision_id,
592
self._repository.add_signature_text(revision_id, text)
552
self._repository._revision_store.add_revision_signature_text(
553
revision_id, text, transaction)