# Source code for pymor.operators.constructions

# This file is part of the pyMOR project (http://www.pymor.org).

"""Module containing some constructions to obtain new operators from old ones."""

from functools import reduce
from itertools import chain
from numbers import Number

import numpy as np

from pymor.core.defaults import defaults
from pymor.core.exceptions import InversionError
from pymor.core.interfaces import ImmutableInterface
from pymor.operators.basic import OperatorBase
from pymor.operators.interfaces import OperatorInterface
from pymor.parameters.base import Parametric
from pymor.parameters.interfaces import ParameterFunctionalInterface
from pymor.vectorarrays.interfaces import VectorArrayInterface, VectorSpaceInterface, _INDEXTYPES
from pymor.vectorarrays.numpy import NumpyVectorSpace

[docs]class LincombOperator(OperatorBase): """Linear combination of arbitrary |Operators|. This |Operator| represents a (possibly |Parameter| dependent) linear combination of a given list of |Operators|. Parameters ---------- operators List of |Operators| whose linear combination is formed. coefficients A list of linear coefficients. A linear coefficient can either be a fixed number or a |ParameterFunctional|. solver_options The |solver_options| for the operator. name Name of the operator. """ def __init__(self, operators, coefficients, solver_options=None, name=None): assert len(operators) > 0 assert len(operators) == len(coefficients) assert all(isinstance(op, OperatorInterface) for op in operators) assert all(isinstance(c, (ParameterFunctionalInterface, _INDEXTYPES)) for c in coefficients) assert all(op.source == operators[0].source for op in operators[1:]) assert all(op.range == operators[0].range for op in operators[1:]) self.source = operators[0].source self.range = operators[0].range self.operators = tuple(operators) self.linear = all(op.linear for op in operators) self.coefficients = tuple(coefficients) self.solver_options = solver_options self.name = name self.build_parameter_type(*chain(operators, (f for f in coefficients if isinstance(f, ParameterFunctionalInterface)))) @property def H(self): options = {'inverse': self.solver_options.get('inverse_adjoint'), 'inverse_adjoint': self.solver_options.get('inverse')} if self.solver_options else None return self.with_(operators=[op.H for op in self.operators], solver_options=options, name=self.name + '_adjoint')
[docs] def evaluate_coefficients(self, mu): """Compute the linear coefficients for a given |Parameter|. Parameters ---------- mu |Parameter| for which to compute the linear coefficients. Returns ------- List of linear coefficients. """ mu = self.parse_parameter(mu) return [c.evaluate(mu) if hasattr(c, 'evaluate') else c for c in self.coefficients]
[docs] def apply(self, U, mu=None): coeffs = self.evaluate_coefficients(mu) R = self.operators[0].apply(U, mu=mu) R.scal(coeffs[0]) for op, c in zip(self.operators[1:], coeffs[1:]): R.axpy(c, op.apply(U, mu=mu)) return R
[docs] def apply2(self, V, U, mu=None): coeffs = self.evaluate_coefficients(mu) matrices = [op.apply2(V, U, mu=mu) for op in self.operators] coeffs_dtype = reduce(np.promote_types, (type(c) for c in coeffs)) matrices_dtype = reduce(np.promote_types, (m.dtype for m in matrices)) common_dtype = np.promote_types(coeffs_dtype, matrices_dtype) R = coeffs[0] * matrices[0] if R.dtype != common_dtype: R = R.astype(common_dtype) for m, c in zip(matrices[1:], coeffs[1:]): R += c * m return R
[docs] def pairwise_apply2(self, V, U, mu=None): coeffs = self.evaluate_coefficients(mu) vectors = [op.pairwise_apply2(V, U, mu=mu) for op in self.operators] coeffs_dtype = reduce(np.promote_types, (type(c) for c in coeffs)) vectors_dtype = reduce(np.promote_types, (v.dtype for v in vectors)) common_dtype = np.promote_types(coeffs_dtype, vectors_dtype) R = coeffs[0] * vectors[0] if R.dtype != common_dtype: R = R.astype(common_dtype) for v, c in zip(vectors[1:], coeffs[1:]): R += c * v return R
[docs] def apply_adjoint(self, V, mu=None): coeffs = self.evaluate_coefficients(mu) R = self.operators[0].apply_adjoint(V, mu=mu) R.scal(coeffs[0]) for op, c in zip(self.operators[1:], coeffs[1:]): R.axpy(c, op.apply_adjoint(V, mu=mu)) return R
[docs] def assemble(self, mu=None): operators = [op.assemble(mu) for op in self.operators] coefficients = self.evaluate_coefficients(mu) op = operators[0].assemble_lincomb(operators, coefficients, solver_options=self.solver_options, name=self.name + '_assembled') if op: return op else: if self.parametric or operators != self.operators: return LincombOperator(operators, coefficients, solver_options=self.solver_options, name=self.name + '_assembled') else: return self # avoid infinite recursion
[docs] def jacobian(self, U, mu=None): if self.linear: return self.assemble(mu) jacobians = [op.jacobian(U, mu) for op in self.operators] coefficients = self.evaluate_coefficients(mu) options = self.solver_options.get('jacobian') if self.solver_options else None jac = jacobians[0].assemble_lincomb(jacobians, coefficients, solver_options=options, name=self.name + '_jacobian') if jac is None: return LincombOperator(jacobians, coefficients, solver_options=options, name=self.name + '_jacobian') else: return jac
[docs] def apply_inverse(self, V, mu=None, least_squares=False): if len(self.operators) == 1: if self.coefficients[0] == 0.: if least_squares: return self.source.zeros(len(V)) else: raise InversionError else: U = self.operators[0].apply_inverse(V, mu=mu, least_squares=least_squares) U *= (1. / self.coefficients[0]) return U else: return super().apply_inverse(V, mu=mu, least_squares=least_squares)
[docs] def apply_inverse_adjoint(self, U, mu=None, least_squares=False): if len(self.operators) == 1: if self.coefficients[0] == 0.: if least_squares: return self.range.zeros(len(U)) else: raise InversionError else: V = self.operators[0].apply_inverse_adjoint(U, mu=mu, least_squares=least_squares) V *= (1. / self.coefficients[0]) return V else: return super().apply_inverse_adjoint(U, mu=mu, least_squares=least_squares)
def _as_array(self, source, mu): coefficients = np.array(self.evaluate_coefficients(mu)) arrays = [op.as_source_array(mu) if source else op.as_range_array(mu) for op in self.operators] R = arrays[0] R.scal(coefficients[0]) for c, v in zip(coefficients[1:], arrays[1:]): R.axpy(c, v) return R
[docs] def as_range_array(self, mu=None): return self._as_array(False, mu)
[docs] def as_source_array(self, mu=None): return self._as_array(True, mu)
def _add_sub(self, other, sign): if not isinstance(other, OperatorInterface): return NotImplemented if self.name != 'LincombOperator': if isinstance(other, LincombOperator) and other.name == 'LincombOperator': operators = (self,) + other.operators coefficients = (1.,) + (other.coefficients if sign == 1. else tuple(-c for c in other.coefficients)) else: operators, coefficients = (self, other), (1., sign) elif isinstance(other, LincombOperator) and other.name == 'LincombOperator': operators = self.operators + other.operators coefficients = self.coefficients + (other.coefficients if sign == 1. else tuple(-c for c in other.coefficients)) else: operators, coefficients = self.operators + (other,), self.coefficients + (sign,) return LincombOperator(operators, coefficients, solver_options=self.solver_options) def _radd_sub(self, other, sign): if not isinstance(other, OperatorInterface): return NotImplemented # note that 'other' can never be a LincombOperator if self.name != 'LincombOperator': operators, coefficients = (other, self), (1., sign) else: operators = (other,) + self.operators coefficients = (1.,) + (self.coefficients if sign == 1. else tuple(-c for c in self.coefficients)) return LincombOperator(operators, coefficients, solver_options=other.solver_options)
[docs] def __mul__(self, other): assert isinstance(other, (Number, ParameterFunctionalInterface)) if self.name != 'LincombOperator': return LincombOperator((self,), (other,)) else: return self.with_(coefficients=tuple(c * other for c in self.coefficients))
[docs]class Concatenation(OperatorBase): """|Operator| representing the concatenation of two |Operators|. Parameters ---------- operators Tuple of |Operators| to concatenate. `operators[-1]` is the first applied operator, `operators[0]` is the last applied operator. solver_options The |solver_options| for the operator. name Name of the operator. """ def __init__(self, operators, solver_options=None, name=None): assert all(isinstance(op, OperatorInterface) for op in operators) assert all(operators[i].source == operators[i+1].range for i in range(len(operators)-1)) self.operators = tuple(operators) self.build_parameter_type(*operators) self.source = operators[-1].source self.range = operators[0].range self.linear = all(op.linear for op in operators) self.solver_options = solver_options self.name = name @property def H(self): options = {'inverse': self.solver_options.get('inverse_adjoint'), 'inverse_adjoint': self.solver_options.get('inverse')} if self.solver_options else None return type(self)(tuple(op.H for op in self.operators[::-1]), solver_options=options, name=self.name + '_adjoint')
[docs] def apply(self, U, mu=None): mu = self.parse_parameter(mu) for op in self.operators[::-1]: U = op.apply(U, mu=mu) return U
[docs] def apply_adjoint(self, V, mu=None): mu = self.parse_parameter(mu) for op in self.operators: V = op.apply_adjoint(V, mu=mu) return V
[docs] def jacobian(self, U, mu=None): assert len(U) == 1 Us = [U] for op in self.operators[:0:-1]: Us.append(op.apply(Us[-1], mu=mu)) options = self.solver_options.get('jacobian') if self.solver_options else None return Concatenation(tuple(op.jacobian(U, mu=mu) for op, U in zip(self.operators, Us[::-1])), solver_options=options, name=self.name + '_jacobian')
[docs] def restricted(self, dofs): restricted_ops = [] for op in self.operators: rop, dofs = op.restricted(dofs) restricted_ops.append(rop) return Concatenation(restricted_ops), dofs
[docs] def __matmul__(self, other): if not isinstance(other, OperatorInterface): return NotImplemented if self.name != 'Concatenation': if isinstance(other, Concatenation) and other.name == 'Concatenation': operators = (self,) + other.operators else: operators = (self, other) elif isinstance(other, Concatenation) and other.name == 'Concatenation': operators = self.operators + other.operators else: operators = self.operators + (other,) return Concatenation(operators, solver_options=self.solver_options)
def __rmatmul__(self, other): if not isinstance(other, OperatorInterface): return NotImplemented # note that 'other' can never be a Concatenation if self.name != 'Concatenation': operators = (other, self) else: operators = (other,) + self.operators return Concatenation(operators, solver_options=other.solver_options)
[docs]class ComponentProjection(OperatorBase): """|Operator| representing the projection of a |VectorArray| onto some of its components. Parameters ---------- components List or 1D |NumPy array| of the indices of the vector :meth:`~pymor.vectorarrays.interfaces.VectorArrayInterface.components` that are to be extracted by the operator. source Source |VectorSpace| of the operator. name Name of the operator. """ linear = True def __init__(self, components, source, name=None): assert all(0 <= c < source.dim for c in components) self.components = np.array(components, dtype=np.int32) self.range = NumpyVectorSpace(len(components)) self.source = source self.name = name
[docs] def apply(self, U, mu=None): assert U in self.source return self.range.make_array(U.dofs(self.components))
[docs] def restricted(self, dofs): assert all(0 <= c < self.range.dim for c in dofs) source_dofs = self.components[dofs] return IdentityOperator(NumpyVectorSpace(len(source_dofs))), source_dofs
[docs]class IdentityOperator(OperatorBase): """The identity |Operator|. In other words:: op.apply(U) == U Parameters ---------- space The |VectorSpace| the operator acts on. name Name of the operator. """ linear = True def __init__(self, space, name=None): self.source = self.range = space self.name = name @property def H(self): return self
[docs] def apply(self, U, mu=None): assert U in self.source return U.copy()
[docs] def apply_adjoint(self, V, mu=None): assert V in self.range return V.copy()
[docs] def apply_inverse(self, V, mu=None, least_squares=False): assert V in self.range return V.copy()
[docs] def apply_inverse_adjoint(self, U, mu=None, least_squares=False): assert U in self.source return U.copy()
[docs] def assemble(self, mu=None): return self
[docs] def assemble_lincomb(self, operators, coefficients, solver_options=None, name=None): if all(isinstance(op, IdentityOperator) for op in operators): if len(operators) == 1: # avoid infinite recursion return None assert all(op.source == operators[0].source for op in operators) return IdentityOperator(operators[0].source, name=name) * sum(coefficients) else: return operators[1].assemble_lincomb(operators[1:] + [operators[0]], coefficients[1:] + [coefficients[0]], solver_options=solver_options, name=name)
[docs] def restricted(self, dofs): assert all(0 <= c < self.range.dim for c in dofs) return IdentityOperator(NumpyVectorSpace(len(dofs))), dofs
[docs]class ConstantOperator(OperatorBase): """A constant |Operator| always returning the same vector. Parameters ---------- value A |VectorArray| of length 1 containing the vector which is returned by the operator. source Source |VectorSpace| of the operator. name Name of the operator. """ linear = False def __init__(self, value, source, name=None): assert isinstance(value, VectorArrayInterface) assert len(value) == 1 self.source = source self.range = value.space self.name = name self._value = value.copy()
[docs] def apply(self, U, mu=None): assert U in self.source return self._value[[0] * len(U)].copy()
[docs] def jacobian(self, U, mu=None): assert U in self.source assert len(U) == 1 return ZeroOperator(self.range, self.source, name=self.name + '_jacobian')
[docs] def restricted(self, dofs): assert all(0 <= c < self.range.dim for c in dofs) restricted_value = NumpyVectorSpace.make_array(self._value.dofs(dofs)) return ConstantOperator(restricted_value, NumpyVectorSpace(len(dofs))), dofs
[docs] def apply_inverse(self, V, mu=None, least_squares=False): if not least_squares: raise InversionError('ConstantOperator is not invertible.') return self.source.zeros(len(V))
[docs]class ZeroOperator(OperatorBase): """The |Operator| which maps every vector to zero. Parameters ---------- range Range |VectorSpace| of the operator. source Source |VectorSpace| of the operator. name Name of the operator. """ linear = True def __init__(self, range, source, name=None): assert isinstance(range, VectorSpaceInterface) assert isinstance(source, VectorSpaceInterface) self.source = source self.range = range self.name = name @property def H(self): return type(self)(self.source, self.range, name=self.name + '_adjoint')
[docs] def apply(self, U, mu=None): assert U in self.source return self.range.zeros(len(U))
[docs] def apply_adjoint(self, V, mu=None): assert V in self.range return self.source.zeros(len(V))
[docs] def apply_inverse(self, V, mu=None, least_squares=False): assert V in self.range if not least_squares: raise InversionError return self.source.zeros(len(V))
[docs] def apply_inverse_adjoint(self, U, mu=None, least_squares=False): assert U in self.source if not least_squares: raise InversionError return self.range.zeros(len(U))
[docs] def assemble_lincomb(self, operators, coefficients, solver_options=None, name=None): assert operators[0] is self if len(operators) > 1: return operators[1].assemble_lincomb(operators[1:], coefficients[1:], solver_options=solver_options, name=name) else: return self
[docs] def restricted(self, dofs): assert all(0 <= c < self.range.dim for c in dofs) return ZeroOperator(NumpyVectorSpace(len(dofs)), NumpyVectorSpace(0)), np.array([], dtype=np.int32)
[docs]class VectorArrayOperator(OperatorBase): """Wraps a |VectorArray| as an |Operator|. If `adjoint` is `False`, the operator maps from `NumpyVectorSpace(len(array))` to `array.space` by forming linear combinations of the vectors in the array with given coefficient arrays. If `adjoint == True`, the operator maps from `array.space` to `NumpyVectorSpace(len(array))` by forming the inner products of the argument with the vectors in the given array. Parameters ---------- array The |VectorArray| which is to be treated as an operator. adjoint See description above. space_id Id of the `source` (`range`) |VectorSpace| in case `adjoint` is `False` (`True`). name The name of the operator. """ linear = True def __init__(self, array, adjoint=False, space_id=None, name=None): self._array = array.copy() if adjoint: self.source = array.space self.range = NumpyVectorSpace(len(array), space_id) else: self.source = NumpyVectorSpace(len(array), space_id) self.range = array.space self.adjoint = adjoint self.space_id = space_id self.name = name @property def H(self): return VectorArrayOperator(self._array, not self.adjoint, self.space_id, self.name + '_adjoint')
[docs] def apply(self, U, mu=None): assert U in self.source if not self.adjoint: return self._array.lincomb(U.to_numpy()) else: return self.range.make_array(self._array.dot(U).T)
[docs] def apply_adjoint(self, V, mu=None): assert V in self.range if not self.adjoint: return self.source.make_array(self._array.dot(V).T) else: return self._array.lincomb(V.to_numpy())
[docs] def assemble_lincomb(self, operators, coefficients, solver_options=None, name=None): adjoint = operators[0].adjoint if not all(isinstance(op, VectorArrayOperator) and op.adjoint == adjoint and op.source == operators[0].source and op.range == operators[0].range for op in operators): return None assert not solver_options if adjoint: coefficients = np.conj(coefficients) if coefficients[0] == 1: array = operators[0]._array.copy() else: array = operators[0]._array * coefficients[0] for op, c in zip(operators[1:], coefficients[1:]): array.axpy(c, op._array) return VectorArrayOperator(array, adjoint=adjoint, space_id=operators[0].space_id, name=name)
[docs] def as_range_array(self, mu=None): if not self.adjoint: return self._array.copy() else: return super().as_range_array(mu)
[docs] def as_source_array(self, mu=None): if self.adjoint: return self._array.copy() else: return super().as_source_array(mu)
[docs] def restricted(self, dofs): assert all(0 <= c < self.range.dim for c in dofs) if not self.adjoint: restricted_value = NumpyVectorSpace.make_array(self._array.dofs(dofs)) return VectorArrayOperator(restricted_value, False), np.arange(self.source.dim, dtype=np.int32) else: raise NotImplementedError
[docs]class VectorOperator(VectorArrayOperator): """Wrap a vector as a vector-like |Operator|. Given a vector `v` of dimension `d`, this class represents the operator :: op: R^1 ----> R^d x |---> xâ‹…v In particular:: VectorOperator(vector).as_range_array() == vector Parameters ---------- vector |VectorArray| of length 1 containing the vector `v`. name Name of the operator. """ linear = True source = NumpyVectorSpace(1) def __init__(self, vector, name=None): assert isinstance(vector, VectorArrayInterface) assert len(vector) == 1 super().__init__(vector, adjoint=False, name=name)
[docs]class VectorFunctional(VectorArrayOperator): """Wrap a vector as a linear |Functional|. Given a vector `v` of dimension `d`, this class represents the functional :: f: R^d ----> R^1 u |---> (u, v) where `( , )` denotes the inner product given by `product`. In particular, if `product` is `None` :: VectorFunctional(vector).as_source_array() == vector. If `product` is not none, we obtain :: VectorFunctional(vector).as_source_array() == product.apply(vector). Parameters ---------- vector |VectorArray| of length 1 containing the vector `v`. product |Operator| representing the scalar product to use. name Name of the operator. """ linear = True range = NumpyVectorSpace(1) def __init__(self, vector, product=None, name=None): assert isinstance(vector, VectorArrayInterface) assert len(vector) == 1 assert product is None or isinstance(product, OperatorInterface) and vector in product.source if product is None: super().__init__(vector, adjoint=True, name=name) else: super().__init__(product.apply(vector), adjoint=True, name=name)
[docs]class ProxyOperator(OperatorBase): """Forwards all interface calls to given |Operator|. Mainly useful as base class for other |Operator| implementations. Parameters ---------- operator The |Operator| to wrap. name Name of the wrapping operator. """ def __init__(self, operator, name=None): assert isinstance(operator, OperatorInterface) self.source = operator.source self.range = operator.range self.operator = operator self.linear = operator.linear self.name = name self.build_parameter_type(operator) @property def H(self): return self.with_(operator=self.operator.H, name=self.name + '_adjoint')
[docs] def apply(self, U, mu=None): return self.operator.apply(U, mu=mu)
[docs] def apply_inverse(self, V, mu=None, least_squares=False): return self.operator.apply_inverse(V, mu=mu, least_squares=least_squares)
[docs] def jacobian(self, U, mu=None): return self.operator.jacobian(U, mu=mu)
[docs] def restricted(self, dofs): op, source_dofs = self.operator.restricted(dofs) return self.with_(operator=op), source_dofs
[docs]class FixedParameterOperator(ProxyOperator): """Makes an |Operator| |Parameter|-independent by setting a fixed |Parameter|. Parameters ---------- operator The |Operator| to wrap. mu The fixed |Parameter| that will be fed to the :meth:`~pymor.operators.interfaces.OperatorInterface.apply` method (and related methods) of `operator`. """ def __init__(self, operator, mu=None, name=None): super().__init__(operator, name) assert operator.parse_parameter(mu) or True self.mu = mu.copy() self.build_parameter_type()
[docs] def apply(self, U, mu=None): return self.operator.apply(U, mu=self.mu)
[docs] def apply_inverse(self, V, mu=None, least_squares=False): return self.operator.apply_inverse(V, mu=self.mu, least_squares=least_squares)
[docs] def jacobian(self, U, mu=None): return self.operator.jacobian(U, mu=self.mu)
[docs]class LinearOperator(ProxyOperator): """Mark the wrapped |Operator| to be linear.""" def __init__(self, operator, name=None): super().__init__(operator, name) self.linear = True
[docs]class AffineOperator(ProxyOperator): """Decompose an affine |Operator| into affine_shift and linear_part. """ def __init__(self, operator, name=None): if operator.parametric: raise NotImplementedError super().__init__(operator, name) self.affine_shift = ConstantOperator(operator.apply(operator.source.zeros()), source=operator.source) self.linear_part = LinearOperator(operator - self.affine_shift, name=operator.name + '_linear_part')
[docs] def jacobian(self, U, mu=None): return self.linear_part.jacobian(U, mu)
[docs]class InverseOperator(OperatorBase): """Represents the inverse of a given |Operator|. Parameters ---------- operator The |Operator| of which the inverse is formed. name If not `None`, name of the operator. """ def __init__(self, operator, name=None): assert isinstance(operator, OperatorInterface) self.build_parameter_type(operator) self.source = operator.range self.range = operator.source self.operator = operator self.linear = operator.linear self.name = name or operator.name + '_inverse' @property def H(self): return InverseAdjointOperator(self.operator)
[docs] def apply(self, U, mu=None): assert U in self.source return self.operator.apply_inverse(U, mu=mu)
[docs] def apply_adjoint(self, V, mu=None): assert V in self.range return self.operator.apply_inverse_adjoint(V, mu=mu)
[docs] def apply_inverse(self, V, mu=None, least_squares=False): assert V in self.range return self.operator.apply(V, mu=mu)
[docs] def apply_inverse_adjoint(self, U, mu=None, least_squares=False): assert U in self.source return self.operator.apply_adjoint(U, mu=mu)
[docs]class InverseAdjointOperator(OperatorBase): """Represents the inverse adjoint of a given |Operator|. Parameters ---------- operator The |Operator| of which the inverse adjoint is formed. name If not `None`, name of the operator. """ linear = True def __init__(self, operator, name=None): assert isinstance(operator, OperatorInterface) assert operator.linear self.build_parameter_type(operator) self.source = operator.source self.range = operator.range self.operator = operator self.name = name or operator.name + '_inverse_adjoint' @property def H(self): return InverseOperator(self.operator)
[docs] def apply(self, U, mu=None): assert U in self.source return self.operator.apply_inverse_adjoint(U, mu=mu)
[docs] def apply_adjoint(self, V, mu=None): assert V in self.range return self.operator.apply_inverse(V, mu=mu)
[docs] def apply_inverse(self, V, mu=None, least_squares=False): assert V in self.range return self.operator.apply_adjoint(V, mu=mu)
[docs] def apply_inverse_adjoint(self, U, mu=None, least_squares=False): assert U in self.source return self.operator.apply(U, mu=mu)