Spark机器学习· 实时机器学习

时间:2021-10-31 13:23:51

Spark机器学习

1 在线学习

模型随着接收的新消息,不断更新自己;而不是像离线训练一次次重新训练。

2 Spark Streaming

3 MLib+Streaming应用

3.0 build.sbt

依赖Spark MLlib和Spark Streaming

name := "scala-spark-streaming-app"

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.5.1"

libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.5.1"
使用国内镜像仓库

~/.sbt/repositories

[repositories]
local
osc: http://maven.oschina.net/content/groups/public/
typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
sonatype-oss-releases
maven-central
sonatype-oss-snapshots

3.1 生产消息

object StreamingProducer {

  def main(args: Array[String]) {

    val random = new Random()

    // Maximum number of events per second
    val MaxEvents = 6

    // Read the list of possible names
    val namesResource = this.getClass.getResourceAsStream("/names.csv")
    val names = scala.io.Source.fromInputStream(namesResource)
      .getLines()
      .toList
      .head
      .split(",")
      .toSeq

    // Generate a sequence of possible products
    val products = Seq(
      "iPhone Cover" -> 9.99,
      "Headphones" -> 5.49,
      "Samsung Galaxy Cover" -> 8.95,
      "iPad Cover" -> 7.49
    )

    /** Generate a number of random product events */
    def generateProductEvents(n: Int) = {
      (1 to n).map { i =>
        val (product, price) = products(random.nextInt(products.size))
        val user = random.shuffle(names).head
        (user, product, price)
      }
    }

    // create a network producer
    val listener = new ServerSocket(9999)
    println("Listening on port: 9999")

    while (true) {
      val socket = listener.accept()
      new Thread() {
        override def run = {
          println("Got client connected from: " + socket.getInetAddress)
          val out = new PrintWriter(socket.getOutputStream(), true)

          while (true) {
            Thread.sleep(1000)
            val num = random.nextInt(MaxEvents)
            val productEvents = generateProductEvents(num)
            productEvents.foreach{ event =>
              out.write(event.productIterator.mkString(","))
              out.write("\n")
            }
            out.flush()
            println(s"Created $num events...")
          }
          socket.close()
        }
      }.start()
    }
  }
}
sbt run

Multiple main classes detected, select one to run:

 [1] MonitoringStreamingModel
 [2] SimpleStreamingApp
 [3] SimpleStreamingModel
 [4] StreamingAnalyticsApp
 [5] StreamingModelProducer
 [6] StreamingProducer
 [7] StreamingStateApp

Enter number: 6

3.2 打印消息

阅读全文请点击:http://click.aliyun.com/m/8713/

Spark机器学习· 实时机器学习的更多相关文章

  1. Spark机器学习 Day1 机器学习概述

    Spark机器学习 Day1 机器学习概述 今天主要讨论个问题:Spark机器学习的本质是什么,其内部构成到底是什么. 简单来说,机器学习是数据+算法. 数据 在Spark中做机器学习,肯定有数据来源 ...

  2. Spark MLBase分布式机器学习系统入门:以MLlib实现Kmeans聚类算法

    1.什么是MLBaseMLBase是Spark生态圈的一部分,专注于机器学习,包含三个组件:MLlib.MLI.ML Optimizer. ML Optimizer: This layer aims ...

  3. Spark 中的机器学习库及示例

    MLlib 是 Spark 的机器学习库,旨在简化机器学习的工程实践工作,并方便扩展到更大规模.MLlib 由一些通用的学习算法和工具组成,包括分类.回归.聚类.协同过滤.降维等,同时还包括底层的优化 ...

  4. 【Streaming】30分钟概览Spark Streaming 实时计算

    本文主要介绍四个问题: 什么是Spark Streaming实时计算? Spark实时计算原理流程是什么? Spark 2.X下一代实时计算框架Structured Streaming Spark S ...

  5. 机器学习五 -- 机器学习的“Hello World”,感知机

    机器学习五 -- 机器学习的“Hello World”,感知机 感知机是二类分类的线性分类模型,是神经网络和支持向量机的基础.其输入为实例的特征向量,输出为实例的类别,取+1和-1二值之一,即二类分类 ...

  6. Spark Streaming实时计算框架介绍

    随着大数据的发展,人们对大数据的处理要求也越来越高,原有的批处理框架MapReduce适合离线计算,却无法满足实时性要求较高的业务,如实时推荐.用户行为分析等. Spark Streaming是建立在 ...

  7. 【转】Spark Streaming 实时计算在甜橙金融监控系统中的应用及优化

    系统架构介绍 整个实时监控系统的架构是先由 Flume 收集服务器产生的日志 Log 和前端埋点数据, 然后实时把这些信息发送到 Kafka 分布式发布订阅消息系统,接着由 Spark Streami ...

  8. Spark练习之通过Spark Streaming实时计算wordcount程序

    Spark练习之通过Spark Streaming实时计算wordcount程序 Java版本 Scala版本 pom.xml Java版本 import org.apache.spark.Spark ...

  9. Spark机器学习9· 实时机器学习(scala with sbt)

    1 在线学习 模型随着接收的新消息,不断更新自己:而不是像离线训练一次次重新训练. 2 Spark Streaming 离散化流(DStream) 输入源:Akka actors.消息队列.Flume ...

随机推荐

  1. elk平台搭建

    很多时候我们需要对日志做一个集中式的处理,但是通常情况下这些日志都分布到n台机器上面,导致一个结果就是效率比较低,而ELK平台可以帮助我们解决这么一件事情: ELK下载:https://www.ela ...

  2. HTML与HTML5笔记

    doctype作用 1.告诉浏览器使用什么样的html或者xhtml规范来解析html文档 2.影响浏览器的渲染模式. 浏览器解析css的两种渲染模式:标准模式strict mode和怪异模式quir ...

  3. JAVA-面向对象-特性

    1.封装 1.定义方式 1修饰符class类名 2类名首字母大写 2.类的成员 1属性 成员变量 可以设置默认值 第一个单词首字母小写,后面首字母大写 一般把属性设置成private 提供属性对应的g ...

  4. javascript笔记——js的阻塞特性[转载]

    JS具有阻塞特性,当浏览器在执行js代码时,不能同时做其它事情,即<script>每次出现都会让页面等待脚本的解析和执行(不论JS是内嵌的还是外链的),JS代码执行完成后,才继续渲染页面. ...

  5. Ubuntu 16安装GPU版本tensorflow

    pre { direction: ltr; color: rgb(0, 0, 0) } pre.western { font-family: "Liberation Mono", ...

  6. 火狐兼容window&period;event&period;returnValue&equals;false&semi;

    火狐中window.event是未定义的,可用e.preventDefault();替代window.event.returnValue=false; 直接上图

  7. Python3实战系列之九(获取印度售后数据项目)

    项目现状:已经部署在服务器上并正常运行了. 1.服务器上的部署 2.下载到服务器的文件列表 3.转存在到数据库SQL Server中的数据 项目总结:这次项目采用python来实现,刚开始还是有点担忧 ...

  8. BZOJ1821 &lbrack;JSOI2010&rsqb;Group 部落划分 Group Kruskal

    欢迎访问~原文出处——博客园-zhouzhendong 去博客园看该题解 题目传送门 - BZOJ1821 题意概括 平面上有n个点,现在把他们划分成k个部分,求不同部分之间最近距离的最大值. 两个部 ...

  9. kettle的输入输出组件和脚本组件

    一. 输入组件 1.1表输入 从指定的数据库中,通过sql语句来查询数据加载到内存. 允许简易转换:勾选后可以避免不必要的字段的数据类型转换,从而提高性能. 替换sql语句里的变量:勾选后可以通过${ ...

  10. C&num;&colon; 获取当前路径不要用Environment&period;CurrentDirectory

    网上大把文章写到C#获取当前路径的方法如下: // 获取程序的基目录. System.AppDomain.CurrentDomain.BaseDirectory // 获取模块的完整路径. Syste ...