Source code for ftrack_api.event.expression

# :coding: utf-8
# :copyright: Copyright (c) 2014 ftrack

from builtins import map
from builtins import object
from operator import eq, ne, ge, le, gt, lt

from pyparsing import (
    Group,
    Word,
    CaselessKeyword,
    Forward,
    FollowedBy,
    Suppress,
    oneOf,
    OneOrMore,
    Optional,
    alphanums,
    quotedString,
    removeQuotes,
)

import ftrack_api.exception

# Do not enable packrat since it is not thread-safe and will result in parsing
# exceptions in a multi threaded environment.
# ParserElement.enablePackrat()


[docs]class Parser(object): """Parse string based expression into :class:`Expression` instance."""
[docs] def __init__(self): """Initialise parser.""" self._operators = {"=": eq, "!=": ne, ">=": ge, "<=": le, ">": gt, "<": lt} self._parser = self._construct_parser() super(Parser, self).__init__()
def _construct_parser(self): """Construct and return parser.""" field = Word(alphanums + "_.") operator = oneOf(list(self._operators.keys())) value = Word(alphanums + "-_,./*@+") quoted_value = quotedString("quoted_value").setParseAction(removeQuotes) condition = Group(field + operator + (quoted_value | value))("condition") not_ = Optional(Suppress(CaselessKeyword("not")))("not") and_ = Suppress(CaselessKeyword("and"))("and") or_ = Suppress(CaselessKeyword("or"))("or") expression = Forward() parenthesis = Suppress("(") + expression + Suppress(")") previous = condition | parenthesis for conjunction in (not_, and_, or_): current = Forward() if conjunction in (and_, or_): conjunction_expression = FollowedBy( previous + conjunction + previous ) + Group(previous + OneOrMore(conjunction + previous))( conjunction.resultsName ) elif conjunction in (not_,): conjunction_expression = FollowedBy(conjunction.expr + current) + Group( conjunction + current )(conjunction.resultsName) else: # pragma: no cover raise ValueError("Unrecognised conjunction.") current <<= conjunction_expression | previous previous = current expression <<= previous return expression("expression")
[docs] def parse(self, expression): """Parse string *expression* into :class:`Expression`. Raise :exc:`ftrack_api.exception.ParseError` if *expression* could not be parsed. """ result = None expression = expression.strip() if expression: try: result = self._parser.parseString(expression, parseAll=True) except Exception as error: raise ftrack_api.exception.ParseError( "Failed to parse: {0}. {1}".format(expression, error) ) return self._process(result)
def _process(self, result): """Process *result* using appropriate method. Method called is determined by the name of the result. """ method_name = "_process_{0}".format(result.getName()) method = getattr(self, method_name) return method(result) def _process_expression(self, result): """Process *result* as expression.""" return self._process(result[0]) def _process_not(self, result): """Process *result* as NOT operation.""" return Not(self._process(result[0])) def _process_and(self, result): """Process *result* as AND operation.""" return All([self._process(entry) for entry in result]) def _process_or(self, result): """Process *result* as OR operation.""" return Any([self._process(entry) for entry in result]) def _process_condition(self, result): """Process *result* as condition.""" key, operator, value = result return Condition(key, self._operators[operator], value) def _process_quoted_value(self, result): """Process *result* as quoted value.""" return result
[docs]class Expression(object): """Represent a structured expression to test candidates against.""" def __str__(self): """Return string representation.""" return "<{0}>".format(self.__class__.__name__)
[docs] def match(self, candidate): """Return whether *candidate* satisfies this expression.""" return True
[docs]class All(Expression): """Match candidate that matches all of the specified expressions. .. note:: If no expressions are supplied then will always match. """
[docs] def __init__(self, expressions=None): """Initialise with list of *expressions* to match against.""" self._expressions = expressions or [] super(All, self).__init__()
def __str__(self): """Return string representation.""" return "<{0} [{1}]>".format( self.__class__.__name__, " ".join(map(str, self._expressions)) )
[docs] def match(self, candidate): """Return whether *candidate* satisfies this expression.""" return all([expression.match(candidate) for expression in self._expressions])
[docs]class Any(Expression): """Match candidate that matches any of the specified expressions. .. note:: If no expressions are supplied then will never match. """
[docs] def __init__(self, expressions=None): """Initialise with list of *expressions* to match against.""" self._expressions = expressions or [] super(Any, self).__init__()
def __str__(self): """Return string representation.""" return "<{0} [{1}]>".format( self.__class__.__name__, " ".join(map(str, self._expressions)) )
[docs] def match(self, candidate): """Return whether *candidate* satisfies this expression.""" return any([expression.match(candidate) for expression in self._expressions])
[docs]class Not(Expression): """Negate expression."""
[docs] def __init__(self, expression): """Initialise with *expression* to negate.""" self._expression = expression super(Not, self).__init__()
def __str__(self): """Return string representation.""" return "<{0} {1}>".format(self.__class__.__name__, self._expression)
[docs] def match(self, candidate): """Return whether *candidate* satisfies this expression.""" return not self._expression.match(candidate)
[docs]class Condition(Expression): """Represent condition."""
[docs] def __init__(self, key, operator, value): """Initialise condition. *key* is the key to check on the data when matching. It can be a nested key represented by dots. For example, 'data.eventType' would attempt to match candidate['data']['eventType']. If the candidate is missing any of the requested keys then the match fails immediately. *operator* is the operator function to use to perform the match between the retrieved candidate value and the conditional *value*. If *value* is a string, it can use a wildcard '*' at the end to denote that any values matching the substring portion are valid when matching equality only. """ self._key = key self._operator = operator self._value = value self._wildcard = "*" self._operatorMapping = { eq: "=", ne: "!=", ge: ">=", le: "<=", gt: ">", lt: "<", }
def __str__(self): """Return string representation.""" return "<{0} {1}{2}{3}>".format( self.__class__.__name__, self._key, self._operatorMapping.get(self._operator, self._operator), self._value, )
[docs] def match(self, candidate): """Return whether *candidate* satisfies this expression.""" key_parts = self._key.split(".") try: value = candidate for keyPart in key_parts: value = value[keyPart] except (KeyError, TypeError): return False if ( self._operator is eq and isinstance(self._value, str) and self._value[-1] == self._wildcard ): return self._value[:-1] in value else: return self._operator(value, self._value)