import cvxpy
import sympy
import numpy as np
from paganini.utils import phi, partition_sequences
from paganini.expressions import *
import sys
from enum import Enum
from math import factorial
# define the namespace to avoid polluting with foreign packages
__all__ = ('Seq', 'UCyc', 'MSet', 'Set', 'Cyc', 'Operator', 'Constraint',
'Type', 'Params', 'Specification', 'leq', 'geq', 'eq')
[docs]class Seq(Variable):
""" Sequence variables."""
def __init__(self, expression, constraint = None):
super(Seq,self).__init__()
expression = Polynomial.cast(expression)
self.inner_expressions = expression
self.type = VariableType.TYPE # make sure its a type variable
self.constraint = Constraint.normalise(constraint)
[docs] def register(self, spec):
""" Unfolds the Seq definition and registers it in the given system."""
if self.constraint.operator == Operator.UNBOUNDED:
# note: Seq(expr) = 1 + expr * Seq(expr).
spec.add(self, 1 + self.inner_expressions * self)
elif self.constraint.operator == Operator.LEQ:
# note: Seq(expr)_{<= k} = 1 + expr + expr^2 + ... + expr^k.
expressions = Expr(1)
for k in range(1, self.constraint.value + 1):
expressions = expressions + self.inner_expressions ** k
spec.add(self, expressions)
elif self.constraint.operator == Operator.EQ:
# note: Seq(expr)_{= k} = expr ** k
spec.add(self, self.inner_expressions ** self.constraint.value)
else: # constraint.operator == Operator.GEQ
# note: Seq(expr)_{>= k} = expr^k + expr^{k+1} + ...
# = expr^k (1 + expr^2 + expr^3 + ...)
# = expr^k Seq(expr).
seq = Seq(self.inner_expressions)
spec.add(seq, 1 + self.inner_expressions * seq)
spec.add(self, self.inner_expressions ** self.constraint.value * seq)
[docs]class MSet(Variable):
""" MSet variables."""
def __init__(self, expression, constraint = None):
super(MSet,self).__init__()
expression = Polynomial.cast(expression)
self.inner_expressions = expression
self.type = VariableType.TYPE # make sure its a type variable
self.constraint = Constraint.normalise(constraint)
if self.constraint.operator == Operator.LEQ\
or self.constraint.operator == Operator.GEQ:
raise AttributeError("Unsupported constraint.")
[docs] def register(self, spec):
""" Unfolds the MSet definition and registers it in the given system."""
spec._diagonal_variable(self, 1)
[docs]class UCyc(Variable):
""" Unlabelled Cyc variables."""
def __init__(self, expression, constraint = None):
super(UCyc,self).__init__()
expression = Polynomial.cast(expression)
self.inner_expressions = expression
self.type = VariableType.TYPE # make sure its a type variable
self.constraint = Constraint.normalise(constraint)
if self.constraint.operator != Operator.EQ:
raise AttributeError("Unsupported constraint.")
[docs] def register(self, spec):
""" Unfolds the UCyc definition and registers it in the given system."""
spec._diagonal_variable(self, 1)
[docs]class Set(Variable):
""" Labelled Set variables."""
def __init__(self, expression, constraint = None):
super(Set,self).__init__()
expression = Polynomial.cast(expression)
self.inner_expressions = expression
self.type = VariableType.TYPE # make sure its a type variable
self.constraint = Constraint.normalise(constraint)
if self.constraint.operator == Operator.LEQ\
or self.constraint.operator == Operator.GEQ:
raise AttributeError("Unsupported constraint.")
[docs] def register(self, spec):
""" Unfolds the Set definition and registers it in the given system."""
spec._diagonal_variable(self, 1)
[docs]class Cyc(Variable):
""" Labelled Cyc variables."""
def __init__(self, expression, constraint = None):
super(Cyc,self).__init__()
expression = Polynomial.cast(expression)
self.inner_expressions = expression
self.type = VariableType.TYPE # make sure its a type variable
self.constraint = Constraint.normalise(constraint)
if self.constraint.operator == Operator.LEQ\
or self.constraint.operator == Operator.GEQ:
raise AttributeError("Unsupported constraint.")
[docs] def register(self, spec):
""" Unfolds the Cyc definition and registers it in the given system."""
spec._diagonal_variable(self, 1)
[docs]class Operator(Enum):
""" Enumeration of supported constraint signs."""
LEQ = 1 # less or equal
EQ = 2 # equal
GEQ = 3 # greater or equal
UNBOUNDED = 4 # unbounded operator
[docs]class Constraint:
""" Supported constraints for classes such as SEQ."""
def __init__(self, operator, value):
self.operator = operator
self.value = value
[docs] @staticmethod
def normalise(constraint = None):
if constraint is None:
return Constraint(Operator.UNBOUNDED, 0)
else:
return constraint
[docs]def leq(n):
""" Creates a less or equal constraint for the given input."""
assert n >= 0, "Negative constraints are not supported."
return Constraint(Operator.LEQ, n)
[docs]def eq(n):
""" Creates an equal constraint for the given input."""
assert n > 0, "Non-positive constraints are not supported."
return Constraint(Operator.EQ, n)
[docs]def geq(n):
""" Creates a greater or equal constraint for the given input."""
assert n >= 0, "Negative constraints are not supported."
return Constraint(Operator.GEQ, n)
[docs]class Type(Enum):
""" Enumeration of supported system types."""
ALGEBRAIC = 1
RATIONAL = 2
[docs]class Params:
""" CVXPY solver parameters initalised with some defaults."""
def __init__(self, sys_type):
self.verbose = False
if sys_type == Type.RATIONAL:
self.sys_type = Type.RATIONAL
self.solver = cvxpy.SCS
self.max_iters = 2500
self.eps = 1.e-20
self.norm = 40
else:
self.sys_type = Type.ALGEBRAIC
self.solver = cvxpy.ECOS
self.max_iters = 250
self.feastol = 1.e-20
self.abstol = 1.e-20
self.reltol = 1.e-20
[docs]class Specification:
""" Symbolic system specifications."""
def __init__(self, series_truncate = 20):
""" Creates a new specification. The optional `series_truncate`
parameter controls the truncation threshold for infinite series,
intrinsic to some more involved constructions, such as multisets or
cycles."""
self._index_counter = 0
self._equations = {}
self._all_variables = {}
self._tuned_variables = set()
self._seq_variables = set()
self._mset_variables = set()
self._ucyc_variables = set()
self._set_variables = set()
self._cyc_variables = set()
# diagonals.
self._diag = {}
self._msets = {}
self._sets = {}
self._series_truncate = series_truncate
def __repr__(self):
return '\n'.join([
v.__repr__() + ' = ' + self._equations[v].__repr__()
for v in self._equations
])
def _next_idx(self):
n = self._index_counter
self._index_counter += 1
return n
@property
def discharged_variables(self):
""" Number of variables discharged in the system."""
return self._index_counter
def _register_variable(self, v):
""" Registers the given variable in the specification."""
if v.idx is not None:
return # nothing to do.
v.idx = self._next_idx()
self._all_variables[v.idx] = v
if v.tuning_param is not None:
self._tuned_variables.add(v)
# case over the variable's type
if isinstance(v, Seq):
self._seq_variables.add(v)
self._register_expressions(v.inner_expressions)
elif isinstance(v, MSet):
self._mset_variables.add(v)
self._register_expressions(v.inner_expressions)
elif isinstance(v, UCyc):
self._ucyc_variables.add(v)
self._register_expressions(v.inner_expressions)
elif isinstance(v, Set):
self._set_variables.add(v)
self._register_expressions(v.inner_expressions)
elif isinstance(v, Cyc):
self._cyc_variables.add(v)
self._register_expressions(v.inner_expressions)
def _register_expression(self, expression):
assert isinstance(expression, Expr), 'Expected expression.'
for v in expression.variables:
self._register_variable(v)
def _register_expressions(self, expressions):
if isinstance(expressions, Expr):
self._register_expression(expressions)
else:
for expr in expressions:
self._register_expression(expr)
def _type_variable(self):
""" Discharges a fresh variable."""
v = Variable()
v.type = VariableType.TYPE
self._register_variable(v)
return v
[docs] def check_type(self):
""" Checks if the system is algebraic or rational.
Note: the current method is heuristic."""
if len(self._seq_variables) > 0 or\
len(self._mset_variables) > 0 or\
len(self._ucyc_variables) > 0 or\
len(self._cyc_variables) > 0 or\
len(self._set_variables) > 0:
return Type.ALGEBRAIC
for expressions in self._equations.values():
for exp in expressions:
if isinstance(exp, Expr):
for v, e in exp.variables.items():
if v.is_type_variable and e > 1:
return Type.ALGEBRAIC
return Type.RATIONAL
def _init_params(self, params = None):
if params is None:
# some defaults
sys_type = self.check_type()
return Params(sys_type)
else:
return params
def _diagonal_expr(self, expr, d = 1):
""" Extends self._diag_variable to expressions."""
assert isinstance(expr, Expr), 'Expression required.'
variables = {}
for v in expr.variables:
if v.is_type_variable:
# substitute the power variable.
x = self._diagonal_variable(v, d)
variables[x] = expr.variables[v]
else:
# increase the exponent.
variables[v] = d * expr.variables[v]
return Expr(expr.coeff, variables)
def _diagonal_variable(self, var, d = 1):
""" Given :math:`T(z)` creates its kth diagonal, i.e. :math:`T(z^k)`."""
assert d > 0, 'Non-positive diagonal parameter.'
assert var.is_type_variable, 'Requested diagonal of non-type variable.'
if var not in self._diag:
self._diag[var] = {} # degree -> expression
# check the variable cache.
if d in self._diag[var]:
return self._diag[var][d]
# trivial diagonal case.
if d == 1 and var in self._equations:
self._diag[var][d] = var
return var
var_d = self._type_variable() if d > 1 else var
self._diag[var][d] = var_d # memorise var[d]
if var in self._equations:
monomials = self._equations[var]
self.add(var_d, [self._diagonal_expr(e, d) for e in monomials])
return var_d
if var in self._mset_variables:
if var.constraint.operator == Operator.EQ:
# note: constrained MSet_{ = k}.
series, k = [], var.constraint.value
for n in partition_sequences(k):
product = Polynomial(Expr(1))
for i in range(1, k + 1): # note the truncation.
if i * d <= self._series_truncate:
c = 1 / (factorial(n[i - 1]) * (i ** n[i - 1]))
expr = Polynomial([self._diagonal_expr(e, i * d)\
for e in var.inner_expressions])
product *= (c * expr ** n[i - 1])
if not product.is_one():
series.append(product)
self.add(var_d, Polynomial.sum(series))
return var_d
else:
self._msets[var_d] = []
for k in range(d, self._series_truncate + 1, d):
self._msets[var_d].append(Polynomial([self._diagonal_expr(e, k)\
for e in var.inner_expressions]))
return var_d
elif var in self._ucyc_variables:
if var.constraint.operator == Operator.EQ:
# note: constrained Cyc_{ = k}.
series, k = [], var.constraint.value
for i in range(1, k + 1):
if k % i == 0:
expr = Polynomial([self._diagonal_expr(e, i * d)\
for e in var.inner_expressions])
series.append((phi(i) / k) * expr ** (k // i))
self.add(var_d, Polynomial.sum(series))
return var_d
elif var in self._set_variables:
if var.constraint.operator == Operator.EQ:
# note: constrained Set_{ = k}.
k = var.constraint.value
expr = Polynomial([self._diagonal_expr(e, d)\
for e in var.inner_expressions])
self.add(var_d, (1 / (factorial(k))) * expr ** k)
return var_d
else:
self._sets[var_d] = [Polynomial([self._diagonal_expr(e, d)\
for e in var.inner_expressions])]
return var_d
elif var in self._cyc_variables:
if var.constraint.operator == Operator.EQ:
# note: constrained Cyc_{ = k}.
k = var.constraint.value
expr = Polynomial([self._diagonal_expr(e, d)\
for e in var.inner_expressions])
self.add(var_d, (1 / k) * expr ** k)
return var_d
else:
series = []
for k in range(1, self._series_truncate + 1):
series.append(1/k * Polynomial([self._diagonal_expr(e, d)\
for e in var.inner_expressions]) ** k)
self.add(var_d, Polynomial.sum(series))
return var_d
[docs] def add(self, var, expression):
""" Includes the given definition in the specification."""
expression = Polynomial.cast(expression)
var.type = VariableType.TYPE # make var a type variable
self._equations[var] = expression
# register variables in the system.
self._register_variable(var)
self._register_expressions(expression)
def _compose_constraints(self, variables, n):
""" Composes optimisation constraints."""
constraints = []
for v in self._equations:
rhs = self._equations[v] # right-hand side.
matrix, coeffs, constant_term = rhs.specification(n)
exponents = matrix * variables + coeffs
constraints.append(variables[v.idx] >=
cvxpy.log_sum_exp(exponents) + constant_term)
# MSet variable constraints.
for v in self._msets:
xs, rhs = [], self._msets[v]
for i, e in enumerate(rhs):
matrix, coeffs, constant_term = e.specification(n)
exponents = matrix * variables + coeffs
# xs.append(1/(i+1) * cvxpy.exp(cvxpy.sum(exponents)))
# cvxpy.sum is not supported in Python2
xs.append(1/(i+1) * cvxpy.exp(sum(exponents)))
# constraints.append(variables[v.idx] >= cvxpy.sum(xs))
# cvxpy.sum is not supported in Python2
constraints.append(variables[v.idx] >= sum(xs))
# Set variable constraints.
for v in self._sets:
xs, rhs = [], self._sets[v]
for i, e in enumerate(rhs):
matrix, coeffs, constant_term = e.specification(n)
exponents = matrix * variables + coeffs
xs.append(sum(exponents))
constraints.append(variables[v.idx] >= cvxpy.exp(sum(xs)))
return constraints
def _unfold_variables(self):
""" Unfolds some more involved constructors.
Note: this method might generate new variables in the system."""
for v in self._seq_variables.copy():
# note: since unfolding Seq might produce new sequence
# variables, we cannot iterate over a set of dynamic
# size. Instead, we iterate over its copy.
v.register(self)
for v in self._mset_variables:
v.register(self)
for v in self._ucyc_variables.copy():
# same argument as for seq.
v.register(self)
for v in self._set_variables:
v.register(self)
for v in self._cyc_variables:
v.register(self)
def _run_solver(self, var, problem, params):
""" Invokes the CVXPY solver."""
if params.sys_type == Type.RATIONAL:
solution = problem.solve(solver = params.solver, verbose =
params.verbose, eps = params.eps, max_iters =
params.max_iters)
else:
solution = problem.solve(solver = params.solver, verbose =
params.verbose, feastol = params.feastol, max_iters =
params.max_iters, abstol = params.abstol, reltol =
params.reltol)
# decorate system variables
for idx, expr in enumerate(var.value):
self._all_variables[idx].value = sympy.exp(expr).evalf()
return solution
[docs] def run_tuner(self, t, params = None):
""" Given the type variable and a set of tuning parameters, composes a
(tuning) optimisation problem corresponding to an approximate sampler
meant for structures of the given type. Variables are tuned so to
achieve (in expectation) the marked variable values.
Consider the following example:
>>> sp = Specification()
>>> z, u, M = Variable(1000), Variable(200), Variable()
>>> sp.add(M, z + u * z * M + z * M **2)
>>> params = Params(Type.ALGEBRAIC)
>>> sp.run_tuner(M, params)
Here, the variables z and u are marked with *absolute* values 1000 and
200, respectively. The input type represents the type of Motzkin trees,
i.e. plane unary-binary trees. Variable `z` marks their size, whereas
`u` marks the occurrences of unary nodes. The tuning goal is to obtain
specific values of z, u, and M, such that the induced branching
probabilities lead to a sampler which generates Motzkin trees of size
1000 with around 200 unary nodes (both in expectation).
Respective variables (including type variables) are decorated with a
proper 'value'. The method returns the CVXPY solution (i.e. the optimal
value for the problem, or a string indicating why the problem could not
be solved)."""
assert len(self._tuned_variables) > 0,\
'No variables with tuning parameters.'
# get some default parameters if none given.
params = self._init_params(params)
# register unfoldable variables
self._unfold_variables()
n = self.discharged_variables
variables = cvxpy.Variable(n)
# compose the constraints
constraints = self._compose_constraints(variables, n)
# compose the objective
obj = np.zeros(n, dtype='double')
obj[t.idx] = 1.0
for v in self._tuned_variables:
obj[v.idx] = - v.tuning_param
objective = cvxpy.Minimize(obj * variables)
problem = cvxpy.Problem(objective, constraints)
return self._run_solver(variables, problem, params)
[docs] def run_singular_tuner(self, z, params = None):
""" Given a (size) variable and a set of tuning parameters, composes an
optimisation problem corresponding to an approximate sampler meant for
structures of the given type. Variables are tuned so to achieve (in
expectation) the marked variable frequencies.
Consider the following example:
>>> sp = Specification()
>>> z, u, M = Variable(), Variable(0.4), Variable()
>>> sp.add(M, z + u * z * M + z * M **2)
>>>
>>> params = Params(Type.ALGEBRAIC)
>>> sp.run_singular_tuner(z, params)
Here, the variable u is marked with a *frequency* 0.4. The type M
represents the type of Motzkin trees, i.e. unary-binary plane trees.
Variable z marks their size, whereas u marks the occurrences of unary
nodes. The tuning goal is to obtain specific values of `z`, `u`, and `M`, such
that the induced branching probabilities lead to a sampler which
generates, in expectation, Motzkin trees of infinite (i.e. unbounded)
size and around 40% of unary nodes.
Respective variables (including type variables) are decorated with a
proper 'value'. The method returns the CVXPY solution (i.e. the optimal
value for the problem, or a string indicating why the problem could not
be solved)."""
# get some default parameters if none given.
params = self._init_params(params)
# register unfoldable variables
self._unfold_variables()
n = self.discharged_variables
variables = cvxpy.Variable(n)
# compose the constraints
constraints = self._compose_constraints(variables, n)
if params.sys_type == Type.RATIONAL:
# for rational systems the optimisation problem becomes unbounded,
# hence we need to artificially bound the vector norm.
constraints.append(cvxpy.norm(variables, 2) <= params.norm)
# compose the objective
obj = np.zeros(n, dtype='double')
obj[z.idx] = 1.0
for v in self._tuned_variables:
obj[v.idx] = v.tuning_param
objective = cvxpy.Maximize(obj * variables)
problem = cvxpy.Problem(objective, constraints)
return self._run_solver(variables, problem, params)