Skip to content

Continuous-time noisy voter model (CNVM)

Detailed information about the CNVM can be found here.

CNVMParameters(num_opinions=None, num_agents=None, network=None, network_generator=None, alpha=1.0, r=None, r_tilde=None, r_imit=None, r_noise=None, prob_imit=1, prob_noise=1)

Container for the parameters of the Continuous-time Noisy Voter Model (CNVM).

A node i transitions from its current opinion m to a different opinion n at rate r[m,n] * d(i,n) / (d(i)^alpha) + r_tilde[m,n], where d(i,n) is the count of opinion n in the neighborhood of agent i, and d(i) the degree of node i.

Either a network has to specified, or a NetworkGenerator, or num_agents, in which case a complete network is used. If multiple are given, NetworkGenerator overrules network, and network overrules num_agents. A network can either be given as a nx.Graph or as an adjacency list, where the i-th element is the list of neighbors of node i. (The most efficient way to give a network is via an adjacency list of type list[NDArray].)

The rate parameters r and r_tilde can be given as arrays of shape (num_opinions, num_opinions), or as floats, in which case all rates are set to this value.

(Internally, the CNVM uses an equivalent set of rate parameters: r_imit, r_noise, prob_imit and prob_noise. A node i transitions from its current opinion m to a different opinion n at rate r_imit * d(i,n) / (d(i)^alpha) * prob_imit[m, n] + r_noise * (1/num_opinions) * prob_noise[m, n]. These parameters can be provided instead of the usual r and r_tilde.)

Source code in sponet/cnvm/parameters.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def __init__(
    self,
    num_opinions: int | None = None,
    num_agents: int | None = None,
    network: nx.Graph | Iterable | None = None,
    network_generator: NetworkGenerator | None = None,
    alpha: float = 1.0,
    # rate parameters in style 1
    r: ArrayLike | None = None,
    r_tilde: ArrayLike | None = None,
    # rate parameters in style 2
    r_imit: float | None = None,
    r_noise: float | None = None,
    prob_imit: ArrayLike = 1,
    prob_noise: ArrayLike = 1,
):
    self.alpha = alpha

    # network
    self.num_agents, self.network, self.network_generator = _sanitize_network_input(
        num_agents, network, network_generator
    )

    (
        self.num_opinions,
        self.r,
        self.r_tilde,
        self.r_imit,
        self.r_noise,
        self.prob_imit,
        self.prob_noise,
    ) = _sanitize_rates_input(
        num_opinions, r, r_tilde, r_imit, r_noise, prob_imit, prob_noise
    )

get_network()

If self.network exists, returns the network. Else, if a NetworkGenerator was given, returns a network generated by it. Else, returns the complete graph.

Returns:

  • Graph
Source code in sponet/cnvm/parameters.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def get_network(self) -> nx.Graph:
    """
    If self.network exists, returns the network.
    Else, if a NetworkGenerator was given, returns a network generated by it.
    Else, returns the complete graph.

    Returns
    -------
    nx.Graph
    """
    if self.network is not None:
        return nx.from_dict_of_lists(
            {i: nbrs for i, nbrs in enumerate(self.network)}
        )
    elif self.network_generator is not None:
        return self.network_generator()
    else:
        return nx.complete_graph(self.num_agents)

set_network(network)

Set new network.

Only works if no NetworkGenerator was given.

Parameters:

  • network (Graph | Iterable | None) –

    Graph, or adjacency list, or None (=complete graph).

Source code in sponet/cnvm/parameters.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def set_network(self, network: nx.Graph | Iterable | None) -> None:
    """
    Set new network.

    Only works if no NetworkGenerator was given.

    Parameters
    ----------
    network : nx.Graph | Iterable | None
        Graph, or adjacency list, or None (=complete graph).
    """
    if self.network_generator is not None:
        raise ValueError("Cannot set network when there is a NetworkGenerator.")

    self.num_agents, self.network, _ = _sanitize_network_input(
        self.num_agents, network, None
    )

update_network_by_generator()

Update self.network via the NetworkGenerator.

Only works if a NetworkGenerator was given.

Source code in sponet/cnvm/parameters.py
108
109
110
111
112
113
114
115
116
def update_network_by_generator(self) -> None:
    """
    Update self.network via the NetworkGenerator.

    Only works if a NetworkGenerator was given.
    """
    if self.network_generator is None:
        raise ValueError("No NetworkGenerator was given.")
    self.network = calculate_neighbor_list(self.network_generator())

change_rates(r=None, r_tilde=None)

Change one or both rate parameters.

If only one argument is given, the other rate parameter stays the same.

Parameters:

  • r (float | ndarray, default: None ) –
  • r_tilde (float | ndarray, default: None ) –
Source code in sponet/cnvm/parameters.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def change_rates(
    self,
    r: ArrayLike | None = None,
    r_tilde: ArrayLike | None = None,
):
    """
    Change one or both rate parameters.

    If only one argument is given, the other rate parameter stays the same.

    Parameters
    ----------
    r : float | np.ndarray, optional
    r_tilde : float | np.ndarray
    """
    r = r if r is not None else self.r
    r_tilde = r_tilde if r_tilde is not None else self.r_tilde
    (
        self.num_opinions,
        self.r,
        self.r_tilde,
        self.r_imit,
        self.r_noise,
        self.prob_imit,
        self.prob_noise,
    ) = _sanitize_rates_input(self.num_opinions, r, r_tilde, None, None, 1, 1)

CNVM(params)

Continuous-time Noisy Voter Model.

Parameters:

Source code in sponet/cnvm/model.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def __init__(self, params: CNVMParameters):
    """
    Continuous-time Noisy Voter Model.

    Parameters
    ----------
    params : CNVMParameters
    """
    self.params = params

    # set self.degree_alpha, an array containing d(i)^(1 - alpha)
    self._calculate_degree_alpha()

    self.neighbor_list = (
        NumbaList() if params.network is None else NumbaList(params.network)
    )

update_network()

Update network from NetworkGenerator in params.

Source code in sponet/cnvm/model.py
44
45
46
47
48
49
50
def update_network(self) -> None:
    """
    Update network from NetworkGenerator in params.
    """
    self.params.update_network_by_generator()
    self.neighbor_list = NumbaList(self.params.network)
    self._calculate_degree_alpha()

update_rates(r=None, r_tilde=None)

Update one or both rate parameters.

If only one argument is given, the other rate parameter stays the same.

Parameters:

  • r (ArrayLike, default: None ) –
  • r_tilde (ArrayLike, default: None ) –
Source code in sponet/cnvm/model.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def update_rates(
    self,
    r: ArrayLike | None = None,
    r_tilde: ArrayLike | None = None,
) -> None:
    """
    Update one or both rate parameters.

    If only one argument is given, the other rate parameter stays the same.

    Parameters
    ----------
    r : ArrayLike, optional
    r_tilde : ArrayLike, optional
    """
    self.params.change_rates(r, r_tilde)

simulate(t_max, x_init=None, t_eval=None, rng=default_rng())

Simulate the model from t=0 to t=t_max.

Parameters:

  • t_max (float) –
  • x_init (ArrayLike, default: None ) –

    Initial state, shape=(num_agents,). If no x_init is given, a random one is generated.

  • t_eval (ArrayLike, default: None ) –

    Array of time points where the solution should be saved, or number "n" in which case the solution is stored equidistantly at "n" time points. If None, store every snapshot.

  • rng (Generator, default: default_rng() ) –

    random number generator

Returns:

  • tuple[NDArray, NDArray]

    t_traj (shape=(?,)), x_traj (shape=(?,num_agents))

Source code in sponet/cnvm/model.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def simulate(
    self,
    t_max: float,
    x_init: ArrayLike | None = None,
    t_eval: ArrayLike | None = None,
    rng: Generator = default_rng(),
) -> tuple[NDArray, NDArray]:
    """
    Simulate the model from t=0 to t=t_max.

    Parameters
    ----------
    t_max : float
    x_init : ArrayLike, optional
        Initial state, shape=(num_agents,). If no x_init is given, a random one is generated.
    t_eval : ArrayLike, optional
        Array of time points where the solution should be saved,
        or number "n" in which case the solution is stored equidistantly at "n" time points. If None, store every snapshot.
    rng : Generator, optional
        random number generator

    Returns
    -------
    tuple[NDArray, NDArray]
        t_traj (shape=(?,)), x_traj (shape=(?,num_agents))
    """
    if self.params.network_generator is not None:
        self.update_network()

    opinion_dtype = np.min_scalar_type(self.params.num_opinions - 1)
    if x_init is None:
        x = rng.choice(
            np.arange(self.params.num_opinions, dtype=opinion_dtype),
            size=self.params.num_agents,
        )
    else:
        x = np.array(x_init, dtype=opinion_dtype)

    if t_eval is not None:
        t_eval = t_eval_to_ndarray(t_eval, t_max)

    # Call the correct simulation loop
    t_traj, x_traj = self._call_simulation_loop(x, t_max, t_eval, rng)

    if t_eval is not None and t_traj.shape[0] != t_eval.shape[0]:
        # there might be less samples than len(t_eval)
        # -> fill with duplicates
        t_ind = argmatch(t_eval, t_traj)
        t_traj = t_eval
        x_traj = x_traj[t_ind]

    return t_traj, x_traj

CNVM Approximations

calc_rre_traj(params, initial_states, t_max, t_eval=None)

Solve the RRE given by parameters, starting from c_0, up to time t_max.

Solves the ODE using scipy's "solve_ivp".

Parameters:

  • params (CNVMParameters) –
  • initial_states (ArrayLike) –

    Either shape = (num_opinions,) or (num_states, num_opinions)

  • t_max (float) –

    End time.

  • t_eval (ArrayLike, default: None ) –

    Array of time points where the solution should be saved, or number "n" in which case the solution is stored equidistantly at "n" time points.

Returns:

  • tuple[NDArray, NDArray]

    (t, c), t.shape=(num_timesteps,), c.shape = (num_states, num_timesteps, num_opinions), or c.shape = (num_timesteps, num_opinions) if a single initial state was given.

Source code in sponet/cnvm/approximations/reaction_rate_equation.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def calc_rre_traj(
    params: CNVMParameters,
    initial_states: ArrayLike,
    t_max: float,
    t_eval: ArrayLike | None = None,
) -> tuple[NDArray, NDArray]:
    """
    Solve the RRE given by parameters, starting from c_0, up to time t_max.

    Solves the ODE using scipy's "solve_ivp".

    Parameters
    ----------
    params : CNVMParameters
    initial_states : ArrayLike
        Either shape = (num_opinions,) or (num_states, num_opinions)
    t_max : float
        End time.
    t_eval : ArrayLike, optional
        Array of time points where the solution should be saved,
        or number "n" in which case the solution is stored equidistantly at "n" time points.

    Returns
    -------
    tuple[NDArray, NDArray]
        (t, c),
        t.shape=(num_timesteps,),
        c.shape = (num_states, num_timesteps, num_opinions), or c.shape = (num_timesteps, num_opinions) if a single initial state was given.
    """
    if t_eval is not None:
        t_eval = t_eval_to_ndarray(t_eval, t_max)

    def rhs(_, c):
        out = np.zeros_like(c)
        for m in range(params.num_opinions):
            for n in range(params.num_opinions):
                if n == m:
                    continue

                state_change = np.zeros_like(c)
                state_change[m] = -1
                state_change[n] = 1

                prop = c[m] * (
                    params.r_imit * params.prob_imit[m, n] * c[n]
                    + params.r_noise * params.prob_noise[m, n] / params.num_opinions
                )

                out += prop * state_change
        return out

    initial_states = np.array(initial_states, ndmin=1)
    if initial_states.ndim == 1:
        sol = solve_ivp(
            rhs, (0, t_max), initial_states, rtol=1e-8, atol=1e-8, t_eval=t_eval
        )
        return sol.t, sol.y.T

    c = []
    for initial_state in initial_states:
        sol = solve_ivp(
            rhs, (0, t_max), initial_state, rtol=1e-8, atol=1e-8, t_eval=t_eval
        )
        t = sol.t
        c.append(sol.y.T)
    return t, np.array(c)  # type: ignore

calc_modified_rre_traj(params, initial_states, t_max, alpha=1.0, t_eval=None)

Solve the RRE with modified parameters, starting from c_0, up to time t_max.

The parameters are modified by multiplying the imitation rates r with the factor alpha. For instance, if alpha < 1, this effectively slows the dynamics.

Parameters:

  • params (CNVMParameters) –
  • initial_states (ArrayLike) –

    Either shape = (num_opinions,) or (num_states, num_opinions)

  • t_max (float) –

    End time.

  • alpha (float, default: 1.0 ) –

    Factor for modification of imitation rates.

  • t_eval (ArrayLike, default: None ) –

    Array of time points where the solution should be saved, or number "n" in which case the solution is stored equidistantly at "n" time points.

Returns:

  • tuple[NDArray, NDArray]

    (t, c), t.shape=(num_time_steps,), c.shape = (num_states, num_time_steps, num_opinions), or c.shape = (num_time_steps, num_opinions) if a single initial state was given.

Source code in sponet/cnvm/approximations/reaction_rate_equation.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def calc_modified_rre_traj(
    params: CNVMParameters,
    initial_states: ArrayLike,
    t_max: float,
    alpha: float = 1.0,
    t_eval: ArrayLike | None = None,
) -> tuple[NDArray, NDArray]:
    """
    Solve the RRE with modified parameters, starting from c_0, up to time t_max.

    The parameters are modified by multiplying the imitation rates `r` with the factor `alpha`.
    For instance, if alpha < 1, this effectively slows the dynamics.

    Parameters
    ----------
    params : CNVMParameters
    initial_states : ArrayLike
        Either shape = (num_opinions,) or (num_states, num_opinions)
    t_max : float
        End time.
    alpha : float
        Factor for modification of imitation rates.
    t_eval : ArrayLike, optional
        Array of time points where the solution should be saved,
        or number "n" in which case the solution is stored equidistantly at "n" time points.


    Returns
    -------
    tuple[NDArray, NDArray]
        (t, c),
        t.shape=(num_time_steps,),
        c.shape = (num_states, num_time_steps, num_opinions), or c.shape = (num_time_steps, num_opinions) if a single initial state was given.
    """
    modified_params = deepcopy(params)
    modified_params.change_rates(r=alpha * params.r)
    return calc_rre_traj(modified_params, initial_states, t_max, t_eval)

calc_pair_approximation_traj(params, c_0, s_0, d, t_max, t_eval=None)

For a CNVM with 2 opinions, calculate the pair approximation.

The opinions are referred to as "0" and "1". The pair approximation is a system of two ODEs. The first ODE describes the evolution of the share of nodes with state 1, which is called "c". The second ODE describes the evolution of the variable "s", which can be interpreted has 0.5 times the share of active edges. So in this package, s can be computed as s = 0.5 * sponet.collective_variables.Interfaces.

Parameters:

  • params (CNVMParameters) –
  • c_0 (float) –

    Initial state of c.

  • s_0 (float) –

    Initial state of s.

  • t_max (float) –

    End time.

  • t_eval (ArrayLike, default: None ) –

    Array of time points where the solution should be saved, or number "n" in which case the solution is stored equidistantly at "n" time points.

Returns:

  • tuple[ndarray, ndarray]
    1. timepoints, shape=(?,).
    2. c, shape=(?, 2).
Source code in sponet/cnvm/approximations/pair_approximation.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def calc_pair_approximation_traj(
    params: CNVMParameters,
    c_0: float,
    s_0: float,
    d: float,
    t_max: float,
    t_eval: ArrayLike | None = None,
) -> tuple[np.ndarray, np.ndarray]:
    """
    For a CNVM with 2 opinions, calculate the pair approximation.

    The opinions are referred to as "0" and "1".
    The pair approximation is a system of two ODEs.
    The first ODE describes the evolution of the share of nodes with state 1, which is called "c".
    The second ODE describes the evolution of the variable "s",
    which can be interpreted has 0.5 times the share of active edges.
    So in this package, s can be computed as s = 0.5 * sponet.collective_variables.Interfaces.

    Parameters
    ----------
    params : CNVMParameters
    c_0 : float
        Initial state of c.
    s_0 : float
        Initial state of s.
    t_max : float
        End time.
    t_eval : ArrayLike, optional
        Array of time points where the solution should be saved,
        or number "n" in which case the solution is stored equidistantly at "n" time points.

    Returns
    -------
    tuple[np.ndarray, np.ndarray]
        1. timepoints, shape=(?,).
        2. c, shape=(?, 2).
    """

    assert params.num_opinions == 2

    if t_eval is not None:
        t_eval = t_eval_to_ndarray(t_eval, t_max)

    def rhs(_, c_pa):
        # assert isinstance(params.r, np.ndarray)
        # assert isinstance(params.r_tilde, np.ndarray)

        c, s = c_pa

        dc = (
            s * (params.r[0, 1] - params.r[1, 0])
            - c * (params.r_tilde[0, 1] + params.r_tilde[1, 0])
            + params.r_tilde[0, 1]
        )

        ds = -2 * (d - 1) / d * s * s / (1 - c) * params.r[0, 1]
        ds += -2 * (d - 1) / d * s * s / c * params.r[1, 0]
        ds += s * (
            (d - 2) / d * (params.r[0, 1] + params.r[1, 0])
            - 2 * params.r_tilde[0, 1]
            - 2 * params.r_tilde[1, 0]
        )
        ds += c * (params.r_tilde[1, 0] - params.r_tilde[0, 1])
        ds += params.r_tilde[0, 1]

        return np.array([dc, ds])

    sol = solve_ivp(
        rhs, (0, t_max), np.array([c_0, s_0]), rtol=1e-8, atol=1e-8, t_eval=t_eval
    )
    return sol.t, sol.y.T

sample_stochastic_approximation(params, initial_states, t_max, num_samples, t_eval, rng=None, seed=None)

Simulate the opinion shares directly.

Assumes well-mixedness in the sense that the propensity of reaction m -> n is given by c[m] * (r[m, n] * c[n] + r_tilde[m, n]).

This is only true for complete networks. For other networks the quality of this approximation may vary.

Parameters:

  • params (CNVMParameters) –
  • initial_states (NDArray) –

    Either shape = (num_opinions,) or (num_states, num_opinions)

  • t_max (float) –
  • num_samples (int) –
  • t_eval (ArrayLike) –

    Array of time points where the solution should be saved, or number "n" in which case the solution is stored equidistantly at "n" time points.

  • rng (Generator, default: None ) –

    Random number generator.

  • seed (int, default: None ) –

    Seed for random numbers. If both rng and seed are given, rng takes precedence.

Returns:

  • tuple[NDArray, NDArray]

    (t, c), t.shape=(num_timesteps), c.shape = (num_states, num_samples, num_timesteps, num_opinions), or c.shape = (num_samples, num_timesteps, num_opinions) if a single initial state was given.

Source code in sponet/cnvm/approximations/stochastic_approximation.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def sample_stochastic_approximation(
    params: CNVMParameters,
    initial_states: ArrayLike,
    t_max: float,
    num_samples: int,
    t_eval: ArrayLike,
    rng: Generator | None = None,
    seed: int | None = None,
) -> tuple[NDArray, NDArray]:
    """
    Simulate the opinion shares directly.

    Assumes well-mixedness in the sense that the propensity of
    reaction m -> n is given by
    c[m] * (r[m, n] * c[n] + r_tilde[m, n]).

    This is only true for complete networks.
    For other networks the quality of this approximation may vary.

    Parameters
    ----------
    params : CNVMParameters
    initial_states : NDArray
        Either shape = (num_opinions,) or (num_states, num_opinions)
    t_max : float
    num_samples: int
    t_eval : ArrayLike
        Array of time points where the solution should be saved,
        or number "n" in which case the solution is stored equidistantly at "n" time points.
    rng : Generator, optional
        Random number generator.
    seed : int, optional
        Seed for random numbers. If both `rng` and `seed` are given, `rng` takes precedence.

    Returns
    -------
    tuple[NDArray, NDArray]
        (t, c),
        t.shape=(num_timesteps),
        c.shape = (num_states, num_samples, num_timesteps, num_opinions), or c.shape = (num_samples, num_timesteps, num_opinions) if a single initial state was given.
    """
    if rng is not None:
        seed = int(rng.integers(1, 2**24))
    elif seed is None:
        seed = int(default_rng().integers(1, 2**24))

    t_eval = t_eval_to_ndarray(t_eval, t_max)

    initial_states = np.array(initial_states, ndmin=1)
    is_1d = initial_states.ndim == 1
    if is_1d:
        initial_states = np.expand_dims(initial_states, 0)

    num_states = initial_states.shape[0]
    num_timesteps = t_eval.shape[0]
    c = np.zeros(
        (
            num_states,
            num_samples,
            num_timesteps,
            initial_states.shape[1],
        )
    )

    for i in range(num_states):
        c[i] = _sample_many(
            initial_states[i],
            params.num_agents,
            params.r,
            params.r_tilde,
            t_eval,
            num_samples,
            seed,
        )

    if is_1d:
        c = c[0]
    return t_eval, c

sample_cle(params, initial_states, t_max, num_samples, delta_t=None, t_eval=None, boundary_process='clipping', rng=None, seed=None)

Sample Chemical Langevin Equation (CLE) approximation for the CNVM.

The Euler-Maruyama method is used to integrate the SDE.

Either delta_t or t_eval or both have to be provided.

Parameters:

  • params (CNVMParameters) –
  • initial_states (ArrayLike) –

    Either shape = (num_opinions,) or (num_states, num_opinions)

  • t_max (float) –
  • num_samples (int) –
  • delta_t (float, default: None ) –

    Step size.

  • t_eval (ArrayLike, default: None ) –

    Array of time points where the solution should be saved, or number "n" in which case the solution is stored equidistantly at "n" time points.

  • boundary_process (str, default: 'clipping' ) –

    Kind of process used to deal with the approximation leaving the simplex boundary. Possible values: "clipping", "jump", "normal-reflection" Defaults to "clipping".

  • rng (Generator, default: None ) –

    Random number generator.

  • seed (int, default: None ) –

    Seed for random numbers. If both rng and seed are given, rng takes precedence.

Returns:

  • tuple[NDArray, NDArray]

    (t, c), t.shape=(num_timesteps), c.shape = (num_states, num_samples, num_timesteps, num_opinions), or c.shape = (num_samples, num_timesteps, num_opinions) if a single initial state was given. (If saving_offset > 1, the number of time steps will be smaller.)

Source code in sponet/cnvm/approximations/chemical_langevin_equation/chemical_langevin_equation.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def sample_cle(
    params: CNVMParameters,
    initial_states: ArrayLike,
    t_max: float,
    num_samples: int,
    delta_t: float | None = None,
    t_eval: ArrayLike | None = None,
    boundary_process: str = "clipping",
    rng: Generator | None = None,
    seed: int | None = None,
) -> tuple[NDArray, NDArray]:
    """
    Sample Chemical Langevin Equation (CLE) approximation for the CNVM.

    The Euler-Maruyama method is used to integrate the SDE.

    Either `delta_t` or `t_eval` or both have to be provided.

    Parameters
    ----------
    params : CNVMParameters
    initial_states : ArrayLike
        Either shape = (num_opinions,) or (num_states, num_opinions)
    t_max : float
    num_samples : int
    delta_t : float, optional
        Step size.
    t_eval : ArrayLike, optional
        Array of time points where the solution should be saved,
        or number "n" in which case the solution is stored equidistantly at "n" time points.
    boundary_process : str, optional
        Kind of process used to deal with the approximation leaving the simplex boundary.
        Possible values: "clipping", "jump", "normal-reflection"
        Defaults to "clipping".
    rng : Generator, optional
        Random number generator.
    seed : int, optional
        Seed for random numbers. If both `rng` and `seed` are given, `rng` takes precedence.

    Returns
    -------
    tuple[NDArray, NDArray]
        (t, c),
        t.shape=(num_timesteps),
        c.shape = (num_states, num_samples, num_timesteps, num_opinions), or c.shape = (num_samples, num_timesteps, num_opinions) if a single initial state was given.
        (If saving_offset > 1, the number of time steps will be smaller.)
    """
    if rng is not None:
        seed = int(rng.integers(1, 2**24))
    elif seed is None:
        seed = int(default_rng().integers(1, 2**24))

    delta_t, t_eval = _sanitize_delta_t_and_t_eval(delta_t, t_eval, t_max)

    initial_states = np.array(initial_states, ndmin=1)
    is_1d = initial_states.ndim == 1
    if is_1d:
        initial_states = np.expand_dims(initial_states, 0)

    num_states = initial_states.shape[0]
    num_time_steps = t_eval.shape[0]
    c = np.zeros(
        (
            num_states,
            num_samples,
            num_time_steps,
            initial_states.shape[1],
        )
    )

    boundary_process = get_boundary_process_from_alias(boundary_process)

    for i in range(num_states):
        t, c[i] = _numba_sample_cle(
            initial_states[i],
            delta_t,
            t_eval,
            params.num_agents,
            params.r,
            params.r_tilde,
            num_samples,
            boundary_process,
            seed,
        )

    if is_1d:
        c = c[0]
    return t, c  # type: ignore