Source code for ftrack_api.accessor.disk

# :coding: utf-8
# :copyright: Copyright (c) 2013 ftrack

import os
import sys
import errno
import contextlib

import ftrack_api._python_ntpath as ntpath
import ftrack_api.accessor.base
import ftrack_api.data
from ftrack_api.exception import (
    AccessorFilesystemPathError,
    AccessorUnsupportedOperationError,
    AccessorResourceNotFoundError,
    AccessorOperationFailedError,
    AccessorPermissionDeniedError,
    AccessorResourceInvalidError,
    AccessorContainerNotEmptyError,
    AccessorParentResourceNotFoundError,
)


[docs]class DiskAccessor(ftrack_api.accessor.base.Accessor): """Provide disk access to a location. Expect resource identifiers to refer to relative filesystem paths. """
[docs] def __init__(self, prefix, **kw): """Initialise location accessor. *prefix* specifies the base folder for the disk based structure and will be prepended to any path. It should be specified in the syntax of the current OS. """ if prefix: prefix = os.path.expanduser(os.path.expandvars(prefix)) prefix = os.path.abspath(prefix) self.prefix = prefix super(DiskAccessor, self).__init__(**kw)
[docs] def list(self, resource_identifier): """Return list of entries in *resource_identifier* container. Each entry in the returned list should be a valid resource identifier. Raise :exc:`~ftrack_api.exception.AccessorResourceNotFoundError` if *resource_identifier* does not exist or :exc:`~ftrack_api.exception.AccessorResourceInvalidError` if *resource_identifier* is not a container. """ filesystem_path = self.get_filesystem_path(resource_identifier) with error_handler(operation="list", resource_identifier=resource_identifier): listing = [] for entry in os.listdir(filesystem_path): listing.append(os.path.join(resource_identifier, entry)) return listing
[docs] def exists(self, resource_identifier): """Return if *resource_identifier* is valid and exists in location.""" filesystem_path = self.get_filesystem_path(resource_identifier) return os.path.exists(filesystem_path)
[docs] def is_file(self, resource_identifier): """Return whether *resource_identifier* refers to a file.""" filesystem_path = self.get_filesystem_path(resource_identifier) return os.path.isfile(filesystem_path)
[docs] def is_container(self, resource_identifier): """Return whether *resource_identifier* refers to a container.""" filesystem_path = self.get_filesystem_path(resource_identifier) return os.path.isdir(filesystem_path)
[docs] def is_sequence(self, resource_identifier): """Return whether *resource_identifier* refers to a file sequence.""" raise AccessorUnsupportedOperationError(operation="is_sequence")
[docs] def open(self, resource_identifier, mode="rb"): """Return :class:`~ftrack_api.Data` for *resource_identifier*.""" filesystem_path = self.get_filesystem_path(resource_identifier) with error_handler(operation="open", resource_identifier=resource_identifier): data = ftrack_api.data.File(filesystem_path, mode) return data
[docs] def remove(self, resource_identifier): """Remove *resource_identifier*. Raise :exc:`~ftrack_api.exception.AccessorResourceNotFoundError` if *resource_identifier* does not exist. """ filesystem_path = self.get_filesystem_path(resource_identifier) if self.is_file(resource_identifier): with error_handler( operation="remove", resource_identifier=resource_identifier ): os.remove(filesystem_path) elif self.is_container(resource_identifier): with error_handler( operation="remove", resource_identifier=resource_identifier ): os.rmdir(filesystem_path) else: raise AccessorResourceNotFoundError(resource_identifier=resource_identifier)
[docs] def make_container(self, resource_identifier, recursive=True): """Make a container at *resource_identifier*. If *recursive* is True, also make any intermediate containers. """ filesystem_path = self.get_filesystem_path(resource_identifier) with error_handler( operation="makeContainer", resource_identifier=resource_identifier ): try: if recursive: os.makedirs(filesystem_path) else: try: os.mkdir(filesystem_path) except OSError as error: if error.errno == errno.ENOENT: raise AccessorParentResourceNotFoundError( resource_identifier=resource_identifier ) else: raise except OSError as error: if error.errno != errno.EEXIST: raise
[docs] def get_container(self, resource_identifier): """Return resource_identifier of container for *resource_identifier*. Raise :exc:`~ftrack_api.exception.AccessorParentResourceNotFoundError` if container of *resource_identifier* could not be determined. """ filesystem_path = self.get_filesystem_path(resource_identifier) container = os.path.dirname(filesystem_path) if self.prefix: if not container.startswith(self.prefix): raise AccessorParentResourceNotFoundError( resource_identifier=resource_identifier, message="Could not determine container for " "{resource_identifier} as container falls outside " "of configured prefix.", ) # Convert container filesystem path into resource identifier. container = container[len(self.prefix) :] if ntpath.isabs(container): # Ensure that resulting path is relative by stripping any # leftover prefixed slashes from string. # E.g. If prefix was '/tmp' and path was '/tmp/foo/bar' the # result will be 'foo/bar'. container = container.lstrip("\\/") return container
[docs] def get_filesystem_path(self, resource_identifier): """Return filesystem path for *resource_identifier*. For example:: >>> accessor = DiskAccessor('my.location', '/mountpoint') >>> print accessor.get_filesystem_path('test.txt') /mountpoint/test.txt >>> print accessor.get_filesystem_path('/mountpoint/test.txt') /mountpoint/test.txt Raise :exc:`ftrack_api.exception.AccessorFilesystemPathError` if filesystem path could not be determined from *resource_identifier*. """ filesystem_path = resource_identifier if filesystem_path: filesystem_path = os.path.normpath(filesystem_path) if self.prefix: if not os.path.isabs(filesystem_path): filesystem_path = os.path.normpath( os.path.join(self.prefix, filesystem_path) ) if not filesystem_path.startswith(self.prefix): raise AccessorFilesystemPathError( resource_identifier=resource_identifier, message="Could not determine access path for " "resource_identifier outside of configured prefix: " "{resource_identifier}.", ) return filesystem_path
[docs]@contextlib.contextmanager def error_handler(**kw): """Conform raised OSError/IOError exception to appropriate FTrack error.""" try: yield except (OSError, IOError) as error: (exception_type, exception_value, traceback) = sys.exc_info() kw.setdefault("error", error) error_code = getattr(error, "errno") if not error_code: raise AccessorOperationFailedError(**kw) if error_code == errno.ENOENT: raise AccessorResourceNotFoundError(**kw) elif error_code == errno.EPERM: raise AccessorPermissionDeniedError(**kw) elif error_code == errno.ENOTEMPTY: raise AccessorContainerNotEmptyError(**kw) elif error_code in (errno.ENOTDIR, errno.EISDIR, errno.EINVAL): raise AccessorResourceInvalidError(**kw) else: raise AccessorOperationFailedError(**kw) except Exception: raise