1. Kernel Regression using Pyspark


In a previous article I presented an implementation of a kernel denisty estimation using pyspark. It is thus not difficult to modify the algorithm to estimate a kernel regression. Suppose that there exits some function , an example for such functions are for instance temperature curves which measure the temperature during a day. In practice, such functions
will often not be directly observed, but one will have to deal with discrete, noisy observations contaminated with some error. The purpose of kernel regression is then to estimate the underlying true function.
In the following I will only consider a simple, standard error model: For design points
there are noisy observations
such that
(1)
for i.i.d. zero mean error terms with finite variance
and
.
To estimate I stick to the Nadaraya-Watson-Estimator given by
where is a kernel as described here, in the implementation again an Epanechnikov kernel
is used. The reader might recognize that
is just the density (scaled by
) already implemented here, while
is just a weighted version of it. Implementation should therefore be very similar leading to the following algortihm:
###A Spark-Function to derive a non-parametric kernel regression from pyspark.mllib.regression import LabeledPoint from pyspark.mllib.feature import StandardScaler import matplotlib.pyplot as plt from numpy import * ##1.0 Simulated Data T=1500 time=sort( random.uniform(0,1,T) ) ##Sorting is not required, i did it to have less trouble with the plots true = sin( true_divide( time, 0.05*pi) ) X= true + random.normal(0,0.1,T) data= sc.parallelize( zip(time,X) ) plt.plot(time,true, 'bo') plt.plot(time,X, 'bo') plt.show() ##2.0 The Function #2.1 Kernel Function def spark_regression(data, Nout, bw): def epan_kernel(x,y,b): u=true_divide( (x-y), b) return max(0, true_divide( 1, b)*true_divide(3,4)*(1-u**2)) #derive the minia and maxi used for interpolation mini=data.map(lambda x: x[0]).takeOrdered(1, lambda x: x ) maxi=data.map(lambda x: x[0]).takeOrdered(1, lambda x: -1*x ) #create an interpolation grid (in fact this time it's random) random_grid = sc.parallelize( random.uniform(maxi,mini,Nout) ) #compute K(x-xi) Matrix density=data.cartesian(random_grid).map(lambda x:( float(x[1]),epan_kernel(array(x[0][0]),array(x[1]),bw) ) ) kernl=data.cartesian(random_grid).map(lambda x:( float(x[1]),x[0][1]*epan_kernel(array(x[0][0]),array(x[1]),bw) ) ) mx= kernl.filter(lambda x: x>0).reduceByKey( lambda y, x: y+x ).zip( density.filter(lambda x: x>0).reduceByKey( lambda y, x: y+x ) ) ##added optional filter() does anyone know if this improves performance? return mx.map(lambda x: (x[0][0],true_divide(x[0][1],x[1][1])) ) ##3.0 Results fitted= spark_regression(data, 128, 0.05).collect() fit=array(fitted).transpose() plt.plot(time,true, color='r') plt.plot(fit[0], fit[1], 'bo') plt.show()
Pingback: Kernel Regression using the Fast Fourier Transform – The Big Data Blog