Spark MLlib - LFW

时间:2023-03-09 08:41:32
Spark MLlib - LFW
val path = "/usr/data/lfw-a/*"
val rdd = sc.wholeTextFiles(path)
val first = rdd.first
println(first)
val files = rdd.map { case (fileName, content) =>
fileName.replace("file:", "") }
println(files.first)

println(files.count)
%pyspark
import matplotlib.pyplot as plt
path = "/usr/data/lfw-a/Aaron_Eckhart/Aaron_Eckhart_0001.jpg"
ae = plt.imread(path)
plt.imshow(ae)
plt.show()
import java.awt.image.BufferedImage
def loadImageFromFile(path: String): BufferedImage = {
import javax.imageio.ImageIO
import java.io.File
ImageIO.read(new File(path))
}
val aePath = "/usr/data/lfw-a/Aaron_Eckhart/Aaron_Eckhart_0001.jpg"
val aeImage = loadImageFromFile(aePath)
import java.awt.image
def processImage(image: BufferedImage, width: Int, height: Int):
BufferedImage = {
val bwImage = new BufferedImage(width, height, BufferedImage.TYPE_BYTE_GRAY)
val g = bwImage.getGraphics()
g.drawImage(image, , , width, height, null)
g.dispose()
bwImage
}
val grayImage = processImage(aeImage, , )
import javax.imageio.ImageIO
import java.io.File
ImageIO.write(grayImage, "jpg", new File("/tmp/aeGray.jpg"))
%pyspark
import matplotlib.pyplot as plt
tmpPath = "/tmp/aeGray.jpg"
aeGary = plt.imread(tmpPath)
plt.imshow(aeGary, cmap=plt.cm.gray)
plt.show()
def getPixelsFromImage(image: BufferedImage): Array[Double] = {
val width = image.getWidth
val height = image.getHeight
val pixels = Array.ofDim[Double](width * height)
image.getData.getPixels(, , width, height, pixels)
}
def extractPixels(path: String, width: Int, height: Int):
Array[Double] = {
val raw = loadImageFromFile(path)
val processed = processImage(raw, width, height)
getPixelsFromImage(processed)
}
val pixels = files.map(f => extractPixels(f, , ))
println(pixels.take().map(_.take().mkString
("", ",", ", ...")).mkString("\n"))
import org.apache.spark.mllib.linalg.Vectors
val vectors = pixels.map(p => Vectors.dense(p))
vectors.setName("image-vectors")
vectors.cache
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.feature.StandardScaler
val scaler = new StandardScaler(withMean = true, withStd = false).fit(vectors)
val scaledVectors = vectors.map(v => scaler.transform(v))
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val matrix = new RowMatrix(scaledVectors)
val K =
val pc = matrix.computePrincipalComponents(K)
val rows = pc.numRows
val cols = pc.numCols
println(rows, cols)
import breeze.linalg.DenseMatrix
val pcBreeze = new DenseMatrix(rows, cols, pc.toArray)
import breeze.linalg.csvwrite
csvwrite(new File("/tmp/pc.csv"), pcBreeze)
%pyspark
import numpy as np
import matplotlib.pyplot as plt

pcs = np.loadtxt("/tmp/pc.csv", delimiter=",")
print(pcs.shape)
%pyspark
import numpy as np
import matplotlib.pyplot as plt

def plot_gallery(images, h, w, n_row=, n_col=):
    plt.figure(figsize=(1.8 * n_col, 2.4 * n_row))
    plt.subplots_adjust(bottom=, left=., right=., top=.,hspace=.)
    for i in range(n_row * n_col):
        plt.subplot(n_row, n_col, i + )
        plt.imshow(images[:, i].reshape((h, w)), cmap=plt.cm.gray)
        plt.title(), size=)
        plt.xticks(())
        plt.yticks(())
%pyspark
plot_gallery(pcs, , )
plt.show()
val projected = matrix.multiply(pc)
println(projected.numRows, projected.numCols)
println(projected.rows.take().mkString("\n"))
val svd = matrix.computeSVD(, computeU = true)
println(s"U dimension: (${svd.U.numRows}, ${svd.U.numCols})")
println(s"S dimension: (${svd.s.size}, )")
println(s"V dimension: (${svd.V.numRows}, ${svd.V.numCols})")
def approxEqual(array1: Array[Double], array2: Array[Double],
tolerance: Double = 1e-): Boolean = {
val bools = array1.zip(array2).map { case (v1, v2) => if
(math.abs(math.abs(v1) - math.abs(v2)) > 1e-) false else true }
bools.fold(true)(_ & _)
}
val breezeS = breeze.linalg.DenseVector(svd.s.toArray)
val projectedSVD = svd.U.rows.map { v =>
val breezeV = breeze.linalg.DenseVector(v.toArray)
val multV = breezeV :* breezeS
Vectors.dense(multV.data)
}
projected.rows.zip(projectedSVD).map { case (v1, v2) =>
approxEqual(v1.toArray, v2.toArray) }.filter(b => true).count
val sValues = ( to ).map { i => matrix.computeSVD(i, computeU =
false).s }
sValues.foreach(println)
val svd300 = matrix.computeSVD(, computeU = false)
val sMatrix = , , svd300.s.toArray)
csvwrite(new File("/tmp/s.csv"), sMatrix)
%pyspark
import numpy as np
import matplotlib.pyplot as plt

s = np.loadtxt("/tmp/s.csv", delimiter=",")
print(s.shape)
plt.plot(s)
plt.show()
%pyspark
import numpy as np
import matplotlib.pyplot as plt

s = np.loadtxt("/tmp/s.csv", delimiter=",")
plt.plot(np.cumsum(s))
plt.yscale('log')
plt.show()
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import java.awt.image.BufferedImage
import java.awt.image
import javax.imageio.ImageIO
import java.io.File
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.feature.StandardScaler
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import breeze.linalg.DenseMatrix
import org.jblas.DoubleMatrix
import breeze.linalg.csvwrite
import org.apache.spark.mllib.recommendation.Rating

/**
  * Created by ysp on 16-10-30.
  */
object LFW {
    def main(args: Array[String]) {
        val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("lfw"))
        val path = "/usr/data/lfw-a/*"
        val rdd = sc.wholeTextFiles(path)
        val files = rdd.map { case (fileName, content) => fileName.replace("file:", "") }
        def loadImageFromFile(path: String): BufferedImage = {
            import javax.imageio.ImageIO
            import java.io.File
            ImageIO.read(new File(path))
        }
        def processImage(image: BufferedImage, width: Int, height: Int):
        BufferedImage = {
            val bwImage = new BufferedImage(width, height, BufferedImage.TYPE_BYTE_GRAY)
            val g = bwImage.getGraphics()
            g.drawImage(image, , , width, height, null)
            g.dispose()
            bwImage
        }
        def getPixelsFromImage(image: BufferedImage): Array[Double] = {
            val width = image.getWidth
            val height = image.getHeight
            val pixels = Array.ofDim[Double](width * height)
            image.getData.getPixels(, , width, height, pixels)
        }
        def extractPixels(path: String, width: Int, height: Int):
        Array[Double] = {
            val raw = loadImageFromFile(path)
            val processed = processImage(raw, width, height)
            getPixelsFromImage(processed)
        }
        val pixels = files.map(f => extractPixels(f, , ))
        val vectors = pixels.map(p => Vectors.dense(p))
        vectors.setName("image-vectors")
        vectors.cache
        val scaler = new StandardScaler(withMean = true, withStd = false).fit(vectors)
        val scaledVectors = vectors.map(v => scaler.transform(v))
        val matrix = new RowMatrix(scaledVectors)
        val K =
        val pc = matrix.computePrincipalComponents(K)
        val projected = matrix.multiply(pc)
        println(projected.numRows, projected.numCols)
        println(projected.rows.take().mkString("\n"))

        val projecteds = projected.rows.take().map{ case ( x ) =>
            i +=
            (i,x)
        }
        val ps = projecteds.map{ case (i,x) =>
          val j= i.toInt
          val y= x.toArray
          (j,y)
        }

        val itemFactor =projected.rows.take().map{case ( x ) =>
          val y= x.toArray
          (y)
        }
        val itemVector = new DoubleMatrix(itemFactor)

        def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {
            vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
        }
        val sims = ps.map{ case (id, factor) =>
          val factorVector = new DoubleMatrix(factor)
          val sim = cosineSimilarity(factorVector, itemVector)
          (id, sim)
        }
        println(sims.take().mkString("\n"))

        val sortedSims = sims.sortBy(-_._2)
     println(sortedSims.take(10).mkString("\n"))

     var i = 0
     val fs = files.map{ case ( add ) =>
        i += 1
         (i.toString+"|"+add)
     }
     val titles = fs.map(line => line.split("\\|").take(2)).map(array
        => (array(0).toInt,
        array(1))).collectAsMap()
} }
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import java.awt.image.BufferedImage
import java.awt.image
import javax.imageio.ImageIO
import java.io.File
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.feature.StandardScaler
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import breeze.linalg.DenseMatrix
import org.jblas.DoubleMatrix
import breeze.linalg.csvwrite
import org.apache.spark.mllib.recommendation.Rating

/**
  * Created by ysp on 2016-12-30.
  */
object LFW {
    def main(args: Array[String]) {
        val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("lfw"))
        val path = "/usr/data/lfw-a/*"
        val rdd = sc.wholeTextFiles(path)
        val files = rdd.map { case (fileName, content) => fileName.replace("file:", "") }
        def loadImageFromFile(path: String): BufferedImage = {
            import javax.imageio.ImageIO
            import java.io.File
            ImageIO.read(new File(path))
        }
        def processImage(image: BufferedImage, width: Int, height: Int):
        BufferedImage = {
            val bwImage = new BufferedImage(width, height, BufferedImage.TYPE_BYTE_GRAY)
            val g = bwImage.getGraphics()
            g.drawImage(image, , , width, height, null)
            g.dispose()
            bwImage
        }
        def getPixelsFromImage(image: BufferedImage): Array[Double] = {
            val width = image.getWidth
            val height = image.getHeight
            val pixels = Array.ofDim[Double](width * height)
            image.getData.getPixels(, , width, height, pixels)
        }
        def extractPixels(path: String, width: Int, height: Int):
        Array[Double] = {
            val raw = loadImageFromFile(path)
            val processed = processImage(raw, width, height)
            getPixelsFromImage(processed)
        }
        val pixels = files.map(f => extractPixels(f, , ))
        val vectors = pixels.map(p => Vectors.dense(p))
        vectors.setName("image-vectors")
        vectors.cache
        val scaler = new StandardScaler(withMean = true, withStd = false).fit(vectors)
        val scaledVectors = vectors.map(v => scaler.transform(v))
        val matrix = new RowMatrix(scaledVectors)
        val K =
        val pc = matrix.computePrincipalComponents(K)
        val projected = matrix.multiply(pc)
        println(projected.numRows, projected.numCols)
        println(projected.rows.take().mkString("\n"))

        val projecteds = projected.rows.take().map{ case ( x ) =>
            i +=
            (i,x)
        }
        val ps = projecteds.map{ case (i,x) =>
          val j= i.toInt
          val y= x.toArray
          (j,y)
        }

        val fs = files.map{ case ( add ) =>
            j +=
            (j.toString+"|"+add)
        }
        val t = fs.map(line => line.split()).map(array=>
            (array(),array())).collectAsMap()

        val itemFactor =projected.rows.take().map{case ( x ) =>
          val y= x.toArray
          (y)
        }
        println(t(.toString))
        val itemVector = new DoubleMatrix(itemFactor)

        def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {
            vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
        }

        val sims = ps.map{ case (id, factor) =>
          val factorVector = new DoubleMatrix(factor)
          val sim = cosineSimilarity(factorVector, itemVector)
          val add = t(id.toString)
          (id, sim ,add)
        }
        println(sims.take().mkString("\n"))

        val sortedSims = sims.sortBy(-_._2)
        println(sortedSims.take().mkString("\n"))
    }
}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import java.awt.image.BufferedImage
import java.awt.image
import javax.imageio.ImageIO
import java.io.File
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.feature.StandardScaler
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import breeze.linalg.DenseMatrix
import org.jblas.DoubleMatrix
import breeze.linalg.csvwrite
import org.apache.spark.mllib.recommendation.Rating

/**
  * Created by ysp on 2016-12-30.
  */
object LFW {
    def main(args: Array[String]) {
        val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("lfw"))
        val path = "/usr/data/lfw-a/*"
        val rdd = sc.wholeTextFiles(path)
        val files = rdd.map { case (fileName, content) => fileName.replace("file:", "") }
        def loadImageFromFile(path: String): BufferedImage = {
            import javax.imageio.ImageIO
            import java.io.File
            ImageIO.read(new File(path))
        }
        def processImage(image: BufferedImage, width: Int, height: Int):
        BufferedImage = {
            val bwImage = new BufferedImage(width, height, BufferedImage.TYPE_BYTE_GRAY)
            val g = bwImage.getGraphics()
            g.drawImage(image, , , width, height, null)
            g.dispose()
            bwImage
        }
        def getPixelsFromImage(image: BufferedImage): Array[Double] = {
            val width = image.getWidth
            val height = image.getHeight
            val pixels = Array.ofDim[Double](width * height)
            image.getData.getPixels(, , width, height, pixels)
        }
        def extractPixels(path: String, width: Int, height: Int):
        Array[Double] = {
            val raw = loadImageFromFile(path)
            val processed = processImage(raw, width, height)
            getPixelsFromImage(processed)
        }
        val pixels = files.map(f => extractPixels(f, , ))
        val vectors = pixels.map(p => Vectors.dense(p))
        vectors.setName("image-vectors")
        vectors.cache
        val scaler = new StandardScaler(withMean = true, withStd = false).fit(vectors)
        val scaledVectors = vectors.map(v => scaler.transform(v))
        val matrix = new RowMatrix(scaledVectors)
        val K =
        val pc = matrix.computePrincipalComponents(K)
        val projected = matrix.multiply(pc)
        println(projected.numRows, projected.numCols)
        println(projected.rows.take().mkString("\n"))

        val projecteds = projected.rows.take().map{ case ( x ) =>
            i +=
            (i,x)
        }
        val ps = projecteds.map{ case (i,x) =>
          val j= i.toInt
          val y= x.toArray
          (j,y)
        }

        val fs = files.map{ case ( add ) =>
            j +=
            (j.toString+"|"+add)
        }
        val t = fs.map(line => line.split()).map(array=>
            (array(),array())).collectAsMap()

        val itemFactor =projected.rows.take().map{case ( x ) =>
          val y= x.toArray
          (y)
        }
        val itemVector = new DoubleMatrix(itemFactor)

        def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {
            vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
        }

        val sims = ps.map{ case (id, factor) =>
          val factorVector = new DoubleMatrix(factor)
          val sim = cosineSimilarity(factorVector, itemVector)
          val add = t(id.toString)
          (id, sim ,add)
        }
        println(sims.take().mkString("\n"))

        val sortedSims = sims.sortBy(-_._2)
        println(sortedSims.slice(,)mkString("\n"))
    }
}
val item = sc.textFile("/usr/data/Art.item")
val t = item.map(line => line.split()).map(array=>
    (array().toInt,array())
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import java.awt.image.BufferedImage
import java.awt.image
import javax.imageio.ImageIO
import java.io.File
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.feature.StandardScaler
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import breeze.linalg.DenseMatrix
import org.jblas.DoubleMatrix
import breeze.linalg.csvwrite
import org.apache.spark.mllib.recommendation.Rating

val item = sc.textFile("/home/ysp/Art.item")
val t = item.map(line => line.split()).map(array=>
    (array().toInt,array())).collectAsMap()
val fs = item.map(line => line.split())

val file = fs.map(array=>array())
println(file.count)
file.first()
t()

def loadImageFromFile(path: String): BufferedImage = {
    import javax.imageio.ImageIO
    import java.io.File
    ImageIO.read(new File(path))
}
def processImage(image: BufferedImage, width: Int, height: Int):
    BufferedImage = {
    val bwImage = new BufferedImage(width, height, BufferedImage.TYPE_BYTE_GRAY)
    val g = bwImage.getGraphics()
    g.drawImage(image, , , width, height, null)
    g.dispose()
    bwImage
}
def getPixelsFromImage(image: BufferedImage): Array[Double] = {
    val width = image.getWidth
    val height = image.getHeight
    val pixels = Array.ofDim[Double](width * height)
    image.getData.getPixels(, , width, height, pixels)
}
def extractPixels(path: String, width: Int, height: Int):
    Array[Double] = {
    val raw = loadImageFromFile(path)
    val processed = processImage(raw, width, height)
    getPixelsFromImage(processed)
}

file.first()

val pixels = file.map(f => extractPixels(f, , ))
val vectors = pixels.map(p => Vectors.dense(p))
vectors.setName("image-vectors")
vectors.cache
val scaler = new StandardScaler(withMean = true, withStd = false).fit(vectors)
val scaledVectors = vectors.map(v => scaler.transform(v))
val matrix = new RowMatrix(scaledVectors)
val K =
val pc = matrix.computePrincipalComponents(K)
val projected = matrix.multiply(pc)
println(projected.numRows, projected.numCols)
println(projected.rows.take().mkString("\n"))

val ps = projected.rows.take().map{ case ( x ) =>
    i = i+
    val y= x.toArray
    val address = t(i)
    (i, y, address)
}

val itemFactor =projected.rows.take().map{case ( x ) =>
    val y= x.toArray
    (y)
}
val itemVector = new DoubleMatrix(itemFactor)

def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {
    vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
}
val sims = ps.map{ case (id, factor, add) =>
    val factorVector = new DoubleMatrix(factor)
    val sim = cosineSimilarity(factorVector, itemVector)
    (id, sim, add)
}

println(sims.take().mkString("\n"))
val sortedSims = sims.sortBy(-_._2)
println()
println("推荐:")
println(sortedSims.slice(,)mkString("\n"))