Skip to content

Multiprocessing

sample_many_runs(params, initial_states, t_max, t_eval, num_runs, n_jobs=None, collective_variable=None, rng=default_rng(), progress_bar=False)

Sample multiple runs of the model specified by params.

Parameters:

  • params (Parameters) –

    Either CNVM or CNTM Parameters. If a NetworkGenerator is used, a new network will be sampled for every run.

  • initial_states (ArrayLike) –

    Array of initial states, shape = (num_initial_states, num_agents), or single initial state, shape = (num_agents,). Num_runs simulations will be executed for each initial state.

  • t_max (float) –

    End time.

  • t_eval (ArrayLike) –

    Array of time points where the solution should be saved, or number "n" in which case the solution is stored equidistantly at "n" time points.

  • num_runs (int) –

    Number of samples.

  • n_jobs (int, default: None ) –

    If "None", no multiprocessing is applied. If "-1", all available CPUs will be used.

  • collective_variable (CollectiveVariable, default: None ) –

    If collective variable is specified, the projected trajectory will be returned instead of the full trajectory.

  • rng (Generator, default: default_rng() ) –

    Random number generator.

  • progress_bar (bool, default: False ) –

    Whether to print the progress. If multiprocessing is used, only the progress of the first subprocess is printed.

Returns:

  • t_out, x_out : tuple[NDArray, NDArray]

    (t_out, x_out), time_out.shape = (num_timesteps,), x_out.shape = (num_initial_states, num_runs, num_timesteps, num_agents), or x_out.shape = (num_runs, num_timesteps, num_agents) if only a single initial state was given.

Source code in sponet/multiprocessing.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def sample_many_runs(
    params: Parameters,
    initial_states: ArrayLike,
    t_max: float,
    t_eval: ArrayLike,
    num_runs: int,
    n_jobs: int | None = None,
    collective_variable: CollectiveVariable | None = None,
    rng: Generator = default_rng(),
    progress_bar: bool = False,
) -> tuple[NDArray, NDArray]:
    """
    Sample multiple runs of the model specified by params.

    Parameters
    ----------
    params : Parameters
        Either CNVM or CNTM Parameters.
        If a NetworkGenerator is used, a new network will be sampled for every run.
    initial_states : ArrayLike
        Array of initial states, shape = (num_initial_states, num_agents),
        or single initial state, shape = (num_agents,).
        Num_runs simulations will be executed for each initial state.
    t_max : float
        End time.
    t_eval : ArrayLike
        Array of time points where the solution should be saved,
        or number "n" in which case the solution is stored equidistantly at "n" time points.
    num_runs : int
        Number of samples.
    n_jobs : int, optional
        If "None", no multiprocessing is applied. If "-1", all available CPUs will be used.
    collective_variable : CollectiveVariable, optional
        If collective variable is specified, the projected trajectory will be returned
        instead of the full trajectory.
    rng : Generator, optional
        Random number generator.
    progress_bar : bool, optional
        Whether to print the progress.
        If multiprocessing is used, only the progress of the first subprocess is printed.

    Returns
    -------
    t_out, x_out : tuple[NDArray, NDArray]
        (t_out, x_out), time_out.shape = (num_timesteps,),
        x_out.shape = (num_initial_states, num_runs, num_timesteps, num_agents),
        or x_out.shape = (num_runs, num_timesteps, num_agents) if only a single initial state was given.
    """
    initial_states = np.array(initial_states, ndmin=1)

    # handle 1D initial state
    is_1d = initial_states.ndim == 1
    if is_1d:
        initial_states = np.expand_dims(initial_states, 0)

    worker = _Worker(params, t_max, t_eval, collective_variable)
    t_out = t_eval_to_ndarray(t_eval, t_max)

    # no multiprocessing
    if n_jobs is None or n_jobs == 1:
        x_out = worker(
            initial_states,
            num_runs,
            rng,
            progress_bar,
        )
    else:  # multiprocessing
        if n_jobs == -1:  # determine number of CPUs
            n_jobs = os.cpu_count()
            if n_jobs is None:
                raise RuntimeError("Could not determine number of available CPUs.")

        rngs = rng.spawn(n_jobs)

        progress_bars = [False] * n_jobs
        if progress_bar:
            progress_bars[0] = True

        if num_runs >= initial_states.shape[0]:  # parallelization along runs
            states_chunks = [initial_states] * n_jobs
            runs_chunks = _split_runs(num_runs, n_jobs)
            concat_axis = 1
        else:  # parallelization along initial states
            states_chunks = np.array_split(initial_states, n_jobs)
            runs_chunks = [num_runs] * n_jobs
            concat_axis = 0

        with ProcessPoolExecutor() as executor:
            x_out = list(
                executor.map(worker, states_chunks, runs_chunks, rngs, progress_bars)
            )
        x_out = np.concatenate(x_out, axis=concat_axis)

    if is_1d:
        x_out = x_out[0]
    return t_out, x_out