1648
1448
elif record.storage_kind == 'chunked':
1649
1449
self.add_lines(record.key, parents,
1650
1450
osutils.chunks_to_lines(record.get_bytes_as('chunked')))
1451
elif record.storage_kind == 'fulltext':
1452
self.add_lines(record.key, parents,
1453
split_lines(record.get_bytes_as('fulltext')))
1652
# Not suitable for direct insertion as a
1455
# Not a fulltext, and not suitable for direct insertion as a
1653
1456
# delta, either because it's not the right format, or this
1654
1457
# KnitVersionedFiles doesn't permit deltas (_max_delta_chain ==
1655
1458
# 0) or because it depends on a base only present in the
1656
1459
# fallback kvfs.
1657
self._access.flush()
1659
# Try getting a fulltext directly from the record.
1660
bytes = record.get_bytes_as('fulltext')
1661
except errors.UnavailableRepresentation:
1662
adapter_key = record.storage_kind, 'fulltext'
1663
adapter = get_adapter(adapter_key)
1664
bytes = adapter.get_bytes(record)
1665
lines = split_lines(bytes)
1460
adapter_key = record.storage_kind, 'fulltext'
1461
adapter = get_adapter(adapter_key)
1462
lines = split_lines(adapter.get_bytes(
1463
record, record.get_bytes_as(record.storage_kind)))
1667
1465
self.add_lines(record.key, parents, lines)
1668
1466
except errors.RevisionAlreadyPresent:
1670
1468
# Add any records whose basis parent is now available.
1672
added_keys = [record.key]
1674
key = added_keys.pop(0)
1675
if key in buffered_index_entries:
1676
index_entries = buffered_index_entries[key]
1677
self._index.add_records(index_entries)
1679
[index_entry[0] for index_entry in index_entries])
1680
del buffered_index_entries[key]
1469
added_keys = [record.key]
1471
key = added_keys.pop(0)
1472
if key in buffered_index_entries:
1473
index_entries = buffered_index_entries[key]
1474
self._index.add_records(index_entries)
1476
[index_entry[0] for index_entry in index_entries])
1477
del buffered_index_entries[key]
1478
# If there were any deltas which had a missing basis parent, error.
1681
1479
if buffered_index_entries:
1682
# There were index entries buffered at the end of the stream,
1683
# So these need to be added (if the index supports holding such
1684
# entries for later insertion)
1685
for key in buffered_index_entries:
1686
index_entries = buffered_index_entries[key]
1687
self._index.add_records(index_entries,
1688
missing_compression_parents=True)
1690
def get_missing_compression_parent_keys(self):
1691
"""Return an iterable of keys of missing compression parents.
1693
Check this after calling insert_record_stream to find out if there are
1694
any missing compression parents. If there are, the records that
1695
depend on them are not able to be inserted safely. For atomic
1696
KnitVersionedFiles built on packs, the transaction should be aborted or
1697
suspended - commit will fail at this point. Nonatomic knits will error
1698
earlier because they have no staging area to put pending entries into.
1700
return self._index.get_missing_compression_parents()
1480
from pprint import pformat
1481
raise errors.BzrCheckError(
1482
"record_stream refers to compression parents not in %r:\n%s"
1483
% (self, pformat(sorted(buffered_index_entries.keys()))))
1702
1485
def iter_lines_added_or_present_in_keys(self, keys, pb=None):
1703
1486
"""Iterate over the lines in the versioned files from keys.
1993
class _ContentMapGenerator(object):
1994
"""Generate texts or expose raw deltas for a set of texts."""
1996
def _get_content(self, key):
1997
"""Get the content object for key."""
1998
# Note that _get_content is only called when the _ContentMapGenerator
1999
# has been constructed with just one key requested for reconstruction.
2000
if key in self.nonlocal_keys:
2001
record = self.get_record_stream().next()
2002
# Create a content object on the fly
2003
lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
2004
return PlainKnitContent(lines, record.key)
2006
# local keys we can ask for directly
2007
return self._get_one_work(key)
2009
def get_record_stream(self):
2010
"""Get a record stream for the keys requested during __init__."""
2011
for record in self._work():
2015
"""Produce maps of text and KnitContents as dicts.
2017
:return: (text_map, content_map) where text_map contains the texts for
2018
the requested versions and content_map contains the KnitContents.
2020
# NB: By definition we never need to read remote sources unless texts
2021
# are requested from them: we don't delta across stores - and we
2022
# explicitly do not want to to prevent data loss situations.
2023
if self.global_map is None:
2024
self.global_map = self.vf.get_parent_map(self.keys)
2025
nonlocal_keys = self.nonlocal_keys
2027
missing_keys = set(nonlocal_keys)
2028
# Read from remote versioned file instances and provide to our caller.
2029
for source in self.vf._fallback_vfs:
2030
if not missing_keys:
2032
# Loop over fallback repositories asking them for texts - ignore
2033
# any missing from a particular fallback.
2034
for record in source.get_record_stream(missing_keys,
2036
if record.storage_kind == 'absent':
2037
# Not in thie particular stream, may be in one of the
2038
# other fallback vfs objects.
2040
missing_keys.remove(record.key)
2043
if self._raw_record_map is None:
2044
raise AssertionError('_raw_record_map should have been filled')
2046
for key in self.keys:
2047
if key in self.nonlocal_keys:
2049
yield LazyKnitContentFactory(key, self.global_map[key], self, first)
2052
def _get_one_work(self, requested_key):
2053
# Now, if we have calculated everything already, just return the
2055
if requested_key in self._contents_map:
2056
return self._contents_map[requested_key]
2057
# To simplify things, parse everything at once - code that wants one text
2058
# probably wants them all.
2059
# FUTURE: This function could be improved for the 'extract many' case
2060
# by tracking each component and only doing the copy when the number of
2061
# children than need to apply delta's to it is > 1 or it is part of the
2063
multiple_versions = len(self.keys) != 1
2064
if self._record_map is None:
2065
self._record_map = self.vf._raw_map_to_record_map(
2066
self._raw_record_map)
2067
record_map = self._record_map
2068
# raw_record_map is key:
2069
# Have read and parsed records at this point.
2070
for key in self.keys:
2071
if key in self.nonlocal_keys:
2076
while cursor is not None:
2078
record, record_details, digest, next = record_map[cursor]
2080
raise RevisionNotPresent(cursor, self)
2081
components.append((cursor, record, record_details, digest))
2083
if cursor in self._contents_map:
2084
# no need to plan further back
2085
components.append((cursor, None, None, None))
2089
for (component_id, record, record_details,
2090
digest) in reversed(components):
2091
if component_id in self._contents_map:
2092
content = self._contents_map[component_id]
2094
content, delta = self._factory.parse_record(key[-1],
2095
record, record_details, content,
2096
copy_base_content=multiple_versions)
2097
if multiple_versions:
2098
self._contents_map[component_id] = content
2100
# digest here is the digest from the last applied component.
2101
text = content.text()
2102
actual_sha = sha_strings(text)
2103
if actual_sha != digest:
2104
raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
2105
if multiple_versions:
2106
return self._contents_map[requested_key]
2110
def _wire_bytes(self):
2111
"""Get the bytes to put on the wire for 'key'.
2113
The first collection of bytes asked for returns the serialised
2114
raw_record_map and the additional details (key, parent) for key.
2115
Subsequent calls return just the additional details (key, parent).
2116
The wire storage_kind given for the first key is 'knit-delta-closure',
2117
For subsequent keys it is 'knit-delta-closure-ref'.
2119
:param key: A key from the content generator.
2120
:return: Bytes to put on the wire.
2123
# kind marker for dispatch on the far side,
2124
lines.append('knit-delta-closure')
2126
if self.vf._factory.annotated:
2127
lines.append('annotated')
2130
# then the list of keys
2131
lines.append('\t'.join(['\x00'.join(key) for key in self.keys
2132
if key not in self.nonlocal_keys]))
2133
# then the _raw_record_map in serialised form:
2135
# for each item in the map:
2137
# 1 line with parents if the key is to be yielded (None: for None, '' for ())
2138
# one line with method
2139
# one line with noeol
2140
# one line with next ('' for None)
2141
# one line with byte count of the record bytes
2143
for key, (record_bytes, (method, noeol), next) in \
2144
self._raw_record_map.iteritems():
2145
key_bytes = '\x00'.join(key)
2146
parents = self.global_map.get(key, None)
2148
parent_bytes = 'None:'
2150
parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
2151
method_bytes = method
2157
next_bytes = '\x00'.join(next)
2160
map_byte_list.append('%s\n%s\n%s\n%s\n%s\n%d\n%s' % (
2161
key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
2162
len(record_bytes), record_bytes))
2163
map_bytes = ''.join(map_byte_list)
2164
lines.append(map_bytes)
2165
bytes = '\n'.join(lines)
2169
class _VFContentMapGenerator(_ContentMapGenerator):
2170
"""Content map generator reading from a VersionedFiles object."""
2172
def __init__(self, versioned_files, keys, nonlocal_keys=None,
2173
global_map=None, raw_record_map=None):
2174
"""Create a _ContentMapGenerator.
2176
:param versioned_files: The versioned files that the texts are being
2178
:param keys: The keys to produce content maps for.
2179
:param nonlocal_keys: An iterable of keys(possibly intersecting keys)
2180
which are known to not be in this knit, but rather in one of the
2182
:param global_map: The result of get_parent_map(keys) (or a supermap).
2183
This is required if get_record_stream() is to be used.
2184
:param raw_record_map: A unparsed raw record map to use for answering
2187
# The vf to source data from
2188
self.vf = versioned_files
2190
self.keys = list(keys)
2191
# Keys known to be in fallback vfs objects
2192
if nonlocal_keys is None:
2193
self.nonlocal_keys = set()
2195
self.nonlocal_keys = frozenset(nonlocal_keys)
2196
# Parents data for keys to be returned in get_record_stream
2197
self.global_map = global_map
2198
# The chunked lists for self.keys in text form
2200
# A cache of KnitContent objects used in extracting texts.
2201
self._contents_map = {}
2202
# All the knit records needed to assemble the requested keys as full
2204
self._record_map = None
2205
if raw_record_map is None:
2206
self._raw_record_map = self.vf._get_record_map_unparsed(keys,
2209
self._raw_record_map = raw_record_map
2210
# the factory for parsing records
2211
self._factory = self.vf._factory
2214
class _NetworkContentMapGenerator(_ContentMapGenerator):
2215
"""Content map generator sourced from a network stream."""
2217
def __init__(self, bytes, line_end):
2218
"""Construct a _NetworkContentMapGenerator from a bytes block."""
2220
self.global_map = {}
2221
self._raw_record_map = {}
2222
self._contents_map = {}
2223
self._record_map = None
2224
self.nonlocal_keys = []
2225
# Get access to record parsing facilities
2226
self.vf = KnitVersionedFiles(None, None)
2229
line_end = bytes.find('\n', start)
2230
line = bytes[start:line_end]
2231
start = line_end + 1
2232
if line == 'annotated':
2233
self._factory = KnitAnnotateFactory()
2235
self._factory = KnitPlainFactory()
2236
# list of keys to emit in get_record_stream
2237
line_end = bytes.find('\n', start)
2238
line = bytes[start:line_end]
2239
start = line_end + 1
2241
tuple(segment.split('\x00')) for segment in line.split('\t')
2243
# now a loop until the end. XXX: It would be nice if this was just a
2244
# bunch of the same records as get_record_stream(..., False) gives, but
2245
# there is a decent sized gap stopping that at the moment.
2249
line_end = bytes.find('\n', start)
2250
key = tuple(bytes[start:line_end].split('\x00'))
2251
start = line_end + 1
2252
# 1 line with parents (None: for None, '' for ())
2253
line_end = bytes.find('\n', start)
2254
line = bytes[start:line_end]
2259
[tuple(segment.split('\x00')) for segment in line.split('\t')
2261
self.global_map[key] = parents
2262
start = line_end + 1
2263
# one line with method
2264
line_end = bytes.find('\n', start)
2265
line = bytes[start:line_end]
2267
start = line_end + 1
2268
# one line with noeol
2269
line_end = bytes.find('\n', start)
2270
line = bytes[start:line_end]
2272
start = line_end + 1
2273
# one line with next ('' for None)
2274
line_end = bytes.find('\n', start)
2275
line = bytes[start:line_end]
2279
next = tuple(bytes[start:line_end].split('\x00'))
2280
start = line_end + 1
2281
# one line with byte count of the record bytes
2282
line_end = bytes.find('\n', start)
2283
line = bytes[start:line_end]
2285
start = line_end + 1
2287
record_bytes = bytes[start:start+count]
2288
start = start + count
2290
self._raw_record_map[key] = (record_bytes, (method, noeol), next)
2292
def get_record_stream(self):
2293
"""Get a record stream for for keys requested by the bytestream."""
2295
for key in self.keys:
2296
yield LazyKnitContentFactory(key, self.global_map[key], self, first)
2299
def _wire_bytes(self):
2303
1769
class _KndxIndex(object):
2304
1770
"""Manages knit index files
3339
2660
annotator = _KnitAnnotator(knit)
3340
return iter(annotator.annotate_flat(revision_id))
3343
class _KnitAnnotator(annotate.Annotator):
2661
return iter(annotator.annotate(revision_id))
2664
class _KnitAnnotator(object):
3344
2665
"""Build up the annotations for a text."""
3346
def __init__(self, vf):
3347
annotate.Annotator.__init__(self, vf)
3349
# TODO: handle Nodes which cannot be extracted
3350
# self._ghosts = set()
3352
# Map from (key, parent_key) => matching_blocks, should be 'use once'
3353
self._matching_blocks = {}
3355
# KnitContent objects
3356
self._content_objects = {}
3357
# The number of children that depend on this fulltext content object
3358
self._num_compression_children = {}
3359
# Delta records that need their compression parent before they can be
3361
self._pending_deltas = {}
3362
# Fulltext records that are waiting for their parents fulltexts before
3363
# they can be yielded for annotation
3364
self._pending_annotation = {}
2667
def __init__(self, knit):
2670
# Content objects, differs from fulltexts because of how final newlines
2671
# are treated by knits. the content objects here will always have a
2673
self._fulltext_contents = {}
2675
# Annotated lines of specific revisions
2676
self._annotated_lines = {}
2678
# Track the raw data for nodes that we could not process yet.
2679
# This maps the revision_id of the base to a list of children that will
2680
# annotated from it.
2681
self._pending_children = {}
2683
# Nodes which cannot be extracted
2684
self._ghosts = set()
2686
# Track how many children this node has, so we know if we need to keep
2688
self._annotate_children = {}
2689
self._compression_children = {}
3366
2691
self._all_build_details = {}
2692
# The children => parent revision_id graph
2693
self._revision_id_graph = {}
2695
self._heads_provider = None
2697
self._nodes_to_keep_annotations = set()
2698
self._generations_until_keep = 100
2700
def set_generations_until_keep(self, value):
2701
"""Set the number of generations before caching a node.
2703
Setting this to -1 will cache every merge node, setting this higher
2704
will cache fewer nodes.
2706
self._generations_until_keep = value
2708
def _add_fulltext_content(self, revision_id, content_obj):
2709
self._fulltext_contents[revision_id] = content_obj
2710
# TODO: jam 20080305 It might be good to check the sha1digest here
2711
return content_obj.text()
2713
def _check_parents(self, child, nodes_to_annotate):
2714
"""Check if all parents have been processed.
2716
:param child: A tuple of (rev_id, parents, raw_content)
2717
:param nodes_to_annotate: If child is ready, add it to
2718
nodes_to_annotate, otherwise put it back in self._pending_children
2720
for parent_id in child[1]:
2721
if (parent_id not in self._annotated_lines):
2722
# This parent is present, but another parent is missing
2723
self._pending_children.setdefault(parent_id,
2727
# This one is ready to be processed
2728
nodes_to_annotate.append(child)
2730
def _add_annotation(self, revision_id, fulltext, parent_ids,
2731
left_matching_blocks=None):
2732
"""Add an annotation entry.
2734
All parents should already have been annotated.
2735
:return: A list of children that now have their parents satisfied.
2737
a = self._annotated_lines
2738
annotated_parent_lines = [a[p] for p in parent_ids]
2739
annotated_lines = list(annotate.reannotate(annotated_parent_lines,
2740
fulltext, revision_id, left_matching_blocks,
2741
heads_provider=self._get_heads_provider()))
2742
self._annotated_lines[revision_id] = annotated_lines
2743
for p in parent_ids:
2744
ann_children = self._annotate_children[p]
2745
ann_children.remove(revision_id)
2746
if (not ann_children
2747
and p not in self._nodes_to_keep_annotations):
2748
del self._annotated_lines[p]
2749
del self._all_build_details[p]
2750
if p in self._fulltext_contents:
2751
del self._fulltext_contents[p]
2752
# Now that we've added this one, see if there are any pending
2753
# deltas to be done, certainly this parent is finished
2754
nodes_to_annotate = []
2755
for child in self._pending_children.pop(revision_id, []):
2756
self._check_parents(child, nodes_to_annotate)
2757
return nodes_to_annotate
3368
2759
def _get_build_graph(self, key):
3369
2760
"""Get the graphs for building texts and annotations.
3376
2767
:return: A list of (key, index_memo) records, suitable for
3377
passing to read_records_iter to start reading in the raw data from
2768
passing to read_records_iter to start reading in the raw data fro/
2771
if key in self._annotated_lines:
3380
2774
pending = set([key])
3383
self._num_needed_children[key] = 1
3385
2779
# get all pending nodes
3386
2781
this_iteration = pending
3387
build_details = self._vf._index.get_build_details(this_iteration)
2782
build_details = self._knit._index.get_build_details(this_iteration)
3388
2783
self._all_build_details.update(build_details)
3389
# new_nodes = self._vf._index._get_entries(this_iteration)
2784
# new_nodes = self._knit._index._get_entries(this_iteration)
3390
2785
pending = set()
3391
2786
for key, details in build_details.iteritems():
3392
(index_memo, compression_parent, parent_keys,
2787
(index_memo, compression_parent, parents,
3393
2788
record_details) = details
3394
self._parent_map[key] = parent_keys
3395
self._heads_provider = None
2789
self._revision_id_graph[key] = parents
3396
2790
records.append((key, index_memo))
3397
2791
# Do we actually need to check _annotated_lines?
3398
pending.update([p for p in parent_keys
3399
if p not in self._all_build_details])
3401
for parent_key in parent_keys:
3402
if parent_key in self._num_needed_children:
3403
self._num_needed_children[parent_key] += 1
3405
self._num_needed_children[parent_key] = 1
2792
pending.update(p for p in parents
2793
if p not in self._all_build_details)
3406
2794
if compression_parent:
3407
if compression_parent in self._num_compression_children:
3408
self._num_compression_children[compression_parent] += 1
3410
self._num_compression_children[compression_parent] = 1
2795
self._compression_children.setdefault(compression_parent,
2798
for parent in parents:
2799
self._annotate_children.setdefault(parent,
2801
num_gens = generation - kept_generation
2802
if ((num_gens >= self._generations_until_keep)
2803
and len(parents) > 1):
2804
kept_generation = generation
2805
self._nodes_to_keep_annotations.add(key)
3412
2807
missing_versions = this_iteration.difference(build_details.keys())
3413
if missing_versions:
3414
for key in missing_versions:
3415
if key in self._parent_map and key in self._text_cache:
3416
# We already have this text ready, we just need to
3417
# yield it later so we get it annotated
3419
parent_keys = self._parent_map[key]
3420
for parent_key in parent_keys:
3421
if parent_key in self._num_needed_children:
3422
self._num_needed_children[parent_key] += 1
3424
self._num_needed_children[parent_key] = 1
3425
pending.update([p for p in parent_keys
3426
if p not in self._all_build_details])
3428
raise errors.RevisionNotPresent(key, self._vf)
2808
self._ghosts.update(missing_versions)
2809
for missing_version in missing_versions:
2810
# add a key, no parents
2811
self._revision_id_graph[missing_version] = ()
2812
pending.discard(missing_version) # don't look for it
2813
if self._ghosts.intersection(self._compression_children):
2815
"We cannot have nodes which have a ghost compression parent:\n"
2817
"compression children: %r"
2818
% (self._ghosts, self._compression_children))
2819
# Cleanout anything that depends on a ghost so that we don't wait for
2820
# the ghost to show up
2821
for node in self._ghosts:
2822
if node in self._annotate_children:
2823
# We won't be building this node
2824
del self._annotate_children[node]
3429
2825
# Generally we will want to read the records in reverse order, because
3430
2826
# we find the parent nodes after the children
3431
2827
records.reverse()
3432
return records, ann_keys
3434
def _get_needed_texts(self, key, pb=None):
3435
# if True or len(self._vf._fallback_vfs) > 0:
3436
if len(self._vf._fallback_vfs) > 0:
3437
# If we have fallbacks, go to the generic path
3438
for v in annotate.Annotator._get_needed_texts(self, key, pb=pb):
3443
records, ann_keys = self._get_build_graph(key)
3444
for idx, (sub_key, text, num_lines) in enumerate(
3445
self._extract_texts(records)):
3447
pb.update('annotating', idx, len(records))
3448
yield sub_key, text, num_lines
3449
for sub_key in ann_keys:
3450
text = self._text_cache[sub_key]
3451
num_lines = len(text) # bad assumption
3452
yield sub_key, text, num_lines
3454
except errors.RetryWithNewPacks, e:
3455
self._vf._access.reload_or_raise(e)
3456
# The cached build_details are no longer valid
3457
self._all_build_details.clear()
3459
def _cache_delta_blocks(self, key, compression_parent, delta, lines):
3460
parent_lines = self._text_cache[compression_parent]
3461
blocks = list(KnitContent.get_line_delta_blocks(delta, parent_lines, lines))
3462
self._matching_blocks[(key, compression_parent)] = blocks
3464
def _expand_record(self, key, parent_keys, compression_parent, record,
3467
if compression_parent:
3468
if compression_parent not in self._content_objects:
3469
# Waiting for the parent
3470
self._pending_deltas.setdefault(compression_parent, []).append(
3471
(key, parent_keys, record, record_details))
3473
# We have the basis parent, so expand the delta
3474
num = self._num_compression_children[compression_parent]
3477
base_content = self._content_objects.pop(compression_parent)
3478
self._num_compression_children.pop(compression_parent)
3480
self._num_compression_children[compression_parent] = num
3481
base_content = self._content_objects[compression_parent]
3482
# It is tempting to want to copy_base_content=False for the last
3483
# child object. However, whenever noeol=False,
3484
# self._text_cache[parent_key] is content._lines. So mutating it
3485
# gives very bad results.
3486
# The alternative is to copy the lines into text cache, but then we
3487
# are copying anyway, so just do it here.
3488
content, delta = self._vf._factory.parse_record(
3489
key, record, record_details, base_content,
3490
copy_base_content=True)
3493
content, _ = self._vf._factory.parse_record(
3494
key, record, record_details, None)
3495
if self._num_compression_children.get(key, 0) > 0:
3496
self._content_objects[key] = content
3497
lines = content.text()
3498
self._text_cache[key] = lines
3499
if delta is not None:
3500
self._cache_delta_blocks(key, compression_parent, delta, lines)
3503
def _get_parent_annotations_and_matches(self, key, text, parent_key):
3504
"""Get the list of annotations for the parent, and the matching lines.
3506
:param text: The opaque value given by _get_needed_texts
3507
:param parent_key: The key for the parent text
3508
:return: (parent_annotations, matching_blocks)
3509
parent_annotations is a list as long as the number of lines in
3511
matching_blocks is a list of (parent_idx, text_idx, len) tuples
3512
indicating which lines match between the two texts
3514
block_key = (key, parent_key)
3515
if block_key in self._matching_blocks:
3516
blocks = self._matching_blocks.pop(block_key)
3517
parent_annotations = self._annotations_cache[parent_key]
3518
return parent_annotations, blocks
3519
return annotate.Annotator._get_parent_annotations_and_matches(self,
3520
key, text, parent_key)
3522
def _process_pending(self, key):
3523
"""The content for 'key' was just processed.
3525
Determine if there is any more pending work to be processed.
3528
if key in self._pending_deltas:
3529
compression_parent = key
3530
children = self._pending_deltas.pop(key)
3531
for child_key, parent_keys, record, record_details in children:
3532
lines = self._expand_record(child_key, parent_keys,
3534
record, record_details)
3535
if self._check_ready_for_annotations(child_key, parent_keys):
3536
to_return.append(child_key)
3537
# Also check any children that are waiting for this parent to be
3539
if key in self._pending_annotation:
3540
children = self._pending_annotation.pop(key)
3541
to_return.extend([c for c, p_keys in children
3542
if self._check_ready_for_annotations(c, p_keys)])
3545
def _check_ready_for_annotations(self, key, parent_keys):
3546
"""return true if this text is ready to be yielded.
3548
Otherwise, this will return False, and queue the text into
3549
self._pending_annotation
3551
for parent_key in parent_keys:
3552
if parent_key not in self._annotations_cache:
3553
# still waiting on at least one parent text, so queue it up
3554
# Note that if there are multiple parents, we need to wait
3556
self._pending_annotation.setdefault(parent_key,
3557
[]).append((key, parent_keys))
3561
def _extract_texts(self, records):
3562
"""Extract the various texts needed based on records"""
2830
def _annotate_records(self, records):
2831
"""Build the annotations for the listed records."""
3563
2832
# We iterate in the order read, rather than a strict order requested
3564
2833
# However, process what we can, and put off to the side things that
3565
2834
# still need parents, cleaning them up when those parents are
3568
# 1) As 'records' are read, see if we can expand these records into
3569
# Content objects (and thus lines)
3570
# 2) If a given line-delta is waiting on its compression parent, it
3571
# gets queued up into self._pending_deltas, otherwise we expand
3572
# it, and put it into self._text_cache and self._content_objects
3573
# 3) If we expanded the text, we will then check to see if all
3574
# parents have also been processed. If so, this text gets yielded,
3575
# else this record gets set aside into pending_annotation
3576
# 4) Further, if we expanded the text in (2), we will then check to
3577
# see if there are any children in self._pending_deltas waiting to
3578
# also be processed. If so, we go back to (2) for those
3579
# 5) Further again, if we yielded the text, we can then check if that
3580
# 'unlocks' any of the texts in pending_annotations, which should
3581
# then get yielded as well
3582
# Note that both steps 4 and 5 are 'recursive' in that unlocking one
3583
# compression child could unlock yet another, and yielding a fulltext
3584
# will also 'unlock' the children that are waiting on that annotation.
3585
# (Though also, unlocking 1 parent's fulltext, does not unlock a child
3586
# if other parents are also waiting.)
3587
# We want to yield content before expanding child content objects, so
3588
# that we know when we can re-use the content lines, and the annotation
3589
# code can know when it can stop caching fulltexts, as well.
3591
# Children that are missing their compression parent
3593
for (key, record, digest) in self._vf._read_records_iter(records):
3595
details = self._all_build_details[key]
3596
(_, compression_parent, parent_keys, record_details) = details
3597
lines = self._expand_record(key, parent_keys, compression_parent,
3598
record, record_details)
3600
# Pending delta should be queued up
2836
for (rev_id, record,
2837
digest) in self._knit._read_records_iter(records):
2838
if rev_id in self._annotated_lines:
3602
# At this point, we may be able to yield this content, if all
3603
# parents are also finished
3604
yield_this_text = self._check_ready_for_annotations(key,
3607
# All parents present
3608
yield key, lines, len(lines)
3609
to_process = self._process_pending(key)
3611
this_process = to_process
3613
for key in this_process:
3614
lines = self._text_cache[key]
3615
yield key, lines, len(lines)
3616
to_process.extend(self._process_pending(key))
2840
parent_ids = self._revision_id_graph[rev_id]
2841
parent_ids = [p for p in parent_ids if p not in self._ghosts]
2842
details = self._all_build_details[rev_id]
2843
(index_memo, compression_parent, parents,
2844
record_details) = details
2845
nodes_to_annotate = []
2846
# TODO: Remove the punning between compression parents, and
2847
# parent_ids, we should be able to do this without assuming
2849
if len(parent_ids) == 0:
2850
# There are no parents for this node, so just add it
2851
# TODO: This probably needs to be decoupled
2852
fulltext_content, delta = self._knit._factory.parse_record(
2853
rev_id, record, record_details, None)
2854
fulltext = self._add_fulltext_content(rev_id, fulltext_content)
2855
nodes_to_annotate.extend(self._add_annotation(rev_id, fulltext,
2856
parent_ids, left_matching_blocks=None))
2858
child = (rev_id, parent_ids, record)
2859
# Check if all the parents are present
2860
self._check_parents(child, nodes_to_annotate)
2861
while nodes_to_annotate:
2862
# Should we use a queue here instead of a stack?
2863
(rev_id, parent_ids, record) = nodes_to_annotate.pop()
2864
(index_memo, compression_parent, parents,
2865
record_details) = self._all_build_details[rev_id]
2867
if compression_parent is not None:
2868
comp_children = self._compression_children[compression_parent]
2869
if rev_id not in comp_children:
2870
raise AssertionError("%r not in compression children %r"
2871
% (rev_id, comp_children))
2872
# If there is only 1 child, it is safe to reuse this
2874
reuse_content = (len(comp_children) == 1
2875
and compression_parent not in
2876
self._nodes_to_keep_annotations)
2878
# Remove it from the cache since it will be changing
2879
parent_fulltext_content = self._fulltext_contents.pop(compression_parent)
2880
# Make sure to copy the fulltext since it might be
2882
parent_fulltext = list(parent_fulltext_content.text())
2884
parent_fulltext_content = self._fulltext_contents[compression_parent]
2885
parent_fulltext = parent_fulltext_content.text()
2886
comp_children.remove(rev_id)
2887
fulltext_content, delta = self._knit._factory.parse_record(
2888
rev_id, record, record_details,
2889
parent_fulltext_content,
2890
copy_base_content=(not reuse_content))
2891
fulltext = self._add_fulltext_content(rev_id,
2893
if compression_parent == parent_ids[0]:
2894
# the compression_parent is the left parent, so we can
2896
blocks = KnitContent.get_line_delta_blocks(delta,
2897
parent_fulltext, fulltext)
2899
fulltext_content = self._knit._factory.parse_fulltext(
2901
fulltext = self._add_fulltext_content(rev_id,
2903
nodes_to_annotate.extend(
2904
self._add_annotation(rev_id, fulltext, parent_ids,
2905
left_matching_blocks=blocks))
2907
def _get_heads_provider(self):
2908
"""Create a heads provider for resolving ancestry issues."""
2909
if self._heads_provider is not None:
2910
return self._heads_provider
2911
parent_provider = _mod_graph.DictParentsProvider(
2912
self._revision_id_graph)
2913
graph_obj = _mod_graph.Graph(parent_provider)
2914
head_cache = _mod_graph.FrozenHeadsCache(graph_obj)
2915
self._heads_provider = head_cache
2918
def annotate(self, key):
2919
"""Return the annotated fulltext at the given key.
2921
:param key: The key to annotate.
2923
if len(self._knit._fallback_vfs) > 0:
2924
# stacked knits can't use the fast path at present.
2925
return self._simple_annotate(key)
2928
records = self._get_build_graph(key)
2929
if key in self._ghosts:
2930
raise errors.RevisionNotPresent(key, self._knit)
2931
self._annotate_records(records)
2932
return self._annotated_lines[key]
2933
except errors.RetryWithNewPacks, e:
2934
self._knit._access.reload_or_raise(e)
2935
# The cached build_details are no longer valid
2936
self._all_build_details.clear()
2938
def _simple_annotate(self, key):
2939
"""Return annotated fulltext, rediffing from the full texts.
2941
This is slow but makes no assumptions about the repository
2942
being able to produce line deltas.
2944
# TODO: this code generates a parent maps of present ancestors; it
2945
# could be split out into a separate method, and probably should use
2946
# iter_ancestry instead. -- mbp and robertc 20080704
2947
graph = _mod_graph.Graph(self._knit)
2948
head_cache = _mod_graph.FrozenHeadsCache(graph)
2949
search = graph._make_breadth_first_searcher([key])
2953
present, ghosts = search.next_with_ghosts()
2954
except StopIteration:
2956
keys.update(present)
2957
parent_map = self._knit.get_parent_map(keys)
2959
reannotate = annotate.reannotate
2960
for record in self._knit.get_record_stream(keys, 'topological', True):
2962
fulltext = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
2963
parents = parent_map[key]
2964
if parents is not None:
2965
parent_lines = [parent_cache[parent] for parent in parent_map[key]]
2968
parent_cache[key] = list(
2969
reannotate(parent_lines, fulltext, key, None, head_cache))
2971
return parent_cache[key]
2973
raise errors.RevisionNotPresent(key, self._knit)
3619
2977
from bzrlib._knit_load_data_c import _load_data_c as _load_data