from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Optional
from typing import Tuple
from typing import Dict
from typing import List
from typing import Any
from typing import Self
from typing import Callable
from typing import Sequence
from typing import TypeVar
from itertools import chain
from ..core import Variator
import functools
import random
import typing
from inspect import signature
from typing import Generic
from ..core import Evaluator, Individual
T = TypeVar("T")
_EXPR_PARAM_PREFIX: str = "x"
def _get_arity(fun: Callable | Expression | Symbol | Any) -> int:
"""Return the arity of an object.
If the argument is callable, return the length of its signature.
Otherwise, return 0.
Does not work with built-in functions and other objects that do not
work with :meth:`.inspect.signature`.
Args:
fun: An object
"""
if (callable(fun)):
return len(signature(fun).parameters)
elif isinstance(fun, Expression):
# Specialised code for programs
return fun.arity
elif isinstance(fun, Symbol):
# Specialised code for programs
return 0
else:
return 0
[docs]
class Expression(Generic[T]):
"""A node in an expression tree.
Recursive data structure that implements program trees. An ``Expression``
is also a :class:`Callable` that only accepts arguments passed by position.
The attribute :attr:`.value`
"""
[docs]
def __init__(self: Self,
arity: int,
value: T | Callable[..., T] | Symbol,
children: List[Expression[T]],
factory: Optional[ExpressionFactory] = None):
#! Arity of the expression node.
self.arity: int = arity
#! Value of the expression node.
self.value: T | typing.Callable[..., T] | Symbol = value
#! Children of the expression node.
self.children = children
self._factory = factory
@property
def factory(self: Self) -> ExpressionFactory[T]:
"""The :class:`.ExpressionFactory`, if any, that built this object.
The :class:`.ExpressionFactory` maintains hyperparameters that
instruct the construction of this object.
"""
if self._factory is not None:
return self._factory
else:
raise ValueError("This expression is not associated with a factory.")
@factory.setter
def factory(self: Self, factory: ExpressionFactory[T]) -> None:
self._factory = factory
[docs]
def __call__(self: Self, *args: T) -> T:
"""Evaluate the expression tree with arguments.
Recursively evaluate expression nodes in :attr:`children`. Then,
apply :attr:`value` to the results, in the same order as the
:attr:`children` they are resolved from.
Args:
*args: Arguments to the program.
"""
self_arity: int = self.arity
params_arity: int = len(args)
if (self_arity != params_arity):
raise ValueError(f"The expression expects"
f"{self_arity} parameters, "
f"{params_arity} given.")
value_arity: int = _get_arity(self.value)
children_arity: int = len(self.children)
if (value_arity != children_arity):
raise ValueError(f"The node may be improperly configured. The value expects"
f"{value_arity} arguments, while "
f"{value_arity} children are given.")
# Evaluate children, pack results into a generator
results = (x(*args) for x in self.children)
if callable(self.value):
return self.value(*results)
if isinstance(self.value, Symbol):
return args[self.value.pos]
else:
return self.value
[docs]
def copy(self: Self) -> Self:
"""Return a deep copy.
Call the `python`:copy(self, ...): method on :attr:`value`,
each item in :attr:`children`, and :attr:`value` (if :attr:`value`
implements a method named ``copy``). Use the results to create
a new :class:`Expression`
"""
new_value: T | Callable[..., T] | Symbol
if (hasattr(self.value, "copy") and callable(getattr(self.value, 'copy'))):
new_value = getattr(self.value, 'copy')()
else:
new_value = self.value
new_children: List[Expression[T]] = [x.copy() for x in self.children]
return self.__class__(self.arity, new_value, new_children, self.factory)
[docs]
def nodes(self: Self) -> Tuple[Expression[T], ...]:
"""Return a flat list view of all nodes and subnodes.
Note that operations performed on items in the returned list affect
the original objects.
"""
return (self, *(chain.from_iterable((x.nodes() for x in self.children))))
def __str__(self: Self) -> str:
delimiter = ", "
my_name: str = self.value.__name__\
if callable(self.value) else str(self.value)
children_name: str
if len(self.children) < 1:
children_name = ""
else:
children_name = f"({functools.reduce(
lambda x, y: str(x) + delimiter + str(y),
[str(x) for x in self.children])})"
return (f"{my_name}{children_name}")
[docs]
class Symbol():
"""Dummy object used by :class:`.ExpressionFactory`.
See:
??? TODO
"""
__slots__ = ['pos']
[docs]
def __init__(self: Self, pos: int):
self.pos: int = pos
def __str__(self: Self) -> str:
global _EXPR_PARAM_PREFIX
return _EXPR_PARAM_PREFIX + str(self.pos)
[docs]
class ExpressionFactory(Generic[T]):
"""Factory class for :class:`Expression`.
Build :class:`.Expression` instances with supplied hyperparameters.
Register the factory itself with each expression built by setting
:attr:`Expression.factory`.
Please see :mod:`.evolvables.funcs` for a set of primitives, or
define custom functions.
Note:
If ``arity = 0``, then ``primitives`` must include at least one literal.
Otherwise, the tree cannot be built, as no terminal node can be drawn.
See:
:attr:`Expression.factory`
"""
[docs]
def __init__(self: Self,
primitives: Tuple[T | Callable[..., T], ...],
arity: int):
"""
Args:
primitives: instructions and terminals that occupy nodes of the
expression tree. Listing a primitive more than once increases its
chance of being selected.
arity: arity of constructed :class:`Expression` instances.
Raise:
ValueError if ``arity=0`` and ``primitives`` does not contain nullary
values. The tree cannot be built without terminals.
"""
self.primitive_pool: Dict[int, List[T | Callable[..., T] | Symbol]] = {}
self.arity = arity
self.primitive_pool[0] = []
for item in primitives:
item_arity: int = _get_arity(item)
if item_arity not in self.primitive_pool:
self.primitive_pool[item_arity] = []
self.primitive_pool[item_arity].append(item)
for i in range(arity):
self.primitive_pool[0].append(Symbol(i))
if not self.primitive_pool[0]:
# Remember to test it
raise ValueError("Factory is initialised with no terminal node.")
def _build_is_node_overbudget(self: Self) -> bool:
return self._temp_node_budget_used > self._temp_node_budget_cap
def _build_cost_node_budget(self: Self, cost: int) -> None:
self._temp_node_budget_used += cost
def _build_initialise_node_budget(self, node_budget: int) -> None:
self._temp_node_budget_cap: int = node_budget
self._temp_node_budget_used: int = 0
[docs]
def build(self: Self,
node_budget: int,
layer_budget: int,
nullary_ratio: Optional[float] = None) -> Expression:
"""Build an expression tree to specifications.
The parameters ``node_budget`` and ``layer_budget`` are not constraints.
Rather, if the tree exceeds these budgets, then only nullary values
can be drawn. This ensures that the tree does not exceed these budgets by
too much.
Costs are incurred after a batch nodes are drawn.
Args:
node_budget: Total number of nodes in the tree.
layer_budget: Depth of the tree.
nullary_ratio: Probability of drawing a nullary node.
Raise:
``ValueError`` if ``nullary_ratio`` lies outside of range ``[0,1]``.
"""
if (nullary_ratio is not None
and (nullary_ratio < 0 or nullary_ratio > 1)):
raise ValueError(f"Probability of drawing nullary values must be"
f"between 1 and 0. Got: {nullary_ratio}")
self._build_initialise_node_budget(node_budget)
return self._build_recurse(layer_budget, nullary_ratio)
def _build_recurse(self: Self,
layer_budget: int,
nullary_ratio: Optional[float] = None) -> Expression[T]:
target_primitive: T | Callable[..., T] | Symbol =\
self.draw_primitive(1) if layer_budget < 1\
else self.draw_primitive(nullary_ratio)
inferred_value_arity = _get_arity(target_primitive)
return Expression(arity=self.arity,
value=target_primitive,
children=[*(self._build_recurse(layer_budget - 1, nullary_ratio)
for _ in range(inferred_value_arity))],
factory=self)
[docs]
def draw_primitive(self: Self,
nullary_ratio: Optional[float] = None,
free_draw: bool = False) -> T | Callable[..., T] | Symbol:
"""Return an item from :attr:`primitive_pool`
Args:
nullary_ratio: probability of drawing terminals. If set, non-terminals
are drawn with probability (:python:`1-nullary_ratio`).
free_draw: if ``True``, then the call does not affect or respect
constraints on node counts. For example, it can still draw non-terminal
nodes, even while exceeding node count and depth constraints.
"""
if (self._build_is_node_overbudget() and not free_draw):
nullary_ratio = 1
value_pool: List[T | Callable[..., T] | Symbol]
if (nullary_ratio is None):
value_pool = list(
chain.from_iterable(self.primitive_pool.values()))
else:
nullary_random = random.random()
if (nullary_random < nullary_ratio):
value_pool = self.primitive_pool[0]
else:
value_pool = list(chain.from_iterable(
self.primitive_pool[x] for x in self.primitive_pool.keys() if x != 0))
if not free_draw:
self._build_cost_node_budget(1)
return random.choice(value_pool)
[docs]
def primitive_by_arity(self: Self,
arity: int) -> T | Callable[..., T] | Symbol:
"""Draw a instruction or terminal of the given arity.
If no primitive of the given arity exists, return an empty list.
"""
return random.choice(self.primitive_pool[arity])
[docs]
class Program(Individual[Expression[T]]):
"""A tree-based genetic program.
Tutorial: :doc:`../guides/examples/gp`.
"""
[docs]
def __init__(self, expr: Expression[T]):
self.genome: Expression[T] = expr
def __str__(self) -> str:
return f"Program:{str(self.genome)}"
[docs]
def copy(self) -> Self:
return self.__class__(self.genome.copy())
[docs]
class ProgramFactory(Generic[T]):
"""Convenience factory class for :class:`Program`.
Contain an :class`ExpressionFactory` instance. Delete storage of
hyperparameters and :meth:`ProgramFactory.build` to the internal
:class`ExpressionFactory`.
"""
[docs]
def __init__(self: Self, primitives: Tuple[T | Callable[..., T], ...], arity: int):
self.exprfactory = ExpressionFactory[T](primitives=primitives,
arity=arity)
[docs]
def build(self: Self,
node_budget: int,
layer_budget: int,
nullary_ratio: Optional[float] = None) -> Program:
# new_deposit = [x.copy() for x in self.symbol_deposit]
return Program(self.exprfactory.build(node_budget, layer_budget, nullary_ratio))
[docs]
class CrossoverSubtree(Variator[Program[float]]):
"""Crossover operator that randomly exchange subtrees of parents.
Select an internal node from each parent. Then, select one
child of each internal node and exchange these child nodes. Doing
so also exchanges subtrees that begin at these child nodes.
"""
[docs]
def __init__(self, shuffle: bool = False):
"""
Args:
shuffle: If ``True``: collect all child nodes of both
internal nodes into one list, shuffle that list, then assign
items back to respective parents.
"""
self.arity = 2
self.coarity = 2
self.shuffle = shuffle
[docs]
def vary(self,
parents: Sequence[Program[float]]) -> Tuple[Program[float], ...]:
root1: Program = parents[0].copy()
root2: Program = parents[1].copy()
root1_pass: Program = parents[0].copy()
root2_pass: Program = parents[1].copy()
# print(f"root 1: {str(root1)}")
# print(f"root 2: {str(root2)}")
internal_nodes_from_root_1 =\
tuple(x for x in root1.genome.nodes() if len(x.children) > 0)
internal_nodes_from_root_2 =\
tuple(x for x in root2.genome.nodes() if len(x.children) > 0)
# print(f"root 1 i.nodes: {str([str(x) for x in internal_nodes_from_root_1])}")
# print(f"root 2 i.nodes: {str([str(x) for x in internal_nodes_from_root_2])}")
# If both expression trees have valid internal nodes, their
# children can be exchanged.
if (internal_nodes_from_root_1 and internal_nodes_from_root_2):
if (not self.shuffle):
self.__class__._swap_children(
random.choice(internal_nodes_from_root_1),
random.choice(internal_nodes_from_root_2))
else:
self.__class__._shuffle_children(
random.choice(internal_nodes_from_root_1),
random.choice(internal_nodes_from_root_2))
# expression_node_from_root_1_to_swap =\
# random.choice(internal_nodes_from_root_1)
# expression_node_from_root_2_to_swap =\
# random.choice(internal_nodes_from_root_2)
return (root1, root2, root1_pass, root2_pass)
@staticmethod
def _swap_children(expr1: Expression[float], expr2: Expression[float]) -> None:
r1_children = expr1.children
r2_children = expr2.children
r1_index_to_swap = random.randint(0, len(expr1.children) - 1)
r2_index_to_swap = random.randint(0, len(expr2.children) - 1)
r2_index_hold = r2_children[r2_index_to_swap].copy()
r2_children[r2_index_to_swap] = r1_children[r1_index_to_swap].copy()
r1_children[r1_index_to_swap] = r2_index_hold.copy()
@staticmethod
def _shuffle_children(expr1: Expression[float], expr2: Expression[float]) -> None:
child_nodes = list(expr1.children + expr2.children)
random.shuffle(child_nodes)
for i in range(0, len(expr1.children)):
expr1.children[i] = child_nodes[i].copy()
for i in range(-1, -(len(expr2.children) + 1), -1):
expr2.children[i] = child_nodes[i].copy()
# class CrossoverSubtree(Variator[Program[float]]):
# def __init__(self):
# self.arity = 1
# self.coarity = 1
# def vary(self,
# parents: Sequence[Program[float]]) -> Tuple[Program[float], ...]:
# root1: Program = parents[0].copy()
# random_node = random.choice(root1.genome.nodes())
[docs]
class MutateNode(Variator[Program]):
"""Mutator that changes the primitive in a uniformly random node to
a uniformly selected primitive of the same arity.
"""
[docs]
def __init__(self: Self) -> None:
self.arity = 1
self.coarity = 1
[docs]
def vary(self: Self,
parents: Sequence[Program]) -> Tuple[Program, ...]:
"""
Args:
parents: collection where the 0th item is the parent.
Raise:
``ValueError`` if the parent's :attr:`Program.genome`
does not have :attr:`Expression.factory` set.
"""
root1: Program = parents[0].copy()
if root1.genome.factory is None:
raise ValueError("")
root_pass: Program = parents[0].copy()
random_node = random.choice(root1.genome.nodes())
random_node.value = root1.genome.factory.primitive_by_arity(
_get_arity(random_node.value))
random_node.value = root1.genome.factory.primitive_by_arity(
_get_arity(random_node.value))
return (root1, root_pass)
[docs]
class MutateSubtree(Variator[Program]):
"""Mutation operator that randomly mutates subtrees.
Uniformly select an internal node, then uniformly select a child of
that node. Replace that child with a subtree, constructed by calling
:meth:`ExpressionFactory.build` of the associated :class:`ExpressionFactory`
found in :attr:`Expression.factory`.
"""
[docs]
def __init__(self: Self,
node_budget: int,
layer_budget: int,
nullary_ratio: Optional[float] = None) -> None:
self.arity = 1
self.coarity = 2
self.node_budget = node_budget
self.layer_budget = layer_budget
self.nullary_ratio = nullary_ratio
[docs]
def vary(self: Self,
parents: Sequence[Program]) -> Tuple[Program, ...]:
root1: Program = parents[0].copy()
root_pass: Program = parents[0].copy()
internal_nodes: Tuple[Expression, ...] =\
tuple(x for x in root1.genome.nodes() if len(x.children) > 0)
if (internal_nodes):
random_internal_node = random.choice(internal_nodes)
index_for_replacement = \
random.randint(0, len(random_internal_node.children) - 1)
random_internal_node.children[index_for_replacement] = \
random_internal_node.factory.build(self.node_budget,
self.layer_budget,
self.nullary_ratio)
return (root1, root_pass)
[docs]
class SymbolicEvaluator(Evaluator[Program[float]]):
"""Evaluator for symbolic regression.
Compare the output of a program tree against a given
objective function, over a fixed set of points. Assign higher
fitness to programs whose output is closer to that of
the objective function.
"""
[docs]
def __init__(self,
objective: Callable[..., float],
support: Tuple[Tuple[float, ...], ...]):
"""
Args:
objective: function that
support: collection of points on which the program
is compared against ``objective``.
Raise:
TypeError if the first item in ``support`` does not
match the arity of ``objective``.
"""
self.objective: Callable[..., float] = objective
self.support: Tuple[Tuple[float, ...], ...] = support
self.arity = _get_arity(objective)
if self.arity != len(support[0]):
raise TypeError(f"The objective function has arity "
f"{self.arity}, the first item in support has arity "
f"{support[0]}; they are not the same.")
[docs]
def evaluate(self, program: Program[float]) -> tuple[float]:
return (-sum([abs(self.objective(*sup) - program.genome(*sup))
for sup in self.support]),)
[docs]
class PenaliseNodeCount(Evaluator[Program[float]]):
"""Evaluator that favours smaller program trees.
For each node in the given program tree,
incur a penalty of ``coefficient``.
"""
[docs]
def __init__(self, coefficient: float):
"""
Args:
coefficient: penalty coefficient for each node in the
program tree.
"""
self.coefficient = coefficient
[docs]
def evaluate(self, program: Program[float]) -> tuple[float]:
return (-(self.coefficient * len(program.genome.nodes())),)