2007
1759
"""See VersionedFiles.keys."""
2008
1760
if 'evil' in debug.debug_flags:
2009
1761
trace.mutter_callsite(2, "keys scales with size of history")
2010
sources = [self._index] + self._immediate_fallback_vfs
1762
sources = [self._index] + self._fallback_vfs
2012
1764
for source in sources:
2013
1765
result.update(source.keys())
2017
class _ContentMapGenerator(object):
2018
"""Generate texts or expose raw deltas for a set of texts."""
2020
def __init__(self, ordering='unordered'):
2021
self._ordering = ordering
2023
def _get_content(self, key):
2024
"""Get the content object for key."""
2025
# Note that _get_content is only called when the _ContentMapGenerator
2026
# has been constructed with just one key requested for reconstruction.
2027
if key in self.nonlocal_keys:
2028
record = self.get_record_stream().next()
2029
# Create a content object on the fly
2030
lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
2031
return PlainKnitContent(lines, record.key)
2033
# local keys we can ask for directly
2034
return self._get_one_work(key)
2036
def get_record_stream(self):
2037
"""Get a record stream for the keys requested during __init__."""
2038
for record in self._work():
2042
"""Produce maps of text and KnitContents as dicts.
2044
:return: (text_map, content_map) where text_map contains the texts for
2045
the requested versions and content_map contains the KnitContents.
2047
# NB: By definition we never need to read remote sources unless texts
2048
# are requested from them: we don't delta across stores - and we
2049
# explicitly do not want to to prevent data loss situations.
2050
if self.global_map is None:
2051
self.global_map = self.vf.get_parent_map(self.keys)
2052
nonlocal_keys = self.nonlocal_keys
2054
missing_keys = set(nonlocal_keys)
2055
# Read from remote versioned file instances and provide to our caller.
2056
for source in self.vf._immediate_fallback_vfs:
2057
if not missing_keys:
2059
# Loop over fallback repositories asking them for texts - ignore
2060
# any missing from a particular fallback.
2061
for record in source.get_record_stream(missing_keys,
2062
self._ordering, True):
2063
if record.storage_kind == 'absent':
2064
# Not in thie particular stream, may be in one of the
2065
# other fallback vfs objects.
2067
missing_keys.remove(record.key)
2070
if self._raw_record_map is None:
2071
raise AssertionError('_raw_record_map should have been filled')
2073
for key in self.keys:
2074
if key in self.nonlocal_keys:
2076
yield LazyKnitContentFactory(key, self.global_map[key], self, first)
2079
def _get_one_work(self, requested_key):
2080
# Now, if we have calculated everything already, just return the
2082
if requested_key in self._contents_map:
2083
return self._contents_map[requested_key]
2084
# To simplify things, parse everything at once - code that wants one text
2085
# probably wants them all.
2086
# FUTURE: This function could be improved for the 'extract many' case
2087
# by tracking each component and only doing the copy when the number of
2088
# children than need to apply delta's to it is > 1 or it is part of the
2090
multiple_versions = len(self.keys) != 1
2091
if self._record_map is None:
2092
self._record_map = self.vf._raw_map_to_record_map(
2093
self._raw_record_map)
2094
record_map = self._record_map
2095
# raw_record_map is key:
2096
# Have read and parsed records at this point.
2097
for key in self.keys:
2098
if key in self.nonlocal_keys:
2103
while cursor is not None:
2105
record, record_details, digest, next = record_map[cursor]
2107
raise RevisionNotPresent(cursor, self)
2108
components.append((cursor, record, record_details, digest))
2110
if cursor in self._contents_map:
2111
# no need to plan further back
2112
components.append((cursor, None, None, None))
2116
for (component_id, record, record_details,
2117
digest) in reversed(components):
2118
if component_id in self._contents_map:
2119
content = self._contents_map[component_id]
2121
content, delta = self._factory.parse_record(key[-1],
2122
record, record_details, content,
2123
copy_base_content=multiple_versions)
2124
if multiple_versions:
2125
self._contents_map[component_id] = content
2127
# digest here is the digest from the last applied component.
2128
text = content.text()
2129
actual_sha = sha_strings(text)
2130
if actual_sha != digest:
2131
raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
2132
if multiple_versions:
2133
return self._contents_map[requested_key]
2137
def _wire_bytes(self):
2138
"""Get the bytes to put on the wire for 'key'.
2140
The first collection of bytes asked for returns the serialised
2141
raw_record_map and the additional details (key, parent) for key.
2142
Subsequent calls return just the additional details (key, parent).
2143
The wire storage_kind given for the first key is 'knit-delta-closure',
2144
For subsequent keys it is 'knit-delta-closure-ref'.
2146
:param key: A key from the content generator.
2147
:return: Bytes to put on the wire.
2150
# kind marker for dispatch on the far side,
2151
lines.append('knit-delta-closure')
2153
if self.vf._factory.annotated:
2154
lines.append('annotated')
2157
# then the list of keys
2158
lines.append('\t'.join(['\x00'.join(key) for key in self.keys
2159
if key not in self.nonlocal_keys]))
2160
# then the _raw_record_map in serialised form:
2162
# for each item in the map:
2164
# 1 line with parents if the key is to be yielded (None: for None, '' for ())
2165
# one line with method
2166
# one line with noeol
2167
# one line with next ('' for None)
2168
# one line with byte count of the record bytes
2170
for key, (record_bytes, (method, noeol), next) in \
2171
self._raw_record_map.iteritems():
2172
key_bytes = '\x00'.join(key)
2173
parents = self.global_map.get(key, None)
2175
parent_bytes = 'None:'
2177
parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
2178
method_bytes = method
2184
next_bytes = '\x00'.join(next)
2187
map_byte_list.append('%s\n%s\n%s\n%s\n%s\n%d\n%s' % (
2188
key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
2189
len(record_bytes), record_bytes))
2190
map_bytes = ''.join(map_byte_list)
2191
lines.append(map_bytes)
2192
bytes = '\n'.join(lines)
2196
class _VFContentMapGenerator(_ContentMapGenerator):
2197
"""Content map generator reading from a VersionedFiles object."""
2199
def __init__(self, versioned_files, keys, nonlocal_keys=None,
2200
global_map=None, raw_record_map=None, ordering='unordered'):
2201
"""Create a _ContentMapGenerator.
2203
:param versioned_files: The versioned files that the texts are being
2205
:param keys: The keys to produce content maps for.
2206
:param nonlocal_keys: An iterable of keys(possibly intersecting keys)
2207
which are known to not be in this knit, but rather in one of the
2209
:param global_map: The result of get_parent_map(keys) (or a supermap).
2210
This is required if get_record_stream() is to be used.
2211
:param raw_record_map: A unparsed raw record map to use for answering
2214
_ContentMapGenerator.__init__(self, ordering=ordering)
2215
# The vf to source data from
2216
self.vf = versioned_files
2218
self.keys = list(keys)
2219
# Keys known to be in fallback vfs objects
2220
if nonlocal_keys is None:
2221
self.nonlocal_keys = set()
2223
self.nonlocal_keys = frozenset(nonlocal_keys)
2224
# Parents data for keys to be returned in get_record_stream
2225
self.global_map = global_map
2226
# The chunked lists for self.keys in text form
2228
# A cache of KnitContent objects used in extracting texts.
2229
self._contents_map = {}
2230
# All the knit records needed to assemble the requested keys as full
2232
self._record_map = None
2233
if raw_record_map is None:
2234
self._raw_record_map = self.vf._get_record_map_unparsed(keys,
2237
self._raw_record_map = raw_record_map
2238
# the factory for parsing records
2239
self._factory = self.vf._factory
2242
class _NetworkContentMapGenerator(_ContentMapGenerator):
2243
"""Content map generator sourced from a network stream."""
2245
def __init__(self, bytes, line_end):
2246
"""Construct a _NetworkContentMapGenerator from a bytes block."""
2248
self.global_map = {}
2249
self._raw_record_map = {}
2250
self._contents_map = {}
2251
self._record_map = None
2252
self.nonlocal_keys = []
2253
# Get access to record parsing facilities
2254
self.vf = KnitVersionedFiles(None, None)
2257
line_end = bytes.find('\n', start)
2258
line = bytes[start:line_end]
2259
start = line_end + 1
2260
if line == 'annotated':
2261
self._factory = KnitAnnotateFactory()
2263
self._factory = KnitPlainFactory()
2264
# list of keys to emit in get_record_stream
2265
line_end = bytes.find('\n', start)
2266
line = bytes[start:line_end]
2267
start = line_end + 1
2269
tuple(segment.split('\x00')) for segment in line.split('\t')
2271
# now a loop until the end. XXX: It would be nice if this was just a
2272
# bunch of the same records as get_record_stream(..., False) gives, but
2273
# there is a decent sized gap stopping that at the moment.
2277
line_end = bytes.find('\n', start)
2278
key = tuple(bytes[start:line_end].split('\x00'))
2279
start = line_end + 1
2280
# 1 line with parents (None: for None, '' for ())
2281
line_end = bytes.find('\n', start)
2282
line = bytes[start:line_end]
2287
[tuple(segment.split('\x00')) for segment in line.split('\t')
2289
self.global_map[key] = parents
2290
start = line_end + 1
2291
# one line with method
2292
line_end = bytes.find('\n', start)
2293
line = bytes[start:line_end]
2295
start = line_end + 1
2296
# one line with noeol
2297
line_end = bytes.find('\n', start)
2298
line = bytes[start:line_end]
2300
start = line_end + 1
2301
# one line with next ('' for None)
2302
line_end = bytes.find('\n', start)
2303
line = bytes[start:line_end]
2307
next = tuple(bytes[start:line_end].split('\x00'))
2308
start = line_end + 1
2309
# one line with byte count of the record bytes
2310
line_end = bytes.find('\n', start)
2311
line = bytes[start:line_end]
2313
start = line_end + 1
2315
record_bytes = bytes[start:start+count]
2316
start = start + count
2318
self._raw_record_map[key] = (record_bytes, (method, noeol), next)
2320
def get_record_stream(self):
2321
"""Get a record stream for for keys requested by the bytestream."""
2323
for key in self.keys:
2324
yield LazyKnitContentFactory(key, self.global_map[key], self, first)
2327
def _wire_bytes(self):
2331
1769
class _KndxIndex(object):
2332
1770
"""Manages knit index files
2519
class _DirectPackAccess(object):
2520
"""Access to data in one or more packs with less translation."""
2522
def __init__(self, index_to_packs, reload_func=None):
2523
"""Create a _DirectPackAccess object.
2525
:param index_to_packs: A dict mapping index objects to the transport
2526
and file names for obtaining data.
2527
:param reload_func: A function to call if we determine that the pack
2528
files have moved and we need to reload our caches. See
2529
bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
2531
self._container_writer = None
2532
self._write_index = None
2533
self._indices = index_to_packs
2534
self._reload_func = reload_func
2536
def add_raw_records(self, key_sizes, raw_data):
2537
"""Add raw knit bytes to a storage area.
2539
The data is spooled to the container writer in one bytes-record per
2542
:param sizes: An iterable of tuples containing the key and size of each
2544
:param raw_data: A bytestring containing the data.
2545
:return: A list of memos to retrieve the record later. Each memo is an
2546
opaque index memo. For _DirectPackAccess the memo is (index, pos,
2547
length), where the index field is the write_index object supplied
2548
to the PackAccess object.
2550
if type(raw_data) != str:
2551
raise AssertionError(
2552
'data must be plain bytes was %s' % type(raw_data))
2555
for key, size in key_sizes:
2556
p_offset, p_length = self._container_writer.add_bytes_record(
2557
raw_data[offset:offset+size], [])
2559
result.append((self._write_index, p_offset, p_length))
2562
def get_raw_records(self, memos_for_retrieval):
2563
"""Get the raw bytes for a records.
2565
:param memos_for_retrieval: An iterable containing the (index, pos,
2566
length) memo for retrieving the bytes. The Pack access method
2567
looks up the pack to use for a given record in its index_to_pack
2569
:return: An iterator over the bytes of the records.
2571
# first pass, group into same-index requests
2573
current_index = None
2574
for (index, offset, length) in memos_for_retrieval:
2575
if current_index == index:
2576
current_list.append((offset, length))
2578
if current_index is not None:
2579
request_lists.append((current_index, current_list))
2580
current_index = index
2581
current_list = [(offset, length)]
2582
# handle the last entry
2583
if current_index is not None:
2584
request_lists.append((current_index, current_list))
2585
for index, offsets in request_lists:
2587
transport, path = self._indices[index]
2589
# A KeyError here indicates that someone has triggered an index
2590
# reload, and this index has gone missing, we need to start
2592
if self._reload_func is None:
2593
# If we don't have a _reload_func there is nothing that can
2596
raise errors.RetryWithNewPacks(index,
2597
reload_occurred=True,
2598
exc_info=sys.exc_info())
2600
reader = pack.make_readv_reader(transport, path, offsets)
2601
for names, read_func in reader.iter_records():
2602
yield read_func(None)
2603
except errors.NoSuchFile:
2604
# A NoSuchFile error indicates that a pack file has gone
2605
# missing on disk, we need to trigger a reload, and start over.
2606
if self._reload_func is None:
2608
raise errors.RetryWithNewPacks(transport.abspath(path),
2609
reload_occurred=False,
2610
exc_info=sys.exc_info())
2612
def set_writer(self, writer, index, transport_packname):
2613
"""Set a writer to use for adding data."""
2614
if index is not None:
2615
self._indices[index] = transport_packname
2616
self._container_writer = writer
2617
self._write_index = index
2619
def reload_or_raise(self, retry_exc):
2620
"""Try calling the reload function, or re-raise the original exception.
2622
This should be called after _DirectPackAccess raises a
2623
RetryWithNewPacks exception. This function will handle the common logic
2624
of determining when the error is fatal versus being temporary.
2625
It will also make sure that the original exception is raised, rather
2626
than the RetryWithNewPacks exception.
2628
If this function returns, then the calling function should retry
2629
whatever operation was being performed. Otherwise an exception will
2632
:param retry_exc: A RetryWithNewPacks exception.
2635
if self._reload_func is None:
2637
elif not self._reload_func():
2638
# The reload claimed that nothing changed
2639
if not retry_exc.reload_occurred:
2640
# If there wasn't an earlier reload, then we really were
2641
# expecting to find changes. We didn't find them, so this is a
2645
exc_class, exc_value, exc_traceback = retry_exc.exc_info
2646
raise exc_class, exc_value, exc_traceback
2649
# Deprecated, use PatienceSequenceMatcher instead
2650
KnitSequenceMatcher = patiencediff.PatienceSequenceMatcher
3215
2653
def annotate_knit(knit, revision_id):
3216
2654
"""Annotate a knit with no cached annotations.
3222
2660
annotator = _KnitAnnotator(knit)
3223
return iter(annotator.annotate_flat(revision_id))
3226
class _KnitAnnotator(annotate.Annotator):
2661
return iter(annotator.annotate(revision_id))
2664
class _KnitAnnotator(object):
3227
2665
"""Build up the annotations for a text."""
3229
def __init__(self, vf):
3230
annotate.Annotator.__init__(self, vf)
3232
# TODO: handle Nodes which cannot be extracted
3233
# self._ghosts = set()
3235
# Map from (key, parent_key) => matching_blocks, should be 'use once'
3236
self._matching_blocks = {}
3238
# KnitContent objects
3239
self._content_objects = {}
3240
# The number of children that depend on this fulltext content object
3241
self._num_compression_children = {}
3242
# Delta records that need their compression parent before they can be
3244
self._pending_deltas = {}
3245
# Fulltext records that are waiting for their parents fulltexts before
3246
# they can be yielded for annotation
3247
self._pending_annotation = {}
2667
def __init__(self, knit):
2670
# Content objects, differs from fulltexts because of how final newlines
2671
# are treated by knits. the content objects here will always have a
2673
self._fulltext_contents = {}
2675
# Annotated lines of specific revisions
2676
self._annotated_lines = {}
2678
# Track the raw data for nodes that we could not process yet.
2679
# This maps the revision_id of the base to a list of children that will
2680
# annotated from it.
2681
self._pending_children = {}
2683
# Nodes which cannot be extracted
2684
self._ghosts = set()
2686
# Track how many children this node has, so we know if we need to keep
2688
self._annotate_children = {}
2689
self._compression_children = {}
3249
2691
self._all_build_details = {}
2692
# The children => parent revision_id graph
2693
self._revision_id_graph = {}
2695
self._heads_provider = None
2697
self._nodes_to_keep_annotations = set()
2698
self._generations_until_keep = 100
2700
def set_generations_until_keep(self, value):
2701
"""Set the number of generations before caching a node.
2703
Setting this to -1 will cache every merge node, setting this higher
2704
will cache fewer nodes.
2706
self._generations_until_keep = value
2708
def _add_fulltext_content(self, revision_id, content_obj):
2709
self._fulltext_contents[revision_id] = content_obj
2710
# TODO: jam 20080305 It might be good to check the sha1digest here
2711
return content_obj.text()
2713
def _check_parents(self, child, nodes_to_annotate):
2714
"""Check if all parents have been processed.
2716
:param child: A tuple of (rev_id, parents, raw_content)
2717
:param nodes_to_annotate: If child is ready, add it to
2718
nodes_to_annotate, otherwise put it back in self._pending_children
2720
for parent_id in child[1]:
2721
if (parent_id not in self._annotated_lines):
2722
# This parent is present, but another parent is missing
2723
self._pending_children.setdefault(parent_id,
2727
# This one is ready to be processed
2728
nodes_to_annotate.append(child)
2730
def _add_annotation(self, revision_id, fulltext, parent_ids,
2731
left_matching_blocks=None):
2732
"""Add an annotation entry.
2734
All parents should already have been annotated.
2735
:return: A list of children that now have their parents satisfied.
2737
a = self._annotated_lines
2738
annotated_parent_lines = [a[p] for p in parent_ids]
2739
annotated_lines = list(annotate.reannotate(annotated_parent_lines,
2740
fulltext, revision_id, left_matching_blocks,
2741
heads_provider=self._get_heads_provider()))
2742
self._annotated_lines[revision_id] = annotated_lines
2743
for p in parent_ids:
2744
ann_children = self._annotate_children[p]
2745
ann_children.remove(revision_id)
2746
if (not ann_children
2747
and p not in self._nodes_to_keep_annotations):
2748
del self._annotated_lines[p]
2749
del self._all_build_details[p]
2750
if p in self._fulltext_contents:
2751
del self._fulltext_contents[p]
2752
# Now that we've added this one, see if there are any pending
2753
# deltas to be done, certainly this parent is finished
2754
nodes_to_annotate = []
2755
for child in self._pending_children.pop(revision_id, []):
2756
self._check_parents(child, nodes_to_annotate)
2757
return nodes_to_annotate
3251
2759
def _get_build_graph(self, key):
3252
2760
"""Get the graphs for building texts and annotations.
3259
2767
:return: A list of (key, index_memo) records, suitable for
3260
passing to read_records_iter to start reading in the raw data from
2768
passing to read_records_iter to start reading in the raw data fro/
2771
if key in self._annotated_lines:
3263
2774
pending = set([key])
3266
self._num_needed_children[key] = 1
3268
2779
# get all pending nodes
3269
2781
this_iteration = pending
3270
build_details = self._vf._index.get_build_details(this_iteration)
2782
build_details = self._knit._index.get_build_details(this_iteration)
3271
2783
self._all_build_details.update(build_details)
3272
# new_nodes = self._vf._index._get_entries(this_iteration)
2784
# new_nodes = self._knit._index._get_entries(this_iteration)
3273
2785
pending = set()
3274
2786
for key, details in build_details.iteritems():
3275
(index_memo, compression_parent, parent_keys,
2787
(index_memo, compression_parent, parents,
3276
2788
record_details) = details
3277
self._parent_map[key] = parent_keys
3278
self._heads_provider = None
2789
self._revision_id_graph[key] = parents
3279
2790
records.append((key, index_memo))
3280
2791
# Do we actually need to check _annotated_lines?
3281
pending.update([p for p in parent_keys
3282
if p not in self._all_build_details])
3284
for parent_key in parent_keys:
3285
if parent_key in self._num_needed_children:
3286
self._num_needed_children[parent_key] += 1
3288
self._num_needed_children[parent_key] = 1
2792
pending.update(p for p in parents
2793
if p not in self._all_build_details)
3289
2794
if compression_parent:
3290
if compression_parent in self._num_compression_children:
3291
self._num_compression_children[compression_parent] += 1
3293
self._num_compression_children[compression_parent] = 1
2795
self._compression_children.setdefault(compression_parent,
2798
for parent in parents:
2799
self._annotate_children.setdefault(parent,
2801
num_gens = generation - kept_generation
2802
if ((num_gens >= self._generations_until_keep)
2803
and len(parents) > 1):
2804
kept_generation = generation
2805
self._nodes_to_keep_annotations.add(key)
3295
2807
missing_versions = this_iteration.difference(build_details.keys())
3296
if missing_versions:
3297
for key in missing_versions:
3298
if key in self._parent_map and key in self._text_cache:
3299
# We already have this text ready, we just need to
3300
# yield it later so we get it annotated
3302
parent_keys = self._parent_map[key]
3303
for parent_key in parent_keys:
3304
if parent_key in self._num_needed_children:
3305
self._num_needed_children[parent_key] += 1
3307
self._num_needed_children[parent_key] = 1
3308
pending.update([p for p in parent_keys
3309
if p not in self._all_build_details])
3311
raise errors.RevisionNotPresent(key, self._vf)
2808
self._ghosts.update(missing_versions)
2809
for missing_version in missing_versions:
2810
# add a key, no parents
2811
self._revision_id_graph[missing_version] = ()
2812
pending.discard(missing_version) # don't look for it
2813
if self._ghosts.intersection(self._compression_children):
2815
"We cannot have nodes which have a ghost compression parent:\n"
2817
"compression children: %r"
2818
% (self._ghosts, self._compression_children))
2819
# Cleanout anything that depends on a ghost so that we don't wait for
2820
# the ghost to show up
2821
for node in self._ghosts:
2822
if node in self._annotate_children:
2823
# We won't be building this node
2824
del self._annotate_children[node]
3312
2825
# Generally we will want to read the records in reverse order, because
3313
2826
# we find the parent nodes after the children
3314
2827
records.reverse()
3315
return records, ann_keys
3317
def _get_needed_texts(self, key, pb=None):
3318
# if True or len(self._vf._immediate_fallback_vfs) > 0:
3319
if len(self._vf._immediate_fallback_vfs) > 0:
3320
# If we have fallbacks, go to the generic path
3321
for v in annotate.Annotator._get_needed_texts(self, key, pb=pb):
3326
records, ann_keys = self._get_build_graph(key)
3327
for idx, (sub_key, text, num_lines) in enumerate(
3328
self._extract_texts(records)):
3330
pb.update(gettext('annotating'), idx, len(records))
3331
yield sub_key, text, num_lines
3332
for sub_key in ann_keys:
3333
text = self._text_cache[sub_key]
3334
num_lines = len(text) # bad assumption
3335
yield sub_key, text, num_lines
3337
except errors.RetryWithNewPacks, e:
3338
self._vf._access.reload_or_raise(e)
3339
# The cached build_details are no longer valid
3340
self._all_build_details.clear()
3342
def _cache_delta_blocks(self, key, compression_parent, delta, lines):
3343
parent_lines = self._text_cache[compression_parent]
3344
blocks = list(KnitContent.get_line_delta_blocks(delta, parent_lines, lines))
3345
self._matching_blocks[(key, compression_parent)] = blocks
3347
def _expand_record(self, key, parent_keys, compression_parent, record,
3350
if compression_parent:
3351
if compression_parent not in self._content_objects:
3352
# Waiting for the parent
3353
self._pending_deltas.setdefault(compression_parent, []).append(
3354
(key, parent_keys, record, record_details))
3356
# We have the basis parent, so expand the delta
3357
num = self._num_compression_children[compression_parent]
3360
base_content = self._content_objects.pop(compression_parent)
3361
self._num_compression_children.pop(compression_parent)
3363
self._num_compression_children[compression_parent] = num
3364
base_content = self._content_objects[compression_parent]
3365
# It is tempting to want to copy_base_content=False for the last
3366
# child object. However, whenever noeol=False,
3367
# self._text_cache[parent_key] is content._lines. So mutating it
3368
# gives very bad results.
3369
# The alternative is to copy the lines into text cache, but then we
3370
# are copying anyway, so just do it here.
3371
content, delta = self._vf._factory.parse_record(
3372
key, record, record_details, base_content,
3373
copy_base_content=True)
3376
content, _ = self._vf._factory.parse_record(
3377
key, record, record_details, None)
3378
if self._num_compression_children.get(key, 0) > 0:
3379
self._content_objects[key] = content
3380
lines = content.text()
3381
self._text_cache[key] = lines
3382
if delta is not None:
3383
self._cache_delta_blocks(key, compression_parent, delta, lines)
3386
def _get_parent_annotations_and_matches(self, key, text, parent_key):
3387
"""Get the list of annotations for the parent, and the matching lines.
3389
:param text: The opaque value given by _get_needed_texts
3390
:param parent_key: The key for the parent text
3391
:return: (parent_annotations, matching_blocks)
3392
parent_annotations is a list as long as the number of lines in
3394
matching_blocks is a list of (parent_idx, text_idx, len) tuples
3395
indicating which lines match between the two texts
3397
block_key = (key, parent_key)
3398
if block_key in self._matching_blocks:
3399
blocks = self._matching_blocks.pop(block_key)
3400
parent_annotations = self._annotations_cache[parent_key]
3401
return parent_annotations, blocks
3402
return annotate.Annotator._get_parent_annotations_and_matches(self,
3403
key, text, parent_key)
3405
def _process_pending(self, key):
3406
"""The content for 'key' was just processed.
3408
Determine if there is any more pending work to be processed.
3411
if key in self._pending_deltas:
3412
compression_parent = key
3413
children = self._pending_deltas.pop(key)
3414
for child_key, parent_keys, record, record_details in children:
3415
lines = self._expand_record(child_key, parent_keys,
3417
record, record_details)
3418
if self._check_ready_for_annotations(child_key, parent_keys):
3419
to_return.append(child_key)
3420
# Also check any children that are waiting for this parent to be
3422
if key in self._pending_annotation:
3423
children = self._pending_annotation.pop(key)
3424
to_return.extend([c for c, p_keys in children
3425
if self._check_ready_for_annotations(c, p_keys)])
3428
def _check_ready_for_annotations(self, key, parent_keys):
3429
"""return true if this text is ready to be yielded.
3431
Otherwise, this will return False, and queue the text into
3432
self._pending_annotation
3434
for parent_key in parent_keys:
3435
if parent_key not in self._annotations_cache:
3436
# still waiting on at least one parent text, so queue it up
3437
# Note that if there are multiple parents, we need to wait
3439
self._pending_annotation.setdefault(parent_key,
3440
[]).append((key, parent_keys))
3444
def _extract_texts(self, records):
3445
"""Extract the various texts needed based on records"""
2830
def _annotate_records(self, records):
2831
"""Build the annotations for the listed records."""
3446
2832
# We iterate in the order read, rather than a strict order requested
3447
2833
# However, process what we can, and put off to the side things that
3448
2834
# still need parents, cleaning them up when those parents are
3451
# 1) As 'records' are read, see if we can expand these records into
3452
# Content objects (and thus lines)
3453
# 2) If a given line-delta is waiting on its compression parent, it
3454
# gets queued up into self._pending_deltas, otherwise we expand
3455
# it, and put it into self._text_cache and self._content_objects
3456
# 3) If we expanded the text, we will then check to see if all
3457
# parents have also been processed. If so, this text gets yielded,
3458
# else this record gets set aside into pending_annotation
3459
# 4) Further, if we expanded the text in (2), we will then check to
3460
# see if there are any children in self._pending_deltas waiting to
3461
# also be processed. If so, we go back to (2) for those
3462
# 5) Further again, if we yielded the text, we can then check if that
3463
# 'unlocks' any of the texts in pending_annotations, which should
3464
# then get yielded as well
3465
# Note that both steps 4 and 5 are 'recursive' in that unlocking one
3466
# compression child could unlock yet another, and yielding a fulltext
3467
# will also 'unlock' the children that are waiting on that annotation.
3468
# (Though also, unlocking 1 parent's fulltext, does not unlock a child
3469
# if other parents are also waiting.)
3470
# We want to yield content before expanding child content objects, so
3471
# that we know when we can re-use the content lines, and the annotation
3472
# code can know when it can stop caching fulltexts, as well.
3474
# Children that are missing their compression parent
3476
for (key, record, digest) in self._vf._read_records_iter(records):
3478
details = self._all_build_details[key]
3479
(_, compression_parent, parent_keys, record_details) = details
3480
lines = self._expand_record(key, parent_keys, compression_parent,
3481
record, record_details)
3483
# Pending delta should be queued up
2836
for (rev_id, record,
2837
digest) in self._knit._read_records_iter(records):
2838
if rev_id in self._annotated_lines:
3485
# At this point, we may be able to yield this content, if all
3486
# parents are also finished
3487
yield_this_text = self._check_ready_for_annotations(key,
3490
# All parents present
3491
yield key, lines, len(lines)
3492
to_process = self._process_pending(key)
3494
this_process = to_process
3496
for key in this_process:
3497
lines = self._text_cache[key]
3498
yield key, lines, len(lines)
3499
to_process.extend(self._process_pending(key))
2840
parent_ids = self._revision_id_graph[rev_id]
2841
parent_ids = [p for p in parent_ids if p not in self._ghosts]
2842
details = self._all_build_details[rev_id]
2843
(index_memo, compression_parent, parents,
2844
record_details) = details
2845
nodes_to_annotate = []
2846
# TODO: Remove the punning between compression parents, and
2847
# parent_ids, we should be able to do this without assuming
2849
if len(parent_ids) == 0:
2850
# There are no parents for this node, so just add it
2851
# TODO: This probably needs to be decoupled
2852
fulltext_content, delta = self._knit._factory.parse_record(
2853
rev_id, record, record_details, None)
2854
fulltext = self._add_fulltext_content(rev_id, fulltext_content)
2855
nodes_to_annotate.extend(self._add_annotation(rev_id, fulltext,
2856
parent_ids, left_matching_blocks=None))
2858
child = (rev_id, parent_ids, record)
2859
# Check if all the parents are present
2860
self._check_parents(child, nodes_to_annotate)
2861
while nodes_to_annotate:
2862
# Should we use a queue here instead of a stack?
2863
(rev_id, parent_ids, record) = nodes_to_annotate.pop()
2864
(index_memo, compression_parent, parents,
2865
record_details) = self._all_build_details[rev_id]
2867
if compression_parent is not None:
2868
comp_children = self._compression_children[compression_parent]
2869
if rev_id not in comp_children:
2870
raise AssertionError("%r not in compression children %r"
2871
% (rev_id, comp_children))
2872
# If there is only 1 child, it is safe to reuse this
2874
reuse_content = (len(comp_children) == 1
2875
and compression_parent not in
2876
self._nodes_to_keep_annotations)
2878
# Remove it from the cache since it will be changing
2879
parent_fulltext_content = self._fulltext_contents.pop(compression_parent)
2880
# Make sure to copy the fulltext since it might be
2882
parent_fulltext = list(parent_fulltext_content.text())
2884
parent_fulltext_content = self._fulltext_contents[compression_parent]
2885
parent_fulltext = parent_fulltext_content.text()
2886
comp_children.remove(rev_id)
2887
fulltext_content, delta = self._knit._factory.parse_record(
2888
rev_id, record, record_details,
2889
parent_fulltext_content,
2890
copy_base_content=(not reuse_content))
2891
fulltext = self._add_fulltext_content(rev_id,
2893
if compression_parent == parent_ids[0]:
2894
# the compression_parent is the left parent, so we can
2896
blocks = KnitContent.get_line_delta_blocks(delta,
2897
parent_fulltext, fulltext)
2899
fulltext_content = self._knit._factory.parse_fulltext(
2901
fulltext = self._add_fulltext_content(rev_id,
2903
nodes_to_annotate.extend(
2904
self._add_annotation(rev_id, fulltext, parent_ids,
2905
left_matching_blocks=blocks))
2907
def _get_heads_provider(self):
2908
"""Create a heads provider for resolving ancestry issues."""
2909
if self._heads_provider is not None:
2910
return self._heads_provider
2911
parent_provider = _mod_graph.DictParentsProvider(
2912
self._revision_id_graph)
2913
graph_obj = _mod_graph.Graph(parent_provider)
2914
head_cache = _mod_graph.FrozenHeadsCache(graph_obj)
2915
self._heads_provider = head_cache
2918
def annotate(self, key):
2919
"""Return the annotated fulltext at the given key.
2921
:param key: The key to annotate.
2923
if len(self._knit._fallback_vfs) > 0:
2924
# stacked knits can't use the fast path at present.
2925
return self._simple_annotate(key)
2928
records = self._get_build_graph(key)
2929
if key in self._ghosts:
2930
raise errors.RevisionNotPresent(key, self._knit)
2931
self._annotate_records(records)
2932
return self._annotated_lines[key]
2933
except errors.RetryWithNewPacks, e:
2934
self._knit._access.reload_or_raise(e)
2935
# The cached build_details are no longer valid
2936
self._all_build_details.clear()
2938
def _simple_annotate(self, key):
2939
"""Return annotated fulltext, rediffing from the full texts.
2941
This is slow but makes no assumptions about the repository
2942
being able to produce line deltas.
2944
# TODO: this code generates a parent maps of present ancestors; it
2945
# could be split out into a separate method, and probably should use
2946
# iter_ancestry instead. -- mbp and robertc 20080704
2947
graph = _mod_graph.Graph(self._knit)
2948
head_cache = _mod_graph.FrozenHeadsCache(graph)
2949
search = graph._make_breadth_first_searcher([key])
2953
present, ghosts = search.next_with_ghosts()
2954
except StopIteration:
2956
keys.update(present)
2957
parent_map = self._knit.get_parent_map(keys)
2959
reannotate = annotate.reannotate
2960
for record in self._knit.get_record_stream(keys, 'topological', True):
2962
fulltext = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
2963
parents = parent_map[key]
2964
if parents is not None:
2965
parent_lines = [parent_cache[parent] for parent in parent_map[key]]
2968
parent_cache[key] = list(
2969
reannotate(parent_lines, fulltext, key, None, head_cache))
2971
return parent_cache[key]
2973
raise errors.RevisionNotPresent(key, self._knit)
3502
from bzrlib._knit_load_data_pyx import _load_data_c as _load_data
3503
except ImportError, e:
3504
osutils.failed_to_load_extension(e)
2977
from bzrlib._knit_load_data_c import _load_data_c as _load_data
3505
2979
from bzrlib._knit_load_data_py import _load_data_py as _load_data