Construct conditional distribution and transform the jointly distribution

Hello,
I’m building a stochastic model with three random variables X,Y, Z that both X, Y depends on Z and X, Y are conditional independent given the value of Z. I wonder that how can I implement the conditional distribution of X | Z given the copula of X, Z and the marginal of Z? Another question is about the transformation of jointly distribution of (X, Y) | Z. If there is another random variable R = \frac{X}{Y}, how can I implement the jointly distribution of (R, Y) | Z? I have knew that the jointly density function under this transformation satisfies the following equation:

f_{R,Y | Z}(r,y|z) = f_{X,Y|Z}(ry, y|z) | y |

Does anyone have some idea to implement this distribution by the above equation using openturns?

Hi Wissen,
In order to ease the definition of complex distributions, OpenTURNS provides the PythonDistribution class. It allows to define your own distribution given its elements such as its PDF. For the first case, you need to know the density f_X of X otherwise you will not be able to compute the density of X given Z=z, as the conditional density is given by:

f_{X|Z}(x|z)=\frac{f_{X,Z}(x,z)}{f_Z(z)}=f_X(x)c_{X,Z}(F_X(x),F_Z(z))

where f_{X,Z} is the joint density function of (X,Z), f_X and f_Z the marginal density functions of resp. X and Z and F_X and F_Z their cumulative distribution functions.

If you know f_X, f_Z and c_{X,Z} then you can use:

import openturns as ot

class DistributionX_Z(ot.PythonDistribution):
    """
    This class implements the distribution of X|Z given the copula of (X,Z)
    and the marginal distribution of Z.
    Its PDF is given by
    f_{X|Z}(x|z)=f_{X,Z}(x,z)/f_Z(z)=f_X(x)f_Z(z)c(F_X(x), F_Z(z))
    """
    def __init__(self, marginalX, marginalZ, copulaXZ, z):
        super(DistributionX_Z, self).__init__(1)
        self.marginalX_ = marginalX
        self.marginalZ_ = marginalZ
        self.copulaXZ_ = copulaXZ
        self.z_ = z
        # Here you should check that self.uZ_ is positive
        self.uZ_ = marginalZ.computeCDF(z)
        self.xMin_ = self.marginalX_.getRange().getLowerBound()[0]
        self.xMax_ = self.marginalX_.getRange().getUpperBound()[0]
        self.integrator_ = ot.GaussKronrod() # ot.GaussLegendre([128])
        
    def getRange(self):
        return ot.Interval(self.xMin_, self.xMax_)

    def computePDF(self, X):
        uX = self.marginalX_.computeCDF(X)
        pdfX = self.marginalX_.computePDF(X)
        if pdfX == 0.0:
            return 0.0
        return pdfX * self.copulaXZ_.computePDF([uX, self.uZ_])

    def computeCDF(self, X):
        x = X[0]
        if x <= self.xMin_:
            return 0.0
        if x >= self.xMax_:
            return 1.0
        def kernel(X):
            return [self.computePDF(X)]
        return self.integrator_.integrate(ot.PythonFunction(1, 1, kernel), ot.Interval(self.xMin_, x))[0]

if __name__ == '__main__':
    dX = ot.Normal()
    dZ = ot.Exponential()
    cXZ = ot.GumbelCopula(5.0)
    dX_Z = ot.Distribution(DistributionX_Z(dX, dZ, cXZ, 1.0))
    from time import time
    size = 100
    t0 = time()
    sample = dX_Z.getSample(size)
    t1 = time()
    print("Sampling speed=", size / (t1 - t0), "rng/s")
    from time import time
    size = 1000000
    inData = dX.getSample(size)
    t0 = time()
    res = dX_Z.computePDF(inData)
    t1 = time()
    print("PDF speed=", size / (t1 - t0), "pdf/s")
    size = 10000
    inData = dX.getSample(size)
    t0 = time()
    res = dX_Z.computeCDF(inData)
    t1 = time()
    print("CDF speed=", size / (t1 - t0), "pdf/s")
    
    ot.Show(dX_Z.drawPDF())
    ot.Show(dX_Z.drawCDF())

and you get

Sampling speed= 58.57735770741991 rng/s
PDF speed= 260990.17873171327 pdf/s
CDF speed= 1469.5815857184682 pdf/s

pdf
cdf

If the conditioning dimension is the first component of the copula, i.e. if you know c_{Z,X}, then the implementation is much easier (and more efficient) using self.copulaXZ_.computeConditionalCDF(). You can also implement an efficient sampling method using self.copulaXZ_.computeConditionalCDF():

import openturns as ot

class DistributionX_Z(ot.PythonDistribution):
    """
    This class implements the distribution of X|Z given the copula of (Z,X)
    and the marginal distribution of Z.
    Its PDF is given by
    f_{X|Z}(x|z)=f_{X,Z}(x,z)/f_Z(z)=f_X(x)f_Z(z)c(F_X(x), F_Z(z))
    """
    def __init__(self, marginalX, marginalZ, copulaZX, z):
        super(DistributionX_Z, self).__init__(1)
        self.marginalX_ = marginalX
        self.marginalZ_ = marginalZ
        self.copulaZX_ = copulaZX
        self.z_ = z
        # Here you should check that self.uZ_ is positive
        self.uZ_ = marginalZ.computeCDF(z)
        self.xMin_ = self.marginalX_.getRange().getLowerBound()[0]
        self.xMax_ = self.marginalX_.getRange().getUpperBound()[0]
        
    def getRange(self):
        return ot.Interval(self.xMin_, self.xMax_)

    def computePDF(self, X):
        uX = self.marginalX_.computeCDF(X)
        pdfX = self.marginalX_.computePDF(X)
        if pdfX == 0.0:
            return 0.0
        return pdfX * self.copulaZX_.computeConditionalPDF(uX, [self.uZ_])

    def computeCDF(self, X):
        uX = self.marginalX_.computeCDF(X)
        return self.copulaZX_.computeConditionalCDF(uX, [self.uZ_])

    def getRealization(self):
        return [self.copulaZX_.computeConditionalQuantile(ot.RandomGenerator.Generate(), [self.uZ_])]

if __name__ == '__main__':
    dX = ot.Normal()
    dZ = ot.Exponential()
    cXZ = ot.GumbelCopula(5.0)
    dX_Z = ot.Distribution(DistributionX_Z(dX, dZ, cXZ, 1.0))
    from time import time
    size = 1000000
    t0 = time()
    sample = dX_Z.getSample(size)
    t1 = time()
    print("Sampling speed=", size / (t1 - t0), "rng/s")
    from time import time
    inData = dX.getSample(size)
    t0 = time()
    res = dX_Z.computePDF(inData)
    t1 = time()
    print("PDF speed=", size / (t1 - t0), "pdf/s")
    t0 = time()
    res = dX_Z.computeCDF(inData)
    t1 = time()
    print("CDF speed=", size / (t1 - t0), "pdf/s")
    
    ot.Show(dX_Z.drawPDF())
    ot.Show(dX_Z.drawCDF())

and you get:

Sampling speed= 442468.86907499697 rng/s
PDF speed= 261378.97640842458 pdf/s
CDF speed= 356770.63131405925 pdf/s

and the same graphs for the PDF and the CDF. Note the huge improvement in the sampling speed and the CDF computation speed.

For the more complex model it works essentially the same way. I hope to find some spare time to give you a possible implementation tomorrow.

Cheers

Régis

is wrong, it should be

getRealization(self):
        return self.marginalX_.computeQuantile(self.copulaZX_.computeConditionalQuantile(ot.RandomGenerator.Generate(), [self.uZ_]))

The sampling speed is impacted, but remains quite efficient:

Sampling speed= 114508.69012645168 rng/s
PDF speed= 269025.7285793848 pdf/s
CDF speed= 371484.36216444726 pdf/s

Dear Régis,

I would like to express my sincere gratitude for your valuable assistance in addressing my query on the forum. Your detailed and practical solution was incredibly helpful, and I truly appreciate the time and effort you put into explaining it.

Moreover, OpenTURNS is so remarkablely efficient that it proved to be an excellent tool for tackling my problem, and I am impressed by its capabilities.

Once again, thank you for your expertise and for sharing your knowledge. Your contribution has significantly enriched my understanding and has been instrumental in finding a solution to my issue.

Best regards,
Wissen

1 Like