449
449
method, noeol = record_details
450
450
if method == 'line-delta':
451
assert base_content is not None
452
451
if copy_base_content:
453
452
content = base_content.copy()
836
835
# put them in anywhere, but we hope that sending them soon
837
836
# after the fulltext will give good locality in the receiver
838
837
ready_to_send[:0] = deferred.pop(version_id)
839
assert len(deferred) == 0, \
840
"Still have compressed child versions waiting to be sent"
838
if not (len(deferred) == 0):
839
raise AssertionError("Still have compressed child versions waiting to be sent")
841
840
# XXX: The stream format is such that we cannot stream it - we have to
842
841
# know the length of all the data a-priori.
846
845
(version_id2, options, _, parents) in \
847
846
izip(self._data.read_records_iter_raw(copy_queue_records),
848
847
temp_version_list):
849
assert version_id == version_id2, \
850
'logic error, inconsistent results'
848
if not (version_id == version_id2):
849
raise AssertionError('logic error, inconsistent results')
851
850
raw_datum.append(raw_data)
852
851
result_version_list.append(
853
852
(version_id, options, len(raw_data), parents))
1038
1037
# We received a line-delta record for a non-delta knit.
1039
1038
# Convert it to a fulltext.
1040
1039
gzip_bytes = reader_callable(length)
1041
lines, sha1 = self._data._parse_record(
1042
version_id, gzip_bytes)
1043
delta = self.factory.parse_line_delta(lines,
1045
content = self.factory.make(
1046
self.get_lines(parents[0]), parents[0])
1047
content.apply_delta(delta, version_id)
1048
digest, len, content = self.add_lines(
1049
version_id, parents, content.text())
1051
raise errors.VersionedFileInvalidChecksum(version)
1040
self._convert_line_delta_to_fulltext(
1041
gzip_bytes, version_id, parents)
1054
1044
self._add_raw_records(
1055
1045
[(version_id, options, parents, length)],
1056
1046
reader_callable(length))
1048
def _convert_line_delta_to_fulltext(self, gzip_bytes, version_id, parents):
1049
lines, sha1 = self._data._parse_record(version_id, gzip_bytes)
1050
delta = self.factory.parse_line_delta(lines, version_id)
1051
content = self.factory.make(self.get_lines(parents[0]), parents[0])
1052
content.apply_delta(delta, version_id)
1053
digest, len, content = self.add_lines(
1054
version_id, parents, content.text())
1056
raise errors.VersionedFileInvalidChecksum(version_id)
1058
1058
def _knit_from_datastream(self, (format, data_list, reader_callable)):
1059
1059
"""Create a knit object from a data stream.
1362
1362
# I/O and the time spend applying deltas.
1363
1363
delta = self._check_should_delta(present_parents)
1365
assert isinstance(version_id, str)
1366
1365
content = self.factory.make(lines, version_id)
1367
1366
if delta or (self.factory.annotated and len(present_parents) > 0):
1368
1367
# Merge annotations from parent texts if needed.
1541
1540
enumerate(self._data.read_records_iter(version_id_records)):
1542
1541
pb.update('Walking content.', version_idx, total)
1543
1542
method = self._index.get_method(version_id)
1545
assert method in ('fulltext', 'line-delta')
1546
1543
if method == 'fulltext':
1547
1544
line_iterator = self.factory.get_fulltext_content(data)
1545
elif method == 'line-delta':
1549
1546
line_iterator = self.factory.get_linedelta_content(data)
1548
raise ValueError('invalid method %r' % (method,))
1550
1549
# XXX: It might be more efficient to yield (version_id,
1551
1550
# line_iterator) in the future. However for now, this is a simpler
1552
1551
# change to integrate into the rest of the codebase. RBC 20071110
1865
1864
self._version_list_to_index(parents))
1866
assert isinstance(line, str), \
1867
'content must be utf-8 encoded: %r' % (line,)
1868
1865
lines.append(line)
1869
1866
self._cache_version(version_id, options, pos, size, tuple(parents))
1870
1867
if not self._need_to_create:
2130
2127
compression_parents = an_entry[3][1]
2131
2128
if not compression_parents:
2133
assert len(compression_parents) == 1
2134
2130
return compression_parents[0]
2136
2132
def _get_method(self, node):
2317
2313
tuple - (index, pos, length), where the index field is always None
2318
2314
for the .knit access method.
2320
assert type(raw_data) == str, \
2321
'data must be plain bytes was %s' % type(raw_data)
2322
2316
if not self._need_to_create:
2323
2317
base = self._transport.append_bytes(self._filename, raw_data)
2401
2395
tuple - (index, pos, length), where the index field is the
2402
2396
write_index object supplied to the PackAccess object.
2404
assert type(raw_data) == str, \
2405
'data must be plain bytes was %s' % type(raw_data)
2408
2400
for size in sizes:
2501
2493
# use a generator for memory friendliness
2502
2494
for from_backing_knit, version_id, start, end in memos_for_retrieval:
2503
2495
if not from_backing_knit:
2504
assert version_id is self.stream_index
2496
if version_id is not self.stream_index:
2497
raise AssertionError()
2505
2498
yield self.data[start:end]
2507
2500
# we have been asked to thunk. This thunking only occurs when
2729
2722
dense_lines or lines,
2730
2723
["end %s\n" % version_id]))
2731
assert bytes.__class__ == str
2732
2724
compressed_bytes = bytes_to_gzip(bytes)
2733
2725
return len(compressed_bytes), compressed_bytes
2929
2921
def join(self, pb=None, msg=None, version_ids=None, ignore_missing=False):
2930
2922
"""See InterVersionedFile.join."""
2931
assert isinstance(self.source, KnitVersionedFile)
2932
assert isinstance(self.target, KnitVersionedFile)
2934
2923
# If the source and target are mismatched w.r.t. annotations vs
2935
2924
# plain, the data needs to be converted accordingly
2936
2925
if self.source.factory.annotated == self.target.factory.annotated:
2982
2971
# * already have it or
2983
2972
# * have it scheduled already
2984
2973
# otherwise we don't care
2985
assert (self.target.has_version(parent) or
2974
if not (self.target.has_version(parent) or
2986
2975
parent in copy_set or
2987
not self.source.has_version(parent))
2976
not self.source.has_version(parent)):
2977
raise AssertionError("problem joining parent %r "
2979
% (parent, self.source, self.target))
2988
2980
index_memo = self.source._index.get_position(version_id)
2989
2981
copy_queue_records.append((version_id, index_memo))
2990
2982
copy_queue.append((version_id, options, parents))
2999
2991
(version_id2, options, parents) in \
3000
2992
izip(self.source._data.read_records_iter_raw(copy_queue_records),
3002
assert version_id == version_id2, 'logic error, inconsistent results'
2994
if not (version_id == version_id2):
2995
raise AssertionError('logic error, inconsistent results')
3003
2996
count = count + 1
3004
2997
pb.update("Joining knit", count, total)
3049
3042
def join(self, pb=None, msg=None, version_ids=None, ignore_missing=False):
3050
3043
"""See InterVersionedFile.join."""
3051
assert isinstance(self.source, bzrlib.weave.Weave)
3052
assert isinstance(self.target, KnitVersionedFile)
3054
3044
version_ids = self._get_source_version_ids(version_ids, ignore_missing)
3056
3046
if not version_ids:
3082
3072
# check that its will be a consistent copy:
3083
3073
for parent in parents:
3084
3074
# if source has the parent, we must already have it
3085
assert (self.target.has_version(parent))
3075
if not self.target.has_version(parent):
3076
raise AssertionError("%r does not have parent %r"
3077
% (self.target, parent))
3086
3078
self.target.add_lines(
3087
3079
version_id, parents, self.source.get_lines(version_id))
3088
3080
count = count + 1
3258
3250
# add a key, no parents
3259
3251
self._revision_id_graph[missing_version] = ()
3260
3252
pending.discard(missing_version) # don't look for it
3261
# XXX: This should probably be a real exception, as it is a data
3263
assert not self._ghosts.intersection(self._compression_children), \
3264
"We cannot have nodes which have a compression parent of a ghost."
3253
if self._ghosts.intersection(self._compression_children):
3255
"We cannot have nodes which have a ghost compression parent:\n"
3257
"compression children: %r"
3258
% (self._ghosts, self._compression_children))
3265
3259
# Cleanout anything that depends on a ghost so that we don't wait for
3266
3260
# the ghost to show up
3267
3261
for node in self._ghosts:
3295
3289
if len(parent_ids) == 0:
3296
3290
# There are no parents for this node, so just add it
3297
3291
# TODO: This probably needs to be decoupled
3298
assert compression_parent is None
3299
3292
fulltext_content, delta = self._knit.factory.parse_record(
3300
3293
rev_id, record, record_details, None)
3301
3294
fulltext = self._add_fulltext_content(rev_id, fulltext_content)
3312
3305
record_details) = self._all_build_details[rev_id]
3313
3306
if compression_parent is not None:
3314
3307
comp_children = self._compression_children[compression_parent]
3315
assert rev_id in comp_children
3308
if rev_id not in comp_children:
3309
raise AssertionError("%r not in compression children %r"
3310
% (rev_id, comp_children))
3316
3311
# If there is only 1 child, it is safe to reuse this
3318
3313
reuse_content = (len(comp_children) == 1