~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/store/__init__.py

  • Committer: Robert Collins
  • Date: 2005-10-02 21:51:29 UTC
  • mfrom: (1396)
  • mto: This revision was merged to the branch mainline in revision 1397.
  • Revision ID: robertc@robertcollins.net-20051002215128-5686c7d24bf9bdb9
merge from martins newformat branch - brings in transport abstraction

Show diffs side-by-side

added added

removed removed

Lines of Context:
24
24
unique ID.
25
25
"""
26
26
 
27
 
import errno
28
 
import gzip
29
 
import os
30
 
import tempfile
31
 
import types
32
 
from stat import ST_SIZE
33
 
from StringIO import StringIO
 
27
from cStringIO import StringIO
34
28
 
35
 
from bzrlib.errors import BzrError, UnlistableStore
 
29
from bzrlib.errors import BzrError, UnlistableStore, TransportNotPossible
36
30
from bzrlib.trace import mutter
37
 
import bzrlib.ui
38
 
import bzrlib.osutils as osutils
39
 
#circular import
40
 
#from bzrlib.remotebranch import get_url
41
 
import urllib2
42
 
 
 
31
import bzrlib.transport
 
32
from bzrlib.transport.local import LocalTransport
43
33
 
44
34
######################################################################
45
35
# stores
49
39
 
50
40
 
51
41
class Store(object):
52
 
    """An abstract store that holds files indexed by unique names.
53
 
 
 
42
    """This class represents the abstract storage layout for saving information.
 
43
    
54
44
    Files can be added, but not modified once they are in.  Typically
55
45
    the hash is used as the name, or something else known to be unique,
56
46
    such as a UUID.
57
 
 
58
 
    >>> st = ImmutableScratchStore()
59
 
 
60
 
    >>> st.add(StringIO('hello'), 'aa')
61
 
    >>> 'aa' in st
62
 
    True
63
 
    >>> 'foo' in st
64
 
    False
65
 
 
66
 
    You are not allowed to add an id that is already present.
67
 
 
68
 
    Entries can be retrieved as files, which may then be read.
69
 
 
70
 
    >>> st.add(StringIO('goodbye'), '123123')
71
 
    >>> st['123123'].read()
72
 
    'goodbye'
73
 
    """
74
 
 
75
 
    def total_size(self):
76
 
        """Return (count, bytes)
77
 
 
78
 
        This is the (compressed) size stored on disk, not the size of
79
 
        the content."""
80
 
        total = 0
81
 
        count = 0
82
 
        for fid in self:
83
 
            count += 1
84
 
            total += self._item_size(fid)
85
 
        return count, total
86
 
 
87
 
 
88
 
class ImmutableStore(Store):
89
 
    """Store that stores files on disk.
90
 
 
91
 
    TODO: Atomic add by writing to a temporary file and renaming.
92
 
    TODO: Guard against the same thing being stored twice, compressed and
93
 
          uncompressed during copy_multi_immutable - the window is for a
94
 
          matching store with some crack code that lets it offer a 
95
 
          non gz FOO and then a fz FOO.
96
 
 
97
 
    In bzr 0.0.5 and earlier, files within the store were marked
98
 
    readonly on disk.  This is no longer done but existing stores need
99
 
    to be accomodated.
100
 
    """
101
 
 
102
 
    def __init__(self, basedir):
103
 
        super(ImmutableStore, self).__init__()
104
 
        self._basedir = basedir
105
 
 
106
 
    def _path(self, entry_id):
107
 
        if not isinstance(entry_id, basestring):
108
 
            raise TypeError(type(entry_id))
109
 
        if '\\' in entry_id or '/' in entry_id:
110
 
            raise ValueError("invalid store id %r" % entry_id)
111
 
        return os.path.join(self._basedir, entry_id)
112
 
 
113
 
    def __repr__(self):
114
 
        return "%s(%r)" % (self.__class__.__name__, self._basedir)
115
 
 
116
 
    def add(self, f, fileid, compressed=True):
117
 
        """Add contents of a file into the store.
118
 
 
119
 
        f -- An open file, or file-like object."""
120
 
        # FIXME: Only works on files that will fit in memory
121
 
        
122
 
        from bzrlib.atomicfile import AtomicFile
123
 
        
124
 
        mutter("add store entry %r" % (fileid))
125
 
        if isinstance(f, types.StringTypes):
126
 
            content = f
127
 
        else:
128
 
            content = f.read()
129
 
            
130
 
        p = self._path(fileid)
131
 
        if os.access(p, os.F_OK) or os.access(p + '.gz', os.F_OK):
132
 
            raise BzrError("store %r already contains id %r" % (self._basedir, fileid))
133
 
 
134
 
        fn = p
135
 
        if compressed:
136
 
            fn = fn + '.gz'
137
 
            
138
 
        af = AtomicFile(fn, 'wb')
139
 
        try:
140
 
            if compressed:
141
 
                gf = gzip.GzipFile(mode='wb', fileobj=af)
142
 
                gf.write(content)
143
 
                gf.close()
 
47
    """
 
48
 
 
49
    def __len__(self):
 
50
        raise NotImplementedError('Children should define their length')
 
51
 
 
52
    def __getitem__(self, fileid):
 
53
        """Returns a file reading from a particular entry."""
 
54
        raise NotImplementedError
 
55
 
 
56
    def __contains__(self, fileid):
 
57
        """"""
 
58
        raise NotImplementedError
 
59
 
 
60
    def __iter__(self):
 
61
        raise NotImplementedError
 
62
 
 
63
    def add(self, f, fileid):
 
64
        """Add a file object f to the store accessible from the given fileid"""
 
65
        raise NotImplementedError('Children of Store must define their method of adding entries.')
 
66
 
 
67
    def add_multi(self, entries):
 
68
        """Add a series of file-like or string objects to the store with the given
 
69
        identities.
 
70
        
 
71
        :param entries: A list of tuples of file,id pairs [(file1, id1), (file2, id2), ...]
 
72
                        This could also be a generator yielding (file,id) pairs.
 
73
        """
 
74
        for f, fileid in entries:
 
75
            self.add(f, fileid)
 
76
 
 
77
    def has(self, fileids):
 
78
        """Return True/False for each entry in fileids.
 
79
 
 
80
        :param fileids: A List or generator yielding file ids.
 
81
        :return: A generator or list returning True/False for each entry.
 
82
        """
 
83
        for fileid in fileids:
 
84
            if fileid in self:
 
85
                yield True
144
86
            else:
145
 
                af.write(content)
146
 
            af.commit()
147
 
        finally:
148
 
            af.close()
149
 
 
150
 
 
151
 
    def copy_multi(self, other, ids, permit_failure=False):
 
87
                yield False
 
88
 
 
89
    def get(self, fileids, permit_failure=False, pb=None):
 
90
        """Return a set of files, one for each requested entry.
 
91
        
 
92
        :param permit_failure: If true, return None for entries which do not 
 
93
                               exist.
 
94
        :return: A list or generator of file-like objects, one for each id.
 
95
        """
 
96
        for fileid in fileids:
 
97
            try:
 
98
                yield self[fileid]
 
99
            except KeyError:
 
100
                if permit_failure:
 
101
                    yield None
 
102
                else:
 
103
                    raise
 
104
 
 
105
    def copy_multi(self, other, ids, pb=None, permit_failure=False):
152
106
        """Copy texts for ids from other into self.
153
107
 
154
 
        If an id is present in self, it is skipped.
 
108
        If an id is present in self, it is skipped.  A count of copied
 
109
        ids is returned, which may be less than len(ids).
155
110
 
156
 
        Returns (count_copied, failed), where failed is a collection of ids
157
 
        that could not be copied.
 
111
        :param other: Another Store object
 
112
        :param ids: A list of entry ids to be copied
 
113
        :param pb: A ProgressBar object, if none is given, the default will be created.
 
114
        :param permit_failure: Allow missing entries to be ignored
 
115
        :return: (n_copied, [failed]) The number of entries copied successfully,
 
116
            followed by a list of entries which could not be copied (because they
 
117
            were missing)
158
118
        """
159
 
        pb = bzrlib.ui.ui_factory.progress_bar()
160
 
        
 
119
        if pb is None:
 
120
            pb = bzrlib.ui.ui_factory.progress_bar()
 
121
 
 
122
        # XXX: Is there any reason why we couldn't make this accept a generator
 
123
        # and build a list as it finds things to copy?
 
124
        ids = list(ids) # Make sure we don't have a generator, since we iterate 2 times
161
125
        pb.update('preparing to copy')
162
 
        to_copy = [id for id in ids if id not in self]
163
 
        if isinstance(other, ImmutableStore):
164
 
            return self.copy_multi_immutable(other, to_copy, pb, 
165
 
                                             permit_failure=permit_failure)
166
 
        count = 0
 
126
        to_copy = []
 
127
        for file_id, has in zip(ids, self.has(ids)):
 
128
            if not has:
 
129
                to_copy.append(file_id)
 
130
        return self._do_copy(other, to_copy, pb, permit_failure=permit_failure)
 
131
 
 
132
    def _do_copy(self, other, to_copy, pb, permit_failure=False):
 
133
        """This is the standard copying mechanism, just get them one at
 
134
        a time from remote, and store them locally.
 
135
 
 
136
        :param other: Another Store object
 
137
        :param to_copy: A list of entry ids to copy
 
138
        :param pb: A ProgressBar object to display completion status.
 
139
        :param permit_failure: Allow missing entries to be ignored
 
140
        :return: (n_copied, [failed])
 
141
            The number of entries copied, and a list of failed entries.
 
142
        """
 
143
        # This should be updated to use add_multi() rather than
 
144
        # the current methods of buffering requests.
 
145
        # One question, is it faster to queue up 1-10 and then copy 1-10
 
146
        # then queue up 11-20, copy 11-20
 
147
        # or to queue up 1-10, copy 1, queue 11, copy 2, etc?
 
148
        # sort of pipeline versus batch.
 
149
 
 
150
        # We can't use self._transport.copy_to because we don't know
 
151
        # whether the local tree is in the same format as other
167
152
        failed = set()
168
 
        for id in to_copy:
169
 
            count += 1
170
 
            pb.update('copy', count, len(to_copy))
171
 
            if not permit_failure:
172
 
                self.add(other[id], id)
173
 
            else:
 
153
        def buffer_requests():
 
154
            count = 0
 
155
            buffered_requests = []
 
156
            for fileid in to_copy:
174
157
                try:
175
 
                    entry = other[id]
 
158
                    f = other[fileid]
176
159
                except KeyError:
177
 
                    failed.add(id)
178
 
                    continue
179
 
                self.add(entry, id)
180
 
                
181
 
        if not permit_failure:
 
160
                    if permit_failure:
 
161
                        failed.add(fileid)
 
162
                        continue
 
163
                    else:
 
164
                        raise
 
165
 
 
166
                buffered_requests.append((f, fileid))
 
167
                if len(buffered_requests) > self._max_buffered_requests:
 
168
                    yield buffered_requests.pop(0)
 
169
                    count += 1
 
170
                    pb.update('copy', count, len(to_copy))
 
171
 
 
172
            for req in buffered_requests:
 
173
                yield req
 
174
                count += 1
 
175
                pb.update('copy', count, len(to_copy))
 
176
 
182
177
            assert count == len(to_copy)
183
 
        pb.clear()
184
 
        return count, failed
185
 
 
186
 
    def copy_multi_immutable(self, other, to_copy, pb, permit_failure=False):
187
 
        count = 0
188
 
        failed = set()
189
 
        for id in to_copy:
190
 
            p = self._path(id)
191
 
            other_p = other._path(id)
192
 
            try:
193
 
                osutils.link_or_copy(other_p, p)
194
 
            except (IOError, OSError), e:
195
 
                if e.errno == errno.ENOENT:
196
 
                    if not permit_failure:
197
 
                        osutils.link_or_copy(other_p+".gz", p+".gz")
198
 
                    else:
199
 
                        try:
200
 
                            osutils.link_or_copy(other_p+".gz", p+".gz")
201
 
                        except IOError, e:
202
 
                            if e.errno == errno.ENOENT:
203
 
                                failed.add(id)
204
 
                            else:
205
 
                                raise
206
 
                else:
207
 
                    raise
208
 
            
209
 
            count += 1
210
 
            pb.update('copy', count, len(to_copy))
211
 
        assert count == len(to_copy)
212
 
        pb.clear()
213
 
        return count, failed
214
 
 
215
 
    def __contains__(self, fileid):
216
 
        """"""
217
 
        p = self._path(fileid)
218
 
        return (os.access(p, os.R_OK)
219
 
                or os.access(p + '.gz', os.R_OK))
220
 
 
221
 
    def _item_size(self, fid):
222
 
        p = self._path(fid)
223
 
        try:
224
 
            return os.stat(p)[ST_SIZE]
225
 
        except OSError:
226
 
            return os.stat(p + '.gz')[ST_SIZE]
227
 
 
228
 
    # TODO: Guard against the same thing being stored twice,
229
 
    # compressed and uncompressed
230
 
 
231
 
    def __iter__(self):
232
 
        for f in os.listdir(self._basedir):
233
 
            if f[-3:] == '.gz':
234
 
                # TODO: case-insensitive?
235
 
                yield f[:-3]
236
 
            else:
237
 
                yield f
238
 
 
239
 
    def __len__(self):
240
 
        return len(os.listdir(self._basedir))
241
 
 
242
 
    def __getitem__(self, fileid):
243
 
        """Returns a file reading from a particular entry."""
244
 
        p = self._path(fileid)
245
 
        try:
246
 
            return gzip.GzipFile(p + '.gz', 'rb')
247
 
        except IOError, e:
248
 
            if e.errno != errno.ENOENT:
249
 
                raise
250
 
 
251
 
        try:
252
 
            return file(p, 'rb')
253
 
        except IOError, e:
254
 
            if e.errno != errno.ENOENT:
255
 
                raise
256
 
 
257
 
        raise KeyError(fileid)
258
 
 
259
 
 
260
 
class ImmutableScratchStore(ImmutableStore):
261
 
    """Self-destructing test subclass of ImmutableStore.
262
 
 
263
 
    The Store only exists for the lifetime of the Python object.
264
 
 Obviously you should not put anything precious in it.
265
 
    """
266
 
    def __init__(self):
267
 
        super(ImmutableScratchStore, self).__init__(tempfile.mkdtemp())
268
 
 
269
 
    def __del__(self):
270
 
        for f in os.listdir(self._basedir):
271
 
            fpath = os.path.join(self._basedir, f)
272
 
            # needed on windows, and maybe some other filesystems
273
 
            os.chmod(fpath, 0600)
274
 
            os.remove(fpath)
275
 
        os.rmdir(self._basedir)
276
 
        mutter("%r destroyed" % self)
 
178
 
 
179
        self.add_multi(buffer_requests())
 
180
 
 
181
        pb.clear()
 
182
        return len(to_copy), failed
 
183
 
 
184
 
 
185
class TransportStore(Store):
 
186
    """A TransportStore is a Store superclass for Stores that use Transports."""
 
187
 
 
188
    _max_buffered_requests = 10
 
189
 
 
190
    def __init__(self, transport):
 
191
        assert isinstance(transport, bzrlib.transport.Transport)
 
192
        super(TransportStore, self).__init__()
 
193
        self._transport = transport
 
194
 
 
195
    def __repr__(self):
 
196
        if self._transport is None:
 
197
            return "%s(None)" % (self.__class__.__name__)
 
198
        else:
 
199
            return "%s(%r)" % (self.__class__.__name__, self._transport.base)
 
200
 
 
201
    __str__ = __repr__
277
202
 
278
203
 
279
204
class ImmutableMemoryStore(Store):
280
205
    """A memory only store."""
281
206
 
 
207
    def __contains__(self, fileid):
 
208
        return self._contents.has_key(fileid)
 
209
 
282
210
    def __init__(self):
283
211
        super(ImmutableMemoryStore, self).__init__()
284
212
        self._contents = {}
300
228
    def __iter__(self):
301
229
        return iter(self._contents.keys())
302
230
 
303
 
 
304
 
class RemoteStore(object):
305
 
 
306
 
    def __init__(self, baseurl):
307
 
        self._baseurl = baseurl
308
 
 
309
 
    def _path(self, name):
310
 
        if '/' in name:
311
 
            raise ValueError('invalid store id', name)
312
 
        return self._baseurl + '/' + name
313
 
        
314
 
    def __getitem__(self, fileid):
315
 
        # circular import.
316
 
        from bzrlib.remotebranch import get_url
317
 
        p = self._path(fileid)
318
 
        try:
319
 
            return get_url(p, compressed=True)
320
 
        except urllib2.URLError:
321
 
            pass
322
 
        try:
323
 
            return get_url(p, compressed=False)
324
 
        except urllib2.URLError:
325
 
            raise KeyError(fileid)
326
 
 
327
 
    def __contains__(self, fileid):
328
 
        try:
329
 
            self[fileid]
330
 
            return True
331
 
        except KeyError:
332
 
            return False
333
 
        
334
 
 
335
 
class CachedStore:
 
231
    def total_size(self):
 
232
        result = 0
 
233
        count = 0
 
234
        for fileid in self:
 
235
            count += 1
 
236
            result += self._item_size(fileid)
 
237
        return count, result
 
238
        
 
239
 
 
240
class CachedStore(Store):
336
241
    """A store that caches data locally, to avoid repeated downloads.
337
242
    The precacache method should be used to avoid server round-trips for
338
243
    every piece of data.
339
244
    """
340
245
 
341
246
    def __init__(self, store, cache_dir):
 
247
        super(CachedStore, self).__init__()
342
248
        self.source_store = store
343
 
        self.cache_store = ImmutableStore(cache_dir)
 
249
        # This clones the source store type with a locally bound
 
250
        # transport. FIXME: it assumes a constructor is == cloning.
 
251
        # clonable store - it might be nicer to actually have a clone()
 
252
        # or something. RBC 20051003
 
253
        self.cache_store = store.__class__(LocalTransport(cache_dir))
344
254
 
345
255
    def __getitem__(self, id):
346
256
        mutter("Cache add %s" % id)
348
258
            self.cache_store.add(self.source_store[id], id)
349
259
        return self.cache_store[id]
350
260
 
 
261
    def __contains__(self, fileid):
 
262
        if fileid in self.cache_store:
 
263
            return True
 
264
        if fileid in self.source_store:
 
265
            # We could copy at this time
 
266
            return True
 
267
        return False
 
268
 
 
269
    def get(self, fileids, permit_failure=False, pb=None):
 
270
        fileids = list(fileids)
 
271
        hasids = self.cache_store.has(fileids)
 
272
        needs = set()
 
273
        for has, fileid in zip(hasids, fileids):
 
274
            if not has:
 
275
                needs.add(fileid)
 
276
        if needs:
 
277
            self.cache_store.copy_multi(self.source_store, needs,
 
278
                    permit_failure=permit_failure)
 
279
        return self.cache_store.get(fileids,
 
280
                permit_failure=permit_failure, pb=pb)
 
281
 
351
282
    def prefetch(self, ids):
352
283
        """Copy a series of ids into the cache, before they are used.
353
284
        For remote stores that support pipelining or async downloads, this can
354
285
        increase speed considerably.
 
286
 
355
287
        Failures while prefetching are ignored.
356
288
        """
357
289
        mutter("Prefetch of ids %s" % ",".join(ids))
358
 
        self.cache_store.copy_multi(self.source_store, ids,
 
290
        self.cache_store.copy_multi(self.source_store, ids, 
359
291
                                    permit_failure=True)
360
292
 
361
293
 
362
294
def copy_all(store_from, store_to):
363
295
    """Copy all ids from one store to another."""
 
296
    # TODO: Optional progress indicator
364
297
    if not hasattr(store_from, "__iter__"):
365
298
        raise UnlistableStore(store_from)
366
 
    ids = [f for f in store_from]
 
299
    try:
 
300
        ids = [f for f in store_from]
 
301
    except (NotImplementedError, TransportNotPossible):
 
302
        raise UnlistableStore(store_from)
367
303
    store_to.copy_multi(store_from, ids)
 
304