~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-02-25 22:00:24 UTC
  • mfrom: (4039.3.8 sort_knit_fetch)
  • Revision ID: pqm@pqm.ubuntu.com-20090225220024-b81h6glz8zi2ekfh
(jam) Batch up requests for fulltexts into 5MB (compressed) requests,
        rather than 1 per file.

Show diffs side-by-side

added added

removed removed

Lines of Context:
123
123
 
124
124
DATA_SUFFIX = '.knit'
125
125
INDEX_SUFFIX = '.kndx'
 
126
_STREAM_MIN_BUFFER_SIZE = 5*1024*1024
126
127
 
127
128
 
128
129
class KnitAdapter(object):
806
807
    versioned_files.writer.end()
807
808
 
808
809
 
 
810
def _get_total_build_size(self, keys, positions):
 
811
    """Determine the total bytes to build these keys.
 
812
 
 
813
    (helper function because _KnitGraphIndex and _KndxIndex work the same, but
 
814
    don't inherit from a common base.)
 
815
 
 
816
    :param keys: Keys that we want to build
 
817
    :param positions: dict of {key, (info, index_memo, comp_parent)} (such
 
818
        as returned by _get_components_positions)
 
819
    :return: Number of bytes to build those keys
 
820
    """
 
821
    all_build_index_memos = {}
 
822
    build_keys = keys
 
823
    while build_keys:
 
824
        next_keys = set()
 
825
        for key in build_keys:
 
826
            # This is mostly for the 'stacked' case
 
827
            # Where we will be getting the data from a fallback
 
828
            if key not in positions:
 
829
                continue
 
830
            _, index_memo, compression_parent = positions[key]
 
831
            all_build_index_memos[key] = index_memo
 
832
            if compression_parent not in all_build_index_memos:
 
833
                next_keys.add(compression_parent)
 
834
        build_keys = next_keys
 
835
    return sum([index_memo[2] for index_memo
 
836
                in all_build_index_memos.itervalues()])
 
837
 
 
838
 
809
839
class KnitVersionedFiles(VersionedFiles):
810
840
    """Storage for many versioned files using knit compression.
811
841
 
1181
1211
                # n = next
1182
1212
                records = [(key, i_m) for key, (r, i_m, n)
1183
1213
                                       in position_map.iteritems()]
 
1214
                # Sort by the index memo, so that we request records from the
 
1215
                # same pack file together, and in forward-sorted order
 
1216
                records.sort(key=operator.itemgetter(1))
1184
1217
                raw_record_map = {}
1185
1218
                for key, data in self._read_records_iter_unchecked(records):
1186
1219
                    (record_details, index_memo, next) = position_map[key]
1189
1222
            except errors.RetryWithNewPacks, e:
1190
1223
                self._access.reload_or_raise(e)
1191
1224
 
1192
 
    def _split_by_prefix(self, keys):
 
1225
    @classmethod
 
1226
    def _split_by_prefix(cls, keys):
1193
1227
        """For the given keys, split them up based on their prefix.
1194
1228
 
1195
1229
        To keep memory pressure somewhat under control, split the
1198
1232
        This should be revisited if _get_content_maps() can ever cross
1199
1233
        file-id boundaries.
1200
1234
 
 
1235
        The keys for a given file_id are kept in the same relative order.
 
1236
        Ordering between file_ids is not, though prefix_order will return the
 
1237
        order that the key was first seen.
 
1238
 
1201
1239
        :param keys: An iterable of key tuples
1202
 
        :return: A dict of {prefix: [key_list]}
 
1240
        :return: (split_map, prefix_order)
 
1241
            split_map       A dictionary mapping prefix => keys
 
1242
            prefix_order    The order that we saw the various prefixes
1203
1243
        """
1204
1244
        split_by_prefix = {}
 
1245
        prefix_order = []
1205
1246
        for key in keys:
1206
1247
            if len(key) == 1:
1207
 
                split_by_prefix.setdefault('', []).append(key)
1208
 
            else:
1209
 
                split_by_prefix.setdefault(key[0], []).append(key)
1210
 
        return split_by_prefix
 
1248
                prefix = ''
 
1249
            else:
 
1250
                prefix = key[0]
 
1251
 
 
1252
            if prefix in split_by_prefix:
 
1253
                split_by_prefix[prefix].append(key)
 
1254
            else:
 
1255
                split_by_prefix[prefix] = [key]
 
1256
                prefix_order.append(prefix)
 
1257
        return split_by_prefix, prefix_order
 
1258
 
 
1259
    def _group_keys_for_io(self, keys, non_local_keys, positions,
 
1260
                           _min_buffer_size=_STREAM_MIN_BUFFER_SIZE):
 
1261
        """For the given keys, group them into 'best-sized' requests.
 
1262
 
 
1263
        The idea is to avoid making 1 request per file, but to never try to
 
1264
        unpack an entire 1.5GB source tree in a single pass. Also when
 
1265
        possible, we should try to group requests to the same pack file
 
1266
        together.
 
1267
 
 
1268
        :return: list of (keys, non_local) tuples that indicate what keys
 
1269
            should be fetched next.
 
1270
        """
 
1271
        # TODO: Ideally we would group on 2 factors. We want to extract texts
 
1272
        #       from the same pack file together, and we want to extract all
 
1273
        #       the texts for a given build-chain together. Ultimately it
 
1274
        #       probably needs a better global view.
 
1275
        total_keys = len(keys)
 
1276
        prefix_split_keys, prefix_order = self._split_by_prefix(keys)
 
1277
        prefix_split_non_local_keys, _ = self._split_by_prefix(non_local_keys)
 
1278
        cur_keys = []
 
1279
        cur_non_local = set()
 
1280
        cur_size = 0
 
1281
        result = []
 
1282
        sizes = []
 
1283
        for prefix in prefix_order:
 
1284
            keys = prefix_split_keys[prefix]
 
1285
            non_local = prefix_split_non_local_keys.get(prefix, [])
 
1286
 
 
1287
            this_size = self._index._get_total_build_size(keys, positions)
 
1288
            cur_size += this_size
 
1289
            cur_keys.extend(keys)
 
1290
            cur_non_local.update(non_local)
 
1291
            if cur_size > _min_buffer_size:
 
1292
                result.append((cur_keys, cur_non_local))
 
1293
                sizes.append(cur_size)
 
1294
                cur_keys = []
 
1295
                cur_non_local = set()
 
1296
                cur_size = 0
 
1297
        if cur_keys:
 
1298
            result.append((cur_keys, cur_non_local))
 
1299
            sizes.append(cur_size)
 
1300
        trace.mutter('Collapsed %d keys into %d requests w/ %d file_ids'
 
1301
                     ' w/ sizes: %s', total_keys, len(result),
 
1302
                     len(prefix_split_keys), sizes)
 
1303
        return result
1211
1304
 
1212
1305
    def get_record_stream(self, keys, ordering, include_delta_closure):
1213
1306
        """Get a stream of records for keys.
1334
1427
            # XXX: get_content_maps performs its own index queries; allow state
1335
1428
            # to be passed in.
1336
1429
            non_local_keys = needed_from_fallback - absent_keys
1337
 
            prefix_split_keys = self._split_by_prefix(present_keys)
1338
 
            prefix_split_non_local_keys = self._split_by_prefix(non_local_keys)
1339
 
            for prefix, keys in prefix_split_keys.iteritems():
1340
 
                non_local = prefix_split_non_local_keys.get(prefix, [])
1341
 
                non_local = set(non_local)
1342
 
                generator = _VFContentMapGenerator(self, keys, non_local,
1343
 
                    global_map)
 
1430
            for keys, non_local_keys in self._group_keys_for_io(present_keys,
 
1431
                                                                non_local_keys,
 
1432
                                                                positions):
 
1433
                generator = _VFContentMapGenerator(self, keys, non_local_keys,
 
1434
                                                   global_map)
1344
1435
                for record in generator.get_record_stream():
1345
1436
                    yield record
1346
1437
        else:
2569
2660
            return index_memo[0][:-1], index_memo[1]
2570
2661
        return keys.sort(key=get_sort_key)
2571
2662
 
 
2663
    _get_total_build_size = _get_total_build_size
 
2664
 
2572
2665
    def _split_key(self, key):
2573
2666
        """Split key into a prefix and suffix."""
2574
2667
        return key[:-1], key[-1]
2890
2983
            return positions[key][1]
2891
2984
        return keys.sort(key=get_index_memo)
2892
2985
 
 
2986
    _get_total_build_size = _get_total_build_size
 
2987
 
2893
2988
 
2894
2989
class _KnitKeyAccess(object):
2895
2990
    """Access to records in .knit files."""