138
145
INDEX_SUFFIX = '.kndx'
148
class KnitAdapter(object):
149
"""Base class for knit record adaption."""
151
def __init__(self, basis_vf):
152
"""Create an adapter which accesses full texts from basis_vf.
154
:param basis_vf: A versioned file to access basis texts of deltas from.
155
May be None for adapters that do not need to access basis texts.
157
self._data = _KnitData(None)
158
self._annotate_factory = KnitAnnotateFactory()
159
self._plain_factory = KnitPlainFactory()
160
self._basis_vf = basis_vf
163
class FTAnnotatedToUnannotated(KnitAdapter):
164
"""An adapter from FT annotated knits to unannotated ones."""
166
def get_bytes(self, factory, annotated_compressed_bytes):
168
self._data._parse_record_unchecked(annotated_compressed_bytes)
169
content = self._annotate_factory.parse_fulltext(contents, rec[1])
170
size, bytes = self._data._record_to_data(rec[1], rec[3], content.text())
174
class DeltaAnnotatedToUnannotated(KnitAdapter):
175
"""An adapter for deltas from annotated to unannotated."""
177
def get_bytes(self, factory, annotated_compressed_bytes):
179
self._data._parse_record_unchecked(annotated_compressed_bytes)
180
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
182
contents = self._plain_factory.lower_line_delta(delta)
183
size, bytes = self._data._record_to_data(rec[1], rec[3], contents)
187
class FTAnnotatedToFullText(KnitAdapter):
188
"""An adapter from FT annotated knits to unannotated ones."""
190
def get_bytes(self, factory, annotated_compressed_bytes):
192
self._data._parse_record_unchecked(annotated_compressed_bytes)
193
content, delta = self._annotate_factory.parse_record(factory.key[0],
194
contents, factory._build_details, None)
195
return ''.join(content.text())
198
class DeltaAnnotatedToFullText(KnitAdapter):
199
"""An adapter for deltas from annotated to unannotated."""
201
def get_bytes(self, factory, annotated_compressed_bytes):
203
self._data._parse_record_unchecked(annotated_compressed_bytes)
204
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
206
compression_parent = factory.parents[0][0]
207
basis_lines = self._basis_vf.get_lines(compression_parent)
208
# Manually apply the delta because we have one annotated content and
210
basis_content = PlainKnitContent(basis_lines, compression_parent)
211
basis_content.apply_delta(delta, rec[1])
212
basis_content._should_strip_eol = factory._build_details[1]
213
return ''.join(basis_content.text())
216
class FTPlainToFullText(KnitAdapter):
217
"""An adapter from FT plain knits to unannotated ones."""
219
def get_bytes(self, factory, compressed_bytes):
221
self._data._parse_record_unchecked(compressed_bytes)
222
content, delta = self._plain_factory.parse_record(factory.key[0],
223
contents, factory._build_details, None)
224
return ''.join(content.text())
227
class DeltaPlainToFullText(KnitAdapter):
228
"""An adapter for deltas from annotated to unannotated."""
230
def get_bytes(self, factory, compressed_bytes):
232
self._data._parse_record_unchecked(compressed_bytes)
233
delta = self._plain_factory.parse_line_delta(contents, rec[1])
234
compression_parent = factory.parents[0][0]
235
basis_lines = self._basis_vf.get_lines(compression_parent)
236
basis_content = PlainKnitContent(basis_lines, compression_parent)
237
# Manually apply the delta because we have one annotated content and
239
content, _ = self._plain_factory.parse_record(rec[1], contents,
240
factory._build_details, basis_content)
241
return ''.join(content.text())
244
class KnitContentFactory(ContentFactory):
245
"""Content factory for streaming from knits.
247
:seealso ContentFactory:
250
def __init__(self, version, parents, build_details, sha1, raw_record,
251
annotated, knit=None):
252
"""Create a KnitContentFactory for version.
254
:param version: The version.
255
:param parents: The parents.
256
:param build_details: The build details as returned from
258
:param sha1: The sha1 expected from the full text of this object.
259
:param raw_record: The bytes of the knit data from disk.
260
:param annotated: True if the raw data is annotated.
262
ContentFactory.__init__(self)
264
self.key = (version,)
265
self.parents = tuple((parent,) for parent in parents)
266
if build_details[0] == 'line-delta':
271
annotated_kind = 'annotated-'
274
self.storage_kind = 'knit-%s%s-gz' % (annotated_kind, kind)
275
self._raw_record = raw_record
276
self._build_details = build_details
279
def get_bytes_as(self, storage_kind):
280
if storage_kind == self.storage_kind:
281
return self._raw_record
282
if storage_kind == 'fulltext' and self._knit is not None:
283
return self._knit.get_text(self.key[0])
285
raise errors.UnavailableRepresentation(self.key, storage_kind,
141
289
class KnitContent(object):
142
290
"""Content of a knit version to which deltas can be applied."""
716
859
return pseudo_file.read(length)
717
860
return (self.get_format_signature(), result_version_list, read)
862
def get_record_stream(self, versions, ordering, include_delta_closure):
863
"""Get a stream of records for versions.
865
:param versions: The versions to include. Each version is a tuple
867
:param ordering: Either 'unordered' or 'topological'. A topologically
868
sorted stream has compression parents strictly before their
870
:param include_delta_closure: If True then the closure across any
871
compression parents will be included (in the opaque data).
872
:return: An iterator of ContentFactory objects, each of which is only
873
valid until the iterator is advanced.
875
if include_delta_closure:
876
# Nb: what we should do is plan the data to stream to allow
877
# reconstruction of all the texts without excessive buffering,
878
# including re-sending common bases as needed. This makes the most
879
# sense when we start serialising these streams though, so for now
880
# we just fallback to individual text construction behind the
881
# abstraction barrier.
885
# We end up doing multiple index lookups here for parents details and
886
# disk layout details - we need a unified api ?
887
parent_map = self.get_parent_map(versions)
888
absent_versions = set(versions) - set(parent_map)
889
if ordering == 'topological':
890
present_versions = topo_sort(parent_map)
892
# List comprehension to keep the requested order (as that seems
893
# marginally useful, at least until we start doing IO optimising
895
present_versions = [version for version in versions if version in
897
position_map = self._get_components_positions(present_versions)
898
records = [(version, position_map[version][1]) for version in
901
for version in absent_versions:
902
yield AbsentContentFactory((version,))
903
for version, raw_data, sha1 in \
904
self._data.read_records_iter_raw(records):
905
(record_details, index_memo, _) = position_map[version]
906
yield KnitContentFactory(version, parent_map[version],
907
record_details, sha1, raw_data, self.factory.annotated, knit)
719
909
def _extract_blocks(self, version_id, source, target):
720
910
if self._index.get_method(version_id) != 'line-delta':
886
1077
return KnitVersionedFile(self.filename, self.transport,
887
1078
factory=factory, index=index, access_method=access)
1080
def insert_record_stream(self, stream):
1081
"""Insert a record stream into this versioned file.
1083
:param stream: A stream of records to insert.
1085
:seealso VersionedFile.get_record_stream:
1087
def get_adapter(adapter_key):
1089
return adapters[adapter_key]
1091
adapter_factory = adapter_registry.get(adapter_key)
1092
adapter = adapter_factory(self)
1093
adapters[adapter_key] = adapter
1095
if self.factory.annotated:
1096
# self is annotated, we need annotated knits to use directly.
1097
annotated = "annotated-"
1100
# self is not annotated, but we can strip annotations cheaply.
1102
convertibles = set(["knit-annotated-delta-gz",
1103
"knit-annotated-ft-gz"])
1104
# The set of types we can cheaply adapt without needing basis texts.
1105
native_types = set()
1106
native_types.add("knit-%sdelta-gz" % annotated)
1107
native_types.add("knit-%sft-gz" % annotated)
1108
knit_types = native_types.union(convertibles)
1110
# Buffer all index entries that we can't add immediately because their
1111
# basis parent is missing. We don't buffer all because generating
1112
# annotations may require access to some of the new records. However we
1113
# can't generate annotations from new deltas until their basis parent
1114
# is present anyway, so we get away with not needing an index that
1115
# includes the new keys.
1116
# key = basis_parent, value = index entry to add
1117
buffered_index_entries = {}
1118
for record in stream:
1119
# Raise an error when a record is missing.
1120
if record.storage_kind == 'absent':
1121
raise RevisionNotPresent([record.key[0]], self)
1122
# adapt to non-tuple interface
1123
parents = [parent[0] for parent in record.parents]
1124
if record.storage_kind in knit_types:
1125
if record.storage_kind not in native_types:
1127
adapter_key = (record.storage_kind, "knit-delta-gz")
1128
adapter = get_adapter(adapter_key)
1130
adapter_key = (record.storage_kind, "knit-ft-gz")
1131
adapter = get_adapter(adapter_key)
1132
bytes = adapter.get_bytes(
1133
record, record.get_bytes_as(record.storage_kind))
1135
bytes = record.get_bytes_as(record.storage_kind)
1136
options = [record._build_details[0]]
1137
if record._build_details[1]:
1138
options.append('no-eol')
1139
# Just blat it across.
1140
# Note: This does end up adding data on duplicate keys. As
1141
# modern repositories use atomic insertions this should not
1142
# lead to excessive growth in the event of interrupted fetches.
1143
# 'knit' repositories may suffer excessive growth, but as a
1144
# deprecated format this is tolerable. It can be fixed if
1145
# needed by in the kndx index support raising on a duplicate
1146
# add with identical parents and options.
1147
access_memo = self._data.add_raw_records([len(bytes)], bytes)[0]
1148
index_entry = (record.key[0], options, access_memo, parents)
1150
if 'fulltext' not in options:
1151
basis_parent = parents[0]
1152
if not self.has_version(basis_parent):
1153
pending = buffered_index_entries.setdefault(
1155
pending.append(index_entry)
1158
self._index.add_versions([index_entry])
1159
elif record.storage_kind == 'fulltext':
1160
self.add_lines(record.key[0], parents,
1161
split_lines(record.get_bytes_as('fulltext')))
1163
adapter_key = record.storage_kind, 'fulltext'
1164
adapter = get_adapter(adapter_key)
1165
lines = split_lines(adapter.get_bytes(
1166
record, record.get_bytes_as(record.storage_kind)))
1168
self.add_lines(record.key[0], parents, lines)
1169
except errors.RevisionAlreadyPresent:
1171
# Add any records whose basis parent is now available.
1172
added_keys = [record.key[0]]
1174
key = added_keys.pop(0)
1175
if key in buffered_index_entries:
1176
index_entries = buffered_index_entries[key]
1177
self._index.add_versions(index_entries)
1179
[index_entry[0] for index_entry in index_entries])
1180
del buffered_index_entries[key]
1181
# If there were any deltas which had a missing basis parent, error.
1182
if buffered_index_entries:
1183
raise errors.RevisionNotPresent(buffered_index_entries.keys()[0],
889
1186
def versions(self):
890
1187
"""See VersionedFile.versions."""
891
1188
if 'evil' in debug.debug_flags:
2441
2751
% (version_id, e.__class__.__name__, str(e)))
2444
def _check_header(self, version_id, line):
2754
def _split_header(self, line):
2445
2755
rec = line.split()
2446
2756
if len(rec) != 4:
2447
2757
raise KnitCorrupt(self._access,
2448
2758
'unexpected number of elements in record header')
2761
def _check_header_version(self, rec, version_id):
2449
2762
if rec[1] != version_id:
2450
2763
raise KnitCorrupt(self._access,
2451
2764
'unexpected version, wanted %r, got %r'
2452
2765
% (version_id, rec[1]))
2767
def _check_header(self, version_id, line):
2768
rec = self._split_header(line)
2769
self._check_header_version(rec, version_id)
2455
def _parse_record(self, version_id, data):
2772
def _parse_record_unchecked(self, data):
2456
2773
# profiling notes:
2457
2774
# 4168 calls in 2880 217 internal
2458
2775
# 4168 calls to _parse_record_header in 2121
2459
2776
# 4168 calls to readlines in 330
2460
2777
df = GzipFile(mode='rb', fileobj=StringIO(data))
2463
2779
record_contents = df.readlines()
2464
2780
except Exception, e:
2465
raise KnitCorrupt(self._access,
2466
"While reading {%s} got %s(%s)"
2467
% (version_id, e.__class__.__name__, str(e)))
2781
raise KnitCorrupt(self._access, "Corrupt compressed record %r, got %s(%s)" %
2782
(data, e.__class__.__name__, str(e)))
2468
2783
header = record_contents.pop(0)
2469
rec = self._check_header(version_id, header)
2784
rec = self._split_header(header)
2471
2785
last_line = record_contents.pop()
2472
2786
if len(record_contents) != int(rec[2]):
2473
2787
raise KnitCorrupt(self._access,
2474
2788
'incorrect number of lines %s != %s'
2475
2789
' for version {%s}'
2476
2790
% (len(record_contents), int(rec[2]),
2478
2792
if last_line != 'end %s\n' % rec[1]:
2479
2793
raise KnitCorrupt(self._access,
2480
2794
'unexpected version end line %r, wanted %r'
2481
% (last_line, version_id))
2795
% (last_line, rec[1]))
2797
return rec, record_contents
2799
def _parse_record(self, version_id, data):
2800
rec, record_contents = self._parse_record_unchecked(data)
2801
self._check_header_version(rec, version_id)
2483
2802
return record_contents, rec[3]
2485
2804
def read_records_iter_raw(self, records):