Source code for problog.ddnnf_formula

"""
problog.nnf_formula - d-DNNF
----------------------------

Provides access to d-DNNF formulae.

..
    Part of the ProbLog distribution.

    Copyright 2015 KU Leuven, DTAI Research Group

    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

        http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
"""
import os
import subprocess
import tempfile
from collections import defaultdict

from . import system_info
from .cnf_formula import CNF
from .constraint import ConstraintAD
from .core import transform
from .errors import CompilationError
from .errors import InconsistentEvidenceError
from .evaluator import Evaluator, EvaluatableDSP
from .formula import LogicDAG
from .util import Timer, subprocess_check_call


[docs]class DSharpError(CompilationError): """DSharp has crashed.""" def __init__(self): msg = "DSharp has encountered an error" if system_info["os"] == "darwin": msg += ". This is a known issue. See KNOWN_ISSUES for details on how to resolve this problem" CompilationError.__init__(self, msg)
[docs]class DDNNF(LogicDAG, EvaluatableDSP): """A d-DNNF formula.""" transform_preference = 20 # noinspection PyUnusedLocal,PyUnusedLocal,PyUnusedLocal def __init__(self, **kwdargs): LogicDAG.__init__(self, auto_compact=False) def _create_evaluator(self, semiring, weights, **kwargs): return SimpleDDNNFEvaluator(self, semiring, weights)
[docs]class SimpleDDNNFEvaluator(Evaluator): """Evaluator for d-DNNFs.""" def __init__(self, formula, semiring, weights=None, **kwargs): Evaluator.__init__(self, formula, semiring, weights, **kwargs) self.cache_intermediate = {} # weights of intermediate nodes def _initialize(self, with_evidence=True): self.weights.clear() model_weights = self.formula.extract_weights(self.semiring, self.given_weights) self.weights = model_weights.copy() if with_evidence: for ev in self.evidence(): self.set_evidence(abs(ev), ev > 0) if self.semiring.is_zero(self._get_z()): raise InconsistentEvidenceError(context=" during evidence evaluation")
[docs] def propagate(self): self._initialize()
def _get_z(self): result = self.get_root_weight() return result def evaluate_evidence(self, recompute=False): return self.semiring.result( self._evaluate_evidence(recompute=recompute), self.formula ) # noinspection PyUnusedLocal def _evaluate_evidence(self, recompute=False): self._initialize(False) for ev in self.evidence(): self._set_value(abs(ev), ev > 0) result = self.get_root_weight() return result
[docs] def evaluate_fact(self, node): return self.evaluate(node)
[docs] def evaluate(self, node): if node == 0: # if query = True if not self.semiring.is_nsp(): result = self.semiring.one() else: result = self.get_root_weight() result = self.semiring.normalize(result, self._get_z()) elif node is None: result = self.semiring.zero() else: p = self._get_weight(abs(node)) n = self._get_weight(-abs(node)) self._set_value(abs(node), (node > 0)) result = self.get_root_weight() self._reset_value(abs(node), p, n) if self.has_evidence() or self.semiring.is_nsp() or self.has_constraints(ignore_type={ConstraintAD}): result = self.semiring.normalize(result, self._get_z()) return self.semiring.result(result, self.formula)
[docs] def has_constraints(self, ignore_type=None): """ Check whether the formula has any constraints that are not of the ignore_type. :param ignore_type: A set of constraint classes to ignore. :type ignore_type: None | Set """ ignore_type = ignore_type or set() return any(type(constraint) not in ignore_type for constraint in self.formula.constraints())
def _reset_value(self, index, pos, neg): self.set_weight(index, pos, neg)
[docs] def get_root_weight(self): """Get the WMC of the root of this formula. :return: The WMC of the root of this formula (WMC of node len(self.formula)), multiplied with weight of True (self.weights.get(0)). """ result = self._get_weight(len(self.formula)) return ( self.semiring.times(result, self.weights.get(0)[0]) if self.weights.get(0) is not None else result )
def _get_weight(self, index): if index == 0: return self.semiring.one() elif index is None: return self.semiring.zero() else: abs_index = abs(index) w = self.weights.get(abs_index) or self.cache_intermediate.get(abs_index) if w is not None: return w[index < 0] else: w = self._calculate_weight(index) self.cache_intermediate[abs_index] = w, w return w
[docs] def set_weight(self, index, pos, neg): # index = index of atom in weights, so atom2var[key] = index self.weights[index] = (pos, neg) self.cache_intermediate.clear()
[docs] def set_evidence(self, index, value): curr_pos_weight, curr_neg_weight = self.weights.get(index) pos, neg = self.semiring.to_evidence( curr_pos_weight, curr_neg_weight, sign=value ) if (value and self.semiring.is_zero(curr_pos_weight)) or ( not value and self.semiring.is_zero(curr_neg_weight) ): raise InconsistentEvidenceError(self._deref_node(index)) self.set_weight(index, pos, neg)
def _deref_node(self, index): return self.formula.get_node(index).name def _set_value(self, index, value): """Set value for given node. :param index: index of node :param value: value """ if value: pos = self._get_weight(index) self.set_weight(index, pos, self.semiring.zero()) else: neg = self._get_weight(-index) self.set_weight(index, self.semiring.zero(), neg) def _calculate_weight(self, key): assert key != 0 assert key is not None # assert(key > 0) node = self.formula.get_node(abs(key)) ntype = type(node).__name__ if ntype == "atom": return self.semiring.one() else: assert key > 0 childprobs = [self._get_weight(c) for c in node.children] if ntype == "conj": p = self.semiring.one() for c in childprobs: p = self.semiring.times(p, c) return p elif ntype == "disj": p = self.semiring.zero() for c in childprobs: p = self.semiring.plus(p, c) return p else: raise TypeError("Unexpected node type: '%s'." % ntype)
[docs]class Compiler(object): """Interface to CNF to d-DNNF compiler tool.""" __compilers = {}
[docs] @classmethod def get_default(cls): """Get default compiler for this system.""" if system_info.get("c2d", False): return _compile_with_c2d else: return _compile_with_dsharp
[docs] @classmethod def get(cls, name): """Get compiler by name (or default if name not found). :param name: name of the compiler :returns: function used to call compiler """ result = cls.__compilers.get(name) if result is None: result = cls.get_default() return result
[docs] @classmethod def add(cls, name, func): """Add a compiler. :param name: name of the compiler :param func: function used to call the compiler """ cls.__compilers[name] = func
if system_info.get("c2d", False): # noinspection PyUnusedLocal @transform(CNF, DDNNF) def _compile_with_c2d(cnf, nnf=None, smooth=True, **kwdargs): fd, cnf_file = tempfile.mkstemp(".cnf") os.close(fd) nnf_file = cnf_file + ".nnf" if smooth: smoothl = ["-smooth_all"] else: smoothl = [] cmd = ["cnf2dDNNF", "-dt_method", "0"] + smoothl + ["-reduce", "-in", cnf_file] try: os.remove(cnf_file) except OSError: pass try: os.remove(nnf_file) except OSError: pass return _compile(cnf, cmd, cnf_file, nnf_file) Compiler.add("c2d", _compile_with_c2d) # noinspection PyUnusedLocal @transform(CNF, DDNNF) def _compile_with_dsharp(cnf, nnf=None, smooth=True, **kwdargs): result = None with Timer("DSharp compilation"): fd1, cnf_file = tempfile.mkstemp(".cnf") fd2, nnf_file = tempfile.mkstemp(".nnf") os.close(fd1) os.close(fd2) if smooth: smoothl = ["-smoothNNF"] else: smoothl = [] cmd = ["dsharp", "-Fnnf", nnf_file] + smoothl + ["-disableAllLits", cnf_file] # try: result = _compile(cnf, cmd, cnf_file, nnf_file) except subprocess.CalledProcessError: raise DSharpError() try: os.remove(cnf_file) except OSError: pass try: os.remove(nnf_file) except OSError: pass return result Compiler.add("dsharp", _compile_with_dsharp) def _compile(cnf, cmd, cnf_file, nnf_file): names = cnf.get_names_with_label() if cnf.is_trivial(): nnf = DDNNF() weights = cnf.get_weights() for i in range(1, cnf.atomcount + 1): nnf.add_atom(i, weights.get(i)) or_nodes = [] for i in range(1, cnf.atomcount + 1): or_nodes.append(nnf.add_or((i, -i))) if or_nodes: nnf.add_and(or_nodes) for name, node, label in names: nnf.add_name(name, node, label) for c in cnf.constraints(): nnf.add_constraint(c.copy()) return nnf else: with open(cnf_file, "w") as f: f.write(cnf.to_dimacs()) attempts_left = 1 success = False while attempts_left and not success: try: with open(os.devnull, "w") as OUT_NULL: subprocess_check_call(cmd, stdout=OUT_NULL) success = True except subprocess.CalledProcessError as err: attempts_left -= 1 if attempts_left == 0: raise err return _load_nnf(nnf_file, cnf) def _load_nnf(filename, cnf): nnf = DDNNF() weights = cnf.get_weights() names_inv = defaultdict(list) for name, node, label in cnf.get_names_with_label(): names_inv[node].append((name, label)) with open(filename) as f: line2node = {} rename = {} lnum = 0 for line in f: line = line.strip().split() if line[0] == "nnf": pass elif line[0] == "L": name = int(line[1]) prob = weights.get(abs(name), True) node = nnf.add_atom(abs(name), prob) rename[abs(name)] = node if name < 0: node = -node line2node[lnum] = node if name in names_inv: for actual_name, label in names_inv[name]: nnf.add_name(actual_name, node, label) del names_inv[name] lnum += 1 elif line[0] == "A": children = map(lambda x: line2node[int(x)], line[2:]) line2node[lnum] = nnf.add_and(children) lnum += 1 elif line[0] == "O": children = map(lambda x: line2node[int(x)], line[3:]) line2node[lnum] = nnf.add_or(children) lnum += 1 else: print("Unknown line type") for name in names_inv: for actual_name, label in names_inv[name]: if name == 0: nnf.add_name(actual_name, 0, label) else: nnf.add_name(actual_name, None, label) for c in cnf.constraints(): nnf.add_constraint(c.copy(rename)) return nnf