API Reference

Module for EverCas class.

EverCas

Bases: object

Content addressable file manager.

Attributes:

Name Type Description
root str

Directory path used as root of storage space.

depth int

Depth of subfolders to create when saving a file.

width int

Width of each subfolder to create when saving a file.

algorithm str

Hash algorithm to use when computing file hash. Algorithm should be available in hashlib module. Defaults to 'sha256'.

fmode int

File mode permission to set when adding files to directory. Defaults to 0o664 which allows owner/group to read/write and everyone else to read.

dmode int

Directory mode permission to set for subdirectories. Defaults to 0o755 which allows owner/group to read/write and everyone else to read and everyone to execute.

put_strategy mixed

Default put_strategy for :meth:put method. See :meth:put for more information. Defaults to :attr:PutStrategies.copy.

lowercase_extensions bool

Normalize all file extensions to lower case when adding files. Defaults to False.

Source code in evercas/evercas.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
class EverCas(object):
    """Content addressable file manager.

    Attributes:
        root (str): Directory path used as root of storage space.
        depth (int, optional): Depth of subfolders to create when saving a
            file.
        width (int, optional): Width of each subfolder to create when saving a
            file.
        algorithm (str): Hash algorithm to use when computing file hash.
            Algorithm should be available in ``hashlib`` module. Defaults to
            ``'sha256'``.
        fmode (int, optional): File mode permission to set when adding files to
            directory. Defaults to ``0o664`` which allows owner/group to
            read/write and everyone else to read.
        dmode (int, optional): Directory mode permission to set for
            subdirectories. Defaults to ``0o755`` which allows owner/group to
            read/write and everyone else to read and everyone to execute.
        put_strategy (mixed, optional): Default ``put_strategy`` for
            :meth:`put` method. See :meth:`put` for more information. Defaults
            to :attr:`PutStrategies.copy`.
        lowercase_extensions (bool, optional): Normalize all file extensions
            to lower case when adding files. Defaults to ``False``.
    """

    def __init__(
        self,
        root: str,
        depth: int = 4,
        width: int = 1,
        algorithm: str = "sha256",
        fmode: int = 0o664,
        dmode: int = 0o755,
        put_strategy: str | None = None,
        lowercase_extensions: bool = False,
    ):
        self.root = os.path.realpath(root)
        self.depth = depth
        self.width = width
        self.algorithm = algorithm
        self.fmode = fmode
        self.dmode = dmode
        self.put_strategy = PutStrategies.get(put_strategy) or PutStrategies.copy
        self.lowercase_extensions = lowercase_extensions

    def put(
        self,
        file: BinaryIO | str,
        extension: str | None = None,
        put_strategy: str | None = None,
        simulate: bool = False,
    ):
        """Store contents of `file` on disk using its content hash for the
        address.

        Args:
            file (mixed): Readable object or path to file.
            extension (str, optional): Optional extension to append to file
                when saving.
            put_strategy (mixed, optional): The strategy to use for adding
                files; may be a function or the string name of one of the
                built-in put strategies declared in :class:`PutStrategies`
                class. Defaults to :attr:`PutStrategies.copy`.
            simulate (bool, optional): Return the :class:`HashAddress` of the
                file that would be appended but don't do anything.

        Put strategies are functions ``(evercas, stream, filepath)`` where
        ``evercas`` is the :class:`EverCas` instance from which :meth:`put` was
        called; ``stream`` is the :class:`Stream` object representing the
        data to add; and ``filepath`` is the string absolute file path inside
        the EverCas where it needs to be saved. The put strategy function should
        create the path ``filepath`` containing the data in ``stream``.

        There are currently two built-in put strategies: "copy" (the default)
        and "link". "link" attempts to hard link the file into the EverCas if
        the platform and underlying filesystem support it, and falls back to
        "copy" behavior.

        Returns:
            HashAddress: File's hash address.
        """
        stream = Stream(file)

        if extension and self.lowercase_extensions:
            extension = extension.lower()

        with closing(stream):
            id = self.computehash(stream)
            filepath = self.idpath(id, extension)

            # Only move file if it doesn't already exist.
            if not os.path.isfile(filepath):
                is_duplicate = False
                if not simulate:
                    self.makepath(os.path.dirname(filepath))
                    put_strategy_callable = (
                        PutStrategies.get(put_strategy)
                        or self.put_strategy
                        or PutStrategies.copy
                    )
                    put_strategy_callable(self, stream, filepath)
            else:
                is_duplicate = True

        return HashAddress(id, self.relpath(filepath), filepath, is_duplicate)

    def putdir(
        self,
        root: str,
        extensions: bool = True,
        recursive: bool = False,
        put_strategy: str | None = None,
        simulate: bool = False,
    ):
        """Put all files from a directory.

        Args:
            root (str): Path to the directory to add.
            extensions (bool, optional): Whether to add extensions when
                saving (extension will be taken from input file). Defaults to
                ``True``.
            recursive (bool, optional): Find files recursively in ``root``.
                Defaults to ``False``.
            put_strategy (mixed, optional): same as :meth:`put`.
            simulate (boo, optional): same as :meth:`put`.

        Yields :class:`HashAddress`es for all added files.
        """
        for file in find_files(root, recursive=recursive):
            extension = os.path.splitext(file)[1] if extensions else None
            address = self.put(
                file, extension=extension, put_strategy=put_strategy, simulate=simulate
            )
            yield (file, address)

    def mktempfile(self, stream: Stream):
        """Create a named temporary file from a :class:`Stream` object and
        return its filename.
        """
        tmp = NamedTemporaryFile(delete=False)

        oldmask = os.umask(0)

        try:
            os.chmod(tmp.name, self.fmode)
        finally:
            os.umask(oldmask)

        for data in stream:
            tmp.write(to_bytes(data))

        tmp.close()

        return tmp.name

    def get(self, file: str):
        """Return :class:`HashAddress` from given id or path. If `file` does not
        refer to a valid file, then ``None`` is returned.

        Args:
            file (str): Address ID or path of file.

        Returns:
            HashAddress: File's hash address.
        """
        realpath = self.realpath(file)

        if realpath is None:
            return None
        else:
            return HashAddress(self.unshard(realpath), self.relpath(realpath), realpath)

    def open(self, file: str, mode: str = "rb"):
        """Return open buffer object from given id or path.

        Args:
            file (str): Address ID or path of file.
            mode (str, optional): Mode to open file in. Defaults to ``'rb'``.

        Returns:
            Buffer: An ``io`` buffer dependent on the `mode`.

        Raises:
            IOError: If file doesn't exist.
        """
        realpath = self.realpath(file)
        if realpath is None:
            raise IOError("Could not locate file: {0}".format(file))

        return io.open(realpath, mode)

    def delete(self, file: str):
        """Delete file using id or path. Remove any empty directories after
        deleting. No exception is raised if file doesn't exist.

        Args:
            file (str): Address ID or path of file.
        """
        realpath = self.realpath(file)
        if realpath is None:
            return

        try:
            os.remove(realpath)
        except OSError:  # pragma: no cover
            pass
        else:
            self.remove_empty(os.path.dirname(realpath))

    def remove_empty(self, subpath: str):
        """Successively remove all empty folders starting with `subpath` and
        proceeding "up" through directory tree until reaching the :attr:`root`
        folder.
        """
        # Don't attempt to remove any folders if subpath is not a
        # subdirectory of the root directory.
        if not self.haspath(subpath):
            return

        while subpath != self.root:
            if len(os.listdir(subpath)) > 0 or os.path.islink(subpath):
                break
            os.rmdir(subpath)
            subpath = os.path.dirname(subpath)

    def files(self):
        """Return generator that yields all files in the :attr:`root`
        directory.
        """
        for file in find_files(self.root, recursive=True):
            yield os.path.abspath(file)

    def folders(self):
        """Return generator that yields all folders in the :attr:`root`
        directory that contain files.
        """
        for folder, _, files in os.walk(self.root):
            if files:
                yield folder

    def count(self):
        """Return count of the number of files in the :attr:`root` directory."""
        count = 0
        for _ in self:
            count += 1
        return count

    def size(self):
        """Return the total size in bytes of all files in the :attr:`root`
        directory.
        """
        total = 0

        for path in self.files():
            total += os.path.getsize(path)

        return total

    def exists(self, file: str):
        """Check whether a given file id or path exists on disk."""
        return bool(self.realpath(file))

    def haspath(self, path: str):
        """Return whether `path` is a subdirectory of the :attr:`root`
        directory.
        """
        return issubdir(path, self.root)

    def makepath(self, path: str):
        """Physically create the folder path on disk."""
        try:
            os.makedirs(path, self.dmode)
        except FileExistsError:
            assert os.path.isdir(path), "expected {} to be a directory".format(path)

    def relpath(self, path: str):
        """Return `path` relative to the :attr:`root` directory."""
        return os.path.relpath(path, self.root)

    def realpath(self, file: str):
        """Attempt to determine the real path of a file id or path through
        successive checking of candidate paths. If the real path is stored with
        an extension, the path is considered a match if the basename matches
        the expected file path of the id.
        """

        # Check for absolute path.
        if os.path.isfile(file):
            return file

        # Check for relative path.
        relpath = os.path.join(self.root, file)
        if os.path.isfile(relpath):
            return relpath

        # Check for sharded path.
        filepath = self.idpath(file)
        if os.path.isfile(filepath):
            return filepath

        # Check for sharded path with any extension.
        paths = glob.glob("{0}.*".format(filepath))
        if paths:
            return paths[0]

        # Could not determine a match.
        return None

    def idpath(self, id: str, extension: str | None = ""):
        """Build the file path for a given hash id. Optionally, append a
        file extension.
        """
        paths = self.shard(id)

        if extension and not extension.startswith(os.extsep):
            extension = os.extsep + extension
        elif not extension:
            extension = ""

        return os.path.join(self.root, *paths) + extension

    def computehash(self, stream: Stream):
        """Compute hash of file using :attr:`algorithm`."""
        hashobj = hashlib.new(self.algorithm)
        for data in stream:
            hashobj.update(to_bytes(data))
        return hashobj.hexdigest()

    def shard(self, id: str):
        """Shard content ID into subfolders."""
        return shard(id, self.depth, self.width)

    def unshard(self, path: str):
        """Unshard path to determine hash value."""
        if not self.haspath(path):
            raise ValueError(
                "Cannot unshard path. The path {0!r} is not "
                "a subdirectory of the root directory {1!r}".format(path, self.root)
            )

        return os.path.splitext(self.relpath(path))[0].replace(os.sep, "")

    def repair(self, extensions: bool = True):
        """Repair any file locations whose content address doesn't match it's
        file path.
        """
        repaired: list[tuple[str, HashAddress]] = []
        corrupted = tuple(self.corrupted(extensions=extensions))
        oldmask = os.umask(0)

        try:
            for path, address in corrupted:
                if os.path.isfile(address.abspath):
                    # File already exists so just delete corrupted path.
                    os.remove(path)
                else:
                    # File doesn't exists so move it.
                    self.makepath(os.path.dirname(address.abspath))
                    shutil.move(path, address.abspath)

                os.chmod(address.abspath, self.fmode)
                repaired.append((path, address))
        finally:
            os.umask(oldmask)

        return repaired

    def corrupted(self, extensions: bool = True):
        """Return generator that yields corrupted files as ``(path, address)``
        where ``path`` is the path of the corrupted file and ``address`` is
        the :class:`HashAddress` of the expected location.
        """
        for path in self.files():
            stream = Stream(path)

            with closing(stream):
                id = self.computehash(stream)

            extension = os.path.splitext(path)[1] if extensions else None
            expected_path = self.idpath(id, extension)

            if expected_path != path:
                yield (
                    path,
                    HashAddress(id, self.relpath(expected_path), expected_path),
                )

    def __contains__(self, file: str):
        """Return whether a given file id or path is contained in the
        :attr:`root` directory.
        """
        return self.exists(file)

    def __iter__(self):
        """Iterate over all files in the :attr:`root` directory."""
        return self.files()

    def __len__(self):
        """Return count of the number of files in the :attr:`root` directory."""
        return self.count()

__contains__(file)

Return whether a given file id or path is contained in the :attr:root directory.

Source code in evercas/evercas.py
407
408
409
410
411
def __contains__(self, file: str):
    """Return whether a given file id or path is contained in the
    :attr:`root` directory.
    """
    return self.exists(file)

__iter__()

Iterate over all files in the :attr:root directory.

Source code in evercas/evercas.py
413
414
415
def __iter__(self):
    """Iterate over all files in the :attr:`root` directory."""
    return self.files()

__len__()

Return count of the number of files in the :attr:root directory.

Source code in evercas/evercas.py
417
418
419
def __len__(self):
    """Return count of the number of files in the :attr:`root` directory."""
    return self.count()

computehash(stream)

Compute hash of file using :attr:algorithm.

Source code in evercas/evercas.py
341
342
343
344
345
346
def computehash(self, stream: Stream):
    """Compute hash of file using :attr:`algorithm`."""
    hashobj = hashlib.new(self.algorithm)
    for data in stream:
        hashobj.update(to_bytes(data))
    return hashobj.hexdigest()

corrupted(extensions=True)

Return generator that yields corrupted files as (path, address) where path is the path of the corrupted file and address is the :class:HashAddress of the expected location.

Source code in evercas/evercas.py
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
def corrupted(self, extensions: bool = True):
    """Return generator that yields corrupted files as ``(path, address)``
    where ``path`` is the path of the corrupted file and ``address`` is
    the :class:`HashAddress` of the expected location.
    """
    for path in self.files():
        stream = Stream(path)

        with closing(stream):
            id = self.computehash(stream)

        extension = os.path.splitext(path)[1] if extensions else None
        expected_path = self.idpath(id, extension)

        if expected_path != path:
            yield (
                path,
                HashAddress(id, self.relpath(expected_path), expected_path),
            )

count()

Return count of the number of files in the :attr:root directory.

Source code in evercas/evercas.py
260
261
262
263
264
265
def count(self):
    """Return count of the number of files in the :attr:`root` directory."""
    count = 0
    for _ in self:
        count += 1
    return count

delete(file)

Delete file using id or path. Remove any empty directories after deleting. No exception is raised if file doesn't exist.

Parameters:

Name Type Description Default
file str

Address ID or path of file.

required
Source code in evercas/evercas.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def delete(self, file: str):
    """Delete file using id or path. Remove any empty directories after
    deleting. No exception is raised if file doesn't exist.

    Args:
        file (str): Address ID or path of file.
    """
    realpath = self.realpath(file)
    if realpath is None:
        return

    try:
        os.remove(realpath)
    except OSError:  # pragma: no cover
        pass
    else:
        self.remove_empty(os.path.dirname(realpath))

exists(file)

Check whether a given file id or path exists on disk.

Source code in evercas/evercas.py
278
279
280
def exists(self, file: str):
    """Check whether a given file id or path exists on disk."""
    return bool(self.realpath(file))

files()

Return generator that yields all files in the :attr:root directory.

Source code in evercas/evercas.py
245
246
247
248
249
250
def files(self):
    """Return generator that yields all files in the :attr:`root`
    directory.
    """
    for file in find_files(self.root, recursive=True):
        yield os.path.abspath(file)

folders()

Return generator that yields all folders in the :attr:root directory that contain files.

Source code in evercas/evercas.py
252
253
254
255
256
257
258
def folders(self):
    """Return generator that yields all folders in the :attr:`root`
    directory that contain files.
    """
    for folder, _, files in os.walk(self.root):
        if files:
            yield folder

get(file)

Return :class:HashAddress from given id or path. If file does not refer to a valid file, then None is returned.

Parameters:

Name Type Description Default
file str

Address ID or path of file.

required

Returns:

Name Type Description
HashAddress

File's hash address.

Source code in evercas/evercas.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def get(self, file: str):
    """Return :class:`HashAddress` from given id or path. If `file` does not
    refer to a valid file, then ``None`` is returned.

    Args:
        file (str): Address ID or path of file.

    Returns:
        HashAddress: File's hash address.
    """
    realpath = self.realpath(file)

    if realpath is None:
        return None
    else:
        return HashAddress(self.unshard(realpath), self.relpath(realpath), realpath)

haspath(path)

Return whether path is a subdirectory of the :attr:root directory.

Source code in evercas/evercas.py
282
283
284
285
286
def haspath(self, path: str):
    """Return whether `path` is a subdirectory of the :attr:`root`
    directory.
    """
    return issubdir(path, self.root)

idpath(id, extension='')

Build the file path for a given hash id. Optionally, append a file extension.

Source code in evercas/evercas.py
328
329
330
331
332
333
334
335
336
337
338
339
def idpath(self, id: str, extension: str | None = ""):
    """Build the file path for a given hash id. Optionally, append a
    file extension.
    """
    paths = self.shard(id)

    if extension and not extension.startswith(os.extsep):
        extension = os.extsep + extension
    elif not extension:
        extension = ""

    return os.path.join(self.root, *paths) + extension

makepath(path)

Physically create the folder path on disk.

Source code in evercas/evercas.py
288
289
290
291
292
293
def makepath(self, path: str):
    """Physically create the folder path on disk."""
    try:
        os.makedirs(path, self.dmode)
    except FileExistsError:
        assert os.path.isdir(path), "expected {} to be a directory".format(path)

mktempfile(stream)

Create a named temporary file from a :class:Stream object and return its filename.

Source code in evercas/evercas.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def mktempfile(self, stream: Stream):
    """Create a named temporary file from a :class:`Stream` object and
    return its filename.
    """
    tmp = NamedTemporaryFile(delete=False)

    oldmask = os.umask(0)

    try:
        os.chmod(tmp.name, self.fmode)
    finally:
        os.umask(oldmask)

    for data in stream:
        tmp.write(to_bytes(data))

    tmp.close()

    return tmp.name

open(file, mode='rb')

Return open buffer object from given id or path.

Parameters:

Name Type Description Default
file str

Address ID or path of file.

required
mode str

Mode to open file in. Defaults to 'rb'.

'rb'

Returns:

Name Type Description
Buffer

An io buffer dependent on the mode.

Raises:

Type Description
IOError

If file doesn't exist.

Source code in evercas/evercas.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def open(self, file: str, mode: str = "rb"):
    """Return open buffer object from given id or path.

    Args:
        file (str): Address ID or path of file.
        mode (str, optional): Mode to open file in. Defaults to ``'rb'``.

    Returns:
        Buffer: An ``io`` buffer dependent on the `mode`.

    Raises:
        IOError: If file doesn't exist.
    """
    realpath = self.realpath(file)
    if realpath is None:
        raise IOError("Could not locate file: {0}".format(file))

    return io.open(realpath, mode)

put(file, extension=None, put_strategy=None, simulate=False)

Store contents of file on disk using its content hash for the address.

Parameters:

Name Type Description Default
file mixed

Readable object or path to file.

required
extension str

Optional extension to append to file when saving.

None
put_strategy mixed

The strategy to use for adding files; may be a function or the string name of one of the built-in put strategies declared in :class:PutStrategies class. Defaults to :attr:PutStrategies.copy.

None
simulate bool

Return the :class:HashAddress of the file that would be appended but don't do anything.

False

Put strategies are functions (evercas, stream, filepath) where evercas is the :class:EverCas instance from which :meth:put was called; stream is the :class:Stream object representing the data to add; and filepath is the string absolute file path inside the EverCas where it needs to be saved. The put strategy function should create the path filepath containing the data in stream.

There are currently two built-in put strategies: "copy" (the default) and "link". "link" attempts to hard link the file into the EverCas if the platform and underlying filesystem support it, and falls back to "copy" behavior.

Returns:

Name Type Description
HashAddress

File's hash address.

Source code in evercas/evercas.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def put(
    self,
    file: BinaryIO | str,
    extension: str | None = None,
    put_strategy: str | None = None,
    simulate: bool = False,
):
    """Store contents of `file` on disk using its content hash for the
    address.

    Args:
        file (mixed): Readable object or path to file.
        extension (str, optional): Optional extension to append to file
            when saving.
        put_strategy (mixed, optional): The strategy to use for adding
            files; may be a function or the string name of one of the
            built-in put strategies declared in :class:`PutStrategies`
            class. Defaults to :attr:`PutStrategies.copy`.
        simulate (bool, optional): Return the :class:`HashAddress` of the
            file that would be appended but don't do anything.

    Put strategies are functions ``(evercas, stream, filepath)`` where
    ``evercas`` is the :class:`EverCas` instance from which :meth:`put` was
    called; ``stream`` is the :class:`Stream` object representing the
    data to add; and ``filepath`` is the string absolute file path inside
    the EverCas where it needs to be saved. The put strategy function should
    create the path ``filepath`` containing the data in ``stream``.

    There are currently two built-in put strategies: "copy" (the default)
    and "link". "link" attempts to hard link the file into the EverCas if
    the platform and underlying filesystem support it, and falls back to
    "copy" behavior.

    Returns:
        HashAddress: File's hash address.
    """
    stream = Stream(file)

    if extension and self.lowercase_extensions:
        extension = extension.lower()

    with closing(stream):
        id = self.computehash(stream)
        filepath = self.idpath(id, extension)

        # Only move file if it doesn't already exist.
        if not os.path.isfile(filepath):
            is_duplicate = False
            if not simulate:
                self.makepath(os.path.dirname(filepath))
                put_strategy_callable = (
                    PutStrategies.get(put_strategy)
                    or self.put_strategy
                    or PutStrategies.copy
                )
                put_strategy_callable(self, stream, filepath)
        else:
            is_duplicate = True

    return HashAddress(id, self.relpath(filepath), filepath, is_duplicate)

putdir(root, extensions=True, recursive=False, put_strategy=None, simulate=False)

Put all files from a directory.

Parameters:

Name Type Description Default
root str

Path to the directory to add.

required
extensions bool

Whether to add extensions when saving (extension will be taken from input file). Defaults to True.

True
recursive bool

Find files recursively in root. Defaults to False.

False
put_strategy mixed

same as :meth:put.

None
simulate boo

same as :meth:put.

False

Yields :class:HashAddresses for all added files.

Source code in evercas/evercas.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def putdir(
    self,
    root: str,
    extensions: bool = True,
    recursive: bool = False,
    put_strategy: str | None = None,
    simulate: bool = False,
):
    """Put all files from a directory.

    Args:
        root (str): Path to the directory to add.
        extensions (bool, optional): Whether to add extensions when
            saving (extension will be taken from input file). Defaults to
            ``True``.
        recursive (bool, optional): Find files recursively in ``root``.
            Defaults to ``False``.
        put_strategy (mixed, optional): same as :meth:`put`.
        simulate (boo, optional): same as :meth:`put`.

    Yields :class:`HashAddress`es for all added files.
    """
    for file in find_files(root, recursive=recursive):
        extension = os.path.splitext(file)[1] if extensions else None
        address = self.put(
            file, extension=extension, put_strategy=put_strategy, simulate=simulate
        )
        yield (file, address)

realpath(file)

Attempt to determine the real path of a file id or path through successive checking of candidate paths. If the real path is stored with an extension, the path is considered a match if the basename matches the expected file path of the id.

Source code in evercas/evercas.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
def realpath(self, file: str):
    """Attempt to determine the real path of a file id or path through
    successive checking of candidate paths. If the real path is stored with
    an extension, the path is considered a match if the basename matches
    the expected file path of the id.
    """

    # Check for absolute path.
    if os.path.isfile(file):
        return file

    # Check for relative path.
    relpath = os.path.join(self.root, file)
    if os.path.isfile(relpath):
        return relpath

    # Check for sharded path.
    filepath = self.idpath(file)
    if os.path.isfile(filepath):
        return filepath

    # Check for sharded path with any extension.
    paths = glob.glob("{0}.*".format(filepath))
    if paths:
        return paths[0]

    # Could not determine a match.
    return None

relpath(path)

Return path relative to the :attr:root directory.

Source code in evercas/evercas.py
295
296
297
def relpath(self, path: str):
    """Return `path` relative to the :attr:`root` directory."""
    return os.path.relpath(path, self.root)

remove_empty(subpath)

Successively remove all empty folders starting with subpath and proceeding "up" through directory tree until reaching the :attr:root folder.

Source code in evercas/evercas.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def remove_empty(self, subpath: str):
    """Successively remove all empty folders starting with `subpath` and
    proceeding "up" through directory tree until reaching the :attr:`root`
    folder.
    """
    # Don't attempt to remove any folders if subpath is not a
    # subdirectory of the root directory.
    if not self.haspath(subpath):
        return

    while subpath != self.root:
        if len(os.listdir(subpath)) > 0 or os.path.islink(subpath):
            break
        os.rmdir(subpath)
        subpath = os.path.dirname(subpath)

repair(extensions=True)

Repair any file locations whose content address doesn't match it's file path.

Source code in evercas/evercas.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def repair(self, extensions: bool = True):
    """Repair any file locations whose content address doesn't match it's
    file path.
    """
    repaired: list[tuple[str, HashAddress]] = []
    corrupted = tuple(self.corrupted(extensions=extensions))
    oldmask = os.umask(0)

    try:
        for path, address in corrupted:
            if os.path.isfile(address.abspath):
                # File already exists so just delete corrupted path.
                os.remove(path)
            else:
                # File doesn't exists so move it.
                self.makepath(os.path.dirname(address.abspath))
                shutil.move(path, address.abspath)

            os.chmod(address.abspath, self.fmode)
            repaired.append((path, address))
    finally:
        os.umask(oldmask)

    return repaired

shard(id)

Shard content ID into subfolders.

Source code in evercas/evercas.py
348
349
350
def shard(self, id: str):
    """Shard content ID into subfolders."""
    return shard(id, self.depth, self.width)

size()

Return the total size in bytes of all files in the :attr:root directory.

Source code in evercas/evercas.py
267
268
269
270
271
272
273
274
275
276
def size(self):
    """Return the total size in bytes of all files in the :attr:`root`
    directory.
    """
    total = 0

    for path in self.files():
        total += os.path.getsize(path)

    return total

unshard(path)

Unshard path to determine hash value.

Source code in evercas/evercas.py
352
353
354
355
356
357
358
359
360
def unshard(self, path: str):
    """Unshard path to determine hash value."""
    if not self.haspath(path):
        raise ValueError(
            "Cannot unshard path. The path {0!r} is not "
            "a subdirectory of the root directory {1!r}".format(path, self.root)
        )

    return os.path.splitext(self.relpath(path))[0].replace(os.sep, "")

HashAddress dataclass

File address containing file's path on disk and it's content hash ID.

Attributes:

Name Type Description
id str

Hash ID (hexdigest) of file contents.

relpath str

Relative path location to :attr:EverCas.root.

abspath str

Absolute path location of file on disk.

is_duplicate boolean

Whether the hash address created was a duplicate of a previously existing file. Can only be True after a put operation. Defaults to False.

Source code in evercas/evercas.py
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
@dataclass
class HashAddress:
    """File address containing file's path on disk and it's content hash ID.

    Attributes:
        id (str): Hash ID (hexdigest) of file contents.
        relpath (str): Relative path location to :attr:`EverCas.root`.
        abspath (str): Absolute path location of file on disk.
        is_duplicate (boolean, optional): Whether the hash address created was
            a duplicate of a previously existing file. Can only be ``True``
            after a put operation. Defaults to ``False``.
    """

    id: str
    relpath: str
    abspath: str
    is_duplicate: bool = False

PutStrategies

Namespace for built-in put strategies.

Should not be instantiated. Use the :meth:get static method to look up a strategy by name, or directly reference one of the included class methods.

Source code in evercas/evercas.py
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
class PutStrategies:
    """Namespace for built-in put strategies.

    Should not be instantiated. Use the :meth:`get` static method to look up a
    strategy by name, or directly reference one of the included class methods.
    """

    @classmethod
    def get(cls, method: str | None) -> Callable[[EverCas, Stream, str], None] | None:
        """Look up a strategy by name string. You can also pass a function
        which will be returned as is."""
        if method:
            if method == "get":
                raise ValueError("invalid put strategy name, 'get'")
            if callable(method):
                return method
            elif callable(getattr(cls, method)):
                return getattr(cls, method)

    @staticmethod
    def copy(evercas: EverCas, src_stream: Stream, dst_path: str) -> None:
        """The default copy put strategy, writes the file object to a
        temporary file on disk and then moves it into place."""
        shutil.move(evercas.mktempfile(src_stream), dst_path)

    @classmethod
    def link(cls, evercas: EverCas, src_stream: Stream, dst_path: str) -> None:
        """Use os.link if available to create a hard link to the original
        file if the EverCas and the original file reside on the same
        filesystem and the filesystem supports hard links."""

        if not hasattr(os, "link"):
            return PutStrategies.copy(evercas, src_stream, dst_path)

        # Get the original file path exposed by the Stream instance
        src_path = src_stream.name
        # No path available because e.g. a StringIO was used
        if not src_path:
            # Just copy
            return cls.copy(evercas, src_stream, dst_path)

        try:
            # Try to create the hard link
            os.link(src_path, dst_path)
        except EnvironmentError as e:
            # These are link specific errors. If any of these 3 are raised
            # we try to copy instead
            # EMLINK - src already has the maximum number of links to it
            # EXDEV - invalid cross-device link
            # EPERM - the dst filesystem does not support hard links
            # (note EPERM could also be another permissions error; these
            # will be raised again when we try to copy)
            if e.errno not in (errno.EMLINK, errno.EXDEV, errno.EPERM):
                raise
            return cls.copy(evercas, src_stream, dst_path)
        else:
            # After creating the hard link, make sure it has the correct
            # file permissions
            os.chmod(dst_path, evercas.fmode)

copy(evercas, src_stream, dst_path) staticmethod

The default copy put strategy, writes the file object to a temporary file on disk and then moves it into place.

Source code in evercas/evercas.py
560
561
562
563
564
@staticmethod
def copy(evercas: EverCas, src_stream: Stream, dst_path: str) -> None:
    """The default copy put strategy, writes the file object to a
    temporary file on disk and then moves it into place."""
    shutil.move(evercas.mktempfile(src_stream), dst_path)

get(method) classmethod

Look up a strategy by name string. You can also pass a function which will be returned as is.

Source code in evercas/evercas.py
548
549
550
551
552
553
554
555
556
557
558
@classmethod
def get(cls, method: str | None) -> Callable[[EverCas, Stream, str], None] | None:
    """Look up a strategy by name string. You can also pass a function
    which will be returned as is."""
    if method:
        if method == "get":
            raise ValueError("invalid put strategy name, 'get'")
        if callable(method):
            return method
        elif callable(getattr(cls, method)):
            return getattr(cls, method)

Use os.link if available to create a hard link to the original file if the EverCas and the original file reside on the same filesystem and the filesystem supports hard links.

Source code in evercas/evercas.py
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
@classmethod
def link(cls, evercas: EverCas, src_stream: Stream, dst_path: str) -> None:
    """Use os.link if available to create a hard link to the original
    file if the EverCas and the original file reside on the same
    filesystem and the filesystem supports hard links."""

    if not hasattr(os, "link"):
        return PutStrategies.copy(evercas, src_stream, dst_path)

    # Get the original file path exposed by the Stream instance
    src_path = src_stream.name
    # No path available because e.g. a StringIO was used
    if not src_path:
        # Just copy
        return cls.copy(evercas, src_stream, dst_path)

    try:
        # Try to create the hard link
        os.link(src_path, dst_path)
    except EnvironmentError as e:
        # These are link specific errors. If any of these 3 are raised
        # we try to copy instead
        # EMLINK - src already has the maximum number of links to it
        # EXDEV - invalid cross-device link
        # EPERM - the dst filesystem does not support hard links
        # (note EPERM could also be another permissions error; these
        # will be raised again when we try to copy)
        if e.errno not in (errno.EMLINK, errno.EXDEV, errno.EPERM):
            raise
        return cls.copy(evercas, src_stream, dst_path)
    else:
        # After creating the hard link, make sure it has the correct
        # file permissions
        os.chmod(dst_path, evercas.fmode)

Stream

Bases: object

Common interface for file-like objects.

The input obj can be a file-like object or a path to a file. If obj is a path to a file, then it will be opened until :meth:close is called. If obj is a file-like object, then it's original position will be restored when :meth:close is called instead of closing the object automatically. Closing of the stream is deferred to whatever process passed the stream in.

Successive readings of the stream is supported without having to manually set it's position back to 0.

Source code in evercas/evercas.py
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
class Stream(object):
    """Common interface for file-like objects.

    The input `obj` can be a file-like object or a path to a file. If `obj` is
    a path to a file, then it will be opened until :meth:`close` is called.
    If `obj` is a file-like object, then it's original position will be
    restored when :meth:`close` is called instead of closing the object
    automatically. Closing of the stream is deferred to whatever process passed
    the stream in.

    Successive readings of the stream is supported without having to manually
    set it's position back to ``0``.
    """

    def __init__(self, obj: BinaryIO | str):
        if isinstance(obj, str) and os.path.isfile(obj):
            obj = io.open(obj, "rb")
            pos = None
        elif isinstance(obj, BinaryIO):
            pos = obj.tell()
        else:
            raise ValueError("Object must be a valid file path or a BinaryIO object")

        try:
            file_stat = os.stat(obj.name)
            buffer_size = file_stat.st_blksize
        except Exception:
            buffer_size = 8192

        try:
            # Expose the original file path if available.
            # This allows put strategies to use OS functions, working with
            # paths, instead of being limited to the API provided by Python
            # file-like objects
            # name property can also hold int fd, so we make it None in that
            # case
            self.name: str | None = None if isinstance(obj.name, int) else obj.name
        except AttributeError:
            self.name = None

        self._obj = obj
        self._pos = pos
        self._buffer_size = buffer_size

    def __iter__(self):
        """Read underlying IO object and yield results. Return object to
        original position if we didn't open it originally.
        """
        self._obj.seek(0)

        while True:
            data = self._obj.read(self._buffer_size)

            if not data:
                break

            yield data

        if self._pos is not None:
            self._obj.seek(self._pos)

    def close(self):
        """Close underlying IO object if we opened it, else return it to
        original position.
        """
        if self._pos is None:
            self._obj.close()
        else:
            self._obj.seek(self._pos)

__iter__()

Read underlying IO object and yield results. Return object to original position if we didn't open it originally.

Source code in evercas/evercas.py
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
def __iter__(self):
    """Read underlying IO object and yield results. Return object to
    original position if we didn't open it originally.
    """
    self._obj.seek(0)

    while True:
        data = self._obj.read(self._buffer_size)

        if not data:
            break

        yield data

    if self._pos is not None:
        self._obj.seek(self._pos)

close()

Close underlying IO object if we opened it, else return it to original position.

Source code in evercas/evercas.py
531
532
533
534
535
536
537
538
def close(self):
    """Close underlying IO object if we opened it, else return it to
    original position.
    """
    if self._pos is None:
        self._obj.close()
    else:
        self._obj.seek(self._pos)