import math
import networkx as nx
import numpy as np
from itertools import combinations, chain
from typing import Callable
from tqdm import tqdm
import logging
[docs]
class MyersonCalculator():
r"""Calculates the exact Myerson values.
For a game described by a coalition function :math:`v` the Myerson values
attribute the individual players contribution to the payoff of the game. For
a complete graph (every node connected to every other node) the Myerson
value is equal to the Shapley value (:math:`S`: coalition of players,
:math:`N`: grand coalition of all players):
.. math::
\text{Sh}_i\,({v}) = \sum_{S \subseteq N \setminus \{i\}} \frac{|S|! \: (|N| - |S| - 1)!}{|N|!}\big( {v}\,(S \cup \{i\}) - {v}\,(S) \big)
Else, the additional gain players can obthain through coalition is
restricted only to players which are connected by edges in the graph.
Args:
graph (nx.classes.graph.Graph): The coalition structure of the game.
coalition_function (Callable): The coalition for which to calculate
the payoff of the game. Expects a coalition (tuple of node
indices) and a graph which contains additional information on
the players to decide on the payoff.
disable_tqdm (bool, optional): Disables progress bar. Defaults to
True.
"""
def __init__(self,
graph: nx.classes.graph.Graph,
coalition_function: Callable,
disable_tqdm: bool=True) -> None:
"""Instantiate the class.
"""
self.disable_tqdm = disable_tqdm
self.log = logging.getLogger("MyersonCalculator")
self.nx_graph = graph
self.grand_coalition = list(graph.nodes()) # alias: set of players / set of nodes / F
self.coalition_function = coalition_function
[docs]
def calculate_coalitions(self, grand_coalition: list) -> list:
r"""Calculate all possible coalitions for a set of players / all nodes in
a graph.
Args:
grand_coalition (list): Set of players / grand coalition / atoms in
graph as a list of tupels.
Returns:
list: All :math:`2^N` coalitions.
"""
self.log.info(f"Calculating number of coalitions.")
coalitions = [combinations(grand_coalition, len(grand_coalition)-x) for x in \
tqdm(range(len(grand_coalition)), desc="Calculate coalitions", disable=self.disable_tqdm)]
coalitions = list(chain.from_iterable(coalitions)) # chaining removes empty set
coalitions.append(())
self.log.info(f"Number of coalitions: {len(coalitions)}")
return coalitions
[docs]
def calculate_graph_restricted_coalitions(self, coalitions: list,
nx_graph: nx.classes.graph.Graph) -> tuple[set, dict]:
"""Calculate the graph restricted coalitions for each coalition. The
graph restricted coalitions are tuples of nodes which are connected.
Args:
coalitions (list): All coalitions.
nx_graph (nx.classes.graph.Graph): NetworkX Graph for which to
calculate the Myerson values.
Returns:
tuple[set, dict]: Set of all possible graph restricted coalitions as
a tuple of nodes, dictionary mapping each coalition to its
graph restricted coalitions.
"""
self.log.info(f"Calculating number of graph restricted coalitions.")
graph_restricted_coalitions = [self.restrict(coalition, nx_graph) \
for coalition in tqdm(coalitions,
desc="Calculate graph restricted coalitions",
disable=self.disable_tqdm)]
coalitions_to_graph_restricted_coalitions = dict(zip(coalitions, graph_restricted_coalitions))
graph_restricted_coalitions = list(chain.from_iterable(graph_restricted_coalitions))
self.log.info(f"Removing dublicates from {len(graph_restricted_coalitions)} graph restricted coalitions.")
graph_restricted_coalitions = set(x for x in tqdm(graph_restricted_coalitions, desc="Remove duplicates", disable=self.disable_tqdm))
self.log.info(f"Number of graph restricted coalitions: {len(graph_restricted_coalitions)}")
return graph_restricted_coalitions, coalitions_to_graph_restricted_coalitions
[docs]
def calculate_worth_of_single_graph_restricted_coalition(self,
graph_restricted_coalition: tuple,
nx_graph: nx.classes.graph.Graph) -> float:
"""Calculate the worth of a graph restricted coalition, i. e. a single
connected component.
Args:
graph_restricted_coalition (tuple): Graph restricted coalition as
node indices.
nx_graph (nx.classes.graph.Graph): Additional information for the
coalition function, i.e. the entire graph with node parameters.
The result of the coalition function should depend only on the
coalition, however the node parameters might contain necessary
information.
Returns:
float: Worth, the output of the coalition function for the connected
subgraph.
"""
return self.coalition_function(graph_restricted_coalition, nx_graph)
[docs]
def calculate_worth_of_graph_restricted_coalitions(self,
graph_restricted_coalitions: set) -> dict:
"""Calculate the worth of every graph restricted coalition and map it to
its worth.
Args:
graph_restricted_coalitions (set): Set of connected components as
tuples of node indices.
Returns:
dict: Dictionary mapping each connected component to its worth.
"""
self.log.info(f"Calculating worth of graph restricted coalitions.")
graph_restricted_coalitions_to_worth = {}
for coalition in tqdm(graph_restricted_coalitions,
desc="Calculating worth of graph restricted coalitions",
disable=self.disable_tqdm):
worth = self.calculate_worth_of_single_graph_restricted_coalition(coalition,
self.nx_graph)
graph_restricted_coalitions_to_worth.update({coalition: worth})
return graph_restricted_coalitions_to_worth
[docs]
def map_coalition_to_worth(self, coalitions: list[tuple],
coalitions_to_graph_restricted_coalitions: dict,
graph_restricted_coalitions_to_worth: dict) -> dict:
"""Map every coalition to its worth.
Args:
coalitions (list): List of all coalitions (2^{num_nodes}).
coalitions_to_graph_restricted_coalitions (dict): Dictionary mapping
the coalitions to the corresponding graph restricted coalitions.
graph_restricted_coalitions_to_worth (dict): Dictionary mapping the
graph restricted coalitions to their worth.
Returns:
dict: Dictionary mapping each coalition to its worth.
"""
self.log.info(f"Mapping coalitions to worth.")
coalition_to_worth = {}
for coalition in tqdm(coalitions, desc="Mapping coalitions to worth", disable=self.disable_tqdm):
worth = 0.
for graph_restricted_coalition in coalitions_to_graph_restricted_coalitions[coalition]:
worth += graph_restricted_coalitions_to_worth[graph_restricted_coalition]
coalition_to_worth.update({coalition: worth})
return coalition_to_worth
[docs]
def calculate_worth_of_grand_coalition(self, nx_graph: nx.classes.graph.Graph) -> float:
"""Calculate payoff of the game, i.e. the payoff of all players / the
grand coalition.
Args:
coalition_function (Callable): The coalition function associating a
coalition with a payoff.
nx_graph (nx.classes.graph.Graph): Coalition structure of the game
as a graph.
Returns:
float: Payoff of the game / worth of grand coalition.
"""
restricted_grand_coalition = self.restrict(self.grand_coalition, nx_graph)
worth = sum([self.calculate_worth_of_single_graph_restricted_coalition(S, nx_graph) \
for S in restricted_grand_coalition])
return worth
[docs]
def restrict(self, coalition: tuple, nx_graph: nx.classes.graph.Graph) -> list[tuple]:
"""Restricts a graph through a (sub)set of nodes / players. Generate a
list of graph restricted coalitions, i. e. a list of node indices of
connected nodes in the subgraph.
Args:
coalition (tuple): Nodes that remain in the graph.
nx_graph (nx.classes.graph.Graph): Graph from which to generate
subgraphs.
Returns:
list[tuple]: Graph restricted coalitions as tuples of node indices.
"""
remove_nodes = set(nx_graph.nodes)-set(coalition)
if remove_nodes == set(nx_graph.nodes):
return [()] # empty_graph
else:
G_reduced = nx_graph.copy()
G_reduced.remove_nodes_from(remove_nodes)
result = [tuple(comp) for comp in nx.connected_components(G_reduced)]
return result
[docs]
def subgraph_from_coalition(self, graph_restricted_coalition: tuple,
nx_graph: nx.classes.graph.Graph) -> nx.classes.graph.Graph:
"""Generates a subgraph from a graph restricted coalition (a subset of
nodes / players) and a graph.
Args:
graph_restricted_coalition (tuple): Nodes / players which form the
subgraph.
nx_graph (nx.classes.graph.Graph): Subgraph induced in this graph by
the nodes_set.
Returns:
nx.classes.graph.Graph: The new subgraph.
"""
if len(graph_restricted_coalition) == 0:
return nx.Graph()
else:
return nx_graph.subgraph(graph_restricted_coalition)
[docs]
def calculate_single_myerson_value(self, node: int, grand_coalition: tuple,
coalitions: list[tuple], coalition_to_worth: dict) -> float:
"""Calculate a single Myerson value.
Args:
node (int): Node index for which to calculate the Myerson value.
grand_coalition (tuple): Set of all players.
coalitions (list[tuple]): List of all coalitions.
coalition_to_worth (dict): Mapping of every coalition to its worth.
Returns:
float: Myerson value.
"""
my = 0
size_grand_coalition = len(grand_coalition)
factorial_size_grand_coalition = math.factorial(size_grand_coalition)
for coalition in [S for S in coalitions if node not in S]:
size_coalition = len(coalition)
prefactor = ((math.factorial(size_coalition)
*math.factorial(size_grand_coalition-size_coalition-1))
/factorial_size_grand_coalition)
worth_of_coalition = coalition_to_worth[coalition]
worth_of_coalition_with_node = coalition_to_worth[tuple(sorted(coalition+(node,)))]
my += prefactor * (worth_of_coalition_with_node - worth_of_coalition)
return my
[docs]
def calculate_all_myerson_values(self) -> np.ndarray:
"""Calculate the Myerson values for every node / player in the graph.
Returns:
np.ndarray: Myerson values for each node.
"""
self.calculate_all_mappings()
self.log.info(f"Calculating Myerson values.")
my_values = []
for node in tqdm(self.grand_coalition, desc="Calculating Myerson values.", disable=self.disable_tqdm):
my_val = self.calculate_single_myerson_value(node, self.grand_coalition,
self.coalitions, self.coalitions_to_worth)
my_values.append(my_val)
my_values = np.array(my_values)
log_string = "".join([f"\t{node}: {val}\n" for node, val in zip(self.grand_coalition, my_values)])
self.log.info(f"Myerson Values:\n{log_string}")
return my_values
[docs]
def calculate_all_mappings(self) -> None:
"""Calculates all coalitions, graph restricted coalitions, and their
associated worths as class attributes:
* `self.coalitions` (list[tuple])
* `self.graph_restricted_coalitions` (set[tuple])
* `self.coalitions_to_graph_restricted_coalitions` (dict)
* `self.graph_restricted_coalitions_to_worth` (dict)
* `self.coalitions_to_worth` (dict)
"""
self.coalitions = self.calculate_coalitions(self.grand_coalition)
self.graph_restricted_coalitions, self.coalitions_to_graph_restricted_coalitions \
= self.calculate_graph_restricted_coalitions(self.coalitions, self.nx_graph)
self.graph_restricted_coalitions_to_worth \
= self.calculate_worth_of_graph_restricted_coalitions(self.graph_restricted_coalitions)
self.coalitions_to_worth \
= self.map_coalition_to_worth(self.coalitions,
self.coalitions_to_graph_restricted_coalitions,
self.graph_restricted_coalitions_to_worth)
[docs]
class MyersonSampler(MyersonCalculator):
r"""A class approximating the Myerson value using Monte Carlo sampling.
The Myerson values are approximated by randomly sampling from all
permutations needed to calculate the Shapley value:
.. math::
\text{Sh}_i\,({v}) = \frac{1}{|N|!}\; \sum_R \big({v}\,(P_i^R \cup \{i\}) - {v}(P_i^R)\big)
For efficiencies sake, the sampled permutations are transformed into
the corresponding coalitions.
Args:
graph (nx.classes.graph.Graph): The coalition structure of the game.
coalition_function (Callable): The coalition for which to calculate
the payoff of the game. Expects a coalition (tuple of node
indices) and a graph which contains additional information on
the players to decide on the payoff.
seed (None | int, optional): Seed for randomness. Defaults to None.
number_of_samples (int, optional): Number of sampling steps. Defaults to 1000.
disable_tqdm (bool, optional): Disables progress bar. Defaults to True.
"""
def __init__(self,
graph: nx.classes.graph.Graph,
coalition_function: Callable,
seed: None | int = None,
number_of_samples: int = 1000,
disable_tqdm: bool=True) -> None:
self.disable_tqdm = disable_tqdm
self.log = logging.getLogger("MyersonSampler")
self.nx_graph = graph
self.grand_coalition = list(graph.nodes()) # alias: set of players / set of nodes / F
self.coalition_function = coalition_function
self.seed = seed
self.rng = np.random.default_rng(seed)
self.number_of_samples = number_of_samples
"""Instantiates the class.
"""
@staticmethod
def _replace_in_array(array: np.ndarray, value_to_replace, value_to_replace_with) -> np.ndarray:
"""Replace a value in an array with a different value.
Args:
array (np.ndarray): The array.
value_to_replace (int): The value to replace.
value_to_replace_with (int): The value to replace with.
Returns:
np.ndarray: The array with the replace value if it was found, else
the original array.
"""
# Convert to a flat array to work with a single loop for any dimension
flat_array = array.ravel().copy()
# Find the index of the first occurrence of value_to_replace
index = np.where(flat_array == value_to_replace)[0]
if index.size > 0: # Check if the value was found
flat_array[index[0]] = value_to_replace_with # Replace the first occurrence
# Reshape back to original array's shape
return flat_array.reshape(array.shape)
[docs]
def reset_rng(self):
"""Reset random number generator to seed.
"""
self.rng = np.random.default_rng(self.seed)
[docs]
def sample_permutations(self, number_of_samples: int):
"""Uniformly sample permutations from all possible permutations. Samples
`number_of_samples`*2*`number_of_nodes` permutations in total.
Args:
number_of_samples (int): How many sample steps should be carried out.
Returns:
tuple(int, list[np.ndarray], list[np.ndarray]): A randomly chosen
node, the sampled permutations without the random node, all the
sampled permutations.
"""
nodes_array = np.array(self.grand_coalition)
random_node = self.rng.choice(nodes_array)
self.log.info(f"Sampling {number_of_samples} steps.")
permutations_without_random_node = []
for i in tqdm(range(number_of_samples),
desc="Sample permutations without random node",
disable=self.disable_tqdm):
random_permutation_size: int = self.rng.integers(0, len(nodes_array))
nodes_array_without_random_node = nodes_array.copy()
indices_to_delete = np.where(nodes_array_without_random_node == random_node)
nodes_array_without_random_node = np.delete(nodes_array_without_random_node,
indices_to_delete)
sampled_permutation_without_random_node = self.rng.choice(nodes_array_without_random_node,
size=random_permutation_size,
replace=False)
self.rng.shuffle(sampled_permutation_without_random_node)
permutations_without_random_node.append(sampled_permutation_without_random_node)
all_sampled_permutations = []
for permutation in tqdm(permutations_without_random_node,
desc="Sample permutations containing random node",
disable=self.disable_tqdm):
for node_idx, node in enumerate(nodes_array):
sampled_permutation_with_current_swapped_in_random_node = permutation.copy()
sampled_permutation_with_current_swapped_in_random_node = \
self._replace_in_array(sampled_permutation_with_current_swapped_in_random_node,
node, random_node)
all_sampled_permutations.append(sampled_permutation_with_current_swapped_in_random_node)
all_sampled_permutations.append(np.append(sampled_permutation_with_current_swapped_in_random_node, node))
# len(all_sampled_permutations): steps*2*len(self.grand_coalition)
self.log.info(f"Sampled {len(all_sampled_permutations)} of {math.factorial(len(self.grand_coalition))} permutations.")
return random_node, permutations_without_random_node, all_sampled_permutations
[docs]
def get_coalitions_from_permutations(self, permutations: list[np.ndarray]):
"""Get the set of coalitions from the different (sampled) permutations.
Args:
permutations (list[np.ndarray]): The permutations.
Returns:
list[tuple]: The sampled coalitions.
"""
all_sampled_coalitions = list(set([tuple(np.sort(x)) for x in permutations]))
self.log.info(f"Sampled {len(all_sampled_coalitions)} of {2**len(self.grand_coalition)} coalitions.")
return all_sampled_coalitions
[docs]
def sample_all_mappings(self) -> None:
"""Samples permutations, corresponding coalitions, graph restricted
coalitions, and their associated worths as class attributes:
* `self.random_node` (int)
* `self.permutations_without_random_node` (list[np.ndarray])
* `self.all_sampled_permutations` (list[np.ndarray])
* `self.coalitions` (list[tuple])
* `self.graph_restricted_coalitions` (set[tuple])
* `self.coalitions_to_graph_restricted_coalitions` (dict)
* `self.graph_restricted_coalitions_to_worth` (dict)
* `self.coalitions_to_worth` (dict)
"""
self.random_node, self.permutations_without_random_node, self.all_sampled_permutations \
= self.sample_permutations(self.number_of_samples)
self.coalitions = self.get_coalitions_from_permutations(self.all_sampled_permutations)
self.graph_restricted_coalitions, self.coalitions_to_graph_restricted_coalitions \
= self.calculate_graph_restricted_coalitions(self.coalitions, self.nx_graph)
self.graph_restricted_coalitions_to_worth \
= self.calculate_worth_of_graph_restricted_coalitions(self.graph_restricted_coalitions)
self.coalitions_to_worth \
= self.map_coalition_to_worth(self.coalitions,
self.coalitions_to_graph_restricted_coalitions,
self.graph_restricted_coalitions_to_worth)
[docs]
def sample_all_myerson_values(self) -> np.ndarray:
"""Use Monte Carlo sampling to approximate the Myerson values for every
node / player in the graph.
Returns:
np.ndarray: Sampled Myerson values for each node.
"""
self.sample_all_mappings()
nodes_array = np.array(self.grand_coalition)
my_values = np.zeros(len(nodes_array), dtype=float)
self.log.info(f"Calculating sampled Myerson values.")
for permutation in tqdm(self.permutations_without_random_node,
disable=self.disable_tqdm,
desc="Calculate sampled Myerson values"):
for node_idx, node in enumerate(nodes_array):
sampled_permutation_with_current_swapped_in_random_node = permutation.copy()
sampled_permutation_with_current_swapped_in_random_node \
= self._replace_in_array(sampled_permutation_with_current_swapped_in_random_node,
node,
self.random_node)
worth_with_node = self.coalitions_to_worth[tuple(np.sort(np.append(sampled_permutation_with_current_swapped_in_random_node, node)))]
worth_without_node = self.coalitions_to_worth[tuple(np.sort(sampled_permutation_with_current_swapped_in_random_node))]
my_values[node_idx] = (my_values[node_idx] + worth_with_node - worth_without_node)
my_values = my_values / self.number_of_samples
log_string = "".join([f"\t{node}: {val:.4f}\n" for node, val in zip(self.grand_coalition, my_values)])
self.log.info(f"Sampled Myerson Values:\n{log_string}")
return my_values
[docs]
def calculate_all_myerson_values(self) -> None:
"""Not implemented for sampling class.
Raises:
NotImplementedError: The MyersonSampler only has the
`sample_all_myerson_values()` method. To accuratly calculate the
Myerson values, please use the `MyersonCalculator` class.
"""
raise NotImplementedError("""The MyersonSampler only has the `sample_all_myerson_values()` method.
To accuratly calculate the Myerson values,
please use the `MyersonCalculator` class.""")