Censored distribution

I had a look on how to implement a censored distribution using Dirac components inside a Mixture:


import openturns as ot

class CensoredDistribution(ot.PythonDistribution):
    def __init__(self, distribution, bounds):
        dim = distribution.getDimension()
        super(CensoredDistribution, self).__init__(dim)
        if dim != bounds.getDimension():
            raise ValueError("distribution/bounds dimension do not match")
        w0 = distribution.computeProbability(bounds)
        if w0 <= 0.0:
            raise ValueError("bounds contain a null probability")
        self.distribution = distribution
        self.bounds = bounds
        coll = [ot.TruncatedDistribution(distribution, bounds)]
        weights = [w0]
        flb = self.bounds.getFiniteLowerBound()
        fub = self.bounds.getFiniteUpperBound()
        lb = self.bounds.getLowerBound()
        ub = self.bounds.getUpperBound()
        # TODO: arbitrary dim
        if distribution.isContinuous():
            dist_lb = distribution.getRange().getLowerBound()[0]
            dist_ub = distribution.getRange().getUpperBound()[0]
        else:
            dist_lb = distribution.getSupport()[0, 0]
            dist_ub = distribution.getSupport()[-1, 0]
        for i in range(dim):
            if flb[i] and flb[i] < dist_lb:
                li = ot.Interval([dist_lb], [lb[i]], [False], [True])
                w = distribution.computeProbability(li)
                coll += [ot.Dirac(distribution.computePDF(lb[i]))]
                weights += [w]
            if fub[i] and fub[i] > dist_ub:
                ui = ot.Interval([ub[i]], [dist_ub], [True], [False])
                w = distribution.computeProbability(ui)
                coll += [ot.Dirac(distribution.computePDF(ub[i]))]
                weights += [w]
        print(coll, weights)
        self.mixture = ot.Mixture(coll, weights)

    def getRange(self):
        return self.distribution.getRange()

    def getRealization(self):
        return self.mixture.getRealization()

    def computeCDF(self, X):
        return self.mixture.computeCDF(X)

    def computePDF(self, X):
        return self.mixture.computePDF(X)

    def isDiscrete(self):
        return self.mixture.isDiscrete()

    def getMarginal(self, indices):
        py_dist = CensoredDistribution(self.distribution.getMarginal(indices), self.bounds.getMarginal(indices))
        return ot.Distribution(py_dist)

py_dist = CensoredDistribution(ot.Normal(0.0, 1.0), ot.Interval(-1.2, 1.1))
dist = ot.Distribution(py_dist)
print(dist.getSample(100))

py_dist = CensoredDistribution(ot.Geometric(0.5), ot.Interval(-1.0, 4.0))
dist = ot.Distribution(py_dist)
print(dist.getSample(100))

Neat, but this implementation is buggy. Here is a correction:

import openturns as ot
from math import inf

class CensoredDistribution(ot.PythonDistribution):
    def __init__(self, distribution, bounds):
        dim = distribution.getDimension()
        super(CensoredDistribution, self).__init__(dim)
        if dim != bounds.getDimension():
            raise ValueError("distribution/bounds dimension do not match")
        w0 = distribution.computeProbability(bounds)
        if w0 <= 0.0:
            raise ValueError("bounds contain a null probability")
        self.distribution = distribution
        self.bounds = bounds
        coll = [ot.TruncatedDistribution(distribution, bounds)]
        weights = [w0]
        flb = self.bounds.getFiniteLowerBound()
        fub = self.bounds.getFiniteUpperBound()
        lb = self.bounds.getLowerBound()
        ub = self.bounds.getUpperBound()
        if not flb[0]:
            lb[0] = - inf
        if not fub[0]:
            ub[0] = inf
        # TODO: arbitrary dim
        if distribution.isContinuous():
            dist_flb = distribution.getRange().getFiniteLowerBound()
            dist_fub = distribution.getRange().getFiniteUpperBound()
            dist_lb = distribution.getRange().getLowerBound()[0]
            dist_ub = distribution.getRange().getUpperBound()[0]
            if not dist_flb[0]:
                dist_lb = - inf
            if not dist_fub[0]:
                dist_fub = inf
        else:
            dist_lb = distribution.getSupport()[0, 0] - 0.5
            dist_ub = distribution.getSupport()[-1, 0] + 0.5
        for i in range(dim):
            if lb[i] > dist_lb:
                #li = ot.Interval([dist_lb], [lb[i]])
                w = distribution.computeCDF(lb[i]) - distribution.computeCDF(dist_lb)
                coll += [ot.Dirac(lb[i])]
                weights += [w]
            if ub[i] < dist_ub:
                #ui = ot.Interval([ub[i]], [dist_ub])
                w = distribution.computeCDF(dist_ub) - distribution.computeCDF(ub[i])
                coll += [ot.Dirac(ub[i])]
                weights += [w]
        print(coll, weights)
        self.mixture = ot.Mixture(coll, weights)

    def getRange(self):
        return self.distribution.getRange()

    def getRealization(self):
        return self.mixture.getRealization()

    def computeCDF(self, X):
        return self.mixture.computeCDF(X)

    def computePDF(self, X):
        return self.mixture.computePDF(X)

    def isDiscrete(self):
        return self.mixture.isDiscrete()

    def getMarginal(self, indices):
        py_dist = CensoredDistribution(self.distribution.getMarginal(indices), self.bounds.getMarginal(indices))
        return ot.Distribution(py_dist)

py_dist = CensoredDistribution(ot.Normal(0.0, 1.0), ot.Interval(-0.5, 0.5))
dist = ot.Distribution(py_dist)
sample = dist.getSample(100)

py_dist = CensoredDistribution(ot.Geometric(0.5), ot.Interval(-1.0, 4.0))
dist = ot.Distribution(py_dist)
print(dist.getSample(100))

In the latter part, I had to work around the fact that Normal::coputeProbability has a bug: Normal::computeProbability does not account for infinite boundaries in dimension 1 · Issue #2521 · openturns/openturns · GitHub

I dont think we need inf here, we should just use the numerical range values
the ±0.5 trick wont work with non-integral distribution, better use the support epsilon
(censored_dist.py · GitHub)