Skip to content

Utilities Module

Utility functions used throughout pyERGM.

Random Seed

set_seed

set_seed(seed)

Set random seed for reproducibility across all libraries.

Sets seeds for numpy, Python's random module, and numba-jitted functions.

Parameters:

Name Type Description Default
seed int

Random seed value.

required
Source code in pyERGM/utils.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def set_seed(seed):
    """
    Set random seed for reproducibility across all libraries.

    Sets seeds for numpy, Python's random module, and numba-jitted functions.

    Parameters
    ----------
    seed : int
        Random seed value.
    """
    np.random.seed(seed)
    random.seed(seed)
    _numba_seed(seed)

Network Conversion

construct_adj_mat_from_int

construct_adj_mat_from_int(int_code: int, num_nodes: int, is_directed: bool) -> np.ndarray

Convert an integer to its corresponding adjacency matrix.

Used for exhaustive enumeration in BruteForceERGM. Each integer represents a unique network configuration via binary encoding.

Parameters:

Name Type Description Default
int_code int

Integer encoding of the network (0 to 2^(n*(n-1)) for directed networks).

required
num_nodes int

Number of nodes in the network.

required
is_directed bool

Whether the network is directed.

required

Returns:

Type Description
ndarray

Adjacency matrix of shape (num_nodes, num_nodes).

Source code in pyERGM/utils.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def construct_adj_mat_from_int(int_code: int, num_nodes: int, is_directed: bool) -> np.ndarray:
    """
    Convert an integer to its corresponding adjacency matrix.

    Used for exhaustive enumeration in BruteForceERGM. Each integer represents
    a unique network configuration via binary encoding.

    Parameters
    ----------
    int_code : int
        Integer encoding of the network (0 to 2^(n*(n-1)) for directed networks).
    num_nodes : int
        Number of nodes in the network.
    is_directed : bool
        Whether the network is directed.

    Returns
    -------
    np.ndarray
        Adjacency matrix of shape (num_nodes, num_nodes).
    """
    num_pos_connects = num_nodes * (num_nodes - 1)
    if not is_directed:
        num_pos_connects //= 2
    adj_mat_str = f'{int_code:0{num_pos_connects}b}'
    mat_entries_arr = np.array(list(adj_mat_str), 'uint8')
    return reshape_flattened_off_diagonal_elements_to_square(mat_entries_arr, is_directed)

construct_int_from_adj_mat

construct_int_from_adj_mat(adj_mat: np.ndarray, is_directed: bool) -> int

Convert an adjacency matrix to its integer encoding.

Inverse operation of construct_adj_mat_from_int.

Parameters:

Name Type Description Default
adj_mat ndarray

Adjacency matrix of shape (n, n).

required
is_directed bool

Whether the network is directed.

required

Returns:

Type Description
int

Integer encoding of the network.

Raises:

Type Description
ValueError

If adjacency matrix dimensions are invalid.

Source code in pyERGM/utils.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def construct_int_from_adj_mat(adj_mat: np.ndarray, is_directed: bool) -> int:
    """
    Convert an adjacency matrix to its integer encoding.

    Inverse operation of construct_adj_mat_from_int.

    Parameters
    ----------
    adj_mat : np.ndarray
        Adjacency matrix of shape (n, n).
    is_directed : bool
        Whether the network is directed.

    Returns
    -------
    int
        Integer encoding of the network.

    Raises
    ------
    ValueError
        If adjacency matrix dimensions are invalid.
    """
    if len(adj_mat.shape) != 2 or adj_mat.shape[0] != adj_mat.shape[1]:
        raise ValueError(f"The dimensions of the given adjacency matrix {adj_mat.shape} are not valid for an "
                         f"adjacency matrix (should be a 2D squared matrix)")
    adj_mat_no_diag = flatten_square_matrix_to_edge_list(adj_mat, is_directed)
    return round((adj_mat_no_diag * 2 ** np.arange(adj_mat_no_diag.size - 1, -1, -1).astype(np.ulonglong)).sum())

Matrix Operations

flatten_square_matrix_to_edge_list

flatten_square_matrix_to_edge_list(square_mat: np.ndarray, is_directed: bool) -> np.ndarray
Source code in pyERGM/utils.py
1022
1023
1024
1025
1026
1027
1028
1029
1030
def flatten_square_matrix_to_edge_list(square_mat: np.ndarray, is_directed: bool) -> np.ndarray:
    if square_mat.ndim != 2 or square_mat.shape[0] != square_mat.shape[1]:
        raise ValueError("The input must be a square matrix")
    if is_directed:
        return square_mat[~np.eye(square_mat.shape[0], dtype=bool)].flatten()
    else:
        if np.any(square_mat != square_mat.T):
            raise ValueError("Got an asymmetric matrix as an undirected network to flatten")
        return square_mat[np.triu_indices_from(square_mat, k=1)]

reshape_flattened_off_diagonal_elements_to_square

reshape_flattened_off_diagonal_elements_to_square(flattened_array: np.ndarray, is_directed: bool, flat_mask: npt.NDArray[bool] | None = None) -> np.ndarray
Source code in pyERGM/utils.py
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
def reshape_flattened_off_diagonal_elements_to_square(
        flattened_array: np.ndarray,
        is_directed: bool,
        flat_mask: npt.NDArray[bool] | None = None
) -> np.ndarray:
    if flat_mask is not None:
        if flat_mask.sum() != flattened_array.size:
            raise ValueError(
                f"Received incompatible flattened_array and mask. flat_mask.sum(): "
                f"{flat_mask.sum()}, flattened_array.size: {flattened_array.size}, but should be equal."
            )
        full_array = np.full(flat_mask.size, fill_value=np.nan)
        full_array[flat_mask] = flattened_array
        num_nodes = num_edges_to_num_nodes(flat_mask.size, is_directed)
    else:
        full_array = flattened_array
        num_nodes = num_edges_to_num_nodes(flattened_array.size, is_directed)
    reshaped = np.zeros((num_nodes, num_nodes), dtype=flattened_array.dtype)
    _set_off_diagonal_elements_from_array(reshaped, full_array)
    return reshaped

expand_net_dims

expand_net_dims(net: np.ndarray) -> np.ndarray

Ensure network array has 3 dimensions (n, n, num_networks).

Adds a singleton dimension if the input is 2D.

Parameters:

Name Type Description Default
net ndarray

Network array of shape (n, n) or (n, n, num_networks).

required

Returns:

Type Description
ndarray

Network array of shape (n, n, num_networks).

Raises:

Type Description
ValueError

If array is not 2D or 3D.

Source code in pyERGM/utils.py
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
def expand_net_dims(net: np.ndarray) -> np.ndarray:
    """
    Ensure network array has 3 dimensions (n, n, num_networks).

    Adds a singleton dimension if the input is 2D.

    Parameters
    ----------
    net : np.ndarray
        Network array of shape (n, n) or (n, n, num_networks).

    Returns
    -------
    np.ndarray
        Network array of shape (n, n, num_networks).

    Raises
    ------
    ValueError
        If array is not 2D or 3D.
    """
    if net.ndim == 2:
        # a single network
        return net[..., np.newaxis]
    elif net.ndim != 3:
        raise ValueError("Cannot expand dims to an array that is not 2 or 3 dimensional")
    return net

Covariance Estimation

covariance_matrix_estimation

covariance_matrix_estimation(features_of_net_samples: np.ndarray, mean_features_of_net_samples: np.ndarray, method: CovMatrixEstimationMethod = CovMatrixEstimationMethod.NAIVE, num_batches: int = 25) -> np.ndarray

Approximate the covariance matrix of the model's features

Parameters:

Name Type Description Default
features_of_net_samples ndarray

The calculated features of the networks that are used for the approximation. Of dimensions (num_features X sample_size)

required
mean_features_of_net_samples ndarray

The mean of the features across the samples (a vector of size num_features)

required
method CovMatrixEstimationMethod

the method to use for approximating the covariance matrix currently supported options are: CovMatrixEstimationMethod.NAIVE A naive estimation from the sample: E[gi*gj] - E[gi]E[gj] CovMatrixEstimationMethod.BATCH based on difference of means of sample batches from the total mean, as in Geyer's handbook of MCMC (there it is stated for the univariate case, but the generalization is straight forward). CovMatrixEstimationMethod.MULTIVARIATE_INITIAL_SEQUENCE Following Dai and Jones 2017 - the first estimator in section 3.1 (denoted mIS).

NAIVE
num_batches int

The batch size for CovMatrixEstimationMethod.BATCH (default is 25).

25

Returns:

Type Description
The covariance matrix estimation (num_features X num_features).
Source code in pyERGM/utils.py
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
@njit
def covariance_matrix_estimation(
        features_of_net_samples: np.ndarray,
        mean_features_of_net_samples: np.ndarray,
        method: CovMatrixEstimationMethod = CovMatrixEstimationMethod.NAIVE,
        num_batches: int = 25,
) -> np.ndarray:
    """
    Approximate the covariance matrix of the model's features
    Parameters
    ----------
    features_of_net_samples
        The calculated features of the networks that are used for the approximation. Of dimensions
        (num_features X sample_size)
    mean_features_of_net_samples
        The mean of the features across the samples (a vector of size num_features)
    method
        the method to use for approximating the covariance matrix
        currently supported options are:
            CovMatrixEstimationMethod.NAIVE
                A naive estimation from the sample: E[gi*gj] - E[gi]E[gj]
            CovMatrixEstimationMethod.BATCH
                based on difference of means of sample batches from the total mean, as in Geyer's handbook of
                MCMC (there it is stated for the univariate case, but the generalization is straight forward).
            CovMatrixEstimationMethod.MULTIVARIATE_INITIAL_SEQUENCE
                Following Dai and Jones 2017 - the first estimator in section 3.1 (denoted mIS).
    num_batches:
        The batch size for CovMatrixEstimationMethod.BATCH (default is 25).

    Returns
    -------
    The covariance matrix estimation (num_features X num_features).
    """
    num_features = features_of_net_samples.shape[0]
    sample_size = features_of_net_samples.shape[1]
    match method:
        case CovMatrixEstimationMethod.NAIVE.value:
            # An outer product of the means (E[gi]E[gj])
            cross_prod_mean_features = (mean_features_of_net_samples.reshape(num_features, 1) @
                                        mean_features_of_net_samples.T.reshape(1, num_features))
            # A mean of the outer products of the sample (E[gi*gj])
            features_cross_prods = np.zeros((sample_size, num_features, num_features))
            for i in range(sample_size):
                features_cross_prods[i] = np.outer(features_of_net_samples[:, i], features_of_net_samples[:, i])
            mean_features_cross_prod = features_cross_prods.sum(axis=0) / sample_size

            return mean_features_cross_prod - cross_prod_mean_features

        case CovMatrixEstimationMethod.BATCH.value:
            while sample_size % num_batches != 0:
                num_batches += 1
            batch_size = sample_size // num_batches

            # Divide the sample into batches, and calculate the mean of each one of them
            batches_means = np.zeros((num_features, num_batches))
            for i in range(num_batches):
                batches_means[:, i] = features_of_net_samples[:, i * batch_size:(i + 1) * batch_size].sum(
                    axis=1) / batch_size

            diff_of_global_mean = batches_means - mean_features_of_net_samples.reshape(num_features, 1)

            # Average the outer products of the differences between batch means and the global mean and multiply by the
            # batch size to compensate for the aggregation into batches
            diff_of_global_mean_outer_prods = np.zeros((num_batches, num_features, num_features))
            for i in range(num_batches):
                diff_of_global_mean_outer_prods[i] = np.outer(diff_of_global_mean[:, i], diff_of_global_mean[:, i])
            batches_cov_mat_est = batch_size * diff_of_global_mean_outer_prods.sum(axis=0) / num_batches

            return batches_cov_mat_est

        case CovMatrixEstimationMethod.MULTIVARIATE_INITIAL_SEQUENCE.value:
            features_mean_diff = features_of_net_samples - mean_features_of_net_samples.reshape(num_features, 1)

            # In this method, we sum up capital gammas, and choose where to cut the tail (which corresponds to estimates
            # of auto-correlations with large lags within the chain. Naturally, as the lag increases the estimation
            # becomes worse, so the magic here is to determine where to cut). So we calculate the estimates one by one and
            # evaluate the conditions for cutting.
            # The first condition is to have an index where the estimation is positive-definite, namely all eigen-values
            # are positive. As both gamma_0 (which is auto_corr_funcs[0]) and the capital gammas are symmetric, all
            # the sum of them is allways symmetric, which ensures real eigen values, and we can simply calculate the
            # eigen value with the smallest algebraic value to determine whether all of them are positive.
            is_positive = False
            first_pos_def_idx = 0
            cur_pos_cov_mat_est = np.zeros((num_features, num_features))
            gamma_hat_0 = approximate_kth_auto_correlation_function(features_mean_diff, 0)
            while not is_positive:
                if first_pos_def_idx == sample_size // 2:
                    raise RuntimeError("Got a sample with no valid multivariate_initial_sequence covariance matrix "
                                       "estimation (no possibility is positive-definite). Consider increasing sample size"
                                       " or using a different covariance matrix estimation method.")
                cur_capital_gamma = calc_kth_capital_gamma(features_mean_diff, first_pos_def_idx)
                if first_pos_def_idx == 0:
                    cur_pos_cov_mat_est = -gamma_hat_0 + 2 * cur_capital_gamma
                else:
                    cur_pos_cov_mat_est += 2 * cur_capital_gamma
                cur_smallest_eigen_val = np.linalg.eigvalsh(cur_pos_cov_mat_est)[0]
                if cur_smallest_eigen_val > 0:
                    is_positive = True
                else:
                    first_pos_def_idx += 1

            # Now we find the farthest idx after first_pos_def_idx for which the sequence of determinants is strictly
            # monotonically increasing.
            do_dets_increase = True
            cutting_idx = first_pos_def_idx
            cur_det = np.linalg.det(cur_pos_cov_mat_est)
            while do_dets_increase and cutting_idx < sample_size // 2:
                cur_capital_gamma = calc_kth_capital_gamma(features_mean_diff, cutting_idx + 1)
                next_pos_cov_mat_est = cur_pos_cov_mat_est + 2 * cur_capital_gamma
                next_det = np.linalg.det(next_pos_cov_mat_est)
                if next_det <= cur_det:
                    do_dets_increase = False
                else:
                    cutting_idx += 1
                    cur_pos_cov_mat_est = next_pos_cov_mat_est
                    cur_det = next_det

            return cur_pos_cov_mat_est

        case _:
            raise ValueError(f"{method} is an unsupported method for covariance matrix estimation")

Sampling Utilities

sample_from_independent_probabilities_matrix

sample_from_independent_probabilities_matrix(probability_matrix: npt.NDArray[np.floating], sample_size: int, is_directed: bool, replace: bool = True) -> npt.NDArray[np.floating]

Sample connectivity matrices from a matrix representing the independent probability of an edge between nodes (i, j).

Parameters:

Name Type Description Default
probability_matrix ndarray

Matrix where entry (i,j) is the probability of edge i→j. NaN values indicate masked (ignored) edges.

required
sample_size int

Number of networks to sample.

required
is_directed bool

Whether the network is directed.

required
replace bool

If True (default), sample with replacement (networks may repeat). If False, sample without replacement (all networks unique). Uses adaptive batch sampling with uniqueness checking.

True

Returns:

Type Description
ndarray

Sampled networks of shape (n, n, sample_size).

Raises:

Type Description
RuntimeError

If replace=False and unable to generate enough unique networks (typically due to low model entropy).

Notes

When replace=False, uses adaptive batch sampling: - Samples in batches, keeps only unique networks - Adjusts batch size based on observed duplication rate - Stops if duplication rate becomes too high (>95%)

If the probability matrix contains np.nan values (that designate masked entries), the output sample will have np.nans in corresponding coordinates. (i.e., if np.isnan(probability_matrix[i, j]) then np.all(np.isnan(returned_sample[i, j, ...))).

Source code in pyERGM/utils.py
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
def sample_from_independent_probabilities_matrix(
        probability_matrix: npt.NDArray[np.floating],
        sample_size: int,
        is_directed: bool,
        replace: bool = True
) -> npt.NDArray[np.floating]:
    """
    Sample connectivity matrices from a matrix representing the independent probability of an edge between nodes (i, j).

    Parameters
    ----------
    probability_matrix : np.ndarray
        Matrix where entry (i,j) is the probability of edge i→j.
        NaN values indicate masked (ignored) edges.
    sample_size : int
        Number of networks to sample.
    is_directed : bool
        Whether the network is directed.
    replace : bool, optional
        If True (default), sample with replacement (networks may repeat).
        If False, sample without replacement (all networks unique).
        Uses adaptive batch sampling with uniqueness checking.

    Returns
    -------
    np.ndarray
        Sampled networks of shape (n, n, sample_size).

    Raises
    ------
    RuntimeError
        If replace=False and unable to generate enough unique networks
        (typically due to low model entropy).

    Notes
    -----
    When replace=False, uses adaptive batch sampling:
    - Samples in batches, keeps only unique networks
    - Adjusts batch size based on observed duplication rate
    - Stops if duplication rate becomes too high (>95%)

    If the probability matrix contains np.nan values (that designate masked entries),
    the output sample will have np.nans in corresponding coordinates.
    (i.e., if np.isnan(probability_matrix[i, j]) then np.all(np.isnan(returned_sample[i, j, ...))).
    """
    if replace:
        return _sample_edges_with_replacement(probability_matrix, sample_size, is_directed)

    # Sampling without replacement
    return _sample_unique_networks_adaptive(
        probability_matrix, sample_size, is_directed, is_dyadic=False
    )

sample_from_dyads_distribution

sample_from_dyads_distribution(dyads_distributions, sample_size, replace: bool = True)

Sample networks from dyadic state probability distributions.

Used for exact sampling from MPLE_RECIPROCITY models.

Parameters:

Name Type Description Default
dyads_distributions ndarray

Probability distributions over dyadic states, shape (n_choose_2, 4).

required
sample_size int

Number of networks to sample.

required
replace bool

If True (default), sample with replacement (networks may repeat). If False, sample without replacement (all networks unique). Uses adaptive batch sampling with uniqueness checking.

True

Returns:

Type Description
ndarray

Sampled networks of shape (n, n, sample_size).

Raises:

Type Description
RuntimeError

If replace=False and unable to generate enough unique networks (typically due to low model entropy).

Source code in pyERGM/utils.py
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
def sample_from_dyads_distribution(dyads_distributions, sample_size, replace: bool = True):
    """
    Sample networks from dyadic state probability distributions.

    Used for exact sampling from MPLE_RECIPROCITY models.

    Parameters
    ----------
    dyads_distributions : np.ndarray
        Probability distributions over dyadic states, shape (n_choose_2, 4).
    sample_size : int
        Number of networks to sample.
    replace : bool, optional
        If True (default), sample with replacement (networks may repeat).
        If False, sample without replacement (all networks unique).
        Uses adaptive batch sampling with uniqueness checking.

    Returns
    -------
    np.ndarray
        Sampled networks of shape (n, n, sample_size).

    Raises
    ------
    RuntimeError
        If replace=False and unable to generate enough unique networks
        (typically due to low model entropy).
    """
    if replace:
        return _sample_dyads_with_replacement(dyads_distributions, sample_size)

    return _sample_unique_networks_adaptive(
        dyads_distributions, sample_size, is_directed=True, is_dyadic=True
    )

Entropy and Information

calc_entropy_independent_probability_matrix

calc_entropy_independent_probability_matrix(prob_mat: np.ndarray, is_directed: bool, reduction: Reduction = Reduction.SUM, eps: float = 1e-10) -> float | np.ndarray
Source code in pyERGM/utils.py
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
def calc_entropy_independent_probability_matrix(
        prob_mat: np.ndarray,
        is_directed: bool,
        reduction: Reduction = Reduction.SUM,
        eps: float = 1e-10
) -> float | np.ndarray:
    flat_no_diag_probs = flatten_square_matrix_to_edge_list(prob_mat, is_directed)
    flat_no_diag_probs = flat_no_diag_probs[~np.isnan(flat_no_diag_probs)]
    flat_clipped_no_diag_probs = np.clip(flat_no_diag_probs, eps, 1 - eps)

    entropy_per_entry = -(
            flat_clipped_no_diag_probs * np.log2(flat_clipped_no_diag_probs) +
            (1 - flat_clipped_no_diag_probs) * np.log2(1 - flat_clipped_no_diag_probs)
    )
    return reduce_individual_elements(entropy_per_entry, reduction)

calc_entropy_dyads_dists

calc_entropy_dyads_dists(dyads_distributions: np.ndarray, reduction: Reduction = Reduction.SUM, eps: float = 1e-10) -> float | np.ndarray
Source code in pyERGM/utils.py
1129
1130
1131
1132
1133
1134
1135
1136
def calc_entropy_dyads_dists(
        dyads_distributions: np.ndarray,
        reduction: Reduction = Reduction.SUM,
        eps: float = 1e-10
) -> float | np.ndarray:
    clipped_dyads_dists = np.clip(dyads_distributions, a_min=eps, a_max=1 - eps)
    entropy_per_dyad = -(clipped_dyads_dists * np.log2(clipped_dyads_dists)).sum(axis=1)
    return reduce_individual_elements(entropy_per_dyad, reduction)