Source code for src.auxiliary_functions

"""

==============================================================
Auxiliary Functions, (:mod:`f_abm.src.auxiliary_functions`)
==============================================================

Description
-----------

    This module contains auxiliary functions that are use throughout the program.

Functions
---------

    - matrix_exp
    - digraph2topology
    - add_random_edges
    - add_rs_weights2matrix
    - add_signs2matrix
    - matrix2digraph
    - create_random_numbers
    - opinion2color
    - modify_opinions_method_1
    - modify_opinions_method_2
    - histogram_classification
    - modify_mean
    - make_row_stochastic


"""

import matplotlib.pyplot as plt
import numpy as np
import random
from math import factorial
import igraph as ig
# COMMENTED TO FIX TEMPORARILY CIRCULAR DEPENDENCIES WITH THE plot_functions MODULE
# from src.plot_functions import plot_histogram


[docs]def matrix_exp(matrix, order=10): """ This is a function to approximate a matrix exponential to the order 'order' Parameters ---------- matrix: matrix to calculate the exponential order: the order of the approximation, by default it is 10 Returns ------- returns the approximation of the matrix exponential """ matrix_exp_approx = np.eye(np.shape(matrix)[0]) + matrix # matrix_power(matrix, 0) matrix_product = matrix for local_order in range(2, order): matrix_product = np.matmul(matrix_product, matrix) matrix_exp_approx += matrix_product*(1/factorial(local_order)) return matrix_exp_approx
[docs]def digraph2topology(adjacency_matrix=None, default_type=0): """ Function to convert from any adjacency matrix to the corresponding topology (that is, the associated adjacency matrix without signs or weights) Parameters ---------- adjacency_matrix: the adjacency matrix default_type: ID of the default digraph Returns ------- A matrix of the same size as 'adjacency_matrix' but with only 0 or 1 corresponding to the topology. """ # if adjacency_matrix is None: # adjacency_matrix = default_digraph(default_type=default_type) CIRCULAR DEPENDENCY TO FIX num_agents = adjacency_matrix.shape[0] adjacency_matrix = adjacency_matrix != 0 topology = np.zeros((num_agents, num_agents)) local_dictionary = {True: 1, False: 0} for id_row in range(0, num_agents): for id_col in range(0, num_agents): topology[id_row][id_col] = local_dictionary[adjacency_matrix[id_row][id_col]] return topology
[docs]def add_random_edges(adjacency_matrix=None, num_iterations=10, default_type=0): """ Function to add random edges to the adjacency matrix 'adjacency_matrix', the edges have no weight or sign. The function does not guarantee that these are new edges, it randomly selects cells of the adjacency matrix and adds edges Parameters ---------- adjacency_matrix: the adjacency matrix to be modified num_iterations: the number of iterations default_type: the ID of the default digraph Returns ------- The same adjacency matrix with random edges. This is a side effect function. """ # if adjacency_matrix is None: # adjacency_matrix = default_digraph(default_type=default_type) CIRCULAR DEPENDENCY TO FIX # Get the number of agents num_agents = adjacency_matrix.shape[0] for _ in range(0, num_iterations): randon_row = random.randint(0, num_agents-1) randon_col = random.randint(0, num_agents-1) adjacency_matrix[randon_row][randon_col] = 1.0
[docs]def add_rs_weights2matrix(adjacency_matrix): """ Function that adds the stochastic weights to an adjacency matrix The result is a row-stochastic matrix A row stochastic matrix is one with all non-negative weights and the sum of elements along a row is always 1 Parameters ---------- adjacency_matrix: original adjacency matrix Returns ------- there is no need to return anything, as the adjacency matrix is transformed in the function """ # Get the number of agents num_agents = adjacency_matrix.shape[0] # Multiply the adjacency matrix by random numbers for id_row in range(0, adjacency_matrix.shape[0]): adjacency_matrix[id_row, :] = adjacency_matrix[id_row, :] * np.random.uniform(low=0.0, high=1.0, size=(1, num_agents)) # Make the matrix row-stochastic make_row_stochastic(adjacency_matrix)
[docs]def add_signs2matrix(adjacency_matrix, positive_edge_ratio): """ Function that adds the signs to the adjacency matrix of a signed digraph Parameters ---------- adjacency_matrix: current adjacency matrix, presumably with only non-negative weights positive_edge_ratio: ratio of positive edges Returns ------- There is no need to return anything, since the function modifies the adjacency matrix as desired """ # Get the number of agents num_agents = adjacency_matrix.shape[0] # Total number of edges (excluding self-loops) num_edges = (adjacency_matrix != 0).sum() - num_agents # Approximate the number of negative edges neg_edges = int(np.floor(positive_edge_ratio * num_edges)) # List all the non self-loop edges edges = [[id_row, id_col] for id_row in range(num_agents) for id_col in range(num_agents) if (id_row != id_col and adjacency_matrix[id_row, id_col] != 0)] # Sort them randomly rng = np.random.default_rng() rng.shuffle(edges) edges = np.array(edges)[:neg_edges, :] # Change the sign of the edge for id_row, id_col in edges: adjacency_matrix[id_row, id_col] *= -1
[docs]def matrix2digraph(adjacency_matrix=None, default_type=0): """ Function that converts from an adjacency matrix to a digraph object it is mainly used to plot Parameters ---------- adjacency_matrix: the adjacency matrix, by default it is a simple ring digraph default_type: ID of the default digraph Returns ------- ID of the default digraph """ # if adjacency_matrix is None: # adjacency_matrix = default_digraph(default_type=default_type) CIRCULAR DEPENDENCY TO FIX return ig.Graph.Weighted_Adjacency(adjacency_matrix)
[docs]def create_random_numbers(num_agents=100, number_parameters=None, limits=(-1, 1)): """ This function creates and returns a list of random 'num_agents' numbers. This function is used to create initial opinions and also to create agent parameters. Its default use is to create initial opinions. This function can also be used for the creation of agent parameters. Parameters ---------- num_agents: number of opinions that are returned, by default 100 number_parameters: a lists of lists, every element corresponds to a different set of initial opinions to be created. Each element of this list contains 4 elements: [0] signals the type of distribution to create. '0' is a uniform distribution 'any non 0' is a normal distribution [1] if the distribution is uniform, this is the minimum value. If the distribution is normal, this is the mean [2] if the distribution is uniform, this is the maximum value. If the distribution is normal, this is the variance [3] the fraction of all the agents. The sum does not have to add to one, as it will be normalized limits: a tuple of two numbers, the first is the lower bound and the second the upper bound Returns ------- numpy array of 'num_agents' rows and 1 column (a list of lists) of opinions """ rng = np.random.default_rng() # this is for the random numbers creation if number_parameters is None: number_parameters = [[0, -1.0, 1.0, 1]] # for ease, transform the list of lists to a numpy 2d array number_parameters = np.array(number_parameters) # the first thing to do is to compute the number of agents each sub-distribution will have, start by normalizing the # fractions fractions = number_parameters[:, 3] fractions = fractions/fractions.sum() fractions = fractions[0] # print(num_agents) # print(type(num_agents)) # print(fractions) # print(type(fractions)) # print(fractions*num_agents) sub_num_agents = np.floor(fractions*num_agents) # compute the number of agents this assignation corresponds to missing_agents = num_agents-sub_num_agents.sum() # number of agents that are left to assign while missing_agents > 0: # randomly assign one agent to one subgroup random_index = random.randint(0, (len(sub_num_agents)-1)) sub_num_agents[random_index] += 1 missing_agents = num_agents - sub_num_agents.sum() # number of agents that are left to assign # replace the fraction, for the actual number of agents number_parameters[:, 3] = sub_num_agents initial_opinions = None for subgroup_info in number_parameters: if subgroup_info[0] == 0: # create a uniform distribution min_op = subgroup_info[1] max_op = subgroup_info[2] local_opinions = (max_op - min_op) * rng.random((int(subgroup_info[3]), 1)) + min_op else: # create a normal distribution dist_mean = subgroup_info[1] dist_variance = subgroup_info[2] local_opinions = rng.normal(dist_mean, dist_variance, (int(subgroup_info[3]), 1)) if initial_opinions is None: initial_opinions = local_opinions else: initial_opinions = np.concatenate((initial_opinions, local_opinions)) # Truncate the opinion values initial_opinions = np.maximum(np.minimum(initial_opinions, limits[1]), limits[0]) return initial_opinions
[docs]def opinion2color(opinion_model, agent_parameter): """ This function transforms agent parameter to colors, depending on the model. Parameters ---------- opinion_model: the model name or identifier agent_parameter: the agent parameters Returns ------- a color in a form of an RGB triplet """ if opinion_model == 'CB': b_value = agent_parameter[0] # Conformist trait r_value = agent_parameter[1] # Radical trait g_value = 1 - (b_value + r_value) # Stubborn trait # Return the value rounded return r_value.round(7), g_value.round(7), b_value.round(7)
[docs]def modify_opinions_method_1(opinions, des_mean, des_abs_mean, epsilon=None, max_counter=100, show_process=False, limits=(-1, 1)): """ This function modifies a given opinion distribution to create an opinion distribution with the desired mean and absolute value mean using method 1 Parameters ---------- opinions: the original opinions des_mean: the desired opinion mean des_abs_mean: the desired opinion mean absolute value epsilon: the tolerance for the infinity norm max_counter: the maximum number of iterations to find the desired opinions show_process: boolean determining whether to show the creation process or not limits: a tuple with the upper and lower limits of the opinions Returns ------- the new, modified opinions """ if epsilon is None: epsilon=0.05 if show_process: fig = plt.figure(figsize=(10, 7)) ax1 = fig.add_subplot(121) ax2 = fig.add_subplot(122) ax2.plot([0, 1, 1, 0], [0, -1, 1, 0]) ax2.plot(des_abs_mean, des_mean, 's', linewidth=2, color=(0.5, 0.1, 0.2)) ax2.grid() opinion_mean = opinions.mean() opinion_abs_mean = np.absolute(opinions).mean() mean_difference = np.absolute(opinion_mean - des_mean) mean_abs_difference = np.absolute(opinion_abs_mean - des_abs_mean) switching_control = 0 counter = 0 while ((mean_difference > epsilon) or (mean_abs_difference > epsilon)) and (counter < max_counter): counter += 1 if switching_control == 0: switching_control = 1 # Primary mean modification (vertical) xi_value = np.minimum(mean_difference, 0.5) if opinion_mean > des_mean: # Reduce the opinion mean opinions -= xi_value for _ in range(5): opinions[np.argmax(opinions)] = -opinions[np.argmax(opinions)] else: # Increase the opinion mean opinions += xi_value for _ in range(5): opinions[np.argmin(opinions)] = -opinions[np.argmin(opinions)] else: switching_control = 0 # Primary abs mean modification (horizontal) mu_value = np.minimum(mean_abs_difference, 0.3) if opinion_abs_mean > des_abs_mean: # Reduce the opinion abs mean opinions -= np.sign(opinions) * mu_value opinions *= (1-mu_value) else: # Increase the opinion abs mean opinions += np.sign(opinions) * mu_value opinions *= (1+mu_value) # Truncate the opinion values opinions = np.maximum(np.minimum(opinions, limits[1]), limits[0]) # COMMENTED TO FIX TEMPORARILY CIRCULAR DEPENDENCIES WITH THE plot_functions MODULE # if show_process: # ax1.clear() # plot_histogram(ax1, opinions) # ax2.plot(opinion_abs_mean, opinion_mean, 'o', linewidth=2, color=(0.2, 0.5, 0.1)) # plt.gcf().canvas.draw() # plt.pause(0.01) opinion_mean = opinions.mean() opinion_abs_mean = np.absolute(opinions).mean() mean_difference = np.absolute(opinion_mean - des_mean) mean_abs_difference = np.absolute(opinion_abs_mean - des_abs_mean) return opinions
[docs]def modify_opinions_method_2(opinions, des_mean, des_abs_mean, epsilon=None, max_counter=100, show_process=False, limits=(-1, 1)): """ This function modifies a given opinion distribution to create an opinion distribution with the desired mean and absolute value mean using method 2 Parameters ---------- opinions: the original opinions des_mean: the desired opinion mean des_abs_mean: the desired opinion mean absolute value epsilon: the tolerance for the infinity norm max_counter: the maximum number of iterations to find the desired opinions show_process: boolean determining whether to show the creation process or not limits: a tuple with the upper and lower limits of the opinions Returns ------- the new, modified opinions """ if epsilon is None: epsilon=0.05 if show_process: fig = plt.figure(figsize=(10, 7)) ax1 = fig.add_subplot(121) ax2 = fig.add_subplot(122) ax2.plot([0, 1, 1, 0], [0, -1, 1, 0]) ax2.plot(des_abs_mean, des_mean, 's', linewidth=2, color=(0.5, 0.1, 0.2)) ax2.grid() opinion_mean = opinions.mean() opinion_abs_mean = np.absolute(opinions).mean() mean_difference = np.absolute(opinion_mean - des_mean) mean_abs_difference = np.absolute(opinion_abs_mean - des_abs_mean) switching_control = 0 counter = 0 while ((mean_difference > epsilon) or (mean_abs_difference > epsilon)) and (counter < max_counter): counter += 1 if switching_control == 0: switching_control = 1 # Primary mean modification (vertical) m1 = (des_mean + 1) / (opinion_mean + 1) m2 = (des_mean - 1) / (opinion_mean - 1) for count, old_opinion in enumerate(opinions): if old_opinion <= opinion_mean: opinions[count] = ((old_opinion + 1)*m1) - 1 else: opinions[count] = ((old_opinion - 1) * m2) + 1 else: switching_control = 0 # Primary abs mean modification (horizontal) mu_value = np.minimum(mean_abs_difference, 0.3) if opinion_abs_mean > des_abs_mean: # Reduce the opinion abs mean by contracting the opinions # remove the mean local_opinion_mean = opinions.mean() opinions -= local_opinion_mean # contract the opinions opinions *= (1-mu_value) # restore the mean opinions += local_opinion_mean else: # Increase the opinion abs mean by expanding the opinions # remove the mean local_opinion_mean = opinions.mean() opinions -= local_opinion_mean # expand the opinions opinions *= (1 + mu_value) # restore the mean opinions += local_opinion_mean # Truncate the opinion values opinions = np.maximum(np.minimum(opinions, limits[1]), limits[0]) # COMMENTED TO FIX TEMPORARILY CIRCULAR DEPENDENCIES WITH THE plot_functions MODULE # if show_process: # ax1.clear() # plot_histogram(ax1, opinions) # ax2.plot(opinion_abs_mean, opinion_mean, 'o', linewidth=2, color=(0.2, 0.5, 0.1)) # plt.gcf().canvas.draw() # plt.pause(0.01) opinion_mean = opinions.mean() opinion_abs_mean = np.absolute(opinions).mean() mean_difference = np.absolute(opinion_mean - des_mean) mean_abs_difference = np.absolute(opinion_abs_mean - des_abs_mean) return opinions
[docs]def histogram_classification(opinion_distribution, classification_parameters=(10, 3, 4, 6, 50, 12, 40)): """ A function that classifies opinion distributions with any number of agents. Parameters ---------- opinion_distribution: the opinion distribution to be classified. classification_parameters: the classification parameters. Returns ------- A number that encodes the classification, 0 is perfect consensus, 1 is consensus, 2 is polaristion, 3 is clustering, and 4 is dissensus. """ # Get each of the parameter values m_value, b_value, k_value, u_value, t1_value, t2_value, t3_value = classification_parameters # First, get the histogram counts hist_counts = np.histogram(opinion_distribution, bins=np.linspace(-1.0, 1.0, m_value+1)) # Normalize the bins, so they add up to 100 hist_counts = hist_counts[0]*(100/hist_counts[0].sum()) # If the histogram contains only two non-empty bins with at least U_value empty bins in between and each of these # two non-empty bins has normalised group count larger than T3_value non_empty_bins = [count for count, value in enumerate(hist_counts) if value > 0] if len(non_empty_bins) == 2: if ((non_empty_bins[1] - non_empty_bins[0]) > u_value) and (hist_counts[non_empty_bins[0]] > t3_value) \ and (hist_counts[non_empty_bins[1]] > t3_value): return 2 # Returns polarisation # If the height of one element is greater than t1_value, then it is perfect consensus if hist_counts.max() > t1_value: return 0 # Returns perfect consensus # Otherwise, we have to do more computations normalised_group_count = [] number_of_bins = [] group_distance = [] local_group_count = 0 local_bin_count = 0 local_group_distance = 0 count_group_distance = False for bin_value in hist_counts: # go bin by bin if bin_value > t2_value: # A new group starts, or continues, i.e. green or red bins local_group_count += bin_value # increase the count of the group local_bin_count += 1 # increase the number of bins contained in that group if count_group_distance and (local_group_distance > 0): # Ie we are in a group, but the previous group distance was not stored, store that information # and initialize the group distance to zero group_distance.append(local_group_distance) local_group_distance = 0 else: # if this is a blue bin if local_group_count > 0: # means that a group is closed behind, so information needs to be stored about that group normalised_group_count.append(local_group_count) number_of_bins.append(local_bin_count) # also, it means that it is necessary to start counting group distance count_group_distance = True # start counting bin distance local_group_distance = 0 # initialize at zero if count_group_distance: local_group_distance += 1 # increase the group distance by one # reset the group and bin count local_group_count = 0 local_bin_count = 0 # if there was a final group, it is necessary to add it if local_group_count > 0: normalised_group_count.append(local_group_count) number_of_bins.append(local_bin_count) # group_distance.append(local_group_distance) # no distance needs to be added # Now, with this new information, we can further classify the histogram if (len(normalised_group_count) == 1) and (number_of_bins[0] <= b_value) and (normalised_group_count[0] > 50): return 1 # Consensus if (len(normalised_group_count) == 2) and (number_of_bins[0] <= b_value) and (number_of_bins[1] <= b_value) \ and (group_distance[0] >= k_value) and ((normalised_group_count[0] + normalised_group_count[1]) > 50): return 2 # Polarisation normalised_group_count = np.array(normalised_group_count) number_of_bins = np.array(number_of_bins) if number_of_bins.shape[0] == 0: max_bin_count = 0 else: max_bin_count = number_of_bins.max() if (len(normalised_group_count) >= 2) and (max_bin_count <= b_value) and (normalised_group_count.sum() > 50): # COMMENTED (in part) TO FIX TEMPORARILY CIRCULAR DEPENDENCIES WITH THE plot_functions MODULE # fig = plt.figure(figsize=(10, 7)) # ax = fig.add_subplot(111) # plot_histogram(ax, opinion_distribution, num_bins=10) # ax.plot([-1, 1], [12, 12], color=(1, 0, 0)) return 3 # Clustering return 4 # Dissensus
[docs]def modify_mean(weights, des_mean, epsilon=0.05, max_counter=100, limits=(0, 1)): """ Parameters ---------- weights: initial weights. des_mean: desired weight mean. epsilon: tolerance. max_counter: maximum number of iterations. limits: the limits of the weights. Returns ------- modified weights with the desired mean. """ weights_mean = weights.mean() mean_difference = np.absolute(weights_mean - des_mean) counter = 0 while (mean_difference > epsilon) and (counter < max_counter): counter += 1 m1 = (des_mean + 1) / (weights_mean + 1) m2 = (des_mean - 1) / (weights_mean - 1) for count, old_weight in enumerate(weights): if old_weight <= weights_mean: weights[count] = ((old_weight + 1) * m1) - 1 else: weights[count] = ((old_weight - 1) * m2) + 1 # Truncate the weight values weights = np.maximum(np.minimum(weights, limits[1]), limits[0]) weights_mean = weights.mean() mean_difference = np.absolute(weights_mean - des_mean) return weights
[docs]def make_row_stochastic(matrix): """ Function that takes a matrix and makes it row-stochastic Parameters ---------- matrix: original matrix Returns ------- This function does not return anything, as it modifies the passed matrix """ for id_row in range(0, matrix.shape[0]): denominator = matrix[id_row, :].sum() if denominator == 0: # If the denominator is zero, then amke every element in the row have the same weight matrix[id_row, :] = np.ones(matrix.shape[1])*(1/matrix.shape[1]) else: matrix[id_row, :] = matrix[id_row, :] * (1 / denominator)