Skip to content

Sampling Module

This module provides MCMC sampling algorithms for generating networks from ERGM models.

Sampler

Base class for samplers.

Sampler

Bases: ABC

Abstract base class for ERGM network samplers.

Parameters:

Name Type Description Default
thetas ndarray

ERGM model parameters (coefficients).

required
metrics_collection MetricsCollection

Collection of metrics defining the ERGM model.

required
Source code in pyERGM/sampling.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class Sampler(ABC):
    """
    Abstract base class for ERGM network samplers.

    Parameters
    ----------
    thetas : np.ndarray
        ERGM model parameters (coefficients).
    metrics_collection : MetricsCollection
        Collection of metrics defining the ERGM model.
    """

    def __init__(self, thetas, metrics_collection: MetricsCollection):
        self.thetas = deepcopy(thetas)
        self.metrics_collection = metrics_collection

    @abstractmethod
    def sample(self, initial_state, n_iterations):
        """Generate network samples. Must be implemented by subclasses."""
        pass

__init__

__init__(thetas, metrics_collection: MetricsCollection)
Source code in pyERGM/sampling.py
24
25
26
def __init__(self, thetas, metrics_collection: MetricsCollection):
    self.thetas = deepcopy(thetas)
    self.metrics_collection = metrics_collection

sample abstractmethod

sample(initial_state, n_iterations)

Generate network samples. Must be implemented by subclasses.

Source code in pyERGM/sampling.py
28
29
30
31
@abstractmethod
def sample(self, initial_state, n_iterations):
    """Generate network samples. Must be implemented by subclasses."""
    pass

NaiveMetropolisHastings

Implementation of the Metropolis-Hastings algorithm for ERGM sampling.

NaiveMetropolisHastings

Bases: Sampler

Source code in pyERGM/sampling.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
class NaiveMetropolisHastings(Sampler):
    def __init__(self, thetas, metrics_collection: MetricsCollection):
        """
        An implementation for the symmetric proposal Metropolis-Hastings algorithm for ERGMS, using the logit
        of the acceptance rate. See docs for more details.
        Throughout this implementation, networks are represented as adjacency matrices.

        Parameters
        ----------
        thetas : np.ndarray
            Coefficients of the ERGM
        metrics_collection : MetricsCollection
            A MetricsCollection object that can calculate statistics of a network.
        """
        super().__init__(thetas, metrics_collection)

        self._edge_proposal_dists = {}

    def set_thetas(self, thetas):
        """
        Update the model parameters.

        Parameters
        ----------
        thetas : np.ndarray
            New ERGM coefficients.
        """
        self.thetas = deepcopy(thetas)

    def _calculate_weighted_change_score(self, current_network, indices: tuple[int, int]):
        """
        Calculate g(proposed_network)-g(current_network) and then inner product with thetas.
        """
        change_score = self.metrics_collection.calc_change_scores(current_network, indices)
        return np.dot(self.thetas, change_score)

    def _flip_network_edge(self, current_network, i, j):
        """
        Flip the edge between nodes i, j. If it's an undirected network, we flip entries W_i,j and W_j,i.
        NOTE! This function changes the network that is passed by reference
        """
        current_network[i, j] = 1 - current_network[i, j]
        if not self.metrics_collection.is_directed:
            current_network[j, i] = 1 - current_network[j, i]

    def _calc_edge_influence_on_features(self, net_for_change_scores: np.ndarray):
        """
        Calculates the influence of each edge on each one of the features in the MetricsCollection.
        The influence of an edge on a feature is defined as the ratio between the change in the feature that flipping
        the edge would cause divided by the average change over all edges.
        If the average influence is 0 (the feature does not change by flipping any edge), the influence of all edges is
        0.
        For example, each edge has an influence of 1 on NumberOfEdges (no matter what edge we flip, it changes by 1).
        Similarly, an edge (i,j) has an influence of n on OutDegree of node i, and 0 on OutDegree of other nodes (the
        out degree of node 1 would change by 1 as a result of flipping the edge, and there are exactly n-1 such edges,
        out of the n(n-1) edges in the matrix, so the average influence is 1/n).
        Parameters
        ----------
        net_for_change_scores : np.ndarray
            The network using which the change scores for all edges is calculated.
        Returns
        -------
        A n(n-1) vector with the sum of influences of each edge over all features (total influence of each edge).
        """
        change_scores = self.metrics_collection.prepare_mple_regressors(net_for_change_scores)
        mean_importance_per_feature = change_scores.mean(axis=0)
        change_scores[:, mean_importance_per_feature != 0] /= mean_importance_per_feature[
            None, mean_importance_per_feature != 0]
        return change_scores.sum(axis=1)

    def _calc_proposal_dist_features_influence__sum(self, net_for_change_scores: np.ndarray):
        """
        Calculate a proposal distribution over edges such that edges with high influence will get higher probabilities.
        The transformation of total influences into a distribution is done by normalized using the sum of total
        influences.
        Parameters
        ----------
        net_for_change_scores : np.ndarray
            The network using which the change scores for all edges is calculated.
        """
        edge_influence = self._calc_edge_influence_on_features(net_for_change_scores)
        self._edge_proposal_dists[EdgeProposalMethod.FEATURES_INFLUENCE_SUM] = edge_influence / edge_influence.sum()

    def _calc_proposal_dist_features_influence__softmax(self, net_for_change_scores: np.ndarray):
        """
        Calculate a proposal distribution over edges such that edges with high influence will get higher probabilities.
        The transformation of total influences into a distribution is done by softmax.
        Parameters
        ----------
        net_for_change_scores : np.ndarray
            The network using which the change scores for all edges is calculated.
        """
        edge_influence = self._calc_edge_influence_on_features(net_for_change_scores)
        self._edge_proposal_dists[EdgeProposalMethod.FEATURES_INFLUENCE_SOFTMAX] = softmax(edge_influence)

    def _get_node_pair_flip_indices(
            self,
            num_flips: int,
            net_size: int,
            current_network: np.ndarray,
            edge_proposal_method: EdgeProposalMethod,
    ) -> npt.NDArray[int]:
        if edge_proposal_method == EdgeProposalMethod.UNIFORM:
            edges_to_flip = get_uniform_random_edges_to_flip(net_size, num_flips)
        elif edge_proposal_method == EdgeProposalMethod.FEATURES_INFLUENCE_SUM:
            if edge_proposal_method not in self._edge_proposal_dists.keys():
                self._calc_proposal_dist_features_influence__sum(current_network)
            edges_to_flip = get_custom_distribution_random_edges_to_flip(num_flips, self._edge_proposal_dists[
                EdgeProposalMethod.FEATURES_INFLUENCE_SUM], self.metrics_collection.is_directed)
        elif edge_proposal_method == EdgeProposalMethod.FEATURES_INFLUENCE_SOFTMAX:
            if edge_proposal_method not in self._edge_proposal_dists.keys():
                self._calc_proposal_dist_features_influence__softmax(current_network)
            edges_to_flip = get_custom_distribution_random_edges_to_flip(num_flips, self._edge_proposal_dists[
                EdgeProposalMethod.FEATURES_INFLUENCE_SOFTMAX], self.metrics_collection.is_directed)
        else:
            raise ValueError(f"Got an unsupported edge proposal method {edge_proposal_method}")
        return edges_to_flip

    def sample(
            self,
            initial_state,
            num_of_nets,
            replace=True,
            edge_proposal_method: EdgeProposalMethod = EdgeProposalMethod.UNIFORM,
            burn_in: int | None = None,
            steps_per_sample: int | None = None,
    ):
        """
        Sample networks using the Metropolis-Hastings algorithm.

        Parameters
        ----------
        initial_state : np.ndarray
            The initial network to start the Markov Chain from

        num_of_nets : int
            The number of networks to sample

        burn_in : int
            Optional. The number of burn-in steps for the sampler (number of steps in the chain that are discarded
            before the sampler starts to take samples). *Defaults to 100 * net_size**2*.

        steps_per_sample : int
            Optional. The number of steps to advance the chain between samples. *Defaults to net_size**2*.

        replace : bool
            A boolean flag indicating whether we sample with our without replacement. replace=True means networks can be
            duplicated.

        edge_proposal_method : str
            Optional. The method for the MCMC proposal distribution. This is defined as a distribution over the edges
            of the network, which implies how to choose a proposed graph out of all graphs that are 1-edge-away from the
            current graph. *Defaults to "uniform"*.
        """
        current_network = initial_state.copy()

        net_size = current_network.shape[0]

        sampled_networks = np.zeros((net_size, net_size, num_of_nets))

        if burn_in is None:
            burn_in = 100 * (net_size ** 2)

        if steps_per_sample is None:
            steps_per_sample = net_size ** 2

        num_flips = burn_in + (num_of_nets * steps_per_sample)

        # Generate initial batch of edges and random numbers
        edges_to_flip = self._get_node_pair_flip_indices(
            num_flips=num_flips,
            net_size=net_size,
            current_network=current_network,
            edge_proposal_method=edge_proposal_method,
        )
        random_nums_for_change_acceptance = np.random.rand(edges_to_flip.shape[1])
        edges_batch_start = 0  # Iteration count when current batch started

        networks_count = 0
        mcmc_iter_count = 0

        t1 = time.time()
        while networks_count != num_of_nets:
            # Calculate offset within current batch
            offset_in_batch = mcmc_iter_count - edges_batch_start

            # Check if we need a new batch of edges (handles case where non-replacement needs retries)
            if offset_in_batch >= edges_to_flip.shape[1]:
                edges_batch_start = mcmc_iter_count
                remaining_networks = num_of_nets - networks_count
                num_flips_needed = remaining_networks * steps_per_sample
                edges_to_flip = self._get_node_pair_flip_indices(
                    num_flips=num_flips_needed,
                    net_size=net_size,
                    current_network=current_network,
                    edge_proposal_method=edge_proposal_method,
                )
                random_nums_for_change_acceptance = np.random.rand(edges_to_flip.shape[1])
                offset_in_batch = 0

            # Edge flip:
            random_edge_entry = (
                int(edges_to_flip[0, offset_in_batch]),
                int(edges_to_flip[1, offset_in_batch])
            )

            change_score = self._calculate_weighted_change_score(current_network, random_edge_entry)

            rand_num = random_nums_for_change_acceptance[offset_in_batch]
            perform_change = change_score >= 1 or rand_num <= min(1, np.exp(change_score))
            if perform_change:
                self._flip_network_edge(current_network, random_edge_entry[0], random_edge_entry[1])

            if mcmc_iter_count >= burn_in and (mcmc_iter_count - burn_in) % steps_per_sample == 0:
                sampled_networks[:, :, networks_count] = current_network

                if not replace:
                    if np.unique(sampled_networks[:, :, :networks_count + 1], axis=2).shape[2] == networks_count + 1:
                        networks_count += 1
                    else:
                        sampled_networks[:, :, networks_count] = np.zeros((net_size, net_size))
                else:
                    networks_count += 1
                    t2 = time.time()
                    if networks_count % 100 == 0:
                        logger.debug(f"Sampled {networks_count}/{num_of_nets} networks, time taken: {t2 - t1:.2f}s")

            mcmc_iter_count += 1
        return sampled_networks

__init__

__init__(thetas, metrics_collection: MetricsCollection)

An implementation for the symmetric proposal Metropolis-Hastings algorithm for ERGMS, using the logit of the acceptance rate. See docs for more details. Throughout this implementation, networks are represented as adjacency matrices.

Parameters:

Name Type Description Default
thetas ndarray

Coefficients of the ERGM

required
metrics_collection MetricsCollection

A MetricsCollection object that can calculate statistics of a network.

required
Source code in pyERGM/sampling.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def __init__(self, thetas, metrics_collection: MetricsCollection):
    """
    An implementation for the symmetric proposal Metropolis-Hastings algorithm for ERGMS, using the logit
    of the acceptance rate. See docs for more details.
    Throughout this implementation, networks are represented as adjacency matrices.

    Parameters
    ----------
    thetas : np.ndarray
        Coefficients of the ERGM
    metrics_collection : MetricsCollection
        A MetricsCollection object that can calculate statistics of a network.
    """
    super().__init__(thetas, metrics_collection)

    self._edge_proposal_dists = {}

sample

sample(initial_state, num_of_nets, replace=True, edge_proposal_method: EdgeProposalMethod = EdgeProposalMethod.UNIFORM, burn_in: int | None = None, steps_per_sample: int | None = None)

Sample networks using the Metropolis-Hastings algorithm.

Parameters:

Name Type Description Default
initial_state ndarray

The initial network to start the Markov Chain from

required
num_of_nets int

The number of networks to sample

required
burn_in int

Optional. The number of burn-in steps for the sampler (number of steps in the chain that are discarded before the sampler starts to take samples). Defaults to 100 * net_size2.

None
steps_per_sample int

Optional. The number of steps to advance the chain between samples. Defaults to net_size2.

None
replace bool

A boolean flag indicating whether we sample with our without replacement. replace=True means networks can be duplicated.

True
edge_proposal_method str

Optional. The method for the MCMC proposal distribution. This is defined as a distribution over the edges of the network, which implies how to choose a proposed graph out of all graphs that are 1-edge-away from the current graph. Defaults to "uniform".

UNIFORM
Source code in pyERGM/sampling.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def sample(
        self,
        initial_state,
        num_of_nets,
        replace=True,
        edge_proposal_method: EdgeProposalMethod = EdgeProposalMethod.UNIFORM,
        burn_in: int | None = None,
        steps_per_sample: int | None = None,
):
    """
    Sample networks using the Metropolis-Hastings algorithm.

    Parameters
    ----------
    initial_state : np.ndarray
        The initial network to start the Markov Chain from

    num_of_nets : int
        The number of networks to sample

    burn_in : int
        Optional. The number of burn-in steps for the sampler (number of steps in the chain that are discarded
        before the sampler starts to take samples). *Defaults to 100 * net_size**2*.

    steps_per_sample : int
        Optional. The number of steps to advance the chain between samples. *Defaults to net_size**2*.

    replace : bool
        A boolean flag indicating whether we sample with our without replacement. replace=True means networks can be
        duplicated.

    edge_proposal_method : str
        Optional. The method for the MCMC proposal distribution. This is defined as a distribution over the edges
        of the network, which implies how to choose a proposed graph out of all graphs that are 1-edge-away from the
        current graph. *Defaults to "uniform"*.
    """
    current_network = initial_state.copy()

    net_size = current_network.shape[0]

    sampled_networks = np.zeros((net_size, net_size, num_of_nets))

    if burn_in is None:
        burn_in = 100 * (net_size ** 2)

    if steps_per_sample is None:
        steps_per_sample = net_size ** 2

    num_flips = burn_in + (num_of_nets * steps_per_sample)

    # Generate initial batch of edges and random numbers
    edges_to_flip = self._get_node_pair_flip_indices(
        num_flips=num_flips,
        net_size=net_size,
        current_network=current_network,
        edge_proposal_method=edge_proposal_method,
    )
    random_nums_for_change_acceptance = np.random.rand(edges_to_flip.shape[1])
    edges_batch_start = 0  # Iteration count when current batch started

    networks_count = 0
    mcmc_iter_count = 0

    t1 = time.time()
    while networks_count != num_of_nets:
        # Calculate offset within current batch
        offset_in_batch = mcmc_iter_count - edges_batch_start

        # Check if we need a new batch of edges (handles case where non-replacement needs retries)
        if offset_in_batch >= edges_to_flip.shape[1]:
            edges_batch_start = mcmc_iter_count
            remaining_networks = num_of_nets - networks_count
            num_flips_needed = remaining_networks * steps_per_sample
            edges_to_flip = self._get_node_pair_flip_indices(
                num_flips=num_flips_needed,
                net_size=net_size,
                current_network=current_network,
                edge_proposal_method=edge_proposal_method,
            )
            random_nums_for_change_acceptance = np.random.rand(edges_to_flip.shape[1])
            offset_in_batch = 0

        # Edge flip:
        random_edge_entry = (
            int(edges_to_flip[0, offset_in_batch]),
            int(edges_to_flip[1, offset_in_batch])
        )

        change_score = self._calculate_weighted_change_score(current_network, random_edge_entry)

        rand_num = random_nums_for_change_acceptance[offset_in_batch]
        perform_change = change_score >= 1 or rand_num <= min(1, np.exp(change_score))
        if perform_change:
            self._flip_network_edge(current_network, random_edge_entry[0], random_edge_entry[1])

        if mcmc_iter_count >= burn_in and (mcmc_iter_count - burn_in) % steps_per_sample == 0:
            sampled_networks[:, :, networks_count] = current_network

            if not replace:
                if np.unique(sampled_networks[:, :, :networks_count + 1], axis=2).shape[2] == networks_count + 1:
                    networks_count += 1
                else:
                    sampled_networks[:, :, networks_count] = np.zeros((net_size, net_size))
            else:
                networks_count += 1
                t2 = time.time()
                if networks_count % 100 == 0:
                    logger.debug(f"Sampled {networks_count}/{num_of_nets} networks, time taken: {t2 - t1:.2f}s")

        mcmc_iter_count += 1
    return sampled_networks

set_thetas

set_thetas(thetas)

Update the model parameters.

Parameters:

Name Type Description Default
thetas ndarray

New ERGM coefficients.

required
Source code in pyERGM/sampling.py
52
53
54
55
56
57
58
59
60
61
def set_thetas(self, thetas):
    """
    Update the model parameters.

    Parameters
    ----------
    thetas : np.ndarray
        New ERGM coefficients.
    """
    self.thetas = deepcopy(thetas)