大数据技术全面解读 scala入门(二)并发编程Akka
沉沙 2018-10-10 来源 : 阅读 1444 评论 0

摘要:本篇教程探讨了大数据技术全面解读 scala入门(二)并发编程Akka,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

本篇教程探讨了大数据技术全面解读 scala入门(二)并发编程Akka,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

<

一、概述

  1.什么是akka

        

      Akka基于Actor模型,提供了一个用于构建可扩展的(Scalable)、弹性的(Resilient)、快速响应的(Responsive)应用程序的平台。



  2.Actor模型

  Actor模型:在计算机科学领域,Actor模型是一个并行计算(Concurrent Computation)模型,它把actor作为并行计算的基本元素来对待:为响应一个接收到的消息,一个actor能够自己做出一些决策,如创建更多的actor,或发送更多的消息,或者确定如何去响应接收到的下一个消息。

  

  

  Actor是Akka中最核心的概念,它是一个封装了状态和行为的对象,Actor之间可以通过交换消息的方式进行通信,每个Actor都有自己的收件箱(Mailbox)。通过Actor能够简化锁及线程管理,可以非常容易地开发出正确地并发程序和并行系统,Actor具有如下特性:

  1.提供了一种高级抽象,能够简化在并发(Concurrency)/并行(Parallelism)应用场景下的编程开发

  2.提供了异步非阻塞的、高性能的事件驱动编程模型

  3.超级轻量级事件处理(每GB堆内存几百万Actor)

  3.Actor角色

  Akka中角色主要分为ActorSystem和Actor,这和Hadoop中有点儿类似。一个老大负责监管,下面小弟负责干活

  ActorSystem

  在Akka中,ActorSystem是一个重量级的结构,他需要分配多个线程,所以在实际应用中,ActorSystem通常是一个单例对象,我们可以使用这个ActorSystem创建很多Actor。

  Actor 

  在Akka中,Actor负责通信,在Actor中有一些重要的生命周期方法。

    1.preStart()方法:该方法在Actor对象构造方法执行后执行,整个Actor生命周期中仅执行一次。

    2.receive()方法:该方法在Actor的preStart方法执行完成后执行,用于接收消息,会被反复执行。

二、使用akka实现简单的RPC框架

  1.使用maven创建工程(或者SBT)

    使用maven创建工程:https://www.cns.com/hd-zg/p/5951185.html

    引入akka依赖


 
        
            com.typesafe.akka
            akka-actor_2.10
            2.3.14
        


        
            com.typesafe.akka
            akka-remote_2.10
            2.3.14
        




 pom如下:(build等可以修改)
View Code

 打包可以使用shade这个插件
View Code

 

  2.架构

  

 

   3.master程序示例


package cn.jiangbei.akka

import java.io.FileInputStream
import java.util.Properties

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory


class Master extends Actor {

  println("构造器被调用!")

  // 生命周期方法(类似的还有postStop)
  override def preStart(): Unit = {
    println("preStart生命周期方法被调用!")
  }

  // 用于接收消息
  override def receive: Receive = {
    case "connect" => println("已连接!")
    case "hello" => println("hello!")
  }
}

object Master {
  def main(args: Array[String]): Unit = {
    val properties = new Properties()
    val path = Thread.currentThread().getContextClassLoader.getResource("master.properties").getPath //文件要放到resource文件夹下
    properties.load(new FileInputStream(path))
    val config = ConfigFactory.parseProperties(properties)
    // 准备配置
   /* val host = args(0)
    val port = args(1).toInt
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)*/
    // 创建和监控
    val actorSystem = ActorSystem("MasterSystem", config)
    // 创建Actor(以后Actor可以发送消息了)
    val master = actorSystem.actorOf(Props(new Master), "Master")
    master ! "hello"
    actorSystem.awaitTermination()
  }
}



   4.worker和master通信


package cn.jiangbei.akka

import java.io.FileInputStream
import java.util.Properties

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory


class Master extends Actor {

  // 生命周期方法(类似的还有postStop)
  override def preStart(): Unit = {
    println("preStart生命周期方法被调用!")
  }

  // 用于接收消息
  override def receive: Receive = {
    case "connect" => {
      println("已连接!")
      // 使用sender发送消息给worker
      sender ! "reply"
    }
    case "hello" => println("hello!")
  }
}

object Master {
  def main(args: Array[String]): Unit = {
    val properties = new Properties()
    val path = Thread.currentThread().getContextClassLoader.getResource("master.properties").getPath //文件要放到resource文件夹下
    properties.load(new FileInputStream(path))
    val config = ConfigFactory.parseProperties(properties)
    // 准备配置
    /* val host = args(0)
     val port = args(1).toInt
     val configStr =
       s"""
          |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
          |akka.remote.netty.tcp.hostname = "$host"
          |akka.remote.netty.tcp.port = "$port"
        """.stripMargin
     val config = ConfigFactory.parseString(configStr)*/
    // 创建和监控
    val actorSystem = ActorSystem("MasterSystem", config)
    // 创建Actor(以后Actor可以发送消息了)
    val master = actorSystem.actorOf(Props(new Master), "Master")
    actorSystem.awaitTermination()
  }
}



akka.actor.provider=akka.remote.RemoteActorRefProvider
akka.remote.netty.tcp.hostname=127.0.0.1
akka.remote.netty.tcp.port=8888



package cn.jiangbei.akka

import java.io.FileInputStream
import java.util.Properties

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

class Worker extends Actor {

  var master: ActorSelection = _

  override def preStart(): Unit = {
    // 先与master建立连接(参数在Master运行时会打印在日志),context为继承Actor所提供
    // 同样,下面的地址端口也可以分离出来
    val masterHost = "127.0.0.1"
    val masterPort = 8888
    // /user/Master进行指定与哪个master通信
    master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master")
    // 得到master引用后即可发送消息
    master ! "connect"
  }

  override def receive: Receive = { // 返回的是一个偏函数
    case "reply" => println("收到master的回复!")

  }
}

object Worker {
  def main(args: Array[String]): Unit = {
    // 以下与Master类似
    val properties = new Properties()
    val path = Thread.currentThread().getContextClassLoader.getResource("worker.properties").getPath //文件要放到resource文件夹下
    properties.load(new FileInputStream(path))
    val config = ConfigFactory.parseProperties(properties)
    val actorSystem = ActorSystem("MasterSystem", config)
    // 创建Actor,再进行new Worker后会调用preStart()进行消息的发送
    actorSystem.actorOf(Props(new Worker), "Worker")
    actorSystem.awaitTermination()
  }


}



akka.actor.provider=akka.remote.RemoteActorRefProvider
akka.remote.netty.tcp.hostname=127.0.0.1
akka.remote.netty.tcp.port=8889

   

本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!

本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程