272
268
annotated_kind = ''
273
269
self.storage_kind = 'knit-%s%s-gz' % (annotated_kind, kind)
274
270
self._raw_record = raw_record
275
self._network_bytes = network_bytes
276
271
self._build_details = build_details
277
272
self._knit = knit
279
def _create_network_bytes(self):
280
"""Create a fully serialised network version for transmission."""
281
# storage_kind, key, parents, Noeol, raw_record
282
key_bytes = '\x00'.join(self.key)
283
if self.parents is None:
284
parent_bytes = 'None:'
286
parent_bytes = '\t'.join('\x00'.join(key) for key in self.parents)
287
if self._build_details[1]:
291
network_bytes = "%s\n%s\n%s\n%s%s" % (self.storage_kind, key_bytes,
292
parent_bytes, noeol, self._raw_record)
293
self._network_bytes = network_bytes
295
def get_bytes_as(self, storage_kind):
296
if storage_kind == self.storage_kind:
297
if self._network_bytes is None:
298
self._create_network_bytes()
299
return self._network_bytes
300
if ('-ft-' in self.storage_kind and
301
storage_kind in ('chunked', 'fulltext')):
302
adapter_key = (self.storage_kind, 'fulltext')
303
adapter_factory = adapter_registry.get(adapter_key)
304
adapter = adapter_factory(None)
305
bytes = adapter.get_bytes(self)
306
if storage_kind == 'chunked':
310
if self._knit is not None:
311
# Not redundant with direct conversion above - that only handles
313
if storage_kind == 'chunked':
314
return self._knit.get_lines(self.key[0])
315
elif storage_kind == 'fulltext':
316
return self._knit.get_text(self.key[0])
317
raise errors.UnavailableRepresentation(self.key, storage_kind,
321
class LazyKnitContentFactory(ContentFactory):
322
"""A ContentFactory which can either generate full text or a wire form.
324
:seealso ContentFactory:
327
def __init__(self, key, parents, generator, first):
328
"""Create a LazyKnitContentFactory.
330
:param key: The key of the record.
331
:param parents: The parents of the record.
332
:param generator: A _ContentMapGenerator containing the record for this
334
:param first: Is this the first content object returned from generator?
335
if it is, its storage kind is knit-delta-closure, otherwise it is
336
knit-delta-closure-ref
339
self.parents = parents
341
self._generator = generator
342
self.storage_kind = "knit-delta-closure"
344
self.storage_kind = self.storage_kind + "-ref"
347
def get_bytes_as(self, storage_kind):
348
if storage_kind == self.storage_kind:
350
return self._generator._wire_bytes()
352
# all the keys etc are contained in the bytes returned in the
355
if storage_kind in ('chunked', 'fulltext'):
356
chunks = self._generator._get_one_work(self.key).text()
357
if storage_kind == 'chunked':
360
return ''.join(chunks)
361
raise errors.UnavailableRepresentation(self.key, storage_kind,
365
def knit_delta_closure_to_records(storage_kind, bytes, line_end):
366
"""Convert a network record to a iterator over stream records.
368
:param storage_kind: The storage kind of the record.
369
Must be 'knit-delta-closure'.
370
:param bytes: The bytes of the record on the network.
372
generator = _NetworkContentMapGenerator(bytes, line_end)
373
return generator.get_record_stream()
376
def knit_network_to_record(storage_kind, bytes, line_end):
377
"""Convert a network record to a record object.
379
:param storage_kind: The storage kind of the record.
380
:param bytes: The bytes of the record on the network.
383
line_end = bytes.find('\n', start)
384
key = tuple(bytes[start:line_end].split('\x00'))
386
line_end = bytes.find('\n', start)
387
parent_line = bytes[start:line_end]
388
if parent_line == 'None:':
392
[tuple(segment.split('\x00')) for segment in parent_line.split('\t')
395
noeol = bytes[start] == 'N'
396
if 'ft' in storage_kind:
399
method = 'line-delta'
400
build_details = (method, noeol)
402
raw_record = bytes[start:]
403
annotated = 'annotated' in storage_kind
404
return [KnitContentFactory(key, parents, build_details, None, raw_record,
405
annotated, network_bytes=bytes)]
274
def get_bytes_as(self, storage_kind):
275
if storage_kind == self.storage_kind:
276
return self._raw_record
277
if storage_kind == 'fulltext' and self._knit is not None:
278
return self._knit.get_text(self.key[0])
280
raise errors.UnavailableRepresentation(self.key, storage_kind,
408
284
class KnitContent(object):
409
285
"""Content of a knit version to which deltas can be applied.
411
This is always stored in memory as a list of lines with \\n at the end,
412
plus a flag saying if the final ending is really there or not, because that
287
This is always stored in memory as a list of lines with \n at the end,
288
plus a flag saying if the final ending is really there or not, because that
413
289
corresponds to the on-disk knit representation.
1230
1098
def _get_record_map(self, keys, allow_missing=False):
1231
1099
"""Produce a dictionary of knit records.
1233
1101
:return: {key:(record, record_details, digest, next)}
1235
* record: data returned from read_records (a KnitContentobject)
1236
* record_details: opaque information to pass to parse_record
1237
* digest: SHA1 digest of the full text after all steps are done
1238
* next: build-parent of the version, i.e. the leftmost ancestor.
1103
data returned from read_records
1105
opaque information to pass to parse_record
1107
SHA1 digest of the full text after all steps are done
1109
build-parent of the version, i.e. the leftmost ancestor.
1239
1110
Will be None if the record is not a delta.
1241
1111
:param keys: The keys to build a map for
1242
:param allow_missing: If some records are missing, rather than
1112
:param allow_missing: If some records are missing, rather than
1243
1113
error, just return the data that could be generated.
1245
raw_map = self._get_record_map_unparsed(keys,
1115
position_map = self._get_components_positions(keys,
1246
1116
allow_missing=allow_missing)
1247
return self._raw_map_to_record_map(raw_map)
1249
def _raw_map_to_record_map(self, raw_map):
1250
"""Parse the contents of _get_record_map_unparsed.
1252
:return: see _get_record_map.
1256
data, record_details, next = raw_map[key]
1257
content, digest = self._parse_record(key[-1], data)
1258
result[key] = content, record_details, digest, next
1261
def _get_record_map_unparsed(self, keys, allow_missing=False):
1262
"""Get the raw data for reconstructing keys without parsing it.
1264
:return: A dict suitable for parsing via _raw_map_to_record_map.
1265
key-> raw_bytes, (method, noeol), compression_parent
1267
# This retries the whole request if anything fails. Potentially we
1268
# could be a bit more selective. We could track the keys whose records
1269
# we have successfully found, and then only request the new records
1270
# from there. However, _get_components_positions grabs the whole build
1271
# chain, which means we'll likely try to grab the same records again
1272
# anyway. Also, can the build chains change as part of a pack
1273
# operation? We wouldn't want to end up with a broken chain.
1276
position_map = self._get_components_positions(keys,
1277
allow_missing=allow_missing)
1278
# key = component_id, r = record_details, i_m = index_memo,
1280
records = [(key, i_m) for key, (r, i_m, n)
1281
in position_map.iteritems()]
1282
# Sort by the index memo, so that we request records from the
1283
# same pack file together, and in forward-sorted order
1284
records.sort(key=operator.itemgetter(1))
1286
for key, data in self._read_records_iter_unchecked(records):
1287
(record_details, index_memo, next) = position_map[key]
1288
raw_record_map[key] = data, record_details, next
1289
return raw_record_map
1290
except errors.RetryWithNewPacks, e:
1291
self._access.reload_or_raise(e)
1294
def _split_by_prefix(cls, keys):
1295
"""For the given keys, split them up based on their prefix.
1297
To keep memory pressure somewhat under control, split the
1298
requests back into per-file-id requests, otherwise "bzr co"
1299
extracts the full tree into memory before writing it to disk.
1300
This should be revisited if _get_content_maps() can ever cross
1303
The keys for a given file_id are kept in the same relative order.
1304
Ordering between file_ids is not, though prefix_order will return the
1305
order that the key was first seen.
1307
:param keys: An iterable of key tuples
1308
:return: (split_map, prefix_order)
1309
split_map A dictionary mapping prefix => keys
1310
prefix_order The order that we saw the various prefixes
1312
split_by_prefix = {}
1320
if prefix in split_by_prefix:
1321
split_by_prefix[prefix].append(key)
1323
split_by_prefix[prefix] = [key]
1324
prefix_order.append(prefix)
1325
return split_by_prefix, prefix_order
1327
def _group_keys_for_io(self, keys, non_local_keys, positions,
1328
_min_buffer_size=_STREAM_MIN_BUFFER_SIZE):
1329
"""For the given keys, group them into 'best-sized' requests.
1331
The idea is to avoid making 1 request per file, but to never try to
1332
unpack an entire 1.5GB source tree in a single pass. Also when
1333
possible, we should try to group requests to the same pack file
1336
:return: list of (keys, non_local) tuples that indicate what keys
1337
should be fetched next.
1339
# TODO: Ideally we would group on 2 factors. We want to extract texts
1340
# from the same pack file together, and we want to extract all
1341
# the texts for a given build-chain together. Ultimately it
1342
# probably needs a better global view.
1343
total_keys = len(keys)
1344
prefix_split_keys, prefix_order = self._split_by_prefix(keys)
1345
prefix_split_non_local_keys, _ = self._split_by_prefix(non_local_keys)
1347
cur_non_local = set()
1351
for prefix in prefix_order:
1352
keys = prefix_split_keys[prefix]
1353
non_local = prefix_split_non_local_keys.get(prefix, [])
1355
this_size = self._index._get_total_build_size(keys, positions)
1356
cur_size += this_size
1357
cur_keys.extend(keys)
1358
cur_non_local.update(non_local)
1359
if cur_size > _min_buffer_size:
1360
result.append((cur_keys, cur_non_local))
1361
sizes.append(cur_size)
1363
cur_non_local = set()
1366
result.append((cur_keys, cur_non_local))
1367
sizes.append(cur_size)
1117
# key = component_id, r = record_details, i_m = index_memo, n = next
1118
records = [(key, i_m) for key, (r, i_m, n)
1119
in position_map.iteritems()]
1121
for key, record, digest in \
1122
self._read_records_iter(records):
1123
(record_details, index_memo, next) = position_map[key]
1124
record_map[key] = record, record_details, digest, next
1370
1127
def get_record_stream(self, keys, ordering, include_delta_closure):
1371
1128
"""Get a stream of records for keys.
1645
1333
access_memo = self._access.add_raw_records(
1646
1334
[(record.key, len(bytes))], bytes)[0]
1647
1335
index_entry = (record.key, options, access_memo, parents)
1648
1337
if 'fulltext' not in options:
1649
# Not a fulltext, so we need to make sure the compression
1650
# parent will also be present.
1338
basis_parent = parents[0]
1651
1339
# Note that pack backed knits don't need to buffer here
1652
1340
# because they buffer all writes to the transaction level,
1653
1341
# but we don't expose that difference at the index level. If
1654
1342
# the query here has sufficient cost to show up in
1655
1343
# profiling we should do that.
1657
# They're required to be physically in this
1658
# KnitVersionedFiles, not in a fallback.
1659
if not self._index.has_key(compression_parent):
1344
if basis_parent not in self.get_parent_map([basis_parent]):
1660
1345
pending = buffered_index_entries.setdefault(
1661
compression_parent, [])
1662
1347
pending.append(index_entry)
1663
1348
buffered = True
1664
1349
if not buffered:
1665
1350
self._index.add_records([index_entry])
1666
elif record.storage_kind == 'chunked':
1351
elif record.storage_kind == 'fulltext':
1667
1352
self.add_lines(record.key, parents,
1668
osutils.chunks_to_lines(record.get_bytes_as('chunked')))
1353
split_lines(record.get_bytes_as('fulltext')))
1670
# Not suitable for direct insertion as a
1671
# delta, either because it's not the right format, or this
1672
# KnitVersionedFiles doesn't permit deltas (_max_delta_chain ==
1673
# 0) or because it depends on a base only present in the
1675
self._access.flush()
1677
# Try getting a fulltext directly from the record.
1678
bytes = record.get_bytes_as('fulltext')
1679
except errors.UnavailableRepresentation:
1680
adapter_key = record.storage_kind, 'fulltext'
1681
adapter = get_adapter(adapter_key)
1682
bytes = adapter.get_bytes(record)
1683
lines = split_lines(bytes)
1355
adapter_key = record.storage_kind, 'fulltext'
1356
adapter = get_adapter(adapter_key)
1357
lines = split_lines(adapter.get_bytes(
1358
record, record.get_bytes_as(record.storage_kind)))
1685
1360
self.add_lines(record.key, parents, lines)
1686
1361
except errors.RevisionAlreadyPresent:
1688
1363
# Add any records whose basis parent is now available.
1690
added_keys = [record.key]
1692
key = added_keys.pop(0)
1693
if key in buffered_index_entries:
1694
index_entries = buffered_index_entries[key]
1695
self._index.add_records(index_entries)
1697
[index_entry[0] for index_entry in index_entries])
1698
del buffered_index_entries[key]
1364
added_keys = [record.key]
1366
key = added_keys.pop(0)
1367
if key in buffered_index_entries:
1368
index_entries = buffered_index_entries[key]
1369
self._index.add_records(index_entries)
1371
[index_entry[0] for index_entry in index_entries])
1372
del buffered_index_entries[key]
1373
# If there were any deltas which had a missing basis parent, error.
1699
1374
if buffered_index_entries:
1700
# There were index entries buffered at the end of the stream,
1701
# So these need to be added (if the index supports holding such
1702
# entries for later insertion)
1704
for key in buffered_index_entries:
1705
index_entries = buffered_index_entries[key]
1706
all_entries.extend(index_entries)
1707
self._index.add_records(
1708
all_entries, missing_compression_parents=True)
1710
def get_missing_compression_parent_keys(self):
1711
"""Return an iterable of keys of missing compression parents.
1713
Check this after calling insert_record_stream to find out if there are
1714
any missing compression parents. If there are, the records that
1715
depend on them are not able to be inserted safely. For atomic
1716
KnitVersionedFiles built on packs, the transaction should be aborted or
1717
suspended - commit will fail at this point. Nonatomic knits will error
1718
earlier because they have no staging area to put pending entries into.
1720
return self._index.get_missing_compression_parents()
1375
raise errors.RevisionNotPresent(buffered_index_entries.keys()[0],
1722
1378
def iter_lines_added_or_present_in_keys(self, keys, pb=None):
1723
1379
"""Iterate over the lines in the versioned files from keys.
1734
1390
is an iterator).
1737
* Lines are normalised by the underlying store: they will all have \\n
1393
* Lines are normalised by the underlying store: they will all have \n
1739
1395
* Lines are returned in arbitrary order.
1740
* If a requested key did not change any lines (or didn't have any
1741
lines), it may not be mentioned at all in the result.
1743
:param pb: Progress bar supplied by caller.
1744
1397
:return: An iterator over (line, key).
1747
pb = ui.ui_factory.nested_progress_bar()
1400
pb = progress.DummyProgress()
1748
1401
keys = set(keys)
1749
1402
total = len(keys)
1753
# we don't care about inclusions, the caller cares.
1754
# but we need to setup a list of records to visit.
1755
# we need key, position, length
1757
build_details = self._index.get_build_details(keys)
1758
for key, details in build_details.iteritems():
1760
key_records.append((key, details[0]))
1761
records_iter = enumerate(self._read_records_iter(key_records))
1762
for (key_idx, (key, data, sha_value)) in records_iter:
1763
pb.update('Walking content', key_idx, total)
1764
compression_parent = build_details[key][1]
1765
if compression_parent is None:
1767
line_iterator = self._factory.get_fulltext_content(data)
1770
line_iterator = self._factory.get_linedelta_content(data)
1771
# Now that we are yielding the data for this key, remove it
1774
# XXX: It might be more efficient to yield (key,
1775
# line_iterator) in the future. However for now, this is a
1776
# simpler change to integrate into the rest of the
1777
# codebase. RBC 20071110
1778
for line in line_iterator:
1781
except errors.RetryWithNewPacks, e:
1782
self._access.reload_or_raise(e)
1783
# If there are still keys we've not yet found, we look in the fallback
1784
# vfs, and hope to find them there. Note that if the keys are found
1785
# but had no changes or no content, the fallback may not return
1787
if keys and not self._immediate_fallback_vfs:
1788
# XXX: strictly the second parameter is meant to be the file id
1789
# but it's not easily accessible here.
1790
raise RevisionNotPresent(keys, repr(self))
1791
for source in self._immediate_fallback_vfs:
1403
# we don't care about inclusions, the caller cares.
1404
# but we need to setup a list of records to visit.
1405
# we need key, position, length
1407
build_details = self._index.get_build_details(keys)
1408
for key, details in build_details.iteritems():
1410
key_records.append((key, details[0]))
1412
records_iter = enumerate(self._read_records_iter(key_records))
1413
for (key_idx, (key, data, sha_value)) in records_iter:
1414
pb.update('Walking content.', key_idx, total)
1415
compression_parent = build_details[key][1]
1416
if compression_parent is None:
1418
line_iterator = self._factory.get_fulltext_content(data)
1421
line_iterator = self._factory.get_linedelta_content(data)
1422
# XXX: It might be more efficient to yield (key,
1423
# line_iterator) in the future. However for now, this is a simpler
1424
# change to integrate into the rest of the codebase. RBC 20071110
1425
for line in line_iterator:
1427
for source in self._fallback_vfs:
1794
1430
source_keys = set()
2004
1635
"""See VersionedFiles.keys."""
2005
1636
if 'evil' in debug.debug_flags:
2006
1637
trace.mutter_callsite(2, "keys scales with size of history")
2007
sources = [self._index] + self._immediate_fallback_vfs
1638
sources = [self._index] + self._fallback_vfs
2009
1640
for source in sources:
2010
1641
result.update(source.keys())
2014
class _ContentMapGenerator(object):
2015
"""Generate texts or expose raw deltas for a set of texts."""
2017
def __init__(self, ordering='unordered'):
2018
self._ordering = ordering
2020
def _get_content(self, key):
2021
"""Get the content object for key."""
2022
# Note that _get_content is only called when the _ContentMapGenerator
2023
# has been constructed with just one key requested for reconstruction.
2024
if key in self.nonlocal_keys:
2025
record = self.get_record_stream().next()
2026
# Create a content object on the fly
2027
lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
2028
return PlainKnitContent(lines, record.key)
2030
# local keys we can ask for directly
2031
return self._get_one_work(key)
2033
def get_record_stream(self):
2034
"""Get a record stream for the keys requested during __init__."""
2035
for record in self._work():
2039
"""Produce maps of text and KnitContents as dicts.
2041
:return: (text_map, content_map) where text_map contains the texts for
2042
the requested versions and content_map contains the KnitContents.
2044
# NB: By definition we never need to read remote sources unless texts
2045
# are requested from them: we don't delta across stores - and we
2046
# explicitly do not want to to prevent data loss situations.
2047
if self.global_map is None:
2048
self.global_map = self.vf.get_parent_map(self.keys)
2049
nonlocal_keys = self.nonlocal_keys
2051
missing_keys = set(nonlocal_keys)
2052
# Read from remote versioned file instances and provide to our caller.
2053
for source in self.vf._immediate_fallback_vfs:
2054
if not missing_keys:
2056
# Loop over fallback repositories asking them for texts - ignore
2057
# any missing from a particular fallback.
2058
for record in source.get_record_stream(missing_keys,
2059
self._ordering, True):
2060
if record.storage_kind == 'absent':
2061
# Not in thie particular stream, may be in one of the
2062
# other fallback vfs objects.
2064
missing_keys.remove(record.key)
2067
if self._raw_record_map is None:
2068
raise AssertionError('_raw_record_map should have been filled')
2070
for key in self.keys:
2071
if key in self.nonlocal_keys:
2073
yield LazyKnitContentFactory(key, self.global_map[key], self, first)
2076
def _get_one_work(self, requested_key):
2077
# Now, if we have calculated everything already, just return the
2079
if requested_key in self._contents_map:
2080
return self._contents_map[requested_key]
2081
# To simplify things, parse everything at once - code that wants one text
2082
# probably wants them all.
2083
# FUTURE: This function could be improved for the 'extract many' case
2084
# by tracking each component and only doing the copy when the number of
2085
# children than need to apply delta's to it is > 1 or it is part of the
2087
multiple_versions = len(self.keys) != 1
2088
if self._record_map is None:
2089
self._record_map = self.vf._raw_map_to_record_map(
2090
self._raw_record_map)
2091
record_map = self._record_map
2092
# raw_record_map is key:
2093
# Have read and parsed records at this point.
2094
for key in self.keys:
2095
if key in self.nonlocal_keys:
2100
while cursor is not None:
2102
record, record_details, digest, next = record_map[cursor]
2104
raise RevisionNotPresent(cursor, self)
2105
components.append((cursor, record, record_details, digest))
2107
if cursor in self._contents_map:
2108
# no need to plan further back
2109
components.append((cursor, None, None, None))
2113
for (component_id, record, record_details,
2114
digest) in reversed(components):
2115
if component_id in self._contents_map:
2116
content = self._contents_map[component_id]
2118
content, delta = self._factory.parse_record(key[-1],
2119
record, record_details, content,
2120
copy_base_content=multiple_versions)
2121
if multiple_versions:
2122
self._contents_map[component_id] = content
2124
# digest here is the digest from the last applied component.
2125
text = content.text()
2126
actual_sha = sha_strings(text)
2127
if actual_sha != digest:
2128
raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
2129
if multiple_versions:
2130
return self._contents_map[requested_key]
2134
def _wire_bytes(self):
2135
"""Get the bytes to put on the wire for 'key'.
2137
The first collection of bytes asked for returns the serialised
2138
raw_record_map and the additional details (key, parent) for key.
2139
Subsequent calls return just the additional details (key, parent).
2140
The wire storage_kind given for the first key is 'knit-delta-closure',
2141
For subsequent keys it is 'knit-delta-closure-ref'.
2143
:param key: A key from the content generator.
2144
:return: Bytes to put on the wire.
2147
# kind marker for dispatch on the far side,
2148
lines.append('knit-delta-closure')
2150
if self.vf._factory.annotated:
2151
lines.append('annotated')
2154
# then the list of keys
2155
lines.append('\t'.join(['\x00'.join(key) for key in self.keys
2156
if key not in self.nonlocal_keys]))
2157
# then the _raw_record_map in serialised form:
2159
# for each item in the map:
2161
# 1 line with parents if the key is to be yielded (None: for None, '' for ())
2162
# one line with method
2163
# one line with noeol
2164
# one line with next ('' for None)
2165
# one line with byte count of the record bytes
2167
for key, (record_bytes, (method, noeol), next) in \
2168
self._raw_record_map.iteritems():
2169
key_bytes = '\x00'.join(key)
2170
parents = self.global_map.get(key, None)
2172
parent_bytes = 'None:'
2174
parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
2175
method_bytes = method
2181
next_bytes = '\x00'.join(next)
2184
map_byte_list.append('%s\n%s\n%s\n%s\n%s\n%d\n%s' % (
2185
key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
2186
len(record_bytes), record_bytes))
2187
map_bytes = ''.join(map_byte_list)
2188
lines.append(map_bytes)
2189
bytes = '\n'.join(lines)
2193
class _VFContentMapGenerator(_ContentMapGenerator):
2194
"""Content map generator reading from a VersionedFiles object."""
2196
def __init__(self, versioned_files, keys, nonlocal_keys=None,
2197
global_map=None, raw_record_map=None, ordering='unordered'):
2198
"""Create a _ContentMapGenerator.
2200
:param versioned_files: The versioned files that the texts are being
2202
:param keys: The keys to produce content maps for.
2203
:param nonlocal_keys: An iterable of keys(possibly intersecting keys)
2204
which are known to not be in this knit, but rather in one of the
2206
:param global_map: The result of get_parent_map(keys) (or a supermap).
2207
This is required if get_record_stream() is to be used.
2208
:param raw_record_map: A unparsed raw record map to use for answering
2211
_ContentMapGenerator.__init__(self, ordering=ordering)
2212
# The vf to source data from
2213
self.vf = versioned_files
2215
self.keys = list(keys)
2216
# Keys known to be in fallback vfs objects
2217
if nonlocal_keys is None:
2218
self.nonlocal_keys = set()
2220
self.nonlocal_keys = frozenset(nonlocal_keys)
2221
# Parents data for keys to be returned in get_record_stream
2222
self.global_map = global_map
2223
# The chunked lists for self.keys in text form
2225
# A cache of KnitContent objects used in extracting texts.
2226
self._contents_map = {}
2227
# All the knit records needed to assemble the requested keys as full
2229
self._record_map = None
2230
if raw_record_map is None:
2231
self._raw_record_map = self.vf._get_record_map_unparsed(keys,
2234
self._raw_record_map = raw_record_map
2235
# the factory for parsing records
2236
self._factory = self.vf._factory
2239
class _NetworkContentMapGenerator(_ContentMapGenerator):
2240
"""Content map generator sourced from a network stream."""
2242
def __init__(self, bytes, line_end):
2243
"""Construct a _NetworkContentMapGenerator from a bytes block."""
2245
self.global_map = {}
2246
self._raw_record_map = {}
2247
self._contents_map = {}
2248
self._record_map = None
2249
self.nonlocal_keys = []
2250
# Get access to record parsing facilities
2251
self.vf = KnitVersionedFiles(None, None)
2254
line_end = bytes.find('\n', start)
2255
line = bytes[start:line_end]
2256
start = line_end + 1
2257
if line == 'annotated':
2258
self._factory = KnitAnnotateFactory()
2260
self._factory = KnitPlainFactory()
2261
# list of keys to emit in get_record_stream
2262
line_end = bytes.find('\n', start)
2263
line = bytes[start:line_end]
2264
start = line_end + 1
2266
tuple(segment.split('\x00')) for segment in line.split('\t')
2268
# now a loop until the end. XXX: It would be nice if this was just a
2269
# bunch of the same records as get_record_stream(..., False) gives, but
2270
# there is a decent sized gap stopping that at the moment.
2274
line_end = bytes.find('\n', start)
2275
key = tuple(bytes[start:line_end].split('\x00'))
2276
start = line_end + 1
2277
# 1 line with parents (None: for None, '' for ())
2278
line_end = bytes.find('\n', start)
2279
line = bytes[start:line_end]
2284
[tuple(segment.split('\x00')) for segment in line.split('\t')
2286
self.global_map[key] = parents
2287
start = line_end + 1
2288
# one line with method
2289
line_end = bytes.find('\n', start)
2290
line = bytes[start:line_end]
2292
start = line_end + 1
2293
# one line with noeol
2294
line_end = bytes.find('\n', start)
2295
line = bytes[start:line_end]
2297
start = line_end + 1
2298
# one line with next ('' for None)
2299
line_end = bytes.find('\n', start)
2300
line = bytes[start:line_end]
2304
next = tuple(bytes[start:line_end].split('\x00'))
2305
start = line_end + 1
2306
# one line with byte count of the record bytes
2307
line_end = bytes.find('\n', start)
2308
line = bytes[start:line_end]
2310
start = line_end + 1
2312
record_bytes = bytes[start:start+count]
2313
start = start + count
2315
self._raw_record_map[key] = (record_bytes, (method, noeol), next)
2317
def get_record_stream(self):
2318
"""Get a record stream for for keys requested by the bytestream."""
2320
for key in self.keys:
2321
yield LazyKnitContentFactory(key, self.global_map[key], self, first)
2324
def _wire_bytes(self):
2328
1646
class _KndxIndex(object):
2329
1647
"""Manages knit index files
3358
2436
annotator = _KnitAnnotator(knit)
3359
return iter(annotator.annotate_flat(revision_id))
3362
class _KnitAnnotator(annotate.Annotator):
2437
return iter(annotator.annotate(revision_id))
2440
class _KnitAnnotator(object):
3363
2441
"""Build up the annotations for a text."""
3365
def __init__(self, vf):
3366
annotate.Annotator.__init__(self, vf)
3368
# TODO: handle Nodes which cannot be extracted
3369
# self._ghosts = set()
3371
# Map from (key, parent_key) => matching_blocks, should be 'use once'
3372
self._matching_blocks = {}
3374
# KnitContent objects
3375
self._content_objects = {}
3376
# The number of children that depend on this fulltext content object
3377
self._num_compression_children = {}
3378
# Delta records that need their compression parent before they can be
3380
self._pending_deltas = {}
3381
# Fulltext records that are waiting for their parents fulltexts before
3382
# they can be yielded for annotation
3383
self._pending_annotation = {}
2443
def __init__(self, knit):
2446
# Content objects, differs from fulltexts because of how final newlines
2447
# are treated by knits. the content objects here will always have a
2449
self._fulltext_contents = {}
2451
# Annotated lines of specific revisions
2452
self._annotated_lines = {}
2454
# Track the raw data for nodes that we could not process yet.
2455
# This maps the revision_id of the base to a list of children that will
2456
# annotated from it.
2457
self._pending_children = {}
2459
# Nodes which cannot be extracted
2460
self._ghosts = set()
2462
# Track how many children this node has, so we know if we need to keep
2464
self._annotate_children = {}
2465
self._compression_children = {}
3385
2467
self._all_build_details = {}
2468
# The children => parent revision_id graph
2469
self._revision_id_graph = {}
2471
self._heads_provider = None
2473
self._nodes_to_keep_annotations = set()
2474
self._generations_until_keep = 100
2476
def set_generations_until_keep(self, value):
2477
"""Set the number of generations before caching a node.
2479
Setting this to -1 will cache every merge node, setting this higher
2480
will cache fewer nodes.
2482
self._generations_until_keep = value
2484
def _add_fulltext_content(self, revision_id, content_obj):
2485
self._fulltext_contents[revision_id] = content_obj
2486
# TODO: jam 20080305 It might be good to check the sha1digest here
2487
return content_obj.text()
2489
def _check_parents(self, child, nodes_to_annotate):
2490
"""Check if all parents have been processed.
2492
:param child: A tuple of (rev_id, parents, raw_content)
2493
:param nodes_to_annotate: If child is ready, add it to
2494
nodes_to_annotate, otherwise put it back in self._pending_children
2496
for parent_id in child[1]:
2497
if (parent_id not in self._annotated_lines):
2498
# This parent is present, but another parent is missing
2499
self._pending_children.setdefault(parent_id,
2503
# This one is ready to be processed
2504
nodes_to_annotate.append(child)
2506
def _add_annotation(self, revision_id, fulltext, parent_ids,
2507
left_matching_blocks=None):
2508
"""Add an annotation entry.
2510
All parents should already have been annotated.
2511
:return: A list of children that now have their parents satisfied.
2513
a = self._annotated_lines
2514
annotated_parent_lines = [a[p] for p in parent_ids]
2515
annotated_lines = list(annotate.reannotate(annotated_parent_lines,
2516
fulltext, revision_id, left_matching_blocks,
2517
heads_provider=self._get_heads_provider()))
2518
self._annotated_lines[revision_id] = annotated_lines
2519
for p in parent_ids:
2520
ann_children = self._annotate_children[p]
2521
ann_children.remove(revision_id)
2522
if (not ann_children
2523
and p not in self._nodes_to_keep_annotations):
2524
del self._annotated_lines[p]
2525
del self._all_build_details[p]
2526
if p in self._fulltext_contents:
2527
del self._fulltext_contents[p]
2528
# Now that we've added this one, see if there are any pending
2529
# deltas to be done, certainly this parent is finished
2530
nodes_to_annotate = []
2531
for child in self._pending_children.pop(revision_id, []):
2532
self._check_parents(child, nodes_to_annotate)
2533
return nodes_to_annotate
3387
2535
def _get_build_graph(self, key):
3388
2536
"""Get the graphs for building texts and annotations.
3395
2543
:return: A list of (key, index_memo) records, suitable for
3396
passing to read_records_iter to start reading in the raw data from
2544
passing to read_records_iter to start reading in the raw data fro/
2547
if key in self._annotated_lines:
3399
2550
pending = set([key])
3402
self._num_needed_children[key] = 1
3404
2555
# get all pending nodes
3405
2557
this_iteration = pending
3406
build_details = self._vf._index.get_build_details(this_iteration)
2558
build_details = self._knit._index.get_build_details(this_iteration)
3407
2559
self._all_build_details.update(build_details)
3408
# new_nodes = self._vf._index._get_entries(this_iteration)
2560
# new_nodes = self._knit._index._get_entries(this_iteration)
3409
2561
pending = set()
3410
2562
for key, details in build_details.iteritems():
3411
(index_memo, compression_parent, parent_keys,
2563
(index_memo, compression_parent, parents,
3412
2564
record_details) = details
3413
self._parent_map[key] = parent_keys
3414
self._heads_provider = None
2565
self._revision_id_graph[key] = parents
3415
2566
records.append((key, index_memo))
3416
2567
# Do we actually need to check _annotated_lines?
3417
pending.update([p for p in parent_keys
3418
if p not in self._all_build_details])
3420
for parent_key in parent_keys:
3421
if parent_key in self._num_needed_children:
3422
self._num_needed_children[parent_key] += 1
3424
self._num_needed_children[parent_key] = 1
2568
pending.update(p for p in parents
2569
if p not in self._all_build_details)
3425
2570
if compression_parent:
3426
if compression_parent in self._num_compression_children:
3427
self._num_compression_children[compression_parent] += 1
3429
self._num_compression_children[compression_parent] = 1
2571
self._compression_children.setdefault(compression_parent,
2574
for parent in parents:
2575
self._annotate_children.setdefault(parent,
2577
num_gens = generation - kept_generation
2578
if ((num_gens >= self._generations_until_keep)
2579
and len(parents) > 1):
2580
kept_generation = generation
2581
self._nodes_to_keep_annotations.add(key)
3431
2583
missing_versions = this_iteration.difference(build_details.keys())
3432
if missing_versions:
3433
for key in missing_versions:
3434
if key in self._parent_map and key in self._text_cache:
3435
# We already have this text ready, we just need to
3436
# yield it later so we get it annotated
3438
parent_keys = self._parent_map[key]
3439
for parent_key in parent_keys:
3440
if parent_key in self._num_needed_children:
3441
self._num_needed_children[parent_key] += 1
3443
self._num_needed_children[parent_key] = 1
3444
pending.update([p for p in parent_keys
3445
if p not in self._all_build_details])
3447
raise errors.RevisionNotPresent(key, self._vf)
2584
self._ghosts.update(missing_versions)
2585
for missing_version in missing_versions:
2586
# add a key, no parents
2587
self._revision_id_graph[missing_version] = ()
2588
pending.discard(missing_version) # don't look for it
2589
if self._ghosts.intersection(self._compression_children):
2591
"We cannot have nodes which have a ghost compression parent:\n"
2593
"compression children: %r"
2594
% (self._ghosts, self._compression_children))
2595
# Cleanout anything that depends on a ghost so that we don't wait for
2596
# the ghost to show up
2597
for node in self._ghosts:
2598
if node in self._annotate_children:
2599
# We won't be building this node
2600
del self._annotate_children[node]
3448
2601
# Generally we will want to read the records in reverse order, because
3449
2602
# we find the parent nodes after the children
3450
2603
records.reverse()
3451
return records, ann_keys
3453
def _get_needed_texts(self, key, pb=None):
3454
# if True or len(self._vf._immediate_fallback_vfs) > 0:
3455
if len(self._vf._immediate_fallback_vfs) > 0:
3456
# If we have fallbacks, go to the generic path
3457
for v in annotate.Annotator._get_needed_texts(self, key, pb=pb):
3462
records, ann_keys = self._get_build_graph(key)
3463
for idx, (sub_key, text, num_lines) in enumerate(
3464
self._extract_texts(records)):
3466
pb.update('annotating', idx, len(records))
3467
yield sub_key, text, num_lines
3468
for sub_key in ann_keys:
3469
text = self._text_cache[sub_key]
3470
num_lines = len(text) # bad assumption
3471
yield sub_key, text, num_lines
3473
except errors.RetryWithNewPacks, e:
3474
self._vf._access.reload_or_raise(e)
3475
# The cached build_details are no longer valid
3476
self._all_build_details.clear()
3478
def _cache_delta_blocks(self, key, compression_parent, delta, lines):
3479
parent_lines = self._text_cache[compression_parent]
3480
blocks = list(KnitContent.get_line_delta_blocks(delta, parent_lines, lines))
3481
self._matching_blocks[(key, compression_parent)] = blocks
3483
def _expand_record(self, key, parent_keys, compression_parent, record,
3486
if compression_parent:
3487
if compression_parent not in self._content_objects:
3488
# Waiting for the parent
3489
self._pending_deltas.setdefault(compression_parent, []).append(
3490
(key, parent_keys, record, record_details))
3492
# We have the basis parent, so expand the delta
3493
num = self._num_compression_children[compression_parent]
3496
base_content = self._content_objects.pop(compression_parent)
3497
self._num_compression_children.pop(compression_parent)
3499
self._num_compression_children[compression_parent] = num
3500
base_content = self._content_objects[compression_parent]
3501
# It is tempting to want to copy_base_content=False for the last
3502
# child object. However, whenever noeol=False,
3503
# self._text_cache[parent_key] is content._lines. So mutating it
3504
# gives very bad results.
3505
# The alternative is to copy the lines into text cache, but then we
3506
# are copying anyway, so just do it here.
3507
content, delta = self._vf._factory.parse_record(
3508
key, record, record_details, base_content,
3509
copy_base_content=True)
3512
content, _ = self._vf._factory.parse_record(
3513
key, record, record_details, None)
3514
if self._num_compression_children.get(key, 0) > 0:
3515
self._content_objects[key] = content
3516
lines = content.text()
3517
self._text_cache[key] = lines
3518
if delta is not None:
3519
self._cache_delta_blocks(key, compression_parent, delta, lines)
3522
def _get_parent_annotations_and_matches(self, key, text, parent_key):
3523
"""Get the list of annotations for the parent, and the matching lines.
3525
:param text: The opaque value given by _get_needed_texts
3526
:param parent_key: The key for the parent text
3527
:return: (parent_annotations, matching_blocks)
3528
parent_annotations is a list as long as the number of lines in
3530
matching_blocks is a list of (parent_idx, text_idx, len) tuples
3531
indicating which lines match between the two texts
3533
block_key = (key, parent_key)
3534
if block_key in self._matching_blocks:
3535
blocks = self._matching_blocks.pop(block_key)
3536
parent_annotations = self._annotations_cache[parent_key]
3537
return parent_annotations, blocks
3538
return annotate.Annotator._get_parent_annotations_and_matches(self,
3539
key, text, parent_key)
3541
def _process_pending(self, key):
3542
"""The content for 'key' was just processed.
3544
Determine if there is any more pending work to be processed.
3547
if key in self._pending_deltas:
3548
compression_parent = key
3549
children = self._pending_deltas.pop(key)
3550
for child_key, parent_keys, record, record_details in children:
3551
lines = self._expand_record(child_key, parent_keys,
3553
record, record_details)
3554
if self._check_ready_for_annotations(child_key, parent_keys):
3555
to_return.append(child_key)
3556
# Also check any children that are waiting for this parent to be
3558
if key in self._pending_annotation:
3559
children = self._pending_annotation.pop(key)
3560
to_return.extend([c for c, p_keys in children
3561
if self._check_ready_for_annotations(c, p_keys)])
3564
def _check_ready_for_annotations(self, key, parent_keys):
3565
"""return true if this text is ready to be yielded.
3567
Otherwise, this will return False, and queue the text into
3568
self._pending_annotation
3570
for parent_key in parent_keys:
3571
if parent_key not in self._annotations_cache:
3572
# still waiting on at least one parent text, so queue it up
3573
# Note that if there are multiple parents, we need to wait
3575
self._pending_annotation.setdefault(parent_key,
3576
[]).append((key, parent_keys))
3580
def _extract_texts(self, records):
3581
"""Extract the various texts needed based on records"""
2606
def _annotate_records(self, records):
2607
"""Build the annotations for the listed records."""
3582
2608
# We iterate in the order read, rather than a strict order requested
3583
2609
# However, process what we can, and put off to the side things that
3584
2610
# still need parents, cleaning them up when those parents are
3587
# 1) As 'records' are read, see if we can expand these records into
3588
# Content objects (and thus lines)
3589
# 2) If a given line-delta is waiting on its compression parent, it
3590
# gets queued up into self._pending_deltas, otherwise we expand
3591
# it, and put it into self._text_cache and self._content_objects
3592
# 3) If we expanded the text, we will then check to see if all
3593
# parents have also been processed. If so, this text gets yielded,
3594
# else this record gets set aside into pending_annotation
3595
# 4) Further, if we expanded the text in (2), we will then check to
3596
# see if there are any children in self._pending_deltas waiting to
3597
# also be processed. If so, we go back to (2) for those
3598
# 5) Further again, if we yielded the text, we can then check if that
3599
# 'unlocks' any of the texts in pending_annotations, which should
3600
# then get yielded as well
3601
# Note that both steps 4 and 5 are 'recursive' in that unlocking one
3602
# compression child could unlock yet another, and yielding a fulltext
3603
# will also 'unlock' the children that are waiting on that annotation.
3604
# (Though also, unlocking 1 parent's fulltext, does not unlock a child
3605
# if other parents are also waiting.)
3606
# We want to yield content before expanding child content objects, so
3607
# that we know when we can re-use the content lines, and the annotation
3608
# code can know when it can stop caching fulltexts, as well.
3610
# Children that are missing their compression parent
3612
for (key, record, digest) in self._vf._read_records_iter(records):
3614
details = self._all_build_details[key]
3615
(_, compression_parent, parent_keys, record_details) = details
3616
lines = self._expand_record(key, parent_keys, compression_parent,
3617
record, record_details)
3619
# Pending delta should be queued up
2612
for (rev_id, record,
2613
digest) in self._knit._read_records_iter(records):
2614
if rev_id in self._annotated_lines:
3621
# At this point, we may be able to yield this content, if all
3622
# parents are also finished
3623
yield_this_text = self._check_ready_for_annotations(key,
3626
# All parents present
3627
yield key, lines, len(lines)
3628
to_process = self._process_pending(key)
3630
this_process = to_process
3632
for key in this_process:
3633
lines = self._text_cache[key]
3634
yield key, lines, len(lines)
3635
to_process.extend(self._process_pending(key))
2616
parent_ids = self._revision_id_graph[rev_id]
2617
parent_ids = [p for p in parent_ids if p not in self._ghosts]
2618
details = self._all_build_details[rev_id]
2619
(index_memo, compression_parent, parents,
2620
record_details) = details
2621
nodes_to_annotate = []
2622
# TODO: Remove the punning between compression parents, and
2623
# parent_ids, we should be able to do this without assuming
2625
if len(parent_ids) == 0:
2626
# There are no parents for this node, so just add it
2627
# TODO: This probably needs to be decoupled
2628
fulltext_content, delta = self._knit._factory.parse_record(
2629
rev_id, record, record_details, None)
2630
fulltext = self._add_fulltext_content(rev_id, fulltext_content)
2631
nodes_to_annotate.extend(self._add_annotation(rev_id, fulltext,
2632
parent_ids, left_matching_blocks=None))
2634
child = (rev_id, parent_ids, record)
2635
# Check if all the parents are present
2636
self._check_parents(child, nodes_to_annotate)
2637
while nodes_to_annotate:
2638
# Should we use a queue here instead of a stack?
2639
(rev_id, parent_ids, record) = nodes_to_annotate.pop()
2640
(index_memo, compression_parent, parents,
2641
record_details) = self._all_build_details[rev_id]
2642
if compression_parent is not None:
2643
comp_children = self._compression_children[compression_parent]
2644
if rev_id not in comp_children:
2645
raise AssertionError("%r not in compression children %r"
2646
% (rev_id, comp_children))
2647
# If there is only 1 child, it is safe to reuse this
2649
reuse_content = (len(comp_children) == 1
2650
and compression_parent not in
2651
self._nodes_to_keep_annotations)
2653
# Remove it from the cache since it will be changing
2654
parent_fulltext_content = self._fulltext_contents.pop(compression_parent)
2655
# Make sure to copy the fulltext since it might be
2657
parent_fulltext = list(parent_fulltext_content.text())
2659
parent_fulltext_content = self._fulltext_contents[compression_parent]
2660
parent_fulltext = parent_fulltext_content.text()
2661
comp_children.remove(rev_id)
2662
fulltext_content, delta = self._knit._factory.parse_record(
2663
rev_id, record, record_details,
2664
parent_fulltext_content,
2665
copy_base_content=(not reuse_content))
2666
fulltext = self._add_fulltext_content(rev_id,
2668
blocks = KnitContent.get_line_delta_blocks(delta,
2669
parent_fulltext, fulltext)
2671
fulltext_content = self._knit._factory.parse_fulltext(
2673
fulltext = self._add_fulltext_content(rev_id,
2676
nodes_to_annotate.extend(
2677
self._add_annotation(rev_id, fulltext, parent_ids,
2678
left_matching_blocks=blocks))
2680
def _get_heads_provider(self):
2681
"""Create a heads provider for resolving ancestry issues."""
2682
if self._heads_provider is not None:
2683
return self._heads_provider
2684
parent_provider = _mod_graph.DictParentsProvider(
2685
self._revision_id_graph)
2686
graph_obj = _mod_graph.Graph(parent_provider)
2687
head_cache = _mod_graph.FrozenHeadsCache(graph_obj)
2688
self._heads_provider = head_cache
2691
def annotate(self, key):
2692
"""Return the annotated fulltext at the given key.
2694
:param key: The key to annotate.
2696
if True or len(self._knit._fallback_vfs) > 0:
2697
# stacked knits can't use the fast path at present.
2698
return self._simple_annotate(key)
2699
records = self._get_build_graph(key)
2700
if key in self._ghosts:
2701
raise errors.RevisionNotPresent(key, self._knit)
2702
self._annotate_records(records)
2703
return self._annotated_lines[key]
2705
def _simple_annotate(self, key):
2706
"""Return annotated fulltext, rediffing from the full texts.
2708
This is slow but makes no assumptions about the repository
2709
being able to produce line deltas.
2711
# TODO: this code generates a parent maps of present ancestors; it
2712
# could be split out into a separate method, and probably should use
2713
# iter_ancestry instead. -- mbp and robertc 20080704
2714
graph = _mod_graph.Graph(self._knit)
2715
head_cache = _mod_graph.FrozenHeadsCache(graph)
2716
search = graph._make_breadth_first_searcher([key])
2720
present, ghosts = search.next_with_ghosts()
2721
except StopIteration:
2723
keys.update(present)
2724
parent_map = self._knit.get_parent_map(keys)
2726
reannotate = annotate.reannotate
2727
for record in self._knit.get_record_stream(keys, 'topological', True):
2729
fulltext = split_lines(record.get_bytes_as('fulltext'))
2730
parents = parent_map[key]
2731
if parents is not None:
2732
parent_lines = [parent_cache[parent] for parent in parent_map[key]]
2735
parent_cache[key] = list(
2736
reannotate(parent_lines, fulltext, key, None, head_cache))
2738
return parent_cache[key]
2740
raise errors.RevisionNotPresent(key, self._knit)
3638
from bzrlib._knit_load_data_pyx import _load_data_c as _load_data
3639
except ImportError, e:
3640
osutils.failed_to_load_extension(e)
2744
from bzrlib._knit_load_data_c import _load_data_c as _load_data
3641
2746
from bzrlib._knit_load_data_py import _load_data_py as _load_data