918
771
text_map[version_id] = text
919
772
return text_map, final_content
921
def iter_lines_added_or_present_in_versions(self, version_ids=None,
774
def iter_lines_added_or_present_in_versions(self, version_ids=None):
923
775
"""See VersionedFile.iter_lines_added_or_present_in_versions()."""
924
776
if version_ids is None:
925
777
version_ids = self.versions()
927
version_ids = [osutils.safe_revision_id(v) for v in version_ids]
929
pb = progress.DummyProgress()
930
778
# we don't care about inclusions, the caller cares.
931
779
# but we need to setup a list of records to visit.
932
780
# we need version_id, position, length
933
781
version_id_records = []
934
requested_versions = set(version_ids)
782
requested_versions = list(version_ids)
935
783
# filter for available versions
936
784
for version_id in requested_versions:
937
785
if not self.has_version(version_id):
938
786
raise RevisionNotPresent(version_id, self.filename)
939
787
# get a in-component-order queue:
940
789
for version_id in self.versions():
941
790
if version_id in requested_versions:
942
index_memo = self._index.get_position(version_id)
943
version_id_records.append((version_id, index_memo))
791
version_ids.append(version_id)
792
data_pos, length = self._index.get_position(version_id)
793
version_id_records.append((version_id, data_pos, length))
795
pb = bzrlib.ui.ui_factory.nested_progress_bar()
945
797
total = len(version_id_records)
946
for version_idx, (version_id, data, sha_value) in \
947
enumerate(self._data.read_records_iter(version_id_records)):
948
pb.update('Walking content.', version_idx, total)
949
method = self._index.get_method(version_id)
951
assert method in ('fulltext', 'line-delta')
952
if method == 'fulltext':
953
line_iterator = self.factory.get_fulltext_content(data)
955
line_iterator = self.factory.get_linedelta_content(data)
956
for line in line_iterator:
959
pb.update('Walking content.', total, total)
799
pb.update('Walking content.', count, total)
800
for version_id, data, sha_value in \
801
self._data.read_records_iter(version_id_records):
802
pb.update('Walking content.', count, total)
803
method = self._index.get_method(version_id)
804
version_idx = self._index.lookup(version_id)
805
assert method in ('fulltext', 'line-delta')
806
if method == 'fulltext':
807
content = self.factory.parse_fulltext(data, version_idx)
808
for line in content.text():
811
delta = self.factory.parse_line_delta(data, version_idx)
812
for start, end, count, lines in delta:
813
for origin, line in lines:
816
pb.update('Walking content.', total, total)
819
pb.update('Walking content.', total, total)
961
def iter_parents(self, version_ids):
962
"""Iterate through the parents for many version ids.
964
:param version_ids: An iterable yielding version_ids.
965
:return: An iterator that yields (version_id, parents). Requested
966
version_ids not present in the versioned file are simply skipped.
967
The order is undefined, allowing for different optimisations in
968
the underlying implementation.
970
version_ids = [osutils.safe_revision_id(version_id) for
971
version_id in version_ids]
972
return self._index.iter_parents(version_ids)
974
823
def num_versions(self):
975
824
"""See VersionedFile.num_versions()."""
976
825
return self._index.num_versions()
1158
1026
self._history.append(version_id)
1160
1028
index = self._cache[version_id][5]
1161
self._cache[version_id] = (version_id,
1029
self._cache[version_id] = (version_id,
1168
def __init__(self, transport, filename, mode, create=False, file_mode=None,
1169
create_parent_dir=False, delay_create=False, dir_mode=None):
1170
_KnitComponentFile.__init__(self, transport, filename, mode,
1171
file_mode=file_mode,
1172
create_parent_dir=create_parent_dir,
1036
def __init__(self, transport, filename, mode, create=False, file_mode=None):
1037
_KnitComponentFile.__init__(self, transport, filename, mode, file_mode)
1174
1038
self._cache = {}
1175
1039
# position in _history is the 'official' index for a revision
1176
1040
# but the values may have come from a newer entry.
1177
1041
# so - wc -l of a knit index is != the number of unique names
1179
1043
self._history = []
1044
pb = bzrlib.ui.ui_factory.nested_progress_bar()
1181
fp = self._transport.get(self._filename)
1183
# _load_data may raise NoSuchFile if the target knit is
1185
_load_data(self, fp)
1189
if mode != 'w' or not create:
1192
self._need_to_create = True
1049
pb.update('read knit index', count, total)
1050
fp = self._transport.get(self._filename)
1052
self.check_header(fp)
1053
# readlines reads the whole file at once:
1054
# bad for transports like http, good for local disk
1055
# we save 60 ms doing this one change (
1056
# from calling readline each time to calling
1058
# probably what we want for nice behaviour on
1059
# http is a incremental readlines that yields, or
1060
# a check for local vs non local indexes,
1061
for l in fp.readlines():
1063
if len(rec) < 5 or rec[-1] != ':':
1065
# FIXME: in the future we should determine if its a
1066
# short write - and ignore it
1067
# or a different failure, and raise. RBC 20060407
1071
#pb.update('read knit index', count, total)
1072
# See self._parse_parents
1074
for value in rec[4:-1]:
1076
# uncompressed reference
1077
parents.append(value[1:])
1079
# this is 15/4000ms faster than isinstance,
1081
# this function is called thousands of times a
1082
# second so small variations add up.
1083
assert value.__class__ is str
1084
parents.append(self._history[int(value)])
1085
# end self._parse_parents
1086
# self._cache_version(rec[0],
1087
# rec[1].split(','),
1091
# --- self._cache_version
1092
# only want the _history index to reference the 1st
1093
# index entry for version_id
1095
if version_id not in self._cache:
1096
index = len(self._history)
1097
self._history.append(version_id)
1099
index = self._cache[version_id][5]
1100
self._cache[version_id] = (version_id,
1106
# --- self._cache_version
1109
except NoSuchFile, e:
1110
if mode != 'w' or not create:
1114
pb.update('read knit index', total, total)
1117
def _parse_parents(self, compressed_parents):
1118
"""convert a list of string parent values into version ids.
1120
ints are looked up in the index.
1121
.FOO values are ghosts and converted in to FOO.
1123
NOTE: the function is retained here for clarity, and for possible
1124
use in partial index reads. However bulk processing now has
1125
it inlined in __init__ for inner-loop optimisation.
1128
for value in compressed_parents:
1129
if value[-1] == '.':
1130
# uncompressed reference
1131
result.append(value[1:])
1194
self._transport.put_bytes_non_atomic(
1195
self._filename, self.HEADER, mode=self._file_mode)
1133
# this is 15/4000ms faster than isinstance,
1134
# this function is called thousands of times a
1135
# second so small variations add up.
1136
assert value.__class__ is str
1137
result.append(self._history[int(value)])
1197
1140
def get_graph(self):
1198
"""Return a list of the node:parents lists from this knit index."""
1199
return [(vid, idx[4]) for vid, idx in self._cache.iteritems()]
1142
for version_id, index in self._cache.iteritems():
1143
graph.append((version_id, index[4]))
1201
def get_ancestry(self, versions, topo_sorted=True):
1146
def get_ancestry(self, versions):
1202
1147
"""See VersionedFile.get_ancestry."""
1203
1148
# get a graph of all the mentioned versions:
1205
1150
pending = set(versions)
1208
1152
version = pending.pop()
1153
parents = self._cache[version][4]
1154
# got the parents ok
1211
parents = [p for p in cache[version][4] if p in cache]
1213
raise RevisionNotPresent(version, self._filename)
1214
# if not completed and not a ghost
1215
pending.update([p for p in parents if p not in graph])
1156
parents = [parent for parent in parents if parent in self._cache]
1157
for parent in parents:
1158
# if not completed and not a ghost
1159
if parent not in graph:
1216
1161
graph[version] = parents
1219
1162
return topo_sort(graph.items())
1221
1164
def get_ancestry_with_ghosts(self, versions):
1222
1165
"""See VersionedFile.get_ancestry_with_ghosts."""
1223
1166
# get a graph of all the mentioned versions:
1224
self.check_versions_present(versions)
1227
1168
pending = set(versions)
1229
1170
version = pending.pop()
1231
parents = cache[version][4]
1172
parents = self._cache[version][4]
1232
1173
except KeyError:
1233
1174
# ghost, fake it
1234
1175
graph[version] = []
1237
pending.update([p for p in parents if p not in graph])
1178
# got the parents ok
1179
for parent in parents:
1180
if parent not in graph:
1238
1182
graph[version] = parents
1239
1183
return topo_sort(graph.items())
1241
def iter_parents(self, version_ids):
1242
"""Iterate through the parents for many version ids.
1244
:param version_ids: An iterable yielding version_ids.
1245
:return: An iterator that yields (version_id, parents). Requested
1246
version_ids not present in the versioned file are simply skipped.
1247
The order is undefined, allowing for different optimisations in
1248
the underlying implementation.
1250
for version_id in version_ids:
1252
yield version_id, tuple(self.get_parents(version_id))
1256
1185
def num_versions(self):
1257
1186
return len(self._history)
1259
1188
__len__ = num_versions
1261
1190
def get_versions(self):
1262
"""Get all the versions in the file. not topologically sorted."""
1263
1191
return self._history
1193
def idx_to_name(self, idx):
1194
return self._history[idx]
1196
def lookup(self, version_id):
1197
assert version_id in self._cache
1198
return self._cache[version_id][5]
1265
1200
def _version_list_to_index(self, versions):
1266
1201
result_list = []
1268
1202
for version in versions:
1269
if version in cache:
1203
if version in self._cache:
1270
1204
# -- inlined lookup() --
1271
result_list.append(str(cache[version][5]))
1205
result_list.append(str(self._cache[version][5]))
1272
1206
# -- end lookup () --
1274
result_list.append('.' + version)
1208
result_list.append('.' + version.encode('utf-8'))
1275
1209
return ' '.join(result_list)
1277
def add_version(self, version_id, options, index_memo, parents):
1211
def add_version(self, version_id, options, pos, size, parents):
1278
1212
"""Add a version record to the index."""
1279
self.add_versions(((version_id, options, index_memo, parents),))
1213
self.add_versions(((version_id, options, pos, size, parents),))
1281
1215
def add_versions(self, versions):
1282
1216
"""Add multiple versions to the index.
1362
1269
def check_versions_present(self, version_ids):
1363
1270
"""Check that all specified versions are present."""
1365
for version_id in version_ids:
1366
if version_id not in cache:
1367
raise RevisionNotPresent(version_id, self._filename)
1370
class KnitGraphIndex(object):
1371
"""A knit index that builds on GraphIndex."""
1373
def __init__(self, graph_index, deltas=False, parents=True, add_callback=None):
1374
"""Construct a KnitGraphIndex on a graph_index.
1376
:param graph_index: An implementation of bzrlib.index.GraphIndex.
1377
:param deltas: Allow delta-compressed records.
1378
:param add_callback: If not None, allow additions to the index and call
1379
this callback with a list of added GraphIndex nodes:
1380
[(node, value, node_refs), ...]
1381
:param parents: If True, record knits parents, if not do not record
1384
self._graph_index = graph_index
1385
self._deltas = deltas
1386
self._add_callback = add_callback
1387
self._parents = parents
1388
if deltas and not parents:
1389
raise KnitCorrupt(self, "Cannot do delta compression without "
1392
def _get_entries(self, keys, check_present=False):
1393
"""Get the entries for keys.
1395
:param keys: An iterable of index keys, - 1-tuples.
1400
for node in self._graph_index.iter_entries(keys):
1402
found_keys.add(node[1])
1404
# adapt parentless index to the rest of the code.
1405
for node in self._graph_index.iter_entries(keys):
1406
yield node[0], node[1], node[2], ()
1407
found_keys.add(node[1])
1409
missing_keys = keys.difference(found_keys)
1411
raise RevisionNotPresent(missing_keys.pop(), self)
1413
def _present_keys(self, version_ids):
1415
node[1] for node in self._get_entries(version_ids)])
1417
def _parentless_ancestry(self, versions):
1418
"""Honour the get_ancestry API for parentless knit indices."""
1419
wanted_keys = self._version_ids_to_keys(versions)
1420
present_keys = self._present_keys(wanted_keys)
1421
missing = set(wanted_keys).difference(present_keys)
1423
raise RevisionNotPresent(missing.pop(), self)
1424
return list(self._keys_to_version_ids(present_keys))
1426
def get_ancestry(self, versions, topo_sorted=True):
1427
"""See VersionedFile.get_ancestry."""
1428
if not self._parents:
1429
return self._parentless_ancestry(versions)
1430
# XXX: This will do len(history) index calls - perhaps
1431
# it should be altered to be a index core feature?
1432
# get a graph of all the mentioned versions:
1435
versions = self._version_ids_to_keys(versions)
1436
pending = set(versions)
1438
# get all pending nodes
1439
this_iteration = pending
1440
new_nodes = self._get_entries(this_iteration)
1443
for (index, key, value, node_refs) in new_nodes:
1444
# dont ask for ghosties - otherwise
1445
# we we can end up looping with pending
1446
# being entirely ghosted.
1447
graph[key] = [parent for parent in node_refs[0]
1448
if parent not in ghosts]
1450
for parent in graph[key]:
1451
# dont examine known nodes again
1456
ghosts.update(this_iteration.difference(found))
1457
if versions.difference(graph):
1458
raise RevisionNotPresent(versions.difference(graph).pop(), self)
1460
result_keys = topo_sort(graph.items())
1462
result_keys = graph.iterkeys()
1463
return [key[0] for key in result_keys]
1465
def get_ancestry_with_ghosts(self, versions):
1466
"""See VersionedFile.get_ancestry."""
1467
if not self._parents:
1468
return self._parentless_ancestry(versions)
1469
# XXX: This will do len(history) index calls - perhaps
1470
# it should be altered to be a index core feature?
1471
# get a graph of all the mentioned versions:
1473
versions = self._version_ids_to_keys(versions)
1474
pending = set(versions)
1476
# get all pending nodes
1477
this_iteration = pending
1478
new_nodes = self._get_entries(this_iteration)
1480
for (index, key, value, node_refs) in new_nodes:
1481
graph[key] = node_refs[0]
1483
for parent in graph[key]:
1484
# dont examine known nodes again
1488
missing_versions = this_iteration.difference(graph)
1489
missing_needed = versions.intersection(missing_versions)
1491
raise RevisionNotPresent(missing_needed.pop(), self)
1492
for missing_version in missing_versions:
1493
# add a key, no parents
1494
graph[missing_version] = []
1495
pending.discard(missing_version) # don't look for it
1496
result_keys = topo_sort(graph.items())
1497
return [key[0] for key in result_keys]
1499
def get_graph(self):
1500
"""Return a list of the node:parents lists from this knit index."""
1501
if not self._parents:
1502
return [(key, ()) for key in self.get_versions()]
1504
for index, key, value, refs in self._graph_index.iter_all_entries():
1505
result.append((key[0], tuple([ref[0] for ref in refs[0]])))
1508
def iter_parents(self, version_ids):
1509
"""Iterate through the parents for many version ids.
1511
:param version_ids: An iterable yielding version_ids.
1512
:return: An iterator that yields (version_id, parents). Requested
1513
version_ids not present in the versioned file are simply skipped.
1514
The order is undefined, allowing for different optimisations in
1515
the underlying implementation.
1518
all_nodes = set(self._get_entries(self._version_ids_to_keys(version_ids)))
1520
present_parents = set()
1521
for node in all_nodes:
1522
all_parents.update(node[3][0])
1523
# any node we are querying must be present
1524
present_parents.add(node[1])
1525
unknown_parents = all_parents.difference(present_parents)
1526
present_parents.update(self._present_keys(unknown_parents))
1527
for node in all_nodes:
1529
for parent in node[3][0]:
1530
if parent in present_parents:
1531
parents.append(parent[0])
1532
yield node[1][0], tuple(parents)
1534
for node in self._get_entries(self._version_ids_to_keys(version_ids)):
1535
yield node[1][0], ()
1537
def num_versions(self):
1538
return len(list(self._graph_index.iter_all_entries()))
1540
__len__ = num_versions
1542
def get_versions(self):
1543
"""Get all the versions in the file. not topologically sorted."""
1544
return [node[1][0] for node in self._graph_index.iter_all_entries()]
1546
def has_version(self, version_id):
1547
"""True if the version is in the index."""
1548
return len(self._present_keys(self._version_ids_to_keys([version_id]))) == 1
1550
def _keys_to_version_ids(self, keys):
1551
return tuple(key[0] for key in keys)
1553
def get_position(self, version_id):
1554
"""Return details needed to access the version.
1556
:return: a tuple (index, data position, size) to hand to the access
1557
logic to get the record.
1559
node = self._get_node(version_id)
1560
bits = node[2][1:].split(' ')
1561
return node[0], int(bits[0]), int(bits[1])
1563
def get_method(self, version_id):
1564
"""Return compression method of specified version."""
1565
if not self._deltas:
1567
return self._parent_compression(self._get_node(version_id)[3][1])
1569
def _parent_compression(self, reference_list):
1570
# use the second reference list to decide if this is delta'd or not.
1571
if len(reference_list):
1576
def _get_node(self, version_id):
1577
return list(self._get_entries(self._version_ids_to_keys([version_id])))[0]
1579
def get_options(self, version_id):
1580
"""Return a string represention options.
1584
node = self._get_node(version_id)
1585
if not self._deltas:
1586
options = ['fulltext']
1588
options = [self._parent_compression(node[3][1])]
1589
if node[2][0] == 'N':
1590
options.append('no-eol')
1593
def get_parents(self, version_id):
1594
"""Return parents of specified version ignoring ghosts."""
1595
parents = list(self.iter_parents([version_id]))
1598
raise errors.RevisionNotPresent(version_id, self)
1599
return parents[0][1]
1601
def get_parents_with_ghosts(self, version_id):
1602
"""Return parents of specified version with ghosts."""
1603
nodes = list(self._get_entries(self._version_ids_to_keys([version_id]),
1604
check_present=True))
1605
if not self._parents:
1607
return self._keys_to_version_ids(nodes[0][3][0])
1609
def check_versions_present(self, version_ids):
1610
"""Check that all specified versions are present."""
1611
keys = self._version_ids_to_keys(version_ids)
1612
present = self._present_keys(keys)
1613
missing = keys.difference(present)
1615
raise RevisionNotPresent(missing.pop(), self)
1617
def add_version(self, version_id, options, access_memo, parents):
1618
"""Add a version record to the index."""
1619
return self.add_versions(((version_id, options, access_memo, parents),))
1621
def add_versions(self, versions):
1622
"""Add multiple versions to the index.
1624
This function does not insert data into the Immutable GraphIndex
1625
backing the KnitGraphIndex, instead it prepares data for insertion by
1626
the caller and checks that it is safe to insert then calls
1627
self._add_callback with the prepared GraphIndex nodes.
1629
:param versions: a list of tuples:
1630
(version_id, options, pos, size, parents).
1632
if not self._add_callback:
1633
raise errors.ReadOnlyError(self)
1634
# we hope there are no repositories with inconsistent parentage
1639
for (version_id, options, access_memo, parents) in versions:
1640
index, pos, size = access_memo
1641
key = (version_id, )
1642
parents = tuple((parent, ) for parent in parents)
1643
if 'no-eol' in options:
1647
value += "%d %d" % (pos, size)
1648
if not self._deltas:
1649
if 'line-delta' in options:
1650
raise KnitCorrupt(self, "attempt to add line-delta in non-delta knit")
1653
if 'line-delta' in options:
1654
node_refs = (parents, (parents[0],))
1656
node_refs = (parents, ())
1658
node_refs = (parents, )
1661
raise KnitCorrupt(self, "attempt to add node with parents "
1662
"in parentless index.")
1664
keys[key] = (value, node_refs)
1665
present_nodes = self._get_entries(keys)
1666
for (index, key, value, node_refs) in present_nodes:
1667
if (value, node_refs) != keys[key]:
1668
raise KnitCorrupt(self, "inconsistent details in add_versions"
1669
": %s %s" % ((value, node_refs), keys[key]))
1673
for key, (value, node_refs) in keys.iteritems():
1674
result.append((key, value, node_refs))
1676
for key, (value, node_refs) in keys.iteritems():
1677
result.append((key, value))
1678
self._add_callback(result)
1680
def _version_ids_to_keys(self, version_ids):
1681
return set((version_id, ) for version_id in version_ids)
1684
class _KnitAccess(object):
1685
"""Access to knit records in a .knit file."""
1687
def __init__(self, transport, filename, _file_mode, _dir_mode,
1688
_need_to_create, _create_parent_dir):
1689
"""Create a _KnitAccess for accessing and inserting data.
1691
:param transport: The transport the .knit is located on.
1692
:param filename: The filename of the .knit.
1694
self._transport = transport
1695
self._filename = filename
1696
self._file_mode = _file_mode
1697
self._dir_mode = _dir_mode
1698
self._need_to_create = _need_to_create
1699
self._create_parent_dir = _create_parent_dir
1701
def add_raw_records(self, sizes, raw_data):
1702
"""Add raw knit bytes to a storage area.
1704
The data is spooled to whereever the access method is storing data.
1706
:param sizes: An iterable containing the size of each raw data segment.
1707
:param raw_data: A bytestring containing the data.
1708
:return: A list of memos to retrieve the record later. Each memo is a
1709
tuple - (index, pos, length), where the index field is always None
1710
for the .knit access method.
1712
assert type(raw_data) == str, \
1713
'data must be plain bytes was %s' % type(raw_data)
1714
if not self._need_to_create:
1715
base = self._transport.append_bytes(self._filename, raw_data)
1717
self._transport.put_bytes_non_atomic(self._filename, raw_data,
1718
create_parent_dir=self._create_parent_dir,
1719
mode=self._file_mode,
1720
dir_mode=self._dir_mode)
1721
self._need_to_create = False
1725
result.append((None, base, size))
1730
"""IFF this data access has its own storage area, initialise it.
1734
self._transport.put_bytes_non_atomic(self._filename, '',
1735
mode=self._file_mode)
1737
def open_file(self):
1738
"""IFF this data access can be represented as a single file, open it.
1740
For knits that are not mapped to a single file on disk this will
1743
:return: None or a file handle.
1746
return self._transport.get(self._filename)
1751
def get_raw_records(self, memos_for_retrieval):
1752
"""Get the raw bytes for a records.
1754
:param memos_for_retrieval: An iterable containing the (index, pos,
1755
length) memo for retrieving the bytes. The .knit method ignores
1756
the index as there is always only a single file.
1757
:return: An iterator over the bytes of the records.
1759
read_vector = [(pos, size) for (index, pos, size) in memos_for_retrieval]
1760
for pos, data in self._transport.readv(self._filename, read_vector):
1764
class _PackAccess(object):
1765
"""Access to knit records via a collection of packs."""
1767
def __init__(self, index_to_packs, writer=None):
1768
"""Create a _PackAccess object.
1770
:param index_to_packs: A dict mapping index objects to the transport
1771
and file names for obtaining data.
1772
:param writer: A tuple (pack.ContainerWriter, write_index) which
1773
is contains the pack to write, and the index that reads from
1774
it will be associated with.
1777
self.container_writer = writer[0]
1778
self.write_index = writer[1]
1780
self.container_writer = None
1781
self.write_index = None
1782
self.indices = index_to_packs
1784
def add_raw_records(self, sizes, raw_data):
1785
"""Add raw knit bytes to a storage area.
1787
The data is spooled to the container writer in one bytes record per
1790
:param sizes: An iterable containing the size of each raw data segment.
1791
:param raw_data: A bytestring containing the data.
1792
:return: A list of memos to retrieve the record later. Each memo is a
1793
tuple - (index, pos, length), where the index field is the
1794
write_index object supplied to the PackAccess object.
1796
assert type(raw_data) == str, \
1797
'data must be plain bytes was %s' % type(raw_data)
1801
p_offset, p_length = self.container_writer.add_bytes_record(
1802
raw_data[offset:offset+size], [])
1804
result.append((self.write_index, p_offset, p_length))
1808
"""Pack based knits do not get individually created."""
1810
def get_raw_records(self, memos_for_retrieval):
1811
"""Get the raw bytes for a records.
1813
:param memos_for_retrieval: An iterable containing the (index, pos,
1814
length) memo for retrieving the bytes. The Pack access method
1815
looks up the pack to use for a given record in its index_to_pack
1817
:return: An iterator over the bytes of the records.
1819
# first pass, group into same-index requests
1821
current_index = None
1822
for (index, offset, length) in memos_for_retrieval:
1823
if current_index == index:
1824
current_list.append((offset, length))
1826
if current_index is not None:
1827
request_lists.append((current_index, current_list))
1828
current_index = index
1829
current_list = [(offset, length)]
1830
# handle the last entry
1831
if current_index is not None:
1832
request_lists.append((current_index, current_list))
1833
for index, offsets in request_lists:
1834
transport, path = self.indices[index]
1835
reader = pack.make_readv_reader(transport, path, offsets)
1836
for names, read_func in reader.iter_records():
1837
yield read_func(None)
1839
def open_file(self):
1840
"""Pack based knits have no single file."""
1843
def set_writer(self, writer, index, (transport, packname)):
1844
"""Set a writer to use for adding data."""
1845
self.indices[index] = (transport, packname)
1846
self.container_writer = writer
1847
self.write_index = index
1850
class _KnitData(object):
1851
"""Manage extraction of data from a KnitAccess, caching and decompressing.
1853
The KnitData class provides the logic for parsing and using knit records,
1854
making use of an access method for the low level read and write operations.
1857
def __init__(self, access):
1858
"""Create a KnitData object.
1860
:param access: The access method to use. Access methods such as
1861
_KnitAccess manage the insertion of raw records and the subsequent
1862
retrieval of the same.
1864
self._access = access
1271
version_ids = set(version_ids)
1272
for version_id in list(version_ids):
1273
if version_id in self._cache:
1274
version_ids.remove(version_id)
1276
raise RevisionNotPresent(list(version_ids)[0], self.filename)
1279
class _KnitData(_KnitComponentFile):
1280
"""Contents of the knit data file"""
1282
HEADER = "# bzr knit data 8\n"
1284
def __init__(self, transport, filename, mode, create=False, file_mode=None):
1285
_KnitComponentFile.__init__(self, transport, filename, mode)
1865
1286
self._checked = False
1866
# TODO: jam 20060713 conceptually, this could spill to disk
1867
# if the cached size gets larger than a certain amount
1868
# but it complicates the model a bit, so for now just use
1869
# a simple dictionary
1871
self._do_cache = False
1873
def enable_cache(self):
1874
"""Enable caching of reads."""
1875
self._do_cache = True
1288
self._transport.put(self._filename, StringIO(''), mode=file_mode)
1877
1290
def clear_cache(self):
1878
1291
"""Clear the record cache."""
1879
self._do_cache = False
1882
1294
def _open_file(self):
1883
return self._access.open_file()
1296
return self._transport.get(self._filename)
1885
1301
def _record_to_data(self, version_id, digest, lines):
1886
1302
"""Convert version_id, digest, lines into a raw data block.
1987
1370
This unpacks enough of the text record to validate the id is
1988
1371
as expected but thats all.
1373
It will actively recompress currently cached records on the
1374
basis that that is cheaper than I/O activity.
1990
1376
# setup an iterator of the external records:
1991
1377
# uses readv so nice and fast we hope.
1992
1378
if len(records):
1993
1379
# grab the disk data needed.
1995
# Don't check _cache if it is empty
1996
needed_offsets = [index_memo for version_id, index_memo
1998
if version_id not in self._cache]
2000
needed_offsets = [index_memo for version_id, index_memo
2003
raw_records = self._access.get_raw_records(needed_offsets)
2005
for version_id, index_memo in records:
2006
if version_id in self._cache:
2007
# This data has already been validated
2008
data = self._cache[version_id]
2010
data = raw_records.next()
2012
self._cache[version_id] = data
2014
# validate the header
2015
df, rec = self._parse_record_header(version_id, data)
1380
raw_records = self._transport.readv(self._filename,
1381
[(pos, size) for version_id, pos, size in records])
1383
for version_id, pos, size in records:
1384
pos, data = raw_records.next()
1385
# validate the header
1386
df, rec = self._parse_record_header(version_id, data)
2017
1388
yield version_id, data
2019
1390
def read_records_iter(self, records):
2020
1391
"""Read text records from data file and yield result.
2022
The result will be returned in whatever is the fastest to read.
2023
Not by the order requested. Also, multiple requests for the same
2024
record will only yield 1 response.
2025
:param records: A list of (version_id, pos, len) entries
2026
:return: Yields (version_id, contents, digest) in the order
2027
read, not the order requested
1393
Each passed record is a tuple of (version_id, pos, len) and
1394
will be read in the given order. Yields (version_id,
2033
# Skip records we have alread seen
2034
yielded_records = set()
2035
needed_records = set()
2036
for record in records:
2037
if record[0] in self._cache:
2038
if record[0] in yielded_records:
2040
yielded_records.add(record[0])
2041
data = self._cache[record[0]]
2042
content, digest = self._parse_record(record[0], data)
2043
yield (record[0], content, digest)
2045
needed_records.add(record)
2046
needed_records = sorted(needed_records, key=operator.itemgetter(1))
2048
needed_records = sorted(set(records), key=operator.itemgetter(1))
2050
if not needed_records:
2053
# The transport optimizes the fetching as well
2054
# (ie, reads continuous ranges.)
2055
raw_data = self._access.get_raw_records(
2056
[index_memo for version_id, index_memo in needed_records])
2058
for (version_id, index_memo), data in \
2059
izip(iter(needed_records), raw_data):
2060
content, digest = self._parse_record(version_id, data)
2062
self._cache[version_id] = data
1397
if len(records) == 0:
1400
# 60890 calls for 4168 extractions in 5045, 683 internal.
1401
# 4168 calls to readv in 1411
1402
# 4168 calls to parse_record in 2880
1404
# Get unique records, sorted by position
1405
needed_records = sorted(set(records), key=operator.itemgetter(1))
1407
# We take it that the transport optimizes the fetching as good
1408
# as possible (ie, reads continuous ranges.)
1409
response = self._transport.readv(self._filename,
1410
[(pos, size) for version_id, pos, size in needed_records])
1413
for (record_id, pos, size), (pos, data) in \
1414
izip(iter(needed_records), response):
1415
content, digest = self._parse_record(record_id, data)
1416
record_map[record_id] = (digest, content)
1418
for version_id, pos, size in records:
1419
digest, content = record_map[version_id]
2063
1420
yield version_id, content, digest
2065
1422
def read_records(self, records):
2066
1423
"""Read records into a dictionary."""
2067
1424
components = {}
2068
for record_id, content, digest in \
2069
self.read_records_iter(records):
1425
for record_id, content, digest in self.read_records_iter(records):
2070
1426
components[record_id] = (content, digest)
2071
1427
return components