I had a look on how to implement a censored distribution using Dirac components inside a Mixture:
import openturns as ot
class CensoredDistribution(ot.PythonDistribution):
def __init__(self, distribution, bounds):
dim = distribution.getDimension()
super(CensoredDistribution, self).__init__(dim)
if dim != bounds.getDimension():
raise ValueError("distribution/bounds dimension do not match")
w0 = distribution.computeProbability(bounds)
if w0 <= 0.0:
raise ValueError("bounds contain a null probability")
self.distribution = distribution
self.bounds = bounds
coll = [ot.TruncatedDistribution(distribution, bounds)]
weights = [w0]
flb = self.bounds.getFiniteLowerBound()
fub = self.bounds.getFiniteUpperBound()
lb = self.bounds.getLowerBound()
ub = self.bounds.getUpperBound()
# TODO: arbitrary dim
if distribution.isContinuous():
dist_lb = distribution.getRange().getLowerBound()[0]
dist_ub = distribution.getRange().getUpperBound()[0]
else:
dist_lb = distribution.getSupport()[0, 0]
dist_ub = distribution.getSupport()[-1, 0]
for i in range(dim):
if flb[i] and flb[i] < dist_lb:
li = ot.Interval([dist_lb], [lb[i]], [False], [True])
w = distribution.computeProbability(li)
coll += [ot.Dirac(distribution.computePDF(lb[i]))]
weights += [w]
if fub[i] and fub[i] > dist_ub:
ui = ot.Interval([ub[i]], [dist_ub], [True], [False])
w = distribution.computeProbability(ui)
coll += [ot.Dirac(distribution.computePDF(ub[i]))]
weights += [w]
print(coll, weights)
self.mixture = ot.Mixture(coll, weights)
def getRange(self):
return self.distribution.getRange()
def getRealization(self):
return self.mixture.getRealization()
def computeCDF(self, X):
return self.mixture.computeCDF(X)
def computePDF(self, X):
return self.mixture.computePDF(X)
def isDiscrete(self):
return self.mixture.isDiscrete()
def getMarginal(self, indices):
py_dist = CensoredDistribution(self.distribution.getMarginal(indices), self.bounds.getMarginal(indices))
return ot.Distribution(py_dist)
py_dist = CensoredDistribution(ot.Normal(0.0, 1.0), ot.Interval(-1.2, 1.1))
dist = ot.Distribution(py_dist)
print(dist.getSample(100))
py_dist = CensoredDistribution(ot.Geometric(0.5), ot.Interval(-1.0, 4.0))
dist = ot.Distribution(py_dist)
print(dist.getSample(100))