Source code for ftrack_api.event.expression

# :coding: utf-8
# :copyright: Copyright (c) 2014 ftrack

from builtins import map
from six import string_types
from builtins import object
from operator import eq, ne, ge, le, gt, lt

from pyparsing import (Group, Word, CaselessKeyword, Forward,
                       FollowedBy, Suppress, oneOf, OneOrMore, Optional,
                       alphanums, quotedString, removeQuotes)

import ftrack_api.exception

# Do not enable packrat since it is not thread-safe and will result in parsing
# exceptions in a multi threaded environment.
# ParserElement.enablePackrat()


[docs]class Parser(object): '''Parse string based expression into :class:`Expression` instance.'''
[docs] def __init__(self): '''Initialise parser.''' self._operators = { '=': eq, '!=': ne, '>=': ge, '<=': le, '>': gt, '<': lt } self._parser = self._construct_parser() super(Parser, self).__init__()
def _construct_parser(self): '''Construct and return parser.''' field = Word(alphanums + '_.') operator = oneOf(list(self._operators.keys())) value = Word(alphanums + '-_,./*@+') quoted_value = quotedString('quoted_value').setParseAction(removeQuotes) condition = Group( field + operator + (quoted_value | value) )('condition') not_ = Optional(Suppress(CaselessKeyword('not')))('not') and_ = Suppress(CaselessKeyword('and'))('and') or_ = Suppress(CaselessKeyword('or'))('or') expression = Forward() parenthesis = Suppress('(') + expression + Suppress(')') previous = condition | parenthesis for conjunction in (not_, and_, or_): current = Forward() if conjunction in (and_, or_): conjunction_expression = ( FollowedBy(previous + conjunction + previous) + Group( previous + OneOrMore(conjunction + previous) )(conjunction.resultsName) ) elif conjunction in (not_, ): conjunction_expression = ( FollowedBy(conjunction.expr + current) + Group(conjunction + current)(conjunction.resultsName) ) else: # pragma: no cover raise ValueError('Unrecognised conjunction.') current <<= (conjunction_expression | previous) previous = current expression <<= previous return expression('expression')
[docs] def parse(self, expression): '''Parse string *expression* into :class:`Expression`. Raise :exc:`ftrack_api.exception.ParseError` if *expression* could not be parsed. ''' result = None expression = expression.strip() if expression: try: result = self._parser.parseString( expression, parseAll=True ) except Exception as error: raise ftrack_api.exception.ParseError( 'Failed to parse: {0}. {1}'.format(expression, error) ) return self._process(result)
def _process(self, result): '''Process *result* using appropriate method. Method called is determined by the name of the result. ''' method_name = '_process_{0}'.format(result.getName()) method = getattr(self, method_name) return method(result) def _process_expression(self, result): '''Process *result* as expression.''' return self._process(result[0]) def _process_not(self, result): '''Process *result* as NOT operation.''' return Not(self._process(result[0])) def _process_and(self, result): '''Process *result* as AND operation.''' return All([self._process(entry) for entry in result]) def _process_or(self, result): '''Process *result* as OR operation.''' return Any([self._process(entry) for entry in result]) def _process_condition(self, result): '''Process *result* as condition.''' key, operator, value = result return Condition(key, self._operators[operator], value) def _process_quoted_value(self, result): '''Process *result* as quoted value.''' return result
[docs]class Expression(object): '''Represent a structured expression to test candidates against.''' def __str__(self): '''Return string representation.''' return '<{0}>'.format(self.__class__.__name__)
[docs] def match(self, candidate): '''Return whether *candidate* satisfies this expression.''' return True
[docs]class All(Expression): '''Match candidate that matches all of the specified expressions. .. note:: If no expressions are supplied then will always match. '''
[docs] def __init__(self, expressions=None): '''Initialise with list of *expressions* to match against.''' self._expressions = expressions or [] super(All, self).__init__()
def __str__(self): '''Return string representation.''' return '<{0} [{1}]>'.format( self.__class__.__name__, ' '.join(map(str, self._expressions)) )
[docs] def match(self, candidate): '''Return whether *candidate* satisfies this expression.''' return all([ expression.match(candidate) for expression in self._expressions ])
[docs]class Any(Expression): '''Match candidate that matches any of the specified expressions. .. note:: If no expressions are supplied then will never match. '''
[docs] def __init__(self, expressions=None): '''Initialise with list of *expressions* to match against.''' self._expressions = expressions or [] super(Any, self).__init__()
def __str__(self): '''Return string representation.''' return '<{0} [{1}]>'.format( self.__class__.__name__, ' '.join(map(str, self._expressions)) )
[docs] def match(self, candidate): '''Return whether *candidate* satisfies this expression.''' return any([ expression.match(candidate) for expression in self._expressions ])
[docs]class Not(Expression): '''Negate expression.'''
[docs] def __init__(self, expression): '''Initialise with *expression* to negate.''' self._expression = expression super(Not, self).__init__()
def __str__(self): '''Return string representation.''' return '<{0} {1}>'.format( self.__class__.__name__, self._expression )
[docs] def match(self, candidate): '''Return whether *candidate* satisfies this expression.''' return not self._expression.match(candidate)
[docs]class Condition(Expression): '''Represent condition.'''
[docs] def __init__(self, key, operator, value): '''Initialise condition. *key* is the key to check on the data when matching. It can be a nested key represented by dots. For example, 'data.eventType' would attempt to match candidate['data']['eventType']. If the candidate is missing any of the requested keys then the match fails immediately. *operator* is the operator function to use to perform the match between the retrieved candidate value and the conditional *value*. If *value* is a string, it can use a wildcard '*' at the end to denote that any values matching the substring portion are valid when matching equality only. ''' self._key = key self._operator = operator self._value = value self._wildcard = '*' self._operatorMapping = { eq: '=', ne: '!=', ge: '>=', le: '<=', gt: '>', lt: '<' }
def __str__(self): '''Return string representation.''' return '<{0} {1}{2}{3}>'.format( self.__class__.__name__, self._key, self._operatorMapping.get(self._operator, self._operator), self._value )
[docs] def match(self, candidate): '''Return whether *candidate* satisfies this expression.''' key_parts = self._key.split('.') try: value = candidate for keyPart in key_parts: value = value[keyPart] except (KeyError, TypeError): return False if ( self._operator is eq and isinstance(self._value, string_types) and self._value[-1] == self._wildcard ): return self._value[:-1] in value else: return self._operator(value, self._value)