203
209
see parse_fulltext which this inverts.
205
return ['%s %s' % (o.encode('utf-8'), t) for o, t in content._lines]
211
encode_utf8 = cache_utf8.encode
212
return ['%s %s' % (encode_utf8(o), t) for o, t in content._lines]
207
214
def lower_line_delta(self, delta):
208
215
"""convert a delta into a serializable form.
210
217
See parse_line_delta which this inverts.
219
encode_utf8 = cache_utf8.encode
213
221
for start, end, c, lines in delta:
214
222
out.append('%d,%d,%d\n' % (start, end, c))
215
for origin, text in lines:
216
out.append('%s %s' % (origin.encode('utf-8'), text))
223
out.extend(encode_utf8(origin) + ' ' + text
224
for origin, text in lines)
272
280
stored and retrieved.
275
def __init__(self, relpath, transport, file_mode=None, access_mode=None,
283
def __init__(self, relpath, transport, file_mode=None, access_mode=None,
276
284
factory=None, basis_knit=DEPRECATED_PARAMETER, delta=True,
285
create=False, create_parent_dir=False, delay_create=False,
278
287
"""Construct a knit at location specified by relpath.
280
289
:param create: If not True, only open an existing knit.
290
:param create_parent_dir: If True, create the parent directory if
291
creating the file fails. (This is used for stores with
292
hash-prefixes that may not exist yet)
293
:param delay_create: The calling code is aware that the knit won't
294
actually be created until the first data is stored.
282
296
if deprecated_passed(basis_knit):
283
297
warnings.warn("KnitVersionedFile.__(): The basis_knit parameter is"
294
308
self.delta = delta
296
310
self._index = _KnitIndex(transport, relpath + INDEX_SUFFIX,
297
access_mode, create=create, file_mode=file_mode)
311
access_mode, create=create, file_mode=file_mode,
312
create_parent_dir=create_parent_dir, delay_create=delay_create,
298
314
self._data = _KnitData(transport, relpath + DATA_SUFFIX,
299
access_mode, create=create and not len(self), file_mode=file_mode)
315
access_mode, create=create and not len(self), file_mode=file_mode,
316
create_parent_dir=create_parent_dir, delay_create=delay_create,
301
319
def __repr__(self):
302
320
return '%s(%s)' % (self.__class__.__name__,
388
414
"""See VersionedFile.copy_to()."""
389
415
# copy the current index to a temp index to avoid racing with local
391
transport.put(name + INDEX_SUFFIX + '.tmp', self.transport.get(self._index._filename),)
417
transport.put_file_non_atomic(name + INDEX_SUFFIX + '.tmp',
418
self.transport.get(self._index._filename))
392
419
# copy the data file
393
420
f = self._data._open_file()
395
transport.put(name + DATA_SUFFIX, f)
422
transport.put_file(name + DATA_SUFFIX, f)
398
425
# move the copied index into place
399
426
transport.move(name + INDEX_SUFFIX + '.tmp', name + INDEX_SUFFIX)
401
428
def create_empty(self, name, transport, mode=None):
402
return KnitVersionedFile(name, transport, factory=self.factory, delta=self.delta, create=True)
429
return KnitVersionedFile(name, transport, factory=self.factory,
430
delta=self.delta, create=True)
404
432
def _fix_parents(self, version, new_parents):
405
433
"""Fix the parents list for version.
792
823
data_pos, length = self._index.get_position(version_id)
793
824
version_id_records.append((version_id, data_pos, length))
795
pb = bzrlib.ui.ui_factory.nested_progress_bar()
797
827
total = len(version_id_records)
828
pb.update('Walking content.', count, total)
829
for version_id, data, sha_value in \
830
self._data.read_records_iter(version_id_records):
799
831
pb.update('Walking content.', count, total)
800
for version_id, data, sha_value in \
801
self._data.read_records_iter(version_id_records):
802
pb.update('Walking content.', count, total)
803
method = self._index.get_method(version_id)
804
version_idx = self._index.lookup(version_id)
805
assert method in ('fulltext', 'line-delta')
806
if method == 'fulltext':
807
content = self.factory.parse_fulltext(data, version_idx)
808
for line in content.text():
832
method = self._index.get_method(version_id)
833
version_idx = self._index.lookup(version_id)
834
assert method in ('fulltext', 'line-delta')
835
if method == 'fulltext':
836
content = self.factory.parse_fulltext(data, version_idx)
837
for line in content.text():
840
delta = self.factory.parse_line_delta(data, version_idx)
841
for start, end, count, lines in delta:
842
for origin, line in lines:
811
delta = self.factory.parse_line_delta(data, version_idx)
812
for start, end, count, lines in delta:
813
for origin, line in lines:
816
pb.update('Walking content.', total, total)
819
pb.update('Walking content.', total, total)
845
pb.update('Walking content.', total, total)
823
847
def num_versions(self):
824
848
"""See VersionedFile.num_versions()."""
931
955
class _KnitComponentFile(object):
932
956
"""One of the files used to implement a knit database"""
934
def __init__(self, transport, filename, mode, file_mode=None):
958
def __init__(self, transport, filename, mode, file_mode=None,
959
create_parent_dir=False, dir_mode=None):
935
960
self._transport = transport
936
961
self._filename = filename
937
962
self._mode = mode
938
self._file_mode=file_mode
940
def write_header(self):
941
if self._transport.append(self._filename, StringIO(self.HEADER),
942
mode=self._file_mode):
943
raise KnitCorrupt(self._filename, 'misaligned after writing header')
963
self._file_mode = file_mode
964
self._dir_mode = dir_mode
965
self._create_parent_dir = create_parent_dir
966
self._need_to_create = False
945
968
def check_header(self, fp):
946
969
line = fp.readline()
1036
def __init__(self, transport, filename, mode, create=False, file_mode=None):
1037
_KnitComponentFile.__init__(self, transport, filename, mode, file_mode)
1059
def __init__(self, transport, filename, mode, create=False, file_mode=None,
1060
create_parent_dir=False, delay_create=False, dir_mode=None):
1061
_KnitComponentFile.__init__(self, transport, filename, mode,
1062
file_mode=file_mode,
1063
create_parent_dir=create_parent_dir,
1038
1065
self._cache = {}
1039
1066
# position in _history is the 'official' index for a revision
1040
1067
# but the values may have come from a newer entry.
1279
1325
class _KnitData(_KnitComponentFile):
1280
1326
"""Contents of the knit data file"""
1282
HEADER = "# bzr knit data 8\n"
1284
def __init__(self, transport, filename, mode, create=False, file_mode=None):
1285
_KnitComponentFile.__init__(self, transport, filename, mode)
1328
def __init__(self, transport, filename, mode, create=False, file_mode=None,
1329
create_parent_dir=False, delay_create=False,
1331
_KnitComponentFile.__init__(self, transport, filename, mode,
1332
file_mode=file_mode,
1333
create_parent_dir=create_parent_dir,
1286
1335
self._checked = False
1336
# TODO: jam 20060713 conceptually, this could spill to disk
1337
# if the cached size gets larger than a certain amount
1338
# but it complicates the model a bit, so for now just use
1339
# a simple dictionary
1341
self._do_cache = False
1288
self._transport.put(self._filename, StringIO(''), mode=file_mode)
1344
self._need_to_create = create
1346
self._transport.put_bytes_non_atomic(self._filename, '',
1347
mode=self._file_mode)
1349
def enable_cache(self):
1350
"""Enable caching of reads."""
1351
self._do_cache = True
1290
1353
def clear_cache(self):
1291
1354
"""Clear the record cache."""
1355
self._do_cache = False
1294
1358
def _open_file(self):
1323
1389
:return: the offset in the data file raw_data was written.
1325
1391
assert isinstance(raw_data, str), 'data must be plain bytes'
1326
return self._transport.append(self._filename, StringIO(raw_data))
1392
if not self._need_to_create:
1393
return self._transport.append_bytes(self._filename, raw_data)
1395
self._transport.put_bytes_non_atomic(self._filename, raw_data,
1396
create_parent_dir=self._create_parent_dir,
1397
mode=self._file_mode,
1398
dir_mode=self._dir_mode)
1399
self._need_to_create = False
1328
1402
def add_record(self, version_id, digest, lines):
1329
1403
"""Write new text record to disk. Returns the position in the
1330
1404
file where it was written."""
1331
1405
size, sio = self._record_to_data(version_id, digest, lines)
1332
1406
# write to disk
1333
start_pos = self._transport.append(self._filename, sio)
1407
if not self._need_to_create:
1408
start_pos = self._transport.append_file(self._filename, sio)
1410
self._transport.put_file_non_atomic(self._filename, sio,
1411
create_parent_dir=self._create_parent_dir,
1412
mode=self._file_mode,
1413
dir_mode=self._dir_mode)
1414
self._need_to_create = False
1417
self._cache[version_id] = sio.getvalue()
1334
1418
return start_pos, size
1336
1420
def _parse_record_header(self, version_id, raw_data):
1370
1454
This unpacks enough of the text record to validate the id is
1371
1455
as expected but thats all.
1373
It will actively recompress currently cached records on the
1374
basis that that is cheaper than I/O activity.
1376
1457
# setup an iterator of the external records:
1377
1458
# uses readv so nice and fast we hope.
1378
1459
if len(records):
1379
1460
# grab the disk data needed.
1380
raw_records = self._transport.readv(self._filename,
1381
[(pos, size) for version_id, pos, size in records])
1462
# Don't check _cache if it is empty
1463
needed_offsets = [(pos, size) for version_id, pos, size
1465
if version_id not in self._cache]
1467
needed_offsets = [(pos, size) for version_id, pos, size
1470
raw_records = self._transport.readv(self._filename, needed_offsets)
1383
1473
for version_id, pos, size in records:
1384
pos, data = raw_records.next()
1385
# validate the header
1386
df, rec = self._parse_record_header(version_id, data)
1474
if version_id in self._cache:
1475
# This data has already been validated
1476
data = self._cache[version_id]
1478
pos, data = raw_records.next()
1480
self._cache[version_id] = data
1482
# validate the header
1483
df, rec = self._parse_record_header(version_id, data)
1388
1485
yield version_id, data
1390
1487
def read_records_iter(self, records):
1391
1488
"""Read text records from data file and yield result.
1393
Each passed record is a tuple of (version_id, pos, len) and
1394
will be read in the given order. Yields (version_id,
1490
The result will be returned in whatever is the fastest to read.
1491
Not by the order requested. Also, multiple requests for the same
1492
record will only yield 1 response.
1493
:param records: A list of (version_id, pos, len) entries
1494
:return: Yields (version_id, contents, digest) in the order
1495
read, not the order requested
1397
if len(records) == 0:
1400
# 60890 calls for 4168 extractions in 5045, 683 internal.
1401
# 4168 calls to readv in 1411
1402
# 4168 calls to parse_record in 2880
1404
# Get unique records, sorted by position
1405
needed_records = sorted(set(records), key=operator.itemgetter(1))
1407
# We take it that the transport optimizes the fetching as good
1408
# as possible (ie, reads continuous ranges.)
1409
response = self._transport.readv(self._filename,
1501
# Skip records we have alread seen
1502
yielded_records = set()
1503
needed_records = set()
1504
for record in records:
1505
if record[0] in self._cache:
1506
if record[0] in yielded_records:
1508
yielded_records.add(record[0])
1509
data = self._cache[record[0]]
1510
content, digest = self._parse_record(record[0], data)
1511
yield (record[0], content, digest)
1513
needed_records.add(record)
1514
needed_records = sorted(needed_records, key=operator.itemgetter(1))
1516
needed_records = sorted(set(records), key=operator.itemgetter(1))
1518
if not needed_records:
1521
# The transport optimizes the fetching as well
1522
# (ie, reads continuous ranges.)
1523
readv_response = self._transport.readv(self._filename,
1410
1524
[(pos, size) for version_id, pos, size in needed_records])
1413
for (record_id, pos, size), (pos, data) in \
1414
izip(iter(needed_records), response):
1415
content, digest = self._parse_record(record_id, data)
1416
record_map[record_id] = (digest, content)
1418
for version_id, pos, size in records:
1419
digest, content = record_map[version_id]
1526
for (version_id, pos, size), (pos, data) in \
1527
izip(iter(needed_records), readv_response):
1528
content, digest = self._parse_record(version_id, data)
1530
self._cache[version_id] = data
1420
1531
yield version_id, content, digest
1422
1533
def read_records(self, records):
1423
1534
"""Read records into a dictionary."""
1424
1535
components = {}
1425
for record_id, content, digest in self.read_records_iter(records):
1536
for record_id, content, digest in \
1537
self.read_records_iter(records):
1426
1538
components[record_id] = (content, digest)
1427
1539
return components