"""
problog.engine_builtin - Grounding engine builtins
--------------------------------------------------
Implementation of Prolog / ProbLog builtins.
..
Part of the ProbLog distribution.
Copyright 2015 KU Leuven, DTAI Research Group
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import sys
import typing
from .clausedb import ClauseDB
from .engine import UnknownClauseInternal, UnknownClause, ClauseDBEngine
from .engine_unify import unify_value, UnifyError, substitute_simple
from .errors import GroundingError, UserError
from .logic import (
term2str,
Term,
Clause,
Constant,
term2list,
list2term,
is_ground,
is_variable,
Var,
AnnotatedDisjunction,
Object,
)
class builtin(object):
registry = None
@classmethod
def register(cls, tp, name, arity, func):
if cls.registry is None:
cls.registry = []
cls.registry.append((tp, name, arity, func))
@classmethod
def _add_builtin(cls, engine: ClauseDBEngine, bltn, b=None, s=None, sp=None):
if bltn[0] == "bool":
engine.add_builtin(bltn[1], bltn[2], b(bltn[3]))
elif bltn[0] == "det":
engine.add_builtin(bltn[1], bltn[2], s(bltn[3]))
elif bltn[0] == "prob":
engine.add_builtin(bltn[1], bltn[2], sp(bltn[3]))
elif bltn[0] == "raw":
engine.add_builtin(bltn[1], bltn[2], bltn[3])
else:
raise ValueError("Unknown builtin type '%s'." % bltn[0])
@classmethod
def add_builtins(cls, engine: ClauseDBEngine, b=None, s=None, sp=None):
if cls.registry is not None:
for bltn in cls.registry:
cls._add_builtin(engine, bltn, b, s, sp)
def __init__(self, builtin_type, builtin_name, builtin_arity):
self.type = builtin_type
self.name = builtin_name
self.arity = builtin_arity
def __call__(self, func):
builtin.register(self.type, self.name, self.arity, func)
return func
class builtin_boolean(builtin):
def __init__(self, *args):
builtin.__init__(self, "bool", *args)
class builtin_simple(builtin):
def __init__(self, *args):
builtin.__init__(self, "det", *args)
class builtin_probabilistic(builtin):
def __init__(self, *args):
builtin.__init__(self, "prob", *args)
class builtin_raw(builtin):
def __init__(self, *args):
builtin.__init__(self, "raw", *args)
[docs]def add_standard_builtins(engine, b=None, s=None, sp=None):
"""Adds standard builtins to the given engine.
:param engine: engine to add builtins to
:type engine: ClauseDBEngine
:param b: wrapper for boolean builtins (returning True/False)
:param s: wrapper for simple builtins (return deterministic results)
:param sp: wrapper for probabilistic builtins (return probabilistic results)
"""
# SPECIAL CASES NEED TO BE IN ORDER
engine.add_builtin("true", 0, b(_builtin_true)) # -1
engine.add_builtin("fail", 0, b(_builtin_fail)) # -2
engine.add_builtin("false", 0, b(_builtin_fail)) # -3
engine.add_builtin("=", 2, s(_builtin_eq)) # -4
engine.add_builtin("\\=", 2, b(_builtin_neq)) # -5
engine.add_builtin("findall", 3, sp(_builtin_findall)) # -6
engine.add_builtin("all", 3, sp(_builtin_all)) # -7
engine.add_builtin("all_or_none", 3, sp(_builtin_all_or_none)) # -8
engine.add_builtin("==", 2, b(_builtin_same))
engine.add_builtin("\\==", 2, b(_builtin_notsame))
engine.add_builtin("is", 2, s(_builtin_is))
engine.add_builtin(">", 2, b(_builtin_gt))
engine.add_builtin("<", 2, b(_builtin_lt))
engine.add_builtin("=<", 2, b(_builtin_le))
engine.add_builtin(">=", 2, b(_builtin_ge))
engine.add_builtin("=\\=", 2, b(_builtin_val_neq))
engine.add_builtin("=:=", 2, b(_builtin_val_eq))
engine.add_builtin("var", 1, b(_builtin_var))
engine.add_builtin("atom", 1, b(_builtin_atom))
engine.add_builtin("atomic", 1, b(_builtin_atomic))
engine.add_builtin("compound", 1, b(_builtin_compound))
engine.add_builtin("float", 1, b(_builtin_float))
engine.add_builtin("rational", 1, b(_builtin_rational))
engine.add_builtin("integer", 1, b(_builtin_integer))
engine.add_builtin("nonvar", 1, b(_builtin_nonvar))
engine.add_builtin("number", 1, b(_builtin_number))
engine.add_builtin("simple", 1, b(_builtin_simple))
engine.add_builtin("callable", 1, b(_builtin_callable))
engine.add_builtin("dbreference", 1, b(_builtin_dbreference))
engine.add_builtin("primitive", 1, b(_builtin_primitive))
engine.add_builtin("ground", 1, b(_builtin_ground))
engine.add_builtin("is_list", 1, b(_builtin_is_list))
engine.add_builtin("=..", 2, s(_builtin_split_call))
engine.add_builtin("arg", 3, s(_builtin_arg))
engine.add_builtin("functor", 3, s(_builtin_functor))
engine.add_builtin("@>", 2, b(_builtin_struct_gt))
engine.add_builtin("@<", 2, b(_builtin_struct_lt))
engine.add_builtin("@>=", 2, b(_builtin_struct_ge))
engine.add_builtin("@=<", 2, b(_builtin_struct_le))
engine.add_builtin("compare", 3, s(_builtin_compare))
engine.add_builtin("length", 2, s(_builtin_length))
# engine.add_builtin('call_external', 2, s(_builtin_call_external))
engine.add_builtin("sort", 2, s(_builtin_sort))
engine.add_builtin("between", 3, s(_builtin_between))
engine.add_builtin("succ", 2, s(_builtin_succ))
engine.add_builtin("plus", 3, s(_builtin_plus))
engine.add_builtin("consult", 1, b(_builtin_consult))
engine.add_builtin(".", 2, b(_builtin_consult_as_list))
# engine.add_builtin('load_external', 1, b(_builtin_load_external))
engine.add_builtin("unknown", 1, b(_builtin_unknown))
engine.add_builtin("use_module", 1, b(_builtin_use_module))
engine.add_builtin("use_module", 2, b(_builtin_use_module2))
engine.add_builtin("module", 2, b(_builtin_module))
engine.add_builtin("once", 1, _builtin_call)
engine.add_builtin("call", 1, _builtin_call)
engine.add_builtin("call_nc", 1, _builtin_call_nc)
engine.add_builtin("try_call", 1, _builtin_try_call)
for i in range(2, 10):
engine.add_builtin("call", i, _builtin_calln)
engine.add_builtin("call_nc", i, _builtin_calln_nc)
engine.add_builtin("try_call", i, _builtin_try_calln)
engine.add_builtin("subquery", 2, s(_builtin_subquery))
engine.add_builtin("subquery", 3, s(_builtin_subquery))
engine.add_builtin("subquery", 5, s(_builtin_subquery))
engine.add_builtin("sample_uniform1", 3, sp(_builtin_sample_uniform))
for i in range(1, 10):
engine.add_builtin("debugprint", i, b(_builtin_debugprint))
for i in range(1, 10):
engine.add_builtin("write", i, b(_builtin_write))
for i in range(1, 10):
engine.add_builtin("writenl", i, b(_builtin_writenl))
engine.add_builtin("writeln", i, b(_builtin_writenl))
for i in range(1, 10):
engine.add_builtin("error", i, b(_builtin_error))
engine.add_builtin("nl", 0, b(_builtin_nl))
engine.add_builtin("cmd_args", 1, s(_builtin_cmdargs))
engine.add_builtin("atom_number", 2, s(_builtin_atom_number))
engine.add_builtin("nocache", 2, b(_builtin_nocache))
engine.add_builtin("numbervars", 2, s(_builtin_numbervars_0))
engine.add_builtin("numbervars", 3, s(_builtin_numbervars))
engine.add_builtin("varnumbers", 2, s(_builtin_varnumbers))
engine.add_builtin("subsumes_term", 2, b(_builtin_subsumes_term))
engine.add_builtin("subsumes_chk", 2, b(_builtin_subsumes_term))
engine.add_builtin("possible", 1, s(_builtin_possible))
engine.add_builtin("clause", 2, s(_builtin_clause))
engine.add_builtin("clause", 3, s(_builtin_clause3))
engine.add_builtin("create_scope", 2, s(_builtin_create_scope))
engine.add_builtin("subquery_in_scope", 3, s(_builtin_subquery_in_scope))
engine.add_builtin("subquery_in_scope", 4, s(_builtin_subquery_in_scope))
engine.add_builtin("subquery_in_scope", 6, s(_builtin_subquery_in_scope))
engine.add_builtin("call_in_scope", 2, _builtin_call_in_scope)
for i in range(2, 10):
engine.add_builtin("call_in_scope", i + 1, _builtin_calln_in_scope)
engine.add_builtin("find_scope", 2, s(_builtin_find_scope))
#engine.add_builtin("forall", 2, s(_builtin_forall)) TODO: Add forall as built-in here
builtin.add_builtins(engine, b, s, sp)
# @builtin_boolean('nocache', 1)
def _builtin_nocache(functor, arity, database=None, **kwd):
check_mode((functor, arity), ["ai"], **kwd)
database.dont_cache.add((str(functor), int(arity)))
return True
def _builtin_clause(head, body, database=None, **kwd):
mode = check_mode((head, body), ["c*", "v*"], **kwd)
if mode == 0:
clause_def = database.find(head)
if clause_def is None:
clauses = []
else:
clause_ids = database.get_node(clause_def).children
clauses = [database.to_clause(c) for c in clause_ids]
else:
clauses = list(database)
result = []
for clause in clauses:
if isinstance(clause, Clause):
h = clause.head
b = clause.body
elif isinstance(clause, AnnotatedDisjunction):
h = list2term(clause.heads)
b = clause.body
else:
h = clause
b = Term("true")
try:
# Perform unifying checks before storing the results
# can't do unify_value with Vars, first replace them with ints (database._create_ints)
unify_value(head, database._create_ints(h), {})
unify_value(body, database._create_ints(b), {})
result.append((h, b))
except UnifyError:
pass
return result
def _builtin_clause3(head, body, prob, database=None, **kwd):
mode = check_mode((head, body, prob), ["c**", "v**"], **kwd)
if mode == 0:
clause_def = database.find(head)
if clause_def is None:
clauses = []
else:
clause_ids = database.get_node(clause_def).children
clauses = [database.to_clause(c) for c in clause_ids]
else:
clauses = list(database)
result = []
for clause in clauses:
if isinstance(clause, Clause):
h = clause.head
b = clause.body
p = clause.head.probability
elif isinstance(clause, AnnotatedDisjunction):
h = list2term(clause.heads)
b = clause.body
# p = clause.heads.probability
p = list2term([hh.probability for hh in clause.heads])
else:
h = clause
b = Term("true")
p = clause.probability
if p is None:
p = Constant(1.0)
try:
unify_value(head, h, {})
unify_value(body, b, {})
unify_value(prob, p, {})
result.append((h, b, p))
except UnifyError:
pass
return result
def _builtin_cmdargs(lst, engine=None, **kwd):
m = check_mode((lst,), ["v", "L"], **kwd)
args = engine.args
if args is None:
args = []
args = list2term(list(map(Term, args)))
if m == 0:
return [(args,)]
else:
try:
value = unify_value(args, lst, {})
return [(value,)]
except UnifyError:
return []
def _builtin_atom_number(atom, number, **kwd):
mode = check_mode((atom, number), ["vf", "vi", "av", "af", "ai"], **kwd)
if mode in (0, 1):
return [(Term(str(number)), number)]
elif mode == 2:
try:
v = float(atom.functor)
except ValueError:
return [] # fail silently
# raise GroundingError('Atom does not represent a number: \'%s\'' % atom)
if round(v) == v:
v = Constant(int(v))
else:
v = Constant(v)
return [(atom, v)]
else:
if atom == str(number):
return [(atom, number)]
else:
return []
# noinspection PyUnusedLocal
def _builtin_debugprint(*args, **kwd):
print(" ".join(map(term2str, args)), file=sys.stderr)
return True
def term2str_noquote(term):
res = term2str(term)
if res[0] == res[-1] == "'":
res = res[1:-1]
return res
def _builtin_write(*args, **kwd):
print(" ".join(map(term2str_noquote, args)), end="")
return True
def _builtin_error(*args, **kwd):
location = kwd.get("call_origin", (None, None))[1]
database = kwd["database"]
location = database.lineno(location)
message = "".join(map(term2str_noquote, args))
raise UserError(message, location=location)
def _builtin_writenl(*args, **kwd):
print(" ".join(map(term2str_noquote, args)))
return True
def _builtin_nl(**kwd):
print()
return True
[docs]class CallModeError(GroundingError):
"""
Represents an error in builtin argument types.
"""
def __init__(self, functor, args, accepted=None, message=None, location=None):
if accepted is None:
accepted = []
if functor:
self.scope = "%s/%s" % (functor, len(args))
else:
self.scope = None
self.received = ", ".join(map(self._show_arg, args))
self.expected = [", ".join(map(self._show_mode, mode)) for mode in accepted]
msg = "Invalid argument types for call"
if self.scope:
msg += " to '%s'" % self.scope
msg += ": arguments: (%s)" % self.received
if accepted:
msg += ", expected: (%s)" % ") or (".join(self.expected)
else:
msg += ", expected: " + message
GroundingError.__init__(self, msg, location)
def _show_arg(self, x):
return term2str(x)
def _show_mode(self, t):
return mode_types[t][0]
[docs]class StructSort(object):
"""
Comparator of terms based on structure.
"""
# noinspection PyUnusedLocal
def __init__(self, obj, *args):
self.obj = obj
def __lt__(self, other):
return struct_cmp(self.obj, other.obj) < 0
def __gt__(self, other):
return struct_cmp(self.obj, other.obj) > 0
def __eq__(self, other):
return struct_cmp(self.obj, other.obj) == 0
def __le__(self, other):
return struct_cmp(self.obj, other.obj) <= 0
def __ge__(self, other):
return struct_cmp(self.obj, other.obj) >= 0
def __ne__(self, other):
return struct_cmp(self.obj, other.obj) != 0
def _is_var(term):
return is_variable(term) or term.is_var() or isinstance(term, Var)
def _is_nonvar(term):
return not _is_var(term)
def _is_term(term):
return not _is_var(term) and not _is_constant(term)
def _is_float_pos(term):
return _is_constant(term) and term.is_float()
def _is_float_neg(term):
return (
_is_term(term)
and term.arity == 1
and term.functor == "'-'"
and _is_float_pos(term.args[0])
)
def _is_float(term):
return _is_float_pos(term) or _is_float_neg(term)
def _is_integer_pos(term):
return _is_constant(term) and term.is_integer()
def _is_integer_neg(term):
return (
_is_term(term)
and term.arity == 1
and term.functor == "'-'"
and _is_integer_pos(term.args[0])
)
def _is_integer(term):
return _is_integer_pos(term) or _is_integer_neg(term)
def _is_string(term):
return _is_constant(term) and term.is_string()
def _is_number(term):
return _is_float(term) or _is_integer(term)
def _is_constant(term):
return not _is_var(term) and term.is_constant()
def _is_atom(term):
return _is_term(term) and term.arity == 0
def _is_atomic(term):
return _is_nonvar(term) and not _is_compound(term)
# noinspection PyUnusedLocal
def _is_rational(term):
return False
# noinspection PyUnusedLocal
def _is_dbref(term):
return False
def _is_compound(term):
return _is_term(term) and term.arity > 0
def _is_list_maybe(term):
"""
Check whether the term looks like a list (i.e. of the form '.'(_,_)).
:param term:
:return:
"""
return _is_compound(term) and term.functor == "." and term.arity == 2
def _is_list_nonempty(term):
if _is_list_maybe(term):
tail = list_tail(term)
return _is_list_empty(tail) or _is_var(tail)
return False
def _is_fixed_list(term):
return _is_list_empty(term) or _is_fixed_list_nonempty(term)
def _is_fixed_list_nonempty(term):
if _is_list_maybe(term):
tail = list_tail(term)
return _is_list_empty(tail)
return False
def _is_list_empty(term):
return _is_atom(term) and term.functor == "[]"
def _is_list(term):
return _is_list_empty(term) or _is_list_nonempty(term)
def _is_compare(term):
return _is_atom(term) and term.functor in ("'<'", "'='", "'>'")
def _is_object(term):
return isinstance(term, Object)
mode_types = {
"i": ("integer", _is_integer),
"I": ("positive_integer", _is_integer_pos),
"f": ("float", _is_float),
"v": ("var", _is_var),
"n": ("nonvar", _is_nonvar),
"l": ("list", _is_list),
"L": ("fixed_list", _is_fixed_list), # List of fixed length (i.e. tail is [])
"*": ("any", lambda x: True),
"<": ("compare", _is_compare), # < = >
"g": ("ground", is_ground),
"a": ("atom", _is_atom),
"c": ("callable", _is_term),
"o": ("object", _is_object),
}
# noinspection PyUnusedLocal
[docs]def check_mode(args, accepted, functor=None, location=None, database=None, **kwdargs):
"""Checks the arguments against a list of accepted types.
:param args: arguments to check
:type args: tuple of Term
:param accepted: list of accepted combination of types (see mode_types)
:type accepted: list of str
:param functor: functor of the call (used for error message)
:param location: location of the call (used for error message)
:param database: database (used for error message)
:param kwdargs: additional arguments (not used)
:return: the index of the first mode in accepted that matches the arguments
:rtype: int
"""
for i, mode in enumerate(accepted):
correct = True
for a, t in zip(args, mode):
name, test = mode_types[t]
if not test(a):
correct = False
break
if correct:
return i
if database and location:
location = database.lineno(location)
else:
location = None
raise CallModeError(functor, args, accepted, location=location)
[docs]def list_elements(term):
"""Extract elements from a List term.
Ignores the list tail.
:param term: term representing a list
:type term: Term
:return: elements of the list
:rtype: list of Term
"""
elements = []
tail = term
while _is_list_maybe(tail):
elements.append(tail.args[0])
tail = tail.args[1]
return elements, tail
[docs]def list_tail(term):
"""Extract the tail of the list.
:param term: Term representing a list
:type term: Term
:return: tail of the list
:rtype: Term
"""
tail = term
while _is_list_maybe(tail):
tail = tail.args[1]
return tail
def _builtin_split_call(term, parts, database=None, location=None, **kwdargs):
"""Implements the '=..' builtin operator.
:param term:
:param parts:
:param database:
:param location:
:param kwdargs:
:return:
"""
functor = "=.."
# modes:
# <v> =.. list => list has to be fixed length and non-empty
# IF its length > 1 then first element should be an atom
# <n> =.. <list or var>
#
mode = check_mode((term, parts), ["vL", "nv", "nl"], functor=functor, **kwdargs)
if mode == 0:
elements, tail = list_elements(parts)
if len(elements) == 0:
raise CallModeError(
functor,
(term, parts),
message="non-empty list for arg #2 if arg #1 is a variable",
location=database.lineno(location),
)
elif len(elements) > 1 and not _is_atom(elements[0]):
raise CallModeError(
functor,
(term, parts),
message="atom as first element in list if arg #1 is a variable",
location=database.lineno(location),
)
elif len(elements) == 1:
# Special case => term == parts[0]
return [(elements[0], parts)]
else:
term_part = elements[0](*elements[1:])
return [(term_part, parts)]
else:
part_list = (term.with_args(),) + term.args
current = Term("[]")
for t in reversed(part_list):
current = Term(".", t, current)
try:
local_values = {}
list_part = unify_value(current, parts, local_values)
elements, tail = list_elements(list_part)
term_new = elements[0](*elements[1:])
term_part = unify_value(term, term_new, local_values)
return [(term_part, list_part)]
except UnifyError:
return []
def _builtin_arg(index, term, arguments, **kwdargs):
check_mode((index, term, arguments), ["In*"], functor="arg", **kwdargs)
index_v = int(index) - 1
if 0 <= index_v < len(term.args):
try:
arg = term.args[index_v]
res = unify_value(arg, arguments, {})
return [(index, term, res)]
except UnifyError:
pass
return []
def _builtin_functor(term, functor, arity, **kwdargs):
mode = check_mode(
(term, functor, arity), ["vaI", "n**"], functor="functor", **kwdargs
)
if mode == 0:
return [(Term(functor, *((None,) * int(arity))), functor, arity)]
else:
try:
values = {}
func_out = unify_value(functor, Term(term.functor), values)
arity_out = unify_value(arity, Constant(term.arity), values)
return [(term, func_out, arity_out)]
except UnifyError:
pass
return []
# noinspection PyUnusedLocal
def _builtin_true(**kwdargs):
"""``true``"""
return True
# noinspection PyUnusedLocal
def _builtin_fail(**kwdargs):
"""``fail``"""
return False
# noinspection PyUnusedLocal
def _builtin_eq(arg1, arg2, **kwdargs):
"""``A = B``
A and B not both variables
"""
try:
result = unify_value(arg1, arg2, {})
return [(result, result)]
except UnifyError:
return []
# except VariableUnification:
# raise VariableUnification(location = database.lineno(location))
# noinspection PyUnusedLocal
def _builtin_neq(arg1, arg2, **kwdargs):
"""``A \\= B``
A and B not both variables
"""
try:
unify_value(arg1, arg2, {})
return False
except UnifyError:
return True
# noinspection PyUnusedLocal
def _builtin_notsame(arg1, arg2, **kwdargs):
"""``A \\== B``"""
return not arg1 == arg2
# noinspection PyUnusedLocal
def _builtin_same(arg1, arg2, **kwdargs):
"""``A == B``"""
return arg1 == arg2
def _builtin_gt(arg1, arg2, engine=None, **kwdargs):
"""``A > B``
A and B are ground
"""
check_mode((arg1, arg2), ["gg"], functor=">", **kwdargs)
a_value = arg1.compute_value(engine.functions)
b_value = arg2.compute_value(engine.functions)
if a_value is None or b_value is None:
return False
else:
return a_value > b_value
def _builtin_lt(arg1, arg2, engine=None, **kwdargs):
"""``A > B``
A and B are ground
"""
check_mode((arg1, arg2), ["gg"], functor="<", **kwdargs)
a_value = arg1.compute_value(engine.functions)
b_value = arg2.compute_value(engine.functions)
if a_value is None or b_value is None:
return False
else:
return a_value < b_value
def _builtin_le(arg1, arg2, engine=None, **k):
"""``A =< B``
A and B are ground
"""
check_mode((arg1, arg2), ["gg"], functor="=<", **k)
a_value = arg1.compute_value(engine.functions)
b_value = arg2.compute_value(engine.functions)
if a_value is None or b_value is None:
return False
else:
return a_value <= b_value
def _builtin_ge(arg1, arg2, engine=None, **k):
"""``A >= B``
A and B are ground
"""
check_mode((arg1, arg2), ["gg"], functor=">=", **k)
a_value = arg1.compute_value(engine.functions)
b_value = arg2.compute_value(engine.functions)
if a_value is None or b_value is None:
return False
else:
return a_value >= b_value
def _builtin_val_neq(a, b, engine=None, **k):
"""``A =\\= B``
A and B are ground
"""
check_mode((a, b), ["gg"], functor="=\\=", **k)
a_value = a.compute_value(engine.functions)
b_value = b.compute_value(engine.functions)
if a_value is None or b_value is None:
return False
else:
return a_value != b_value
def _builtin_val_eq(a, b, engine=None, **k):
"""``A =:= B``
A and B are ground
"""
check_mode((a, b), ["gg"], functor="=:=", **k)
a_value = a.compute_value(engine.functions)
b_value = b.compute_value(engine.functions)
if a_value is None or b_value is None:
return False
else:
return a_value == b_value
def _builtin_is(a, b, engine=None, **k):
"""``A is B``
B is ground
@param a:
@param b:
@param engine:
@param k:
"""
check_mode((a, b), ["*g"], functor="is", **k)
try:
b_value = b.compute_value(engine.functions)
if b_value is None:
return []
else:
r = Constant(b_value)
unify_value(a, r, {})
return [(r, b)]
except UnifyError:
return []
# noinspection PyUnusedLocal
def _builtin_var(term, **k):
return _is_var(term)
# noinspection PyUnusedLocal
def _builtin_atom(term, **k):
return _is_atom(term)
# noinspection PyUnusedLocal
def _builtin_atomic(term, **k):
return _is_atom(term) or _is_number(term)
# noinspection PyUnusedLocal
def _builtin_compound(term, **k):
return _is_compound(term)
# noinspection PyUnusedLocal
def _builtin_float(term, **k):
return _is_float(term)
# noinspection PyUnusedLocal
def _builtin_integer(term, **k):
return _is_integer(term)
# noinspection PyUnusedLocal
def _builtin_nonvar(term, **k):
return not _is_var(term)
# noinspection PyUnusedLocal
def _builtin_number(term, **k):
return _is_number(term)
# noinspection PyUnusedLocal
def _builtin_simple(term, **k):
return _is_var(term) or _is_atomic(term)
# noinspection PyUnusedLocal
def _builtin_callable(term, **k):
return _is_term(term)
# noinspection PyUnusedLocal
def _builtin_rational(term, **k):
return _is_rational(term)
# noinspection PyUnusedLocal
def _builtin_dbreference(term, **k):
return _is_dbref(term)
# noinspection PyUnusedLocal
def _builtin_primitive(term, **k):
return _is_atomic(term) or _is_dbref(term)
# noinspection PyUnusedLocal
def _builtin_ground(term, **k):
return is_ground(term)
# noinspection PyUnusedLocal
def _builtin_is_list(term, **k):
return _is_list(term)
def compare(a, b):
if a < b:
return -1
elif a > b:
return 1
else:
return 0
def struct_cmp(a, b):
# Note: structural comparison
# 1) Var < Num < Str < Atom < Compound
# 2) Var by address
# 3) Number by value, if == between int and float => float is smaller
# (iso prolog: Float always < Integer )
# 4) String alphabetical
# 5) Atoms alphabetical
# 6) Compound: arity / functor / arguments
# 1) Variables are smallest
if _is_var(a):
if _is_var(b):
# 2) Variable by address or name
if isinstance(a, Term):
a = a.functor
if isinstance(b, Term):
b = b.functor
return compare(a, b)
else:
return -1
elif _is_var(b):
return 1
# assert( not is_var(A) and not is_var(B) )
# 2) Numbers are second smallest
if _is_number(a):
if _is_number(b):
# Just compare numbers on float value
res = compare(float(a), float(b))
if res == 0:
# If the same, float is smaller.
if _is_float(a) and _is_integer(b):
return -1
elif _is_float(b) and _is_integer(a):
return 1
else:
return 0
else:
return -1
elif _is_number(b):
return 1
# 3) Strings are third
if _is_string(a):
if _is_string(b):
return compare(str(a), str(b))
else:
return -1
elif _is_string(b):
return 1
# 4) Atoms / terms come next
# 4.1) By arity
res = compare(a.arity, b.arity)
if res != 0:
return res
# 4.2) By functor
fa = str(a.functor)
fb = str(b.functor)
res = compare(fa, fb)
if res != 0:
return res
# 4.3) By arguments (recursively)
for a1, b1 in zip(a.args, b.args):
res = struct_cmp(a1, b1)
if res != 0:
return res
return 0
# noinspection PyUnusedLocal
def _builtin_struct_lt(a, b, **k):
return struct_cmp(a, b) < 0
# noinspection PyUnusedLocal
def _builtin_struct_le(a, b, **k):
return struct_cmp(a, b) <= 0
# noinspection PyUnusedLocal
def _builtin_struct_gt(a, b, **k):
return struct_cmp(a, b) > 0
# noinspection PyUnusedLocal
def _builtin_struct_ge(a, b, **k):
return struct_cmp(a, b) >= 0
def _builtin_compare(c, a, b, **k):
mode = check_mode((c, a, b), ["<**", "v**"], functor="compare", **k)
compares = "'>'", "'='", "'<'"
cp = struct_cmp(a, b)
c_token = compares[1 - cp]
if mode == 0: # Given compare
if c_token == c.functor:
return [(c, a, b)]
else: # Unknown compare
return [(Term(c_token), a, b)]
# numbervars(T,+N1,-Nn) number the variables TBD?
def build_list(elements, tail):
current = tail
for el in reversed(elements):
current = Term(".", el, current)
return current
def _builtin_numbervars_0(term, output, **k):
res = _builtin_numbervars(term, Constant(0), output)[0]
return [(res[0], res[2])]
def _builtin_numbervars(term, start, output, **k):
mode = check_mode((term, start, output), ["*i*"], functor="numbervars", **k)
class NumberVars(object):
def __init__(self, start):
self._n = start
self._table = {}
def __getitem__(self, item):
if item in self._table:
return self._table[item]
else:
r = Term("$Var", Constant(self._n))
self._table[item] = r
self._n += 1
return r
out = unify_value(term.apply(NumberVars(int(start))), output, {})
return [(term, start, out)]
def _builtin_varnumbers(term, output, engine=None, context=None, **k):
check_mode((term, output), ["cv", "cc"], functor="varnumbers", **k)
start = engine.context_min_var(context)
class VarNumbers(object):
def __init__(self, start):
self._n = start
self._table = {}
def __contains__(self, item):
return isinstance(item, Term) and item.functor == "$Var"
def __getitem__(self, item):
assert isinstance(item, Term) and item.functor == "$Var"
item = int(item.args[0])
if item in self._table:
return self._table[item]
else:
self._n -= 1
self._table[item] = self._n
return self._n
xx = term.apply_term(VarNumbers(start))
out = unify_value(output, xx, {})
return [(term, out)]
# class UnknownExternal(GroundingError):
# """Undefined clause in call."""
#
# def __init__(self, signature, location):
# GroundingError.__init__(self, "Unknown external function '%s'" % signature, location)
# def _builtin_call_external(call, result, database=None, location=None, **k):
# from . import pypl
# check_mode((call, result), ['gv'], function='call_external', database=database,
# location=location, **k)
#
# func = k['engine'].get_external_call(call.functor)
# if func is None:
# raise UnknownExternal(call.functor, database.lineno(location))
#
# values = [pypl.pl2py(arg) for arg in call.args]
# computed_result = func(*values)
#
# return [(call, pypl.py2pl(computed_result))]
def _builtin_length(l, n, **k):
mode = check_mode((l, n), ["LI", "Lv", "lI", "vI"], functor="length", **k)
# Note that Prolog also accepts 'vv' and 'lv', but these are unbounded.
# Note that lI is a subset of LI, but only first matching mode is returned.
if mode == 0 or mode == 1: # Given fixed list and maybe length
elements, tail = list_elements(l)
list_size = len(elements)
try:
n = unify_value(n, Constant(list_size), {})
return [(l, n)]
except UnifyError:
return []
else: # Unbounded list or variable list and fixed length.
if mode == 2:
elements, tail = list_elements(l)
else:
elements, tail = [], l
remain = int(n) - len(elements)
if remain < 0:
raise UnifyError()
else:
min_var = k.get("engine").context_min_var(k.get("context"))
extra = list(range(min_var, min_var - remain, -1)) # [None] * remain
new_l = build_list(elements + extra, Term("[]"))
return [(new_l, n)]
def _builtin_sort(l, s, **k):
# TODO doesn't work properly with variables e.g. gives sort([X,Y,Y],[_])
# should be sort([X,Y,Y],[X,Y])
check_mode((l, s), ["L*"], functor="sort", **k)
elements, tail = list_elements(l)
# assert( is_list_empty(tail) )
try:
sorted_list = build_list(sorted(set(elements), key=StructSort), Term("[]"))
s_out = unify_value(s, sorted_list, {})
return [(l, s_out)]
except UnifyError:
return []
def _builtin_between(low, high, value, **k):
"""
Implements the between/3 builtin.
:param low:
:param high:
:param value:
:param k:
:return:
"""
mode = check_mode((low, high, value), ["iii", "iiv"], functor="between", **k)
low_v = int(low)
high_v = int(high)
if mode == 0: # Check
value_v = int(value)
if low_v <= value_v <= high_v:
return [(low, high, value)]
else: # Enumerate
results = []
for value_v in range(low_v, high_v + 1):
results.append((low, high, Constant(value_v)))
return results
def _builtin_succ(a, b, **kwdargs):
"""
Implements the succ/2 builtin.
:param a: input argument
:param b: output argument
:param kwdargs: additional arguments
:return:
"""
mode = check_mode((a, b), ["vI", "Iv", "II"], functor="succ", **kwdargs)
if mode == 0:
b_v = int(b)
return [(Constant(b_v - 1), b)]
elif mode == 1:
a_v = int(a)
return [(a, Constant(a_v + 1))]
else:
a_v = int(a)
b_v = int(b)
if b_v == a_v + 1:
return [(a, b)]
return []
def _builtin_plus(a, b, c, **kwdargs):
"""
Implements the plus/3 builtin.
:param a: first argument
:param b: second argument
:param c: result argument
:param kwdargs: additional arguments
:return:
"""
mode = check_mode(
(a, b, c), ["iii", "iiv", "ivi", "vii"], functor="plus", **kwdargs
)
if mode == 0:
a_v = int(a)
b_v = int(b)
c_v = int(c)
if a_v + b_v == c_v:
return [(a, b, c)]
elif mode == 1:
a_v = int(a)
b_v = int(b)
return [(a, b, Constant(a_v + b_v))]
elif mode == 2:
a_v = int(a)
c_v = int(c)
return [(a, Constant(c_v - a_v), c)]
else:
b_v = int(b)
c_v = int(c)
return [(Constant(c_v - b_v), b, c)]
return []
def _atom_to_filename(atom):
"""Translate an atom to a filename.
:param atom: filename as atom
:type atom: Term
:return: filename as string
:rtype: str
"""
atomstr = str(atom)
if atomstr[0] == atomstr[-1] == "'":
atomstr = atomstr[1:-1]
return atomstr
def _builtin_consult_as_list(op1, op2, **kwdargs):
"""Implementation of consult/1 using list notation.
:param op1: first element in the list
:param op2: tail of the list
:param kwdargs: additional arugments
:return: True
"""
# TODO make non-recursive
check_mode((op1, op2), ["*L"], functor="consult", **kwdargs)
_builtin_consult(op1, **kwdargs)
if _is_list_nonempty(op2):
_builtin_consult_as_list(op2.args[0], op2.args[1], **kwdargs)
return True
def _builtin_consult(filename, database=None, engine=None, **kwdargs):
"""
Implementation of consult/1 builtin.
A file will be loaded only once.
:param filename: filename to load into the database
:type filename: Term
:param database: database containing the current logic program.
:param kwdargs: additional arguments
:return: True
"""
check_mode((filename,), ["a"], functor="consult", **kwdargs)
database.consult(filename, location=kwdargs.get("location"))
return True
# noinspection PyUnusedLocal
def _builtin_unknown(arg, engine=None, **kwdargs):
check_mode((arg,), ["a"], functor="unknown")
if arg.functor == "fail":
engine.unknown = engine.UNKNOWN_FAIL
else:
engine.unknown = engine.UNKNOWN_ERROR
return True
def _select_sublist(lst, target):
"""
Enumerate all possible selection of elements from a list.
This function is used to generate all solutions to findall/3.
An element must be selected if it is TRUE in the target formula.
:param lst: list to select elements from
:type lst: list of tuple
:param target: data structure containing truth value of nodes
:type target: LogicFormula
:return: generator of sublists
"""
ln = len(lst)
# Generate an array that indicates the decision bit for each element in the list.
# If an element is deterministically true, then no decision bit is needed.
choice_bits = [None] * ln
x = 0
for i in range(0, ln):
if lst[i][1] not in (target.TRUE, target.FALSE):
choice_bits[i] = x
x += 1
# We have 2^x distinct lists. Each can be represented as a number between 0 and 2^x-1=n.
n = (1 << x) - 1
while n >= 0:
# Generate the list of positive values and node identifiers
# noinspection PyTypeChecker
sublist = [
lst[i]
for i in range(0, ln)
if (choice_bits[i] is None and lst[i][1] == target.TRUE)
or (choice_bits[i] is not None and n & 1 << choice_bits[i])
]
# Generate the list of negative node identifiers
# noinspection PyTypeChecker
sublist_no = tuple(
[
target.negate(lst[i][1])
for i in range(0, ln)
if (choice_bits[i] is None and lst[i][1] == target.FALSE)
or (choice_bits[i] is not None and not n & 1 << choice_bits[i])
]
)
if sublist:
terms, nodes = zip(*sublist)
else:
# Empty list.
terms, nodes = (), ()
yield terms, nodes + sublist_no + (0,)
n -= 1
def _builtin_all_or_none(pattern, goal, result, **kwargs):
return _builtin_all(pattern, goal, result, allow_none=True, **kwargs)
def _builtin_all(
pattern,
goal,
result,
allow_none=False,
database=None,
target=None,
engine=None,
context=None,
**kwdargs
):
"""
Implementation of all/3 builtin.
:param pattern: pattern to extract
:type pattern: Term
:param goal: goal to evaluate
:type goal: Term
:param result: list to store results
:type result: Term
:param database: database holding logic program
:type database: ClauseDB
:param target: logic formula in which to store the result
:type target: LogicFormula
:param engine: engine that is used for evaluation
:type engine: ClauseDBEngine
:param kwdargs: additional arguments from engine
:return: list results (tuple of lists and node identifiers)
"""
# Check the modes.
mode = check_mode(
(pattern, goal, result), ["*cv", "*cl"], database=database, **kwdargs
)
findall_head = Term(engine.get_non_cache_functor(), pattern, *goal.variables())
findall_clause = Clause(findall_head, goal)
findall_db = database.extend()
findall_db += findall_clause
class _TranslateToNone(object):
# noinspection PyUnusedLocal
def __getitem__(self, item):
return None
findall_head = substitute_simple(findall_head, _TranslateToNone())
results = engine.call(
findall_head, subcall=True, database=findall_db, target=target, **kwdargs
)
results = [(res[0], n) for res, n in results]
output = []
for l, n in _select_sublist(results, target):
if not l and not allow_none:
continue
node = target.add_and(n)
if node is not None:
res = build_list(l, Term("[]"))
if mode == 0: # var
args = (pattern, goal, res)
output.append((args, node))
else:
try:
res = unify_value(res, result, {})
args = (pattern, goal, res)
output.append((args, node))
except UnifyError:
pass
return output
def _builtin_findall_base(
pattern,
goal,
result,
database=None,
target=None,
engine=None,
context=None,
**kwdargs
):
"""
Implementation of findall/3 builtin.
:param pattern: pattern to extract
:type pattern: Term
:param goal: goal to evaluate
:type goal: Term
:param result: list to store results
:type result: Term
:param database: database holding logic program
:type database: ClauseDB
:param target: logic formula in which to store the result
:type target: LogicFormula
:param engine: engine that is used for evaluation
:type engine: ClauseDBEngine
:param kwdargs: additional arguments from engine
:return: list results (tuple of lists and node identifiers)
"""
# Check the modes.
mode = check_mode(
(pattern, goal, result), ["*cv", "*cl"], database=database, **kwdargs
)
findall_head = Term(engine.get_non_cache_functor(), pattern, *goal.variables())
findall_clause = Clause(findall_head, goal)
findall_db = database.extend()
findall_db += findall_clause
class _TranslateToNone(object):
# noinspection PyUnusedLocal
def __getitem__(self, item):
return None
findall_head = substitute_simple(findall_head, _TranslateToNone())
findall_target = target.__class__(
keep_order=True, keep_all=True, keep_duplicates=True
)
try:
results = engine.call(
findall_head,
subcall=True,
database=findall_db,
target=findall_target,
**kwdargs
)
except RuntimeError:
raise IndirectCallCycleError(
database.lineno(kwdargs.get("call_origin", (None, None))[1])
)
new_results = []
keep_all_restore = target.keep_all
target.keep_all = False
for res, n in results:
for mx, b in findall_target.enumerate_branches(n):
b = list(b)
b_renamed = [findall_target.copy_node(target, c) for c in b]
if b_renamed:
proof_node = target.add_and(b_renamed)
else:
proof_node = target.FALSE
# TODO order detection mechanism is too fragile?
if b:
new_results.append((mx, res[0], proof_node))
else:
new_results.append((mx, res[0], proof_node))
target.keep_all = keep_all_restore
new_results = [(b, c) for a, b, c in sorted(new_results, key=lambda s: s[0])]
output = []
for l, n in _select_sublist(new_results, target):
node = target.add_and(n)
if node is not None:
res = build_list(l, Term("[]"))
if mode == 0: # var
args = (pattern, goal, res)
output.append((args, node))
else:
try:
res = unify_value(res, result, {})
args = (pattern, goal, res)
output.append((args, node))
except UnifyError:
pass
return output
def _builtin_possible(goal, engine=None, **kwdargs):
"""Returns all grounding of goal that are possibly true.
This inference ignores weight values (so 0.0::a is still possibly true).
:param goal: goal for which to compute groundings
:type goal: Term
:param engine:
:param kwdargs:
:return:
"""
try:
results = engine.call(goal, subcall=True, **kwdargs)
output = []
for g, _ in results:
output.append((unify_value(goal, goal(*g), {}),))
return output
except UnifyError:
return []
except RuntimeError:
raise IndirectCallCycleError(
kwdargs["database"].lineno(kwdargs.get("call_origin", (None, None))[1])
)
# noinspection PyUnusedLocal
def _builtin_sample_all(pattern, goal, result, database=None, target=None, **kwdargs):
# Like findall.
pass
def _builtin_sample_uniform(key, lst, result, database=None, target=None, **kwdargs):
"""Implements the sample_uniform(+Key,+List,-Result) builtin.
This predicate succeeds once for each element in the list as result, and with probability \
1/(length of the list).
The first argument is used as an identifier such that calls with the same key enforce mutual \
exclusivity on the results, that is, the probability of
sample_uniform(K,L,R1), sample_uniform(K,L,R2), R1 \\== R2
is 0.
:param key:
:param lst:
:param result:
:param database:
:type database: StackBasedEngine
:param target:
:type target: LogicFormula
:param kwdargs:
:return:
"""
mode = check_mode((key, lst, result), ["gLv", "gLn"], database=database, **kwdargs)
identifier = "_uniform_%s" % key
elements, tail = list_elements(lst)
if len(elements) == 0:
return []
else:
prob = Constant(1 / float(len(elements)))
results = []
if mode == 0:
for i, elem in enumerate(elements):
elem_identifier = (identifier, i)
# res = unify_value(result, elem)
results.append(
(
(key, lst, elem),
target.add_atom(
identifier=elem_identifier,
probability=prob,
group=identifier,
),
)
)
else:
res = None
for el in elements:
try:
res = unify_value(el, result, {})
break
except UnifyError:
pass
if res is not None:
results.append(
(
(key, lst, res),
target.add_atom(identifier=identifier, probability=prob),
)
)
return results
def _builtin_findall(pattern, goal, result, **kwdargs):
return _builtin_findall_base(pattern, goal, result, **kwdargs)
# noinspection PyUnusedLocal
def _builtin_module(name, predicates, **kwargs):
return True
# noinspection PyUnusedLocal
def _builtin_use_module2(filename, predicates, database=None, location=None, **kwdargs):
database.use_module(filename, predicates, location=location)
return True
def _builtin_use_module(filename, database=None, location=None, **kwdargs):
database.use_module(filename, None, location=location)
return True
@builtin_boolean("_use_module", 3)
def _builtin_use_module2_scope(
scope, filename, predicates, database=None, location=None, **kwdargs
):
scope = str(scope)
if scope == "None":
scope = None
database.use_module(filename, predicates, my_scope=scope, location=location)
return True
@builtin_boolean("_use_module", 2)
def _use_module(scope, filename, database=None, location=None, **kwdargs):
scope = str(scope)
if scope == "None":
scope = None
database.use_module(filename, None, my_scope=scope, location=location)
return True
@builtin_boolean("_consult", 2)
def _consult(scope, filename, database=None, engine=None, **kwdargs):
check_mode((filename,), ["a"], functor="consult", **kwdargs)
scope = str(scope)
if scope == "None":
scope = None
database.consult(filename, location=kwdargs.get("location"), my_scope=scope)
return True
@builtin_boolean("dbg_printdb", 0)
def _dbg_printdb(database=None, **kwargs):
print(database, file=sys.stderr)
return True
def _builtin_try_call(term, **kwdargs):
try:
return _builtin_call(term, **kwdargs)
except UnknownClause:
return True, kwdargs["callback"].notifyComplete()
except:
return True, kwdargs["callback"].notifyComplete()
def _builtin_call(
term, args=(), engine=None, callback=None, transform=None, context=None, **kwdargs
):
check_mode((term,), ["c"], functor="call")
# Find the define node for the given query term.
term_call = term.with_args(*(term.args + args))
from .engine_stack import get_state
try:
if transform is None:
from .engine_stack import Transformations
transform = Transformations()
def _trans(result):
n = len(term.args)
res1 = result[:n]
res2 = result[n:]
return engine.create_context(
[term.with_args(*res1)] + list(res2), state=get_state(result)
)
transform.addFunction(_trans)
actions = engine.call_intern(
term_call, transform=transform, parent_context=context, **kwdargs
)
except UnknownClauseInternal:
raise UnknownClause(
term_call.signature, kwdargs["database"].lineno(kwdargs["location"])
)
return True, actions
def _builtin_call_nc(*args, **kwdargs):
return _builtin_call(*args, dont_cache=True, **kwdargs)
def _builtin_subquery(term, prob, evidence=None, semiring=None, evaluator=None,
engine: ClauseDBEngine = None, database: ClauseDB = None, **kwdargs):
if evaluator:
check_mode((term, prob, evidence, semiring, evaluator), ["cvLgg"], functor="subquery")
elif evidence:
check_mode((term, prob, evidence), ["cvL"], functor="subquery")
else:
check_mode((term, prob), ["cv"], functor="subquery")
eng = engine.__class__()
target = eng.ground(database, term, label="query")
if evidence:
for ev in term2list(evidence):
target = eng.ground(
database, ev, target=target, label=target.LABEL_EVIDENCE_POS
)
kc, semiring = _create_evaluator_and_semiring(semiring=semiring, evaluator=evaluator,
database=database, engine=engine,
**kwdargs)
results = kc.create_from(target).evaluate(semiring=semiring)
if evaluator:
return [(t, Constant(p), evidence, evaluator, semiring) for t, p in results.items()]
if evidence:
return [(t, Constant(p), evidence) for t, p in results.items()]
else:
return [(t, Constant(p)) for t, p in results.items()]
def _create_evaluator_and_semiring(*, semiring: typing.Optional[Term], evaluator: typing.Optional[Term],
database: ClauseDB, engine: ClauseDBEngine, **kwargs):
"""Helper to create evaluator and semiring.
:param semiring: Semiring name to use.
:param evaluator: Evaluator name to use.
:param database: Database with program.
:param engine: Engine executing the program.
:param kwargs: Keyword arguments sent to the builtin calling this.
"""
from . import get_evaluatable, get_semiring
evaluator_name = None
semiring_name = None
if evaluator:
if not isinstance(evaluator, Constant) or not evaluator.is_string():
raise GroundingError("subquery: evaluator must be a string")
evaluator_name = evaluator.functor.strip('"')
if semiring:
if not isinstance(semiring, Constant) or not semiring.is_string():
raise GroundingError("subquery: semiring must be a string")
semiring_name = semiring.functor.strip('"')
semiring = get_semiring(name=semiring_name).create(engine=engine, database=database, **kwargs)
kc = get_evaluatable(
name=evaluator_name,
semiring=semiring
)
return kc, semiring
def _builtin_calln(term, *args, **kwdargs):
return _builtin_call(term, args, **kwdargs)
def _builtin_try_calln(term, *args, **kwdargs):
return _builtin_try_call(term, args, **kwdargs)
def _builtin_calln_nc(term, *args, **kwdargs):
return _builtin_call(term, args, dont_cache=True, **kwdargs)
def _builtin_subsumes_term(generic, specific, **kwargs):
check_mode((generic, specific), ["**"], functor="subsumes_term")
from .engine_unify import subsumes
return subsumes(generic, specific)
[docs]class IndirectCallCycleError(GroundingError):
"""Cycle should not pass through indirect calls (e.g. call/1, findall/3)."""
def __init__(self, location=None):
GroundingError.__init__(
self, "Indirect cycle detected (passing through findall/3)", location
)
def _build_scope(term):
if term.functor == "'&'":
a = _build_scope(term.args[0])
b = _build_scope(term.args[1])
print(a, b, a & b)
return a & b
elif term.functor == "'|'":
a = _build_scope(term.args[0])
b = _build_scope(term.args[1])
return a | b
elif term.functor == "'-'":
a = _build_scope(term.args[0])
b = _build_scope(term.args[1])
return a - b
elif _is_list(term):
return frozenset(term2list(term))
elif isinstance(term, Object):
if isinstance(term.functor, frozenset) or isinstance(term.functor, set):
return term.functor
else:
raise GroundingError("Unknown object type in set operation")
else:
raise GroundingError("Unknown set construction")
def _builtin_create_scope(term, scope, **kwargs):
mode = check_mode((term, scope), ["Lv", "gv"], **kwargs)
if mode in (0, 1):
result = Object(_build_scope(term))
else:
raise NotImplemented
return [(term, result)]
def _builtin_subquery_in_scope(
scope, term, prob, evidence=None, semiring=None, evaluator=None, engine=None, database=None, **kwdargs
):
if evaluator:
check_mode((scope, term, prob, evidence, semiring, evaluator), ["gcvLgg"], functor="subquery")
elif evidence:
check_mode((scope, term, prob, evidence), ["gcvL"], functor="subquery")
else:
check_mode((scope, term, prob), ["gcv"], functor="subquery")
scopel = _build_scope(scope)
eng = engine.__class__()
target = eng.ground(database, term, label="query", include=scopel)
if evidence:
for ev in term2list(evidence):
target = eng.ground(
database,
ev,
target=target,
label=target.LABEL_EVIDENCE_POS,
include=scopel,
)
kc, semiring = _create_evaluator_and_semiring(semiring=semiring, evaluator=evaluator,
database=database, engine=engine,
**kwdargs)
results = kc.create_from(target).evaluate(semiring=semiring)
if evaluator:
return [(scope, t, Constant(p), evidence, semiring, evaluator) for t, p in results.items()]
elif evidence:
return [(scope, t, Constant(p), evidence) for t, p in results.items()]
else:
return [(scope, t, Constant(p)) for t, p in results.items()]
def _builtin_call_in_scope(
scope,
term,
args=(),
engine=None,
callback=None,
transform=None,
context=None,
**kwdargs
):
check_mode((term,), ["c"], functor="call")
# Find the define node for the given query term.
term_call = term.with_args(*(term.args + args))
scopel = _build_scope(scope)
try:
if transform is None:
from .engine_stack import Transformations
transform = Transformations()
def _trans(result):
n = len(term.args)
res1 = result[:n]
res2 = result[n:]
return [scope, term.with_args(*res1)] + list(res2)
transform.addFunction(_trans)
actions = engine.call_intern(
term_call,
transform=transform,
dont_cache=True,
no_cache=True,
include=scopel,
parent_context=context,
**kwdargs
)
except UnknownClauseInternal:
raise UnknownClause(
term_call.signature, kwdargs["database"].lineno(kwdargs["location"])
)
return True, actions
def _builtin_calln_in_scope(scope, term, *args, **kwdargs):
return _builtin_call_in_scope(scope, term, args, **kwdargs)
def _builtin_find_scope(term, scope, engine=None, database=None, **kwargs):
check_mode((term, scope), ["cv"], functor="find_scope")
if term.functor == "*":
nodes = range(0, len(database))
else:
define = database.find(term)
if define is None:
nodes = 0
else:
define_node = database.get_node(define).children
nodes = define_node.find(term.args)
nodes = Object(frozenset(nodes))
return [(term, nodes)]
@builtin_simple("set_state", 1)
def _builtin_set_state(term, **kwargs):
from engine_stack import Context
return [Context([term], state=term)]
@builtin_simple("reset_state", 0)
def _builtin_reset_state(**kwargs):
from engine_stack import Context
return [Context()]
@builtin_boolean("check_state", 1)
def _builtin_check_state(term, context=None, **kwargs):
from engine_stack import get_state
if get_state(context) == term:
return True
else:
return False
@builtin_boolean("print_state", 0)
def _builtin_print_state(context=None, **kwargs):
if hasattr(context, "state") and context.state is not None:
print("State:", context.state)
else:
print("State not set")
return True
@builtin_boolean("probabilityX", 1)
def _builtin_probability(term, context=None, database=None, **kwargs):
check_mode([term], ["c"], functor="probability")
database.queries.append((term, context.state.get("conditions", ())))
print(database.queries, file=sys.stderr)
return True
@builtin_simple("condition", 1)
def _builtin_condition(term, context=None, database=None, **kwargs):
check_mode([term], ["c"], functor="condition")
from engine_stack import Context
return [Context([term], state=context.state | {"conditions": [term]})]
@builtin_simple("seq", 1)
def seq(term, database=None, **kwargs):
check_mode((term,), ["v"], functor="seq", database=None, **kwargs)
s = database.get_data("__seq__", 0)
s += 1
database.set_data("__seq__", s)
return [(Constant(s),)]
@builtin_boolean("trace", 0)
def trace(engine=None, **kwargs):
if engine.debugger:
engine.debugger.interactive = True
return True
@builtin_boolean("notrace", 0)
def notrace(engine=None, **kwargs):
if engine.debugger:
engine.debugger.interactive = False
return True