# :coding: utf-8
# :copyright: Copyright (c) 2015 ftrack
import ftrack_api.entity.base
[docs]class ProjectSchema(ftrack_api.entity.base.Entity):
'''Class representing ProjectSchema.'''
[docs] def get_statuses(self, schema, type_id=None):
'''Return statuses for *schema* and optional *type_id*.
*type_id* is the id of the Type for a TypedContext and can be used to
get statuses where the workflow has been overridden.
'''
# Task has overrides and need to be handled separately.
if schema == 'Task':
if type_id is not None:
overrides = self['_overrides']
for override in overrides:
if override['type_id'] == type_id:
return override['workflow_schema']['statuses'][:]
return self['_task_workflow']['statuses'][:]
elif schema == 'AssetVersion':
return self['_version_workflow']['statuses'][:]
else:
try:
EntityTypeClass = self.session.types[schema]
except KeyError:
raise ValueError('Schema {0} does not exist.'.format(schema))
object_type_id_attribute = EntityTypeClass.attributes.get(
'object_type_id'
)
try:
object_type_id = object_type_id_attribute.default_value
except AttributeError:
raise ValueError(
'Schema {0} does not have statuses.'.format(schema)
)
for _schema in self['_schemas']:
if _schema['type_id'] == object_type_id:
result = self.session.query(
'select task_status from SchemaStatus '
'where schema_id is {0}'.format(_schema['id'])
)
return [
schema_type['task_status'] for schema_type in result
]
raise ValueError(
'No valid statuses were found for schema {0}.'.format(schema)
)
[docs] def get_types(self, schema):
'''Return types for *schema*.'''
# Task need to be handled separately.
if schema == 'Task':
return self['_task_type_schema']['types'][:]
else:
try:
EntityTypeClass = self.session.types[schema]
except KeyError:
raise ValueError('Schema {0} does not exist.'.format(schema))
object_type_id_attribute = EntityTypeClass.attributes.get(
'object_type_id'
)
try:
object_type_id = object_type_id_attribute.default_value
except AttributeError:
raise ValueError(
'Schema {0} does not have types.'.format(schema)
)
for _schema in self['_schemas']:
if _schema['type_id'] == object_type_id:
result = self.session.query(
'select task_type from SchemaType '
'where schema_id is {0}'.format(_schema['id'])
)
return [schema_type['task_type'] for schema_type in result]
raise ValueError(
'No valid types were found for schema {0}.'.format(schema)
)