Source code for mrg32k3a.mrg32k3a

#!/usr/bin/env python
"""
Summary
-------
Provide a subclass of ``random.Random`` using mrg32k3a as the generator
with stream/substream/subsubstream support.
"""

# Code largely adopted from PyMOSO repository (https://github.com/pymoso/PyMOSO).

import numpy as np
import random
from math import log, ceil, sqrt, exp
from copy import deepcopy

from .matmodops import mat33_mat31_mult, mat31_mod, mat33_power_mod

# Constants used in mrg32k3a and in substream generation.
# P. L'Ecuyer, ``Good Parameter Sets for Combined Multiple Recursive Random Number Generators'',
# Operations Research, 47, 1 (1999), 159--164.
# P. L'Ecuyer, R. Simard, E. J. Chen, and W. D. Kelton,
# ``An Objected-Oriented Random-Number Package with Many Long Streams and Substreams'',
# Operations Research, 50, 6 (2002), 1073--1075.

mrgnorm = 2.328306549295727688e-10
mrgm1 = 4294967087
mrgm2 = 4294944443
mrga12 = 1403580
mrga13n = 810728
mrga21 = 527612
mrga23n = 1370589

A1p0 = [[0, 1, 0],
        [0, 0, 1],
        [-mrga13n, mrga12, 0]
        ]

A2p0 = [[0, 1, 0],
        [0, 0, 1],
        [-mrga23n, 0, mrga21]
        ]

# A1p47 = mat33_power_mod(A1p0, 2**47, mrgm1).
A1p47 = [[1362557480, 3230022138, 4278720212],
         [3427386258, 3848976950, 3230022138],
         [2109817045, 2441486578, 3848976950]
         ]

# A2p47 = mat33_power_mod(A2p0, 2**47, mrgm2).
A2p47 = [[2920112852, 1965329198, 1177141043],
         [2135250851, 2920112852, 969184056],
         [296035385, 2135250851, 4267827987]
         ]

# A1p94 = mat33_power_mod(A1p0, 2**94, mrgm1).
A1p94 = [[2873769531, 2081104178, 596284397],
         [4153800443, 1261269623, 2081104178],
         [3967600061, 1830023157, 1261269623]
         ]

# A2p94 = mat33_power_mod(A2p0, 2**94, mrgm2).
A2p94 = [[1347291439, 2050427676, 736113023],
         [4102191254, 1347291439, 878627148],
         [1293500383, 4102191254, 745646810]
         ]

# A1p141 = mat33_power_mod(A1p0, 2**141, mrgm1).
A1p141 = [[3230096243, 2131723358, 3262178024],
          [2882890127, 4088518247, 2131723358],
          [3991553306, 1282224087, 4088518247]
          ]

# A2p141 = mat33_power_mod(A2p0, 2**141, mrgm2).
A2p141 = [[2196438580, 805386227, 4266375092],
          [4124675351, 2196438580, 2527961345],
          [94452540, 4124675351, 2825656399]
          ]

# Constants used in Beasley-Springer-Moro algorithm for approximating
# the inverse cdf of the standard normal distribution.
bsma = [2.50662823884, -18.61500062529, 41.39119773534, -25.44106049637]
bsmb = [-8.47351093090, 23.08336743743, -21.06224101826, 3.13082909833]
bsmc = [0.3374754822726147, 0.9761690190917186, 0.1607979714918209, 0.0276438810333863, 0.0038405729373609, 0.0003951896511919, 0.0000321767881768, 0.0000002888167364, 0.0000003960315187]


# Adapted to pure Python from the P. L'Ecuyer code referenced above.
[docs]def mrg32k3a(state): """Generate a random number between 0 and 1 from a given state. Parameters ---------- state : tuple [int] Current state of the generator. Returns ------- new_state : tuple [int] Next state of the generator. u : float Pseudo uniform random variate. """ # Component 1. p1 = mrga12 * state[1] - mrga13n * state[0] k1 = int(p1 / mrgm1) p1 -= k1 * mrgm1 if p1 < 0.0: p1 += mrgm1 # Component 2. p2 = mrga21 * state[5] - mrga23n * state[3] k2 = int(p2 / mrgm2) p2 -= k2 * mrgm2 if p2 < 0.0: p2 += mrgm2 # Combination. if p1 <= p2: u = (p1 - p2 + mrgm1) * mrgnorm else: u = (p1 - p2) * mrgnorm new_state = (state[1], state[2], int(p1), state[4], state[5], int(p2)) return new_state, u
[docs]def bsm(u): """Approximate a quantile of the standard normal distribution via the Beasley-Springer-Moro algorithm. Parameters ---------- u : float Probability value for the desired quantile (between 0 and 1). Returns ------- z : float Corresponding quantile of the standard normal distribution. """ y = u - 0.5 if abs(y) < 0.42: # Approximate from the center (Beasly-Springer 1977). r = pow(y, 2) r2 = pow(r, 2) r3 = pow(r, 3) r4 = pow(r, 4) asum = sum([bsma[0], bsma[1] * r, bsma[2] * r2, bsma[3] * r3]) bsum = sum([1, bsmb[0] * r, bsmb[1] * r2, bsmb[2] * r3, bsmb[3] * r4]) z = y * (asum / bsum) else: # Approximate from the tails (Moro 1995). if y < 0.0: signum = -1 r = u else: signum = 1 r = 1 - u s = log(-log(r)) s0 = pow(s, 2) s1 = pow(s, 3) s2 = pow(s, 4) s3 = pow(s, 5) s4 = pow(s, 6) s5 = pow(s, 7) s6 = pow(s, 8) clst = [bsmc[0], bsmc[1] * s, bsmc[2] * s0, bsmc[3] * s1, bsmc[4] * s2, bsmc[5] * s3, bsmc[6] * s4, bsmc[7] * s5, bsmc[8] * s6] t = sum(clst) z = signum * t return z
[docs]class MRG32k3a(random.Random): """Implements mrg32k3a as the generator for a ``random.Random`` object. Attributes ---------- _current_state : tuple [int] Current state of mrg32k3a generator. ref_seed : tuple [int] Seed from which to start the generator. Streams/substreams/subsubstreams are referenced w.r.t. ``ref_seed``. s_ss_sss_index : list [int] Triplet of the indices of the current stream-substream-subsubstream. stream_start : list [int] State corresponding to the start of the current stream. substream_start: list [int] State corresponding to the start of the current substream. subsubstream_start: list [int] State corresponding to the start of the current subsubstream. Parameters ---------- ref_seed : tuple [int], optional Seed from which to start the generator. s_ss_sss_index : list [int], optional Triplet of the indices of the stream-substream-subsubstream to start at. See also -------- random.Random """ def __init__(self, ref_seed=(12345, 12345, 12345, 12345, 12345, 12345), s_ss_sss_index=None): assert(len(ref_seed) == 6) self.version = 2 self.generate = mrg32k3a self.ref_seed = ref_seed super().__init__(ref_seed) if s_ss_sss_index is None: s_ss_sss_index = [0, 0, 0] self.start_fixed_s_ss_sss(s_ss_sss_index) def __deepcopy__(self, memo): cls = self.__class__ result = cls.__new__(cls) memo[id(self)] = result for k, v in self.__dict__.items(): setattr(result, k, deepcopy(v, memo)) return result
[docs] def seed(self, new_state): """Set the state (or seed) of the generator and update the generator state. Parameters ---------- new_state : tuple [int] New state to which to advance the generator. """ assert(len(new_state) == 6) self._current_state = new_state
# super().seed(new_state)
[docs] def getstate(self): """Return the state of the generator. Returns ------- tuple [int] Current state of the generator, ``_current_state``. tuple [int] Ouptput of ``random.Random.getstate()``. See also -------- random.Random """ return self.get_current_state(), super().getstate()
[docs] def setstate(self, state): """Set the internal state of the generator. Parameters ---------- state : tuple ``state[0]`` is new state for the generator. ``state[1]`` is ``random.Random.getstate()``. See also -------- random.Random """ self.seed(state[0]) super().setstate(state[1])
[docs] def random(self): """Generate a standard uniform variate and advance the generator state. Returns ------- u : float Pseudo uniform random variate. """ state = self._current_state new_state, u = self.generate(state) self.seed(new_state) return u
[docs] def get_current_state(self): """Return the current state of the generator. Returns ------- _current_state : tuple [int] Current state of the generator. """ return self._current_state
[docs] def normalvariate(self, mu=0, sigma=1): """Generate a normal random variate. Parameters ---------- mu : float Expected value of the normal distribution from which to generate. sigma : float Standard deviation of the normal distribution from which to generate. Returns ------- float A normal random variate from the specified distribution. """ u = self.random() z = bsm(u) return mu + sigma * z
[docs] def lognormalvariate(self, lq, uq): """Generate a Lognormal random variate using 2.5% and 97.5% quantiles Parameters ---------- lq : float 2.5% quantile of the lognormal distribution from which to generate. uq : float 97.5% quantile of the lognormal distribution from which to generate. Returns ------- float A lognormal random variate from the specified distribution. """ mu = (log(lq) + log(uq)) / 2 sigma = (log(uq) - mu) / 1.96 x = self.normalvariate(mu, sigma) return exp(x)
[docs] def mvnormalvariate(self, mean_vec, cov, factorized=False): """Generate a normal random vector. Parameters --------- mean_vec : list [float] Location parameters of the multivariate normal distribution from which to generate. cov : list [list [float]] Covariance matrix of the multivariate normal distribution from which to generate. factorized : bool, default=False True if we do not need to calculate Cholesky decomposition, i.e., if Cholesky decomposition is given as ``cov``; False otherwise. Returns ------- list [float] Multivariate normal random variate from the specified distribution. """ n_cols = len(cov) if not factorized: Chol = np.linalg.cholesky(cov) else: Chol = cov observations = [self.normalvariate(0, 1) for _ in range(n_cols)] return Chol.dot(observations).transpose() + mean_vec
[docs] def poissonvariate(self, lmbda): """Generate a Poisson random variate. Parameters --------- lmbda : float Expected value of the Poisson distribution from which to generate. Returns ------- float Poisson random variate from the specified distribution. """ if lmbda < 35: n = 0 p = self.random() threshold = exp(-lmbda) while p >= threshold: u = self.random() p = p * u n = n + 1 else: z = self.normalvariate() n = max(ceil(lmbda + sqrt(lmbda) * z - 0.5), 0) return n
[docs] def gumbelvariate(self, mu, beta): """Generate a Gumbel random variate. Parameters --------- mu : float Location of the mode of the Gumbel distribution from which to generate. beta : float Scale parameter of the Gumbel distribution from which to generate; > 0. Returns ------- float Gumbel random variate from the specified distribution. """ u = self.random() q = mu - beta * np.log(-np.log(u)) return q
[docs] def binomialvariate(self, n, p): """Generate a Binomial(n, p) random variate. Parameters ---------- n : int Number of i.i.d. Bernoulli trials; > 0. p : float Success probability of i.i.d. Bernoulli trials; in (0, 1). Returns ------- x : int Binomial random variate from the specified distribution. """ x = sum(self.choices(population=[0, 1], weights=[1 - p, p], k=n)) return x
[docs] def integer_random_vector_from_simplex(self, n_elements, summation, with_zero=False): """Generate a random vector with a specified number of non-negative integer elements that sum up to a specified number. Parameters --------- n_elements : float Number of elements in the requested vector. summation : int Number to which the integer elements of the vector must sum. with_zero: bool True if zeros in the vector are permitted; False otherwise. Returns ------- vec: list [int] A non-negative integer vector of length n_elements that sum to n_elements. """ if with_zero is False: if n_elements > summation: print("The sum cannot be greater than the number of positive integers requested.") return else: # Generate a vector of length n_elements by sampling without replacement from # the set {1, 2, 3, ..., n_elements-1}. Sort it in ascending order, pre-append # "0", and post-append "summation". temp_x = self.sample(population=range(1, summation), k=n_elements - 1) temp_x.sort() x = [0] + temp_x + [summation] # Take differences between consecutive terms. Result will sum to summation. vec = [x[idx + 1] - x[idx] for idx in range(n_elements)] else: temp_vec = self.integer_random_vectors_from_simplex(self, summation=summation + n_elements, n_elements=n_elements, with_zero=False ) vec = [tv - 1 for tv in temp_vec] return vec
[docs] def continuous_random_vector_from_simplex(self, n_elements, summation=1.0, exact_sum=True): """Generate a random vector with a specified number of non-negative real-valued elements that sum up to (or less than or equal to) a specified number. Parameters ---------- n_elements : float Number of elements in the requested vector. summation : float, optional Number to which the elements of the vector must sum. exact_sum : bool, optional True if the sum should be equal to summation; False if the sum should be less than or equal to summation. Returns ------- vec : list [float] Vector of ``n_elements`` non-negative real-valued numbers that sum up to (or less than or equal to) ``summation``. """ if exact_sum is True: # Generate a vector of length n_elements of i.i.d. Exponential(1) # random variates. Normalize all values by the sum and multiply by # "summation". exp_rvs = [self.expovariate(lambd=1) for _ in range(n_elements)] exp_sum = sum(exp_rvs) vec = [summation * x / exp_sum for x in exp_rvs] else: # Sum must equal summation. # Follows Theorem 2.1 of "Non-Uniform Random Variate Generation" by DeVroye. # Chapter 11, page 568. # Generate a vector of length n_elements of i.i.d. Uniform(0, 1) # random variates. Sort it in ascending order, pre-append # "0", and post-append "summation". unif_rvs = [self.random() for _ in range(n_elements)] unif_rvs.sort() x = [0] + unif_rvs + [1] # Take differences between consecutive terms. Result will sum to 1. diffs = np.array([x[idx + 1] - x[idx] for idx in range(n_elements + 1)]) # Construct a matrix of the vertices of the simplex in R^d in regular position. # Includes zero vector and d unit vectors in R^d. vertices = np.concatenate((np.zeros((1, n_elements)), np.identity(n=n_elements)), axis=0) # Multiply each vertex by the corresponding term in diffs. # Then multiply each component by "summation" and sum the vectors # to get the convex combination of the vertices (scaled up to "summation"). vec = list(summation * np.sum(np.multiply(vertices, diffs[:, np.newaxis]), axis=0)) return vec
[docs] def advance_stream(self): """Advance the state of the generator to the start of the next stream. Streams are of length 2**141. """ state = self.stream_start # Split the state into 2 components of length 3. st1 = state[0:3] st2 = state[3:6] # Efficiently advance state -> A*s % m for both state parts. nst1m = mat33_mat31_mult(A1p141, st1) nst2m = mat33_mat31_mult(A2p141, st2) nst1 = mat31_mod(nst1m, mrgm1) nst2 = mat31_mod(nst2m, mrgm2) nstate = tuple(nst1 + nst2) self.seed(nstate) # Increment the stream index. self.s_ss_sss_index[0] += 1 # Reset index for substream and subsubstream. self.s_ss_sss_index[1] = 0 self.s_ss_sss_index[2] = 0 # Update state referencing. self.stream_start = nstate self.substream_start = nstate self.subsubstream_start = nstate
[docs] def advance_substream(self): """Advance the state of the generator to the start of the next substream. Substreams are of length 2**94. """ state = self.substream_start # Split the state into 2 components of length 3. st1 = state[0:3] st2 = state[3:6] # Efficiently advance state -> A*s % m for both state parts. nst1m = mat33_mat31_mult(A1p94, st1) nst2m = mat33_mat31_mult(A2p94, st2) nst1 = mat31_mod(nst1m, mrgm1) nst2 = mat31_mod(nst2m, mrgm2) nstate = tuple(nst1 + nst2) self.seed(nstate) # Increment the substream index. self.s_ss_sss_index[1] += 1 # Reset index for subsubstream. self.s_ss_sss_index[2] = 0 # Update state referencing. self.substream_start = nstate self.subsubstream_start = nstate
[docs] def advance_subsubstream(self): """Advance the state of the generator to the start of the next subsubstream. Subsubstreams are of length 2**47. """ state = self.subsubstream_start # Split the state into 2 components of length 3. st1 = state[0:3] st2 = state[3:6] # Efficiently advance state -> A*s % m for both state parts. nst1m = mat33_mat31_mult(A1p47, st1) nst2m = mat33_mat31_mult(A2p47, st2) nst1 = mat31_mod(nst1m, mrgm1) nst2 = mat31_mod(nst2m, mrgm2) nstate = tuple(nst1 + nst2) self.seed(nstate) # Increment the subsubstream index. self.s_ss_sss_index[2] += 1 # Update state referencing. self.subsubstream_start = nstate
[docs] def reset_stream(self): """Reset the state of the generator to the start of the current stream. """ nstate = self.stream_start self.seed(nstate) # Update state referencing. self.substream_start = nstate self.subsubstream_start = nstate # Reset index for substream and subsubstream. self.s_ss_sss_index[1] = 0 self.s_ss_sss_index[2] = 0
[docs] def reset_substream(self): """Reset the state of the generator to the start of the current substream. """ nstate = self.substream_start self.seed(nstate) # Update state referencing. self.subsubstream_start = nstate # Reset index for subsubstream. self.s_ss_sss_index[2] = 0
[docs] def reset_subsubstream(self): """Reset the state of the generator to the start of the current subsubstream. """ nstate = self.subsubstream_start self.seed(nstate)
[docs] def start_fixed_s_ss_sss(self, s_ss_sss_triplet): """Set the rng to the start of a specified (stream, substream, subsubstream) triplet. Parameters ---------- s_ss_sss_triplet : list [int] Triplet of the indices of the current stream-substream-subsubstream. """ state = self.ref_seed # Split the reference seed into 2 components of length 3. st1 = state[0:3] st2 = state[3:6] # Advance to start of specified stream. # Efficiently advance state -> A*s % m for both state parts. nst1m = mat33_mat31_mult(mat33_power_mod(A1p141, s_ss_sss_triplet[0], mrgm1), st1) nst2m = mat33_mat31_mult(mat33_power_mod(A2p141, s_ss_sss_triplet[0], mrgm2), st2) st1 = mat31_mod(nst1m, mrgm1) st2 = mat31_mod(nst2m, mrgm2) self.stream_start = tuple(st1 + st2) # Advance to start of specified substream. # Efficiently advance state -> A*s % m for both state parts. nst1m = mat33_mat31_mult(mat33_power_mod(A1p94, s_ss_sss_triplet[1], mrgm1), st1) nst2m = mat33_mat31_mult(mat33_power_mod(A2p94, s_ss_sss_triplet[1], mrgm2), st2) st1 = mat31_mod(nst1m, mrgm1) st2 = mat31_mod(nst2m, mrgm2) self.substream_start = tuple(st1 + st2) # Advance to start of specified subsubstream. # Efficiently advance state -> A*s % m for both state parts. nst1m = mat33_mat31_mult(mat33_power_mod(A1p47, s_ss_sss_triplet[2], mrgm1), st1) nst2m = mat33_mat31_mult(mat33_power_mod(A2p47, s_ss_sss_triplet[2], mrgm2), st2) st1 = mat31_mod(nst1m, mrgm1) st2 = mat31_mod(nst2m, mrgm2) self.subsubstream_start = tuple(st1 + st2) nstate = tuple(st1 + st2) self.seed(nstate) # Update index referencing. self.s_ss_sss_index = s_ss_sss_triplet