Hi Wissen,
In order to ease the definition of complex distributions, OpenTURNS provides the PythonDistribution class. It allows to define your own distribution given its elements such as its PDF. For the first case, you need to know the density f_X of X otherwise you will not be able to compute the density of X given Z=z, as the conditional density is given by:
f_{X|Z}(x|z)=\frac{f_{X,Z}(x,z)}{f_Z(z)}=f_X(x)c_{X,Z}(F_X(x),F_Z(z))
where f_{X,Z} is the joint density function of (X,Z), f_X and f_Z the marginal density functions of resp. X and Z and F_X and F_Z their cumulative distribution functions.
If you know f_X, f_Z and c_{X,Z} then you can use:
import openturns as ot
class DistributionX_Z(ot.PythonDistribution):
"""
This class implements the distribution of X|Z given the copula of (X,Z)
and the marginal distribution of Z.
Its PDF is given by
f_{X|Z}(x|z)=f_{X,Z}(x,z)/f_Z(z)=f_X(x)f_Z(z)c(F_X(x), F_Z(z))
"""
def __init__(self, marginalX, marginalZ, copulaXZ, z):
super(DistributionX_Z, self).__init__(1)
self.marginalX_ = marginalX
self.marginalZ_ = marginalZ
self.copulaXZ_ = copulaXZ
self.z_ = z
# Here you should check that self.uZ_ is positive
self.uZ_ = marginalZ.computeCDF(z)
self.xMin_ = self.marginalX_.getRange().getLowerBound()[0]
self.xMax_ = self.marginalX_.getRange().getUpperBound()[0]
self.integrator_ = ot.GaussKronrod() # ot.GaussLegendre([128])
def getRange(self):
return ot.Interval(self.xMin_, self.xMax_)
def computePDF(self, X):
uX = self.marginalX_.computeCDF(X)
pdfX = self.marginalX_.computePDF(X)
if pdfX == 0.0:
return 0.0
return pdfX * self.copulaXZ_.computePDF([uX, self.uZ_])
def computeCDF(self, X):
x = X[0]
if x <= self.xMin_:
return 0.0
if x >= self.xMax_:
return 1.0
def kernel(X):
return [self.computePDF(X)]
return self.integrator_.integrate(ot.PythonFunction(1, 1, kernel), ot.Interval(self.xMin_, x))[0]
if __name__ == '__main__':
dX = ot.Normal()
dZ = ot.Exponential()
cXZ = ot.GumbelCopula(5.0)
dX_Z = ot.Distribution(DistributionX_Z(dX, dZ, cXZ, 1.0))
from time import time
size = 100
t0 = time()
sample = dX_Z.getSample(size)
t1 = time()
print("Sampling speed=", size / (t1 - t0), "rng/s")
from time import time
size = 1000000
inData = dX.getSample(size)
t0 = time()
res = dX_Z.computePDF(inData)
t1 = time()
print("PDF speed=", size / (t1 - t0), "pdf/s")
size = 10000
inData = dX.getSample(size)
t0 = time()
res = dX_Z.computeCDF(inData)
t1 = time()
print("CDF speed=", size / (t1 - t0), "pdf/s")
ot.Show(dX_Z.drawPDF())
ot.Show(dX_Z.drawCDF())
and you get
Sampling speed= 58.57735770741991 rng/s
PDF speed= 260990.17873171327 pdf/s
CDF speed= 1469.5815857184682 pdf/s


If the conditioning dimension is the first component of the copula, i.e. if you know c_{Z,X}, then the implementation is much easier (and more efficient) using self.copulaXZ_.computeConditionalCDF()
. You can also implement an efficient sampling method using self.copulaXZ_.computeConditionalCDF()
:
import openturns as ot
class DistributionX_Z(ot.PythonDistribution):
"""
This class implements the distribution of X|Z given the copula of (Z,X)
and the marginal distribution of Z.
Its PDF is given by
f_{X|Z}(x|z)=f_{X,Z}(x,z)/f_Z(z)=f_X(x)f_Z(z)c(F_X(x), F_Z(z))
"""
def __init__(self, marginalX, marginalZ, copulaZX, z):
super(DistributionX_Z, self).__init__(1)
self.marginalX_ = marginalX
self.marginalZ_ = marginalZ
self.copulaZX_ = copulaZX
self.z_ = z
# Here you should check that self.uZ_ is positive
self.uZ_ = marginalZ.computeCDF(z)
self.xMin_ = self.marginalX_.getRange().getLowerBound()[0]
self.xMax_ = self.marginalX_.getRange().getUpperBound()[0]
def getRange(self):
return ot.Interval(self.xMin_, self.xMax_)
def computePDF(self, X):
uX = self.marginalX_.computeCDF(X)
pdfX = self.marginalX_.computePDF(X)
if pdfX == 0.0:
return 0.0
return pdfX * self.copulaZX_.computeConditionalPDF(uX, [self.uZ_])
def computeCDF(self, X):
uX = self.marginalX_.computeCDF(X)
return self.copulaZX_.computeConditionalCDF(uX, [self.uZ_])
def getRealization(self):
return [self.copulaZX_.computeConditionalQuantile(ot.RandomGenerator.Generate(), [self.uZ_])]
if __name__ == '__main__':
dX = ot.Normal()
dZ = ot.Exponential()
cXZ = ot.GumbelCopula(5.0)
dX_Z = ot.Distribution(DistributionX_Z(dX, dZ, cXZ, 1.0))
from time import time
size = 1000000
t0 = time()
sample = dX_Z.getSample(size)
t1 = time()
print("Sampling speed=", size / (t1 - t0), "rng/s")
from time import time
inData = dX.getSample(size)
t0 = time()
res = dX_Z.computePDF(inData)
t1 = time()
print("PDF speed=", size / (t1 - t0), "pdf/s")
t0 = time()
res = dX_Z.computeCDF(inData)
t1 = time()
print("CDF speed=", size / (t1 - t0), "pdf/s")
ot.Show(dX_Z.drawPDF())
ot.Show(dX_Z.drawCDF())
and you get:
Sampling speed= 442468.86907499697 rng/s
PDF speed= 261378.97640842458 pdf/s
CDF speed= 356770.63131405925 pdf/s
and the same graphs for the PDF and the CDF. Note the huge improvement in the sampling speed and the CDF computation speed.
For the more complex model it works essentially the same way. I hope to find some spare time to give you a possible implementation tomorrow.
Cheers
Régis