Source code for problog.forward

"""
problog.forward - Forward compilation and evaluation
----------------------------------------------------

Forward compilation using TP-operator.

..
    Part of the ProbLog distribution.

    Copyright 2015 KU Leuven, DTAI Research Group

    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

        http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
"""
import copy
import logging
import random
import signal
import time
from collections import defaultdict

from .bdd_formula import BDD
from .core import transform
from .core import transform_create_as
from .dd_formula import DD
from .dd_formula import build_dd
from .evaluator import Evaluator, EvaluatableDSP, InconsistentEvidenceError
from .formula import LogicFormula, OrderedSet, atom
from .sdd_formula import SDD
from .util import UHeap


def timeout_handler(signum, frame):
    raise SystemError("Process timeout (Python) [%s]" % signum)


[docs]class ForwardInference(DD): def __init__(self, compile_timeout=None, **kwdargs): super(ForwardInference, self).__init__(auto_compact=False, **kwdargs) self._inodes_prev = None self._inodes_old = None self._inodes_neg = None self._facts = None self._atoms_in_rules = None self._completed = None self.timeout = compile_timeout self._update_listeners = [] self._node_depths = None self.evidence_node = 0 def register_update_listener(self, obj): self._update_listeners.append(obj) def _create_atom( self, identifier, probability, group, name=None, source=None, is_extra=False ): return atom(identifier, probability, group, name, source, is_extra) def is_complete(self, node): node = abs(node) return self._completed[node - 1] def set_complete(self, node): self._completed[node - 1] = True def init_build(self): if self.evidence(): ev = [n for q, n in self.evidence() if n is None or n != 0] if ev: if len(ev) == 1: self.evidence_node = ev[0] else: self.evidence_node = self.add_and(ev) else: # Only deterministically true evidence self.evidence_node = 0 self._facts = [] # list of facts self._atoms_in_rules = defaultdict( OrderedSet ) # lookup all rules in which an atom is used self._completed = [False] * len(self) self._compute_node_depths() for index, node, nodetype in self: if self._node_depths[index - 1] is not None: # only include nodes that are reachable from a query or evidence if nodetype == "atom": # it's a fact self._facts.append(index) self.set_complete(index) else: # it's a compound for atom in node.children: self._atoms_in_rules[abs(atom)].add(index) self.build_constraint_dd() self.inodes = [None] * len(self) self._inodes_prev = [None] * len(self) self._inodes_old = [None] * len(self) self._inodes_neg = [None] * len(self) self._compute_minmax_depths() def _propagate_complete(self, interrupted=False): if not interrupted: for i, c in enumerate(self._completed): if not c: self._completed[i] = True self.notify_node_completed(i + 1) else: updated_nodes = set([(i + 1) for i, c in enumerate(self._completed) if c]) while updated_nodes: next_updates = set() # Find all heads that are affected affected_nodes = set() for node in updated_nodes: for rule in self._atoms_in_rules[node]: if not self.is_complete(rule): affected_nodes.add(rule) for head in affected_nodes: # head must be compound node = self.get_node(head) children = [self.is_complete(c) for c in node.children] if False not in children: self.is_complete(head) self.notify_node_completed(head) next_updates.add(head) updated_nodes = next_updates def _compute_node_depths(self): """Compute node depths in breadth-first manner.""" self._node_depths = [None] * len(self) self._node_levels = [] # Start with current nodes current_nodes = set( abs(n) for q, n, l in self.labeled() if self.is_probabilistic(n) ) if self.is_probabilistic(self.evidence_node): current_nodes.add(abs(self.evidence_node)) current_level = 0 while current_nodes: self._node_levels.append(current_nodes) next_nodes = set() for index in current_nodes: self._node_depths[index - 1] = current_level node = self.get_node(index) nodetype = type(node).__name__ if nodetype != "atom": for c in node.children: if self.is_probabilistic(c): if self._node_depths[abs(c) - 1] is None: next_nodes.add(abs(c)) current_nodes = next_nodes current_level += 1 def _compute_minmax_depths(self): self._node_minmax = [None] * len(self) for level, nodes in reversed(list(enumerate(self._node_levels))): for index in nodes: # Get current node's minmax minmax = self._node_minmax[index - 1] if minmax is None: minmax = level for rule in self._atoms_in_rules[index]: rule_minmax = self._node_minmax[rule - 1] if rule_minmax is None: self._node_minmax[rule - 1] = minmax else: node = self.get_node(rule) nodetype = type(node).__name__ if nodetype == "conj": rule_minmax = max(minmax, rule_minmax) else: # disj rule_minmax = min(minmax, rule_minmax) self._node_minmax[rule - 1] = rule_minmax def _update_minmax_depths(self, index, new_minmax=0): """Update the minmax depth data structure when the given node is completed. :param index: :return: """ current_minmax = self._node_minmax[index - 1] self._node_minmax[index - 1] = new_minmax for parent in self._atoms_in_rules[index]: parent_minmax = self._node_minmax[parent - 1] if current_minmax == parent_minmax: # Current node is best child => we need to recompute parent_node = self.get_node(parent) parent_nodetype = type(parent_node).__name__ parent_children_minmax = [ self._node_minmax[c - 1] for c in parent_node.children if not self.is_complete(c) ] if not parent_children_minmax: # No incomplete children self.set_complete(parent) parent_minmax = 0 elif parent_nodetype == "conj": parent_minmax = max(parent_children_minmax) else: parent_minmax = min(parent_children_minmax) self._update_minmax_depths(parent, parent_minmax) def sort_nodes(self, nodes): return sorted(nodes, key=lambda i: self._node_depths[i - 1]) def notify_node_updated(self, node, complete): for obj in self._update_listeners: obj.node_updated(self, node, complete) def notify_node_completed(self, node): for obj in self._update_listeners: obj.node_completed(self, node) def _heuristic_key_depth(self, node): # For OR: D(n) is min(D(c) for c in children) # For AND: D(n) is max(D(c) for c in children) return self._node_minmax[node - 1], self._node_depths[node - 1], random.random() def _heuristic_key(self, node): return self._heuristic_key_depth(node) def build_iteration(self, updated_nodes): to_recompute = UHeap(key=self._heuristic_key) for node in updated_nodes: for rule in self._atoms_in_rules[node]: to_recompute.push(rule) # nodes_to_recompute should be an updateable heap without duplicates while to_recompute: key, node = to_recompute.pop_with_key() if self.update_inode(node): # The node has changed # Find rules that may be affected for rule in self._atoms_in_rules[node]: to_recompute.push(rule) # Notify listeners that node was updated self.notify_node_updated(node, self.is_complete(node)) elif self.is_complete(node): self.notify_node_completed(node) # if self.is_complete(node): # self._update_minmax_depths(node) def build_iteration_levelwise(self, updated_nodes): while updated_nodes: next_updates = OrderedSet() # Find all heads that are affected affected_nodes = OrderedSet() for node in updated_nodes: for rule in self._atoms_in_rules[node]: affected_nodes.add(rule) affected_nodes = self.sort_nodes(affected_nodes) # print (affected_nodes, [self._node_depths[i-1] for i in affected_nodes]) for head in affected_nodes: if self.update_inode(head): next_updates.add(head) self.notify_node_updated(head, self.is_complete(head)) elif self.is_complete(head): self.notify_node_completed(head) updated_nodes = next_updates def build_stratum(self, updated_nodes): self.build_iteration(updated_nodes) updated_nodes = OrderedSet() for i, nodes in enumerate(zip(self.inodes, self._inodes_old)): if not self.get_manager().same(*nodes): updated_nodes.add(i + 1) # self.notify_node_updated(i + 1) self.get_manager().ref(*filter(None, self.inodes)) self.get_manager().deref(*filter(None, self._inodes_prev)) self.get_manager().deref(*filter(None, self._inodes_neg)) # Only completed nodes should be used for negation in the next stratum. self._inodes_old = self.inodes[:] self._inodes_prev = [None] * len(self) for i, n in enumerate(self.inodes): if self._completed[i]: self._inodes_prev[i] = n self._inodes_neg = [None] * len(self) return updated_nodes
[docs] def build_dd(self): required_nodes = set( [abs(n) for q, n, l in self.labeled() if self.is_probabilistic(n)] ) required_nodes |= set( [abs(n) for q, n, v in self.evidence_all() if self.is_probabilistic(n)] ) if self.timeout: # signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(self.timeout) signal.signal(signal.SIGALRM, timeout_handler) logging.getLogger("problog").info("Set timeout:", self.timeout) try: self.init_build() updated_nodes = OrderedSet(self._facts) while updated_nodes: # TODO only check nodes that are actually used in negation updated_nodes = self.build_stratum(updated_nodes) self._propagate_complete(False) except SystemError as err: self._propagate_complete(True) logging.getLogger("problog").warning(err) except KeyboardInterrupt as err: self._propagate_complete(True) logging.getLogger("problog").warning(err) signal.alarm(0) self.build_constraint_dd()
def current(self): destination = LogicFormula(auto_compact=False) source = self # TODO maintain a translation table for i, n, t in source: inode = self.get_inode(i) if inode is not None: inode = int(inode) if t == "atom": j = destination.add_atom( n.identifier, n.probability, n.group, name=inode, is_extra=n.is_extra, ) elif t == "conj": children = [c for c in n.children if self.get_inode(c) is not None] j = destination.add_and(children, name=inode) elif t == "disj": children = [c for c in n.children if self.get_inode(c) is not None] j = destination.add_or(children, name=inode) else: raise TypeError("Unknown node type") assert i == j for name, node, label in source.get_names_with_label(): if label != self.LABEL_NAMED: destination.add_name(name, node, label) for c in source.constraints(): if c.is_nontrivial(): destination.add_constraint(c) return destination
[docs] def update_inode(self, index): """Recompute the inode at the given index.""" was_complete = self.is_complete(index) oldnode = self.get_inode(index) node = self.get_node(index) assert index > 0 nodetype = type(node).__name__ if nodetype == "conj": children = [self.get_inode(c) for c in node.children] children_complete = [self.is_complete(c) for c in node.children] if None in children: newnode = None # don't compute if some children are still unknown else: newnode = self.get_manager().conjoin(*children) if False not in children_complete: self.set_complete(index) elif nodetype == "disj": children = [self.get_inode(c) for c in node.children] children_complete = [self.is_complete(c) for c in node.children] children = list( filter(None, children) ) # discard children that are still unknown if children: newnode = self.get_manager().disjoin(*children) else: newnode = None if False not in children_complete: self.set_complete(index) else: raise TypeError("Unexpected node type.") # Add constraints if newnode is not None: newernode = self.get_manager().conjoin(newnode, self.get_constraint_inode()) self.get_manager().deref(newnode) newnode = newernode if self.get_manager().same(oldnode, newnode): return self.is_complete(index) != was_complete # no change occurred else: if oldnode is not None: self.get_manager().deref(oldnode) self.set_inode(index, newnode) return True
def get_evidence_inode(self): if not self.is_probabilistic(self.evidence_node): return self.get_manager().true() else: inode = self.get_inode(self.evidence_node, final=True) if inode: return inode else: return self.get_manager().true()
[docs] def get_inode(self, index, final=False): """ Get the internal node corresponding to the entry at the given index. :param index: index of node to retrieve :return: SDD node corresponding to the given index :rtype: SDDNode """ assert self.is_probabilistic(index) node = self.get_node(abs(index)) if type(node).__name__ == "atom": av = self.atom2var.get(abs(index)) if av is None: av = self.get_manager().add_variable() self.atom2var[abs(index)] = av self.var2atom[av] = abs(index) result = self.get_manager().literal(av) if index < 0: return self.get_manager().negate(result) else: return result elif index < 0 and not final: # We are requesting a negated node => use previous stratum's result result = self._inodes_neg[-index - 1] if result is None and self._inodes_prev[-index - 1] is not None: result = self.get_manager().negate(self._inodes_prev[-index - 1]) self._inodes_neg[-index - 1] = result return result elif index < 0: return self.get_manager().negate(self.inodes[-index - 1]) else: return self.inodes[index - 1]
[docs] def set_inode(self, index, node): """Set the internal node for the given index. :param index: index at which to set the new node :type index: int > 0 :param node: new node """ assert index is not None assert index > 0 self.inodes[index - 1] = node
[docs] def add_constraint(self, c): LogicFormula.add_constraint(self, c)
class _ForwardSDD(SDD, ForwardInference): transform_preference = 1000 def __init__(self, sdd_auto_gc=False, **kwdargs): SDD.__init__(self, sdd_auto_gc=sdd_auto_gc, **kwdargs) ForwardInference.__init__(self, **kwdargs) @classmethod def is_available(cls): return SDD.is_available() def to_explicit_encoding(self): """ Transform the current implicit encoding to an SDD with explicit encoding. The latter will contain indicator variables for inferred literals and contains both the 'true and false' case. For example, while the implicit encoding for c :- a,b will result in c = a * b., the explicit encoding will result in (c * a * b) + (-c * (-a + -b)) In the explicit encoding, evidence can be incorporated as changing the weights and a negation can also easily be queried. Furthermore, the explicit encoding implementation can guarantee a circuit with one root instead of multiple. :return: The explicit encoding of this formula. :rtype: SDDExplicit """ from .sdd_formula_explicit import build_explicit_from_forwardsdd from .sdd_formula_explicit import SDDExplicit return build_explicit_from_forwardsdd(source=self, destination=SDDExplicit()) def copy_to_noref(self, destination): """ Copy the relevant data structures without any refcounts to the new inodes. Because of this, the destination should have auto_gc disabled. :param destination: The DD to copy the data structures' references to. :type destination: DD """ destination.atom2var = self.atom2var.copy() destination.var2atom = self.var2atom.copy() old = self.inode_manager.nodes self.inode_manager.nodes = self.inodes destination.inode_manager = self.inode_manager.get_deepcopy_noref() self.inode_manager.nodes = old destination._atomcount = self._atomcount destination._weights = self._weights.copy() destination._constraints = self._constraints.copy() destination.evidence_node = self.evidence_node destination._nodes = self._nodes.copy() destination._index_atom = self._index_atom.copy() destination._index_next = self._index_next destination._index_conj = self._index_conj.copy() destination._index_disj = self._index_disj.copy() destination._names = copy.deepcopy(self._names) destination._constraints_me = copy.deepcopy(self._constraints_me) class _ForwardBDD(BDD, ForwardInference): transform_preference = 1000 def __init__(self, **kwdargs): BDD.__init__(self, **kwdargs) ForwardInference.__init__(self, **kwdargs) @classmethod def is_available(cls): return BDD.is_available() @transform(LogicFormula, _ForwardSDD) def build_sdd(source, destination, **kwdargs): result = build_dd(source, destination, **kwdargs) return result @transform(LogicFormula, _ForwardBDD) def build_sdd(source, destination, **kwdargs): result = build_dd(source, destination, **kwdargs) return result
[docs]class ForwardSDD(LogicFormula, EvaluatableDSP): transform_preference = 30 def __init__(self, **kwargs): LogicFormula.__init__(self, **kwargs) EvaluatableDSP.__init__(self) self.kwargs = kwargs self.internal = _ForwardSDD(**self.kwargs) @classmethod def is_available(cls): return SDD.is_available() def _create_evaluator(self, semiring, weights, **kwargs): return ForwardEvaluator(self, semiring, self.internal, weights, **kwargs) def to_formula(self): build_dd(self, self.internal) return self.internal.to_formula()
[docs]class ForwardBDD(LogicFormula, EvaluatableDSP): transform_preference = 40 def __init__(self, **kwargs): LogicFormula.__init__(self, **kwargs) EvaluatableDSP.__init__(self) self.kwargs = kwargs @classmethod def is_available(cls): return BDD.is_available() def _create_evaluator(self, semiring, weights, **kwargs): return ForwardEvaluator( self, semiring, _ForwardBDD(**self.kwargs), weights, **kwargs )
# Inform the system that we can create a ForwardFormula in the same way as a LogicFormula. transform_create_as(ForwardSDD, LogicFormula) transform_create_as(ForwardBDD, LogicFormula)
[docs]class ForwardEvaluator(Evaluator): """An evaluator using anytime forward compilation.""" def __init__(self, formula, semiring, fdd, weights=None, verbose=None, **kwargs): Evaluator.__init__(self, formula, semiring, weights, **kwargs) self.fsdd = fdd self._z = None self._verbose = verbose self._results = {} self._complete = set() self._start_time = None def node_updated(self, source, node, complete): is_evidence = node == abs(source.evidence_node) name = [ n for n, i, l in self.formula.labeled() if source.is_probabilistic(i) and abs(i) == node ] if is_evidence: name = ("evidence",) if name: name = name[0] weights = {} for atom, weight in self.weights.items(): av = source.atom2var.get(atom) if av is not None: weights[av] = weight inode = source.get_inode(node, final=True) if inode is not None: enode = source.get_manager().conjoin( source.get_evidence_inode(), source.get_constraint_inode() ) if self.fsdd.get_manager().is_false(enode): raise InconsistentEvidenceError(context=" during compilation") qnode = source.get_manager().conjoin(inode, enode) tvalue = source.get_manager().wmc(enode, weights, self.semiring) value = source.get_manager().wmc(qnode, weights, self.semiring) result = self.semiring.normalize(value, tvalue) self._results[node] = result debug_msg = "update query %s: %s after %ss" % ( name, self.semiring.result(result, self.formula), "%.4f" % (time.time() - self._start_time), ) logging.getLogger("problog").debug(debug_msg) if complete: self._complete.add(node) # if is_evidence: # for c in self._complete: # if c != node: # self.node_updated(source, c, complete) def node_completed(self, source, node): qs = set( abs(qi) for qn, qi, ql in source.labeled() if source.is_probabilistic(qi) ) if node in qs: self._complete.add(node) def initialize(self): self.weights = self.formula.extract_weights(self.semiring, self.given_weights) # We should do all compilation here. self.fsdd.register_update_listener(self) self._start_time = time.time() if len(self.fsdd) == 0: if self.fsdd.init_varcount == -1: self.fsdd.init_varcount = self.formula.atomcount build_dd(self.formula, self.fsdd) # Update weights with constraints and evidence enode = self.fsdd.get_manager().conjoin( self.fsdd.get_evidence_inode(), self.fsdd.get_constraint_inode() ) if self.fsdd.get_manager().is_false(enode): raise InconsistentEvidenceError(context=" during compilation") # Make sure all atoms exist in atom2var. for name, node, label in self.fsdd.labeled(): if self.fsdd.is_probabilistic(node): self.fsdd.get_inode(node) weights = {} for atom, weight in self.weights.items(): av = self.fsdd.atom2var.get(atom) if av is not None: weights[av] = weight elif atom == 0: weights[0] = weight for name, node, label in self.fsdd.labeled(): if self.fsdd.is_probabilistic(node): inode = self.fsdd.get_inode(node) qnode = self.fsdd.get_manager().conjoin(inode, enode) tvalue = self.fsdd.get_manager().wmc(enode, weights, self.semiring) value = self.fsdd.get_manager().wmc(qnode, weights, self.semiring) result = self.semiring.normalize(value, tvalue) elif self.fsdd.is_true(node): result = self.semiring.one() else: result = self.semiring.zero() self._results[node] = result
[docs] def propagate(self): self.initialize()
[docs] def evaluate(self, index): """Compute the value of the given node.""" # We should get results from cache here. ub = 1.0 if index is None: return 0.0 elif index == 0: if not self.semiring.is_nsp(): return self.semiring.result(self.semiring.one(), self.formula) else: weights = {} for atom, weight in self.weights.items(): av = self.fsdd.atom2var.get(atom) if av is not None: weights[av] = weight elif atom == 0: weights[0] = weight enode = self.fsdd.get_manager().conjoin( self.fsdd.get_evidence_inode(), self.fsdd.get_constraint_inode() ) tvalue = self.fsdd.get_manager().wmc(enode, weights, self.semiring) result = self.semiring.normalize(tvalue, tvalue) return self.semiring.result(result, self.formula) else: n = self.formula.get_node(abs(index)) nt = type(n).__name__ if nt == "atom": wp = self._results[index] # wp, wn = self.weights.get(abs(index)) if index < 0: # wn = self.semiring.negate(wp) return self.semiring.result(wp, self.formula) else: return self.semiring.result(wp, self.formula) else: # TODO report correct bounds in case of evidence if index < 0: if -index in self._results: if -index in self._complete: return self.semiring.result( self.semiring.negate(self._results[-index]), self.formula, ) else: return ( self.semiring.result(self.semiring.zero()), self.semiring.result( self.semiring.negate(self._results[-index]), self.formula, ), ) else: return ( self.semiring.result(self.semiring.zero()), self.semiring.result(self.semiring.one()), ) else: if index in self._results: if index in self._complete: return self.semiring.result( self._results[index], self.formula ) else: return ( self.semiring.result( self._results[index], self.formula ), self.semiring.result(self.semiring.one()), ) else: return ( self.semiring.result(self.semiring.zero()), self.semiring.result(self.semiring.one()), )
def evaluate_evidence(self): raise NotImplementedError("Evaluator.evaluate_evidence is an abstract method.") # def add_evidence(self, node): # """Add evidence""" # warnings.warn('Evidence is not supported by this evaluation method and will be ignored.')
[docs] def has_evidence(self): return self.__evidence != []
[docs] def clear_evidence(self): self.__evidence = []
[docs] def evidence(self): return iter(self.__evidence)