1. Functional Regression
Let the covariate be an at least twice continuously differentiable random function defined wlog. on an interval
and
the corresponding the response. For simplicity we assume centered random variables, i.e.
beside we require bounded second moments
,
. We assume that there exists some
that satisfies
(1)
We define cross-covariance functions of X and Y given by
(2)
Let be the covariance operator of
as defined in this post with ordered eigenvalues
and corresponding eigenfunctions
, then
statisfies (1) if and only if
(3)
(3) has a solution if and only if is invertible, for
the unique solution is then given by
(4)
However this solution is unstable as to be seen by a simple example. Consider some , and define
. Then as
(5)
This means that small pertubations in leads to a large variation (even infinite) in
. There are various ways to tackle this problem, the key idea is to trade the uniqness for the stabilty of the a solution. This is archieved by reducing the dimensionality of the covariance operator. By doing so we do not solve the same problem but another one that is somewhat close to the original problem.
A popular approach is to smooth the regression or to truncate the expansion after some . This truncation approach will be the foundation of our spark implementation.
1.1 Simulation

For our simulation we will again assume a brownian motion on
and
. For the simulation we sample
curves and approximate
as proposed in this post with
at
timepoints
, accordingly
.
1.2 Implementation in Spark
Our implementation is based on the FPCA Algorithm presented in this post. We use the estimated first eigenfunctions
and corresponding eigenvalues
to construct an estimator based on 5 for
for
with
#We start with a sample of brownian motions import matplotlib.pyplot as plt from numpy import * from pyspark.mllib.linalg import * from pyspark.mllib.linalg.distributed import * from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry def spark_FPCA( matrix , L): N = matrix.numRows() T = matrix.numCols() scale =CoordinateMatrix(sc.parallelize(range(N)).map(lambda i: MatrixEntry(i, i, 1./sqrt(T) )), N, N).toBlockMatrix() # Compute the top L singular values and corresponding singular vectors. svd = scale.multiply(matrix).transpose().toIndexedRowMatrix().computeSVD(L) s = svd.s # The singular values are stored in a local dense vector. Va=svd.V.toArray() Vl = list(); for i in range(0, len(Va)): Vl.append( IndexedRow(i, Va[i] )) V=IndexedRowMatrix( sc.parallelize(Vl) ) S=DenseMatrix( L,L, diag(s).flatten().flatten().tolist() ) Si=DenseMatrix( L,L, diag(1./(s)).flatten().flatten().tolist() ) scores=V.multiply(S) components= matrix.transpose().multiply( V.multiply(Si).toBlockMatrix() ) #reduced FPCA decomposition FPCA= components.multiply(scores.toBlockMatrix().transpose() ) return (components, scores, FPCA, s) def spark_FReg( Xmatrix ,Ymatrix , L): fpca=spark_FPCA( Xmatrix , L) T = matrix.numCols() components=fpca[0] s=fpca[3]*fpca[3] #reconstruct beta Si=DenseMatrix( L,L, diag(1./(s*T)).flatten().flatten().tolist() ) beta_est=Ymatrix.transpose().multiply( Xmatrix ).multiply(components).toIndexedRowMatrix().multiply(Si).toBlockMatrix().multiply(components.transpose()) return (beta_est) ##EXAMPLE T=5000 N=800 #define beta(t), the score function time=2*pi*true_divide(arange(0,T),T ) beta= sin( time) #betamat=DenseMatrix( T,N, np.repeat(beta,N) ) betamat=DenseMatrix( T,1, beta ) scaleMat=DenseMatrix( T,T, diag(1/T).flatten().flatten().tolist() ) #plt.plot(time,beta) #plt.show() data = list(); for i in range(0, N): data.append( IndexedRow(i, array( cumsum(random.normal(0,sqrt(true_divide(1,T)),T)) ) ) ) #convert the data to spark sc_data= sc.parallelize( data ) matrix = IndexedRowMatrix(sc_data).toBlockMatrix().cache() scale=CoordinateMatrix(sc.range(N).map(lambda i: MatrixEntry(i, i, 1./T)), N, N).toBlockMatrix() Ymatrix=scale.multiply(IndexedRowMatrix(sc_data).multiply(betamat).toBlockMatrix()).cache() ##Plot the Input data #plt.plot( Ymatrix.toLocalMatrix().toArray().flatten() ) #plt.show() #reconstruct beta beta_est=spark_FReg(matrix, Ymatrix, 12).toLocalMatrix() times=true_divide( arange(0,T),T ) plt.plot(times, beta_est.toArray().flatten()) plt.plot(times,beta ) plt.show()