~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/store/__init__.py

[merge] from robert and fix up tests

Show diffs side-by-side

added added

removed removed

Lines of Context:
24
24
unique ID.
25
25
"""
26
26
 
 
27
import os
27
28
from cStringIO import StringIO
28
 
from stat import ST_MODE, S_ISDIR
29
29
from zlib import adler32
30
30
 
 
31
import bzrlib
31
32
import bzrlib.errors as errors
32
33
from bzrlib.errors import BzrError, UnlistableStore, TransportNotPossible
33
34
from bzrlib.trace import mutter
34
 
import bzrlib.transport
 
35
import bzrlib.transport as transport
35
36
from bzrlib.transport.local import LocalTransport
36
37
 
37
38
######################################################################
52
53
    def __len__(self):
53
54
        raise NotImplementedError('Children should define their length')
54
55
 
 
56
    def get(self, file_id, suffix=None):
 
57
        """Returns a file reading from a particular entry.
 
58
        
 
59
        If suffix is present, retrieve the named suffix for file_id.
 
60
        """
 
61
        raise NotImplementedError
 
62
 
55
63
    def __getitem__(self, fileid):
56
 
        """Returns a file reading from a particular entry."""
 
64
        """DEPRECATED. Please use .get(file_id) instead."""
57
65
        raise NotImplementedError
58
66
 
59
 
    def __contains__(self, fileid):
60
 
        """"""
61
 
        raise NotImplementedError
 
67
    #def __contains__(self, fileid):
 
68
    #    """Deprecated, please use has_id"""
 
69
    #    raise NotImplementedError
62
70
 
63
71
    def __iter__(self):
64
72
        raise NotImplementedError
67
75
        """Add a file object f to the store accessible from the given fileid"""
68
76
        raise NotImplementedError('Children of Store must define their method of adding entries.')
69
77
 
70
 
    def add_multi(self, entries):
71
 
        """Add a series of file-like or string objects to the store with the given
72
 
        identities.
 
78
    def has_id(self, file_id, suffix=None):
 
79
        """Return True or false for the presence of file_id in the store.
73
80
        
74
 
        :param entries: A list of tuples of file,id pairs [(file1, id1), (file2, id2), ...]
75
 
                        This could also be a generator yielding (file,id) pairs.
76
 
        """
77
 
        for f, fileid in entries:
78
 
            self.add(f, fileid)
79
 
 
80
 
    def has(self, fileids):
81
 
        """Return True/False for each entry in fileids.
82
 
 
83
 
        :param fileids: A List or generator yielding file ids.
84
 
        :return: A generator or list returning True/False for each entry.
85
 
        """
86
 
        for fileid in fileids:
87
 
            if fileid in self:
88
 
                yield True
89
 
            else:
90
 
                yield False
 
81
        suffix, if present, is a per file suffix, i.e. for digital signature 
 
82
        data."""
 
83
        raise NotImplementedError
91
84
 
92
85
    def listable(self):
93
86
        """Return True if this store is able to be listed."""
94
87
        return hasattr(self, "__iter__")
95
88
 
96
 
    def get(self, fileids, permit_failure=False, pb=None):
97
 
        """Return a set of files, one for each requested entry.
98
 
        
99
 
        :param permit_failure: If true, return None for entries which do not 
100
 
                               exist.
101
 
        :return: A list or generator of file-like objects, one for each id.
102
 
        """
103
 
        for fileid in fileids:
104
 
            try:
105
 
                yield self[fileid]
106
 
            except KeyError:
107
 
                if permit_failure:
108
 
                    yield None
109
 
                else:
110
 
                    raise
111
 
 
112
89
    def copy_multi(self, other, ids, pb=None, permit_failure=False):
113
90
        """Copy texts for ids from other into self.
114
91
 
125
102
        """
126
103
        if pb is None:
127
104
            pb = bzrlib.ui.ui_factory.progress_bar()
128
 
 
129
 
        # XXX: Is there any reason why we couldn't make this accept a generator
130
 
        # and build a list as it finds things to copy?
131
 
        ids = list(ids) # Make sure we don't have a generator, since we iterate 2 times
132
105
        pb.update('preparing to copy')
133
 
        to_copy = []
134
 
        for file_id, has in zip(ids, self.has(ids)):
135
 
            if not has:
136
 
                to_copy.append(file_id)
137
 
        return self._do_copy(other, to_copy, pb, permit_failure=permit_failure)
138
 
 
139
 
    def _do_copy(self, other, to_copy, pb, permit_failure=False):
140
 
        """This is the standard copying mechanism, just get them one at
141
 
        a time from remote, and store them locally.
142
 
 
143
 
        :param other: Another Store object
144
 
        :param to_copy: A list of entry ids to copy
145
 
        :param pb: A ProgressBar object to display completion status.
146
 
        :param permit_failure: Allow missing entries to be ignored
147
 
        :return: (n_copied, [failed])
148
 
            The number of entries copied, and a list of failed entries.
149
 
        """
150
 
        # This should be updated to use add_multi() rather than
151
 
        # the current methods of buffering requests.
152
 
        # One question, is it faster to queue up 1-10 and then copy 1-10
153
 
        # then queue up 11-20, copy 11-20
154
 
        # or to queue up 1-10, copy 1, queue 11, copy 2, etc?
155
 
        # sort of pipeline versus batch.
156
 
 
157
 
        # We can't use self._transport.copy_to because we don't know
158
 
        # whether the local tree is in the same format as other
159
106
        failed = set()
160
 
        def buffer_requests():
161
 
            count = 0
162
 
            buffered_requests = []
163
 
            for fileid in to_copy:
164
 
                try:
165
 
                    f = other[fileid]
166
 
                except KeyError:
167
 
                    if permit_failure:
168
 
                        failed.add(fileid)
169
 
                        continue
170
 
                    else:
171
 
                        raise
172
 
 
173
 
                buffered_requests.append((f, fileid))
174
 
                if len(buffered_requests) > self._max_buffered_requests:
175
 
                    yield buffered_requests.pop(0)
176
 
                    count += 1
177
 
                    pb.update('copy', count, len(to_copy))
178
 
 
179
 
            for req in buffered_requests:
180
 
                yield req
181
 
                count += 1
182
 
                pb.update('copy', count, len(to_copy))
183
 
 
184
 
            assert count == len(to_copy)
185
 
 
186
 
        self.add_multi(buffer_requests())
187
 
 
 
107
        count = 0
 
108
        ids = list(ids) # get the list for showing a length.
 
109
        for fileid in ids:
 
110
            count += 1
 
111
            if self.has_id(fileid):
 
112
                continue
 
113
            try:
 
114
                self._copy_one(fileid, None, other, pb)
 
115
                for suffix in self._suffixes:
 
116
                    try:
 
117
                        self._copy_one(fileid, suffix, other, pb)
 
118
                    except KeyError:
 
119
                        pass
 
120
                pb.update('copy', count, len(ids))
 
121
            except KeyError:
 
122
                if permit_failure:
 
123
                    failed.add(fileid)
 
124
                else:
 
125
                    raise
 
126
        assert count == len(ids)
188
127
        pb.clear()
189
 
        return len(to_copy), failed
 
128
        return count, failed
 
129
 
 
130
    def _copy_one(self, fileid, suffix, other, pb):
 
131
        """Most generic copy-one object routine.
 
132
        
 
133
        Subclasses can override this to provide an optimised
 
134
        copy between their own instances. Such overriden routines
 
135
        should call this if they have no optimised facility for a 
 
136
        specific 'other'.
 
137
        """
 
138
        f = other.get(fileid, suffix)
 
139
        self.add(f, fileid, suffix)
190
140
 
191
141
 
192
142
class TransportStore(Store):
193
143
    """A TransportStore is a Store superclass for Stores that use Transports."""
194
144
 
195
 
    _max_buffered_requests = 10
196
 
 
197
 
    def __getitem__(self, fileid):
198
 
        """Returns a file reading from a particular entry."""
199
 
        fn = self._relpath(fileid)
 
145
    def add(self, f, fileid, suffix=None):
 
146
        """Add contents of a file into the store.
 
147
 
 
148
        f -- A file-like object, or string
 
149
        """
 
150
        mutter("add store entry %r" % (fileid))
 
151
        
 
152
        if suffix is not None:
 
153
            fn = self._relpath(fileid, [suffix])
 
154
        else:
 
155
            fn = self._relpath(fileid)
 
156
        if self._transport.has(fn):
 
157
            raise BzrError("store %r already contains id %r" % (self._transport.base, fileid))
 
158
 
 
159
        if self._prefixed:
 
160
            try:
 
161
                self._transport.mkdir(hash_prefix(fileid)[:-1])
 
162
            except errors.FileExists:
 
163
                pass
 
164
 
 
165
        self._add(fn, f)
 
166
 
 
167
    def _check_fileid(self, fileid):
 
168
        if not isinstance(fileid, basestring):
 
169
            raise TypeError('Fileids should be a string type: %s %r' % (type(fileid), fileid))
 
170
        if '\\' in fileid or '/' in fileid:
 
171
            raise ValueError("invalid store id %r" % fileid)
 
172
 
 
173
    def has_id(self, fileid, suffix=None):
 
174
        """See Store.has_id."""
 
175
        if suffix is not None:
 
176
            fn = self._relpath(fileid, [suffix])
 
177
        else:
 
178
            fn = self._relpath(fileid)
 
179
        return self._transport.has(fn)
 
180
 
 
181
    def _get(self, filename):
 
182
        """Return an vanilla file stream for clients to read from.
 
183
 
 
184
        This is the body of a template method on 'get', and should be 
 
185
        implemented by subclasses.
 
186
        """
 
187
        raise NotImplementedError
 
188
 
 
189
    def get(self, fileid, suffix=None):
 
190
        """See Store.get()."""
 
191
        if suffix is None or suffix == 'gz':
 
192
            fn = self._relpath(fileid)
 
193
        else:
 
194
            fn = self._relpath(fileid, [suffix])
200
195
        try:
201
 
            return self._transport.get(fn)
 
196
            return self._get(fn)
202
197
        except errors.NoSuchFile:
203
198
            raise KeyError(fileid)
204
199
 
205
 
    def __init__(self, transport):
206
 
        assert isinstance(transport, bzrlib.transport.Transport)
 
200
    def __init__(self, a_transport, prefixed=False):
 
201
        assert isinstance(a_transport, transport.Transport)
207
202
        super(TransportStore, self).__init__()
208
 
        self._transport = transport
 
203
        self._transport = a_transport
 
204
        self._prefixed = prefixed
 
205
        # conflating the .gz extension and user suffixes was a mistake.
 
206
        # RBC 20051017 - TODO SOON, separate them again.
 
207
        self._suffixes = set()
 
208
 
 
209
    def __iter__(self):
 
210
        for relpath in self._transport.iter_files_recursive():
 
211
            # worst case is one of each suffix.
 
212
            name = os.path.basename(relpath)
 
213
            if name.endswith('.gz'):
 
214
                name = name[:-3]
 
215
            skip = False
 
216
            for count in range(len(self._suffixes)):
 
217
                for suffix in self._suffixes:
 
218
                    if name.endswith('.' + suffix):
 
219
                        skip = True
 
220
            if not skip:
 
221
                yield name
 
222
 
 
223
    def __len__(self):
 
224
        return len(list(self.__iter__()))
 
225
 
 
226
    def _relpath(self, fileid, suffixes=[]):
 
227
        self._check_fileid(fileid)
 
228
        for suffix in suffixes:
 
229
            if not suffix in self._suffixes:
 
230
                raise ValueError("Unregistered suffix %r" % suffix)
 
231
            self._check_fileid(suffix)
 
232
        if self._prefixed:
 
233
            path = [hash_prefix(fileid) + fileid]
 
234
        else:
 
235
            path = [fileid]
 
236
        path.extend(suffixes)
 
237
        return '.'.join(path)
209
238
 
210
239
    def __repr__(self):
211
240
        if self._transport is None:
215
244
 
216
245
    __str__ = __repr__
217
246
 
218
 
    def _iter_relpaths(self):
219
 
        """Iter the relative paths of files in the transports sub-tree."""
220
 
        transport = self._transport
221
 
        queue = list(transport.list_dir('.'))
222
 
        while queue:
223
 
            relpath = queue.pop(0)
224
 
            st = transport.stat(relpath)
225
 
            if S_ISDIR(st[ST_MODE]):
226
 
                for i, basename in enumerate(transport.list_dir(relpath)):
227
 
                    queue.insert(i, relpath+'/'+basename)
228
 
            else:
229
 
                yield relpath, st
230
 
 
231
247
    def listable(self):
232
248
        """Return True if this store is able to be listed."""
233
249
        return self._transport.listable()
234
250
 
235
 
 
236
 
class ImmutableMemoryStore(Store):
237
 
    """A memory only store."""
238
 
 
239
 
    def __contains__(self, fileid):
240
 
        return self._contents.has_key(fileid)
241
 
 
242
 
    def __init__(self):
243
 
        super(ImmutableMemoryStore, self).__init__()
244
 
        self._contents = {}
245
 
 
246
 
    def add(self, stream, fileid, compressed=True):
247
 
        if self._contents.has_key(fileid):
248
 
            raise StoreError("fileid %s already in the store" % fileid)
249
 
        self._contents[fileid] = stream.read()
250
 
 
251
 
    def __getitem__(self, fileid):
252
 
        """Returns a file reading from a particular entry."""
253
 
        if not self._contents.has_key(fileid):
254
 
            raise IndexError
255
 
        return StringIO(self._contents[fileid])
256
 
 
257
 
    def _item_size(self, fileid):
258
 
        return len(self._contents[fileid])
259
 
 
260
 
    def __iter__(self):
261
 
        return iter(self._contents.keys())
 
251
    def register_suffix(self, suffix):
 
252
        """Register a suffix as being expected in this store."""
 
253
        self._check_fileid(suffix)
 
254
        self._suffixes.add(suffix)
262
255
 
263
256
    def total_size(self):
264
 
        result = 0
 
257
        """Return (count, bytes)
 
258
 
 
259
        This is the (compressed) size stored on disk, not the size of
 
260
        the content."""
 
261
        total = 0
265
262
        count = 0
266
 
        for fileid in self:
 
263
        for relpath in self._transport.iter_files_recursive():
267
264
            count += 1
268
 
            result += self._item_size(fileid)
269
 
        return count, result
 
265
            total += self._transport.stat(relpath).st_size
 
266
                
 
267
        return count, total
 
268
 
 
269
 
 
270
def ImmutableMemoryStore():
 
271
    return bzrlib.store.text.TextStore(transport.memory.MemoryTransport())
270
272
        
271
273
 
272
274
class CachedStore(Store):
284
286
        # or something. RBC 20051003
285
287
        self.cache_store = store.__class__(LocalTransport(cache_dir))
286
288
 
287
 
    def __getitem__(self, id):
 
289
    def get(self, id):
288
290
        mutter("Cache add %s" % id)
289
291
        if id not in self.cache_store:
290
 
            self.cache_store.add(self.source_store[id], id)
291
 
        return self.cache_store[id]
 
292
            self.cache_store.add(self.source_store.get(id), id)
 
293
        return self.cache_store.get(id)
292
294
 
293
 
    def __contains__(self, fileid):
294
 
        if fileid in self.cache_store:
 
295
    def has_id(self, fileid, suffix=None):
 
296
        """See Store.has_id."""
 
297
        if self.cache_store.has_id(fileid, suffix):
295
298
            return True
296
 
        if fileid in self.source_store:
 
299
        if self.source_store.has_id(fileid, suffix):
297
300
            # We could copy at this time
298
301
            return True
299
302
        return False
300
303
 
301
 
    def get(self, fileids, permit_failure=False, pb=None):
302
 
        fileids = list(fileids)
303
 
        hasids = self.cache_store.has(fileids)
304
 
        needs = set()
305
 
        for has, fileid in zip(hasids, fileids):
306
 
            if not has:
307
 
                needs.add(fileid)
308
 
        if needs:
309
 
            self.cache_store.copy_multi(self.source_store, needs,
310
 
                    permit_failure=permit_failure)
311
 
        return self.cache_store.get(fileids,
312
 
                permit_failure=permit_failure, pb=pb)
313
 
 
314
 
    def prefetch(self, ids):
315
 
        """Copy a series of ids into the cache, before they are used.
316
 
        For remote stores that support pipelining or async downloads, this can
317
 
        increase speed considerably.
318
 
 
319
 
        Failures while prefetching are ignored.
320
 
        """
321
 
        mutter("Prefetch of ids %s" % ",".join(ids))
322
 
        self.cache_store.copy_multi(self.source_store, ids, 
323
 
                                    permit_failure=True)
324
 
 
325
304
 
326
305
def copy_all(store_from, store_to):
327
306
    """Copy all ids from one store to another."""