443
443
method, noeol = record_details
444
444
if method == 'line-delta':
445
assert base_content is not None
446
445
if copy_base_content:
447
446
content = base_content.copy()
830
829
# put them in anywhere, but we hope that sending them soon
831
830
# after the fulltext will give good locality in the receiver
832
831
ready_to_send[:0] = deferred.pop(version_id)
833
assert len(deferred) == 0, \
834
"Still have compressed child versions waiting to be sent"
832
if not (len(deferred) == 0):
833
raise AssertionError("Still have compressed child versions waiting to be sent")
835
834
# XXX: The stream format is such that we cannot stream it - we have to
836
835
# know the length of all the data a-priori.
840
839
(version_id2, options, _, parents) in \
841
840
izip(self._data.read_records_iter_raw(copy_queue_records),
842
841
temp_version_list):
843
assert version_id == version_id2, \
844
'logic error, inconsistent results'
842
if not (version_id == version_id2):
843
raise AssertionError('logic error, inconsistent results')
845
844
raw_datum.append(raw_data)
846
845
result_version_list.append(
847
846
(version_id, options, len(raw_data), parents))
880
# Double index lookups here : need a unified api ?
879
# We end up doing multiple index lookups here for parents details and
880
# disk layout details - we need a unified api ?
881
881
parent_map = self.get_parent_map(versions)
882
882
absent_versions = set(versions) - set(parent_map)
883
883
if ordering == 'topological':
889
889
present_versions = [version for version in versions if version in
891
891
position_map = self._get_components_positions(present_versions)
892
# c = component_id, r = record_details, i_m = index_memo, n = next
893
892
records = [(version, position_map[version][1]) for version in
894
893
present_versions]
1013
1012
# We received a line-delta record for a non-delta knit.
1014
1013
# Convert it to a fulltext.
1015
1014
gzip_bytes = reader_callable(length)
1016
lines, sha1 = self._data._parse_record(
1017
version_id, gzip_bytes)
1018
delta = self.factory.parse_line_delta(lines,
1020
content = self.factory.make(
1021
self.get_lines(parents[0]), parents[0])
1022
content.apply_delta(delta, version_id)
1023
digest, len, content = self.add_lines(
1024
version_id, parents, content.text())
1026
raise errors.VersionedFileInvalidChecksum(version)
1015
self._convert_line_delta_to_fulltext(
1016
gzip_bytes, version_id, parents)
1029
1019
self._add_raw_records(
1030
1020
[(version_id, options, parents, length)],
1031
1021
reader_callable(length))
1023
def _convert_line_delta_to_fulltext(self, gzip_bytes, version_id, parents):
1024
lines, sha1 = self._data._parse_record(version_id, gzip_bytes)
1025
delta = self.factory.parse_line_delta(lines, version_id)
1026
content = self.factory.make(self.get_lines(parents[0]), parents[0])
1027
content.apply_delta(delta, version_id)
1028
digest, len, content = self.add_lines(
1029
version_id, parents, content.text())
1031
raise errors.VersionedFileInvalidChecksum(version_id)
1033
1033
def _knit_from_datastream(self, (format, data_list, reader_callable)):
1034
1034
"""Create a knit object from a data stream.
1077
1077
convertibles = set(["knit-annotated-delta-gz",
1078
1078
"knit-annotated-ft-gz"])
1079
# The set of types we can cheaply adapt without needing basis texts.
1079
1080
native_types = set()
1080
1081
native_types.add("knit-%sdelta-gz" % annotated)
1081
1082
native_types.add("knit-%sft-gz" % annotated)
1082
1083
knit_types = native_types.union(convertibles)
1084
# Buffered index entries that we can't add immediately because their
1085
# Buffer all index entries that we can't add immediately because their
1085
1086
# basis parent is missing. We don't buffer all because generating
1086
1087
# annotations may require access to some of the new records. However we
1087
1088
# can't generate annotations from new deltas until their basis parent
1088
1089
# is present anyway, so we get away with not needing an index that
1089
# reports on the new keys.
1090
# includes the new keys.
1090
1091
# key = basis_parent, value = index entry to add
1091
1092
buffered_index_entries = {}
1092
1093
for record in stream:
1337
1338
# I/O and the time spend applying deltas.
1338
1339
delta = self._check_should_delta(present_parents)
1340
assert isinstance(version_id, str)
1341
1341
content = self.factory.make(lines, version_id)
1342
1342
if delta or (self.factory.annotated and len(present_parents) > 0):
1343
1343
# Merge annotations from parent texts if needed.
1516
1516
enumerate(self._data.read_records_iter(version_id_records)):
1517
1517
pb.update('Walking content.', version_idx, total)
1518
1518
method = self._index.get_method(version_id)
1520
assert method in ('fulltext', 'line-delta')
1521
1519
if method == 'fulltext':
1522
1520
line_iterator = self.factory.get_fulltext_content(data)
1521
elif method == 'line-delta':
1524
1522
line_iterator = self.factory.get_linedelta_content(data)
1524
raise ValueError('invalid method %r' % (method,))
1525
1525
# XXX: It might be more efficient to yield (version_id,
1526
1526
# line_iterator) in the future. However for now, this is a simpler
1527
1527
# change to integrate into the rest of the codebase. RBC 20071110
1840
1840
self._version_list_to_index(parents))
1841
assert isinstance(line, str), \
1842
'content must be utf-8 encoded: %r' % (line,)
1843
1841
lines.append(line)
1844
1842
self._cache_version(version_id, options, pos, size, tuple(parents))
1845
1843
if not self._need_to_create:
2105
2103
compression_parents = an_entry[3][1]
2106
2104
if not compression_parents:
2108
assert len(compression_parents) == 1
2109
2106
return compression_parents[0]
2111
2108
def _get_method(self, node):
2292
2289
tuple - (index, pos, length), where the index field is always None
2293
2290
for the .knit access method.
2295
assert type(raw_data) == str, \
2296
'data must be plain bytes was %s' % type(raw_data)
2297
2292
if not self._need_to_create:
2298
2293
base = self._transport.append_bytes(self._filename, raw_data)
2376
2371
tuple - (index, pos, length), where the index field is the
2377
2372
write_index object supplied to the PackAccess object.
2379
assert type(raw_data) == str, \
2380
'data must be plain bytes was %s' % type(raw_data)
2383
2376
for size in sizes:
2476
2469
# use a generator for memory friendliness
2477
2470
for from_backing_knit, version_id, start, end in memos_for_retrieval:
2478
2471
if not from_backing_knit:
2479
assert version_id is self.stream_index
2472
if version_id is not self.stream_index:
2473
raise AssertionError()
2480
2474
yield self.data[start:end]
2482
2476
# we have been asked to thunk. This thunking only occurs when
2704
2698
dense_lines or lines,
2705
2699
["end %s\n" % version_id]))
2706
assert bytes.__class__ == str
2707
2700
compressed_bytes = bytes_to_gzip(bytes)
2708
2701
return len(compressed_bytes), compressed_bytes
2904
2897
def join(self, pb=None, msg=None, version_ids=None, ignore_missing=False):
2905
2898
"""See InterVersionedFile.join."""
2906
assert isinstance(self.source, KnitVersionedFile)
2907
assert isinstance(self.target, KnitVersionedFile)
2909
2899
# If the source and target are mismatched w.r.t. annotations vs
2910
2900
# plain, the data needs to be converted accordingly
2911
2901
if self.source.factory.annotated == self.target.factory.annotated:
2957
2947
# * already have it or
2958
2948
# * have it scheduled already
2959
2949
# otherwise we don't care
2960
assert (self.target.has_version(parent) or
2950
if not (self.target.has_version(parent) or
2961
2951
parent in copy_set or
2962
not self.source.has_version(parent))
2952
not self.source.has_version(parent)):
2953
raise AssertionError("problem joining parent %r "
2955
% (parent, self.source, self.target))
2963
2956
index_memo = self.source._index.get_position(version_id)
2964
2957
copy_queue_records.append((version_id, index_memo))
2965
2958
copy_queue.append((version_id, options, parents))
2974
2967
(version_id2, options, parents) in \
2975
2968
izip(self.source._data.read_records_iter_raw(copy_queue_records),
2977
assert version_id == version_id2, 'logic error, inconsistent results'
2970
if not (version_id == version_id2):
2971
raise AssertionError('logic error, inconsistent results')
2978
2972
count = count + 1
2979
2973
pb.update("Joining knit", count, total)
3024
3018
def join(self, pb=None, msg=None, version_ids=None, ignore_missing=False):
3025
3019
"""See InterVersionedFile.join."""
3026
assert isinstance(self.source, bzrlib.weave.Weave)
3027
assert isinstance(self.target, KnitVersionedFile)
3029
3020
version_ids = self._get_source_version_ids(version_ids, ignore_missing)
3031
3022
if not version_ids:
3057
3048
# check that its will be a consistent copy:
3058
3049
for parent in parents:
3059
3050
# if source has the parent, we must already have it
3060
assert (self.target.has_version(parent))
3051
if not self.target.has_version(parent):
3052
raise AssertionError("%r does not have parent %r"
3053
% (self.target, parent))
3061
3054
self.target.add_lines(
3062
3055
version_id, parents, self.source.get_lines(version_id))
3063
3056
count = count + 1
3233
3226
# add a key, no parents
3234
3227
self._revision_id_graph[missing_version] = ()
3235
3228
pending.discard(missing_version) # don't look for it
3236
# XXX: This should probably be a real exception, as it is a data
3238
assert not self._ghosts.intersection(self._compression_children), \
3239
"We cannot have nodes which have a compression parent of a ghost."
3229
if self._ghosts.intersection(self._compression_children):
3231
"We cannot have nodes which have a ghost compression parent:\n"
3233
"compression children: %r"
3234
% (self._ghosts, self._compression_children))
3240
3235
# Cleanout anything that depends on a ghost so that we don't wait for
3241
3236
# the ghost to show up
3242
3237
for node in self._ghosts:
3270
3265
if len(parent_ids) == 0:
3271
3266
# There are no parents for this node, so just add it
3272
3267
# TODO: This probably needs to be decoupled
3273
assert compression_parent is None
3274
3268
fulltext_content, delta = self._knit.factory.parse_record(
3275
3269
rev_id, record, record_details, None)
3276
3270
fulltext = self._add_fulltext_content(rev_id, fulltext_content)
3287
3281
record_details) = self._all_build_details[rev_id]
3288
3282
if compression_parent is not None:
3289
3283
comp_children = self._compression_children[compression_parent]
3290
assert rev_id in comp_children
3284
if rev_id not in comp_children:
3285
raise AssertionError("%r not in compression children %r"
3286
% (rev_id, comp_children))
3291
3287
# If there is only 1 child, it is safe to reuse this
3293
3289
reuse_content = (len(comp_children) == 1