ITPub博客

首页 > 应用开发 > Java > Akka之WordCount with SBT

Akka之WordCount with SBT

原创 Java 作者:541732025 时间:2016-05-04 16:05:39 0 删除 编辑
build.sbt:

点击(此处)折叠或打开

  1. name := "HelloAkkaWithSBT"

  2. version := "1.0"

  3. scalaVersion := "2.9.2"

  4. resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases"

  5. libraryDependencies += "com.typesafe.akka" % "akka-actor" % "2.0.3"

MasterActor

点击(此处)折叠或打开

  1. class MasterActor extends Actor{
  2.   val aggregateActor : ActorRef = context.actorOf(Props[AggregateActor], name = "aggregate")
  3.   val reduceActor : ActorRef = context.actorOf(Props(new ReduceActor(aggregateActor)), name = "reduce")
  4.   val mapActor : ActorRef = context.actorOf(Props(new MapActor(reduceActor)), "map")

  5.   override protected def receive: Receive = {
  6.     case message : String =>
  7.       mapActor ! message
  8.     case message : Result =>
  9.       aggregateActor ! message
  10.     case _ =>

  11.   }
  12. }

MapActor

点击(此处)折叠或打开

  1. class MapActor(reduceActor: ActorRef) extends Actor{
  2.   val STOP_WORDS_LIST = List("a", "is")

  3.   override protected def receive: Receive = {
  4.     case message : String =>
  5.       reduceActor ! map(message)
  6.     case _ =>
  7.   }

  8.   def map(line : String): MapData = {
  9.     val dataList = new util.ArrayList[Word]
  10.     val parser : StringTokenizer = new StringTokenizer(line)
  11.     val defaultCount : Integer = 1;
  12.     while(parser.hasMoreTokens){
  13.       val word : String = parser.nextToken().toLowerCase()

  14.       if(!STOP_WORDS_LIST.contains(word)){
  15.         dataList.add(new Word(word, defaultCount))
  16.       }
  17.     }

  18.     return new MapData(dataList)
  19.   }
  20. }

ReduceActor

点击(此处)折叠或打开

  1. class ReduceActor(aggregateActor : ActorRef) extends Actor{

  2.   override protected def receive: Receive = {
  3.     case message : MapData =>
  4.       aggregateActor ! reduce(message.dataList)
  5.     case _ =>
  6.   }

  7.   def reduce(dataList: util.ArrayList[Word]): ReduceData = {
  8.     val reduceMap = new util.HashMap[String, Integer]
  9.     if(!dataList.isEmpty()){
  10.       for(wc : Word <- dataList){
  11.         val word = wc.word
  12.         if(reduceMap.containsKey(word)){
  13.           reduceMap.put(word, reduceMap.get(word) + 1)
  14.         }else{
  15.           reduceMap.put(word, 1)
  16.         }
  17.       }
  18.     }

  19.     return new ReduceData(reduceMap)
  20.   }
  21. }

AggregateActor

点击(此处)折叠或打开

  1. class AggregateActor extends Actor{
  2.   val finalReduceMap = new util.HashMap[String, Integer]

  3.   override protected def receive: Receive = {
  4.     case message : ReduceData =>
  5.       aggregate(message.reduceMap)
  6.     case message : Result =>
  7.       println(finalReduceMap.toString)
  8.   }

  9.   def aggregate(reduceMap: util.HashMap[String, Integer]): Unit = {
  10.     var count : Integer = 0
  11.     for(key : String <- reduceMap.keySet()){
  12.       if(finalReduceMap.containsKey(key)){
  13.         count = reduceMap.get(key)
  14.         count += finalReduceMap.get(key)
  15.         finalReduceMap.put(key, count)
  16.       }else{
  17.         finalReduceMap.put(key, reduceMap.get(key))
  18.       }
  19.     }
  20.   }
  21. }

入口

点击(此处)折叠或打开

  1. object MapReduceApplication {
  2.    def main (args: Array[String]){
  3.     val _system = ActorSystem("HelloAkka")
  4.      val master = _system.actorOf(Props[MasterActor], name = "master")

  5.      master ! "Hi! My name is Rocky. I'm so so so happy to be here."
  6.      master ! "Today, I'm going to read a news article for you."
  7.      master ! "I hope I hope you'll like it."

  8.      Thread.sleep(500)
  9.      master ! new Result

  10.      Thread.sleep(500);
  11.      _system.shutdown()
  12.   }
  13. }

SBT运行结果:

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/28912557/viewspace-2093556/,如需转载,请注明出处,否则将追究法律责任。

上一篇: MasterActor详解
下一篇: 没有了~
请登录后发表评论 登录
全部评论

注册时间:2013-05-23

  • 博文量
    127
  • 访问量
    479174