ITPub博客

首页 > 应用开发 > Java > Akka之WordCount

Akka之WordCount

原创 Java 作者:541732025 时间:2016-04-27 10:42:43 0 删除 编辑






注意:在实际处理过程中,MapActor的结果直接发送给了ReduceActor,而ReduceActor处理的结果直接发送给了AggregateActor,没有经过MasterActor


MasterActor:

点击(此处)折叠或打开

  1. /**
  2.  * MasterActor管理各个Actor,以及作为消息的入口、进行消息的转发。
  3.  *
  4.  */
  5. public class MasterActor extends UntypedActor {
  6.     //通过ActorContext创建Actor
  7.     private ActorRef aggregateActor = getContext().actorOf(new Props(AggregateActor.class),"aggregate");
  8.     private ActorRef reduceActor = getContext().actorOf(new Props(new UntypedActorFactory(){public Actor create() {return new ReduceActor(aggregateActor);}}),"reduce");
  9.     private ActorRef mapActor = getContext().actorOf(new Props(new UntypedActorFactory(){public Actor create() {return new MapActor(reduceActor);}}),"map");

  10.     @Override
  11.     public void onReceive(Object message) throws Exception {
  12.         if(message instanceof String){
  13.             mapActor.tell(message);
  14.         }else if(message instanceof Result){
  15.             aggregateActor.tell(message); //返回最终结果
  16.         }else{
  17.             unhandled(message);
  18.         }
  19.     }
  20. }

MapActor:

点击(此处)折叠或打开

  1. /**接收MasterActor消息,以及将计算结果发送给ReduceActor
  2.  *
  3.  */
  4. public class MapActor extends UntypedActor {
  5.     private ActorRef reduceActor = null;
  6.     String[] STOP_WORDS = {"a","is"};
  7.     private List<String> STOP_WORD_LIST = Arrays.asList(STOP_WORDS);

  8.     public MapActor(ActorRef reduceActor) {
  9.         this.reduceActor = reduceActor;
  10.     }

  11.     @Override
  12.     public void onReceive(Object message) throws Exception {
  13.         if(message instanceof String){
  14.             String work = (String)message;

  15.             //map the words in the sentence
  16.             MapData data = evaluateExpression(work);

  17.             //send the result to ReduceActor
  18.             reduceActor.tell(data);
  19.         }else{
  20.             unhandled(message);
  21.         }
  22.     }

  23.     private MapData evaluateExpression(String line){
  24.         List<WordCount> dataList = new ArrayList<WordCount>();
  25.         StringTokenizer parser = new StringTokenizer(line);
  26.         while(parser.hasMoreTokens()){
  27.             String word = parser.nextToken().toLowerCase();
  28.             if(!STOP_WORD_LIST.contains(word)){
  29.                 dataList.add(new WordCount(word, Integer.valueOf(1)));
  30.             }
  31.         }
  32.         return new MapData(dataList);
  33.     }
  34. }

ReduceActor:

点击(此处)折叠或打开

  1. /**接收MapActor消息,以及将计算结果发送给AggregateActor
  2.  *
  3.  */
  4. public class ReduceActor extends UntypedActor {
  5.     private ActorRef aggregateActor = null;

  6.     public ReduceActor(ActorRef aggregateActor) {
  7.         this.aggregateActor = aggregateActor;
  8.     }

  9.     @Override
  10.     public void onReceive(Object message) throws Exception {
  11.         if(message instanceof MapData){
  12.             MapData mapData = (MapData)message;

  13.             //reduce the incoming data
  14.             ReduceData reduceData = reduce(mapData.getDataList());

  15.             //forward the result to aggregate actor
  16.             aggregateActor.tell(reduceData);
  17.         }else{
  18.             unhandled(message);
  19.         }
  20.     }

  21.     private ReduceData reduce(List<WordCount> dataList){
  22.         HashMap<String,Integer> reduceMap = new HashMap<String, Integer>();
  23.         for(WordCount wordCount : dataList){
  24.             if(reduceMap.containsKey(wordCount.getWord())){
  25.                 Integer value = (Integer) reduceMap.get(wordCount.getWord());
  26.                 value++;
  27.                 reduceMap.put(wordCount.getWord(), value);
  28.             }else{
  29.                 reduceMap.put(wordCount.getWord(),Integer.valueOf(1));
  30.             }
  31.         }
  32.         return new ReduceData(reduceMap);
  33.     }
  34. }

AggregateActor:

点击(此处)折叠或打开

  1. /**接收ReduceActor消息,进行数据汇总;接收MasterActor消息,返回最终结果
  2.  *
  3.  */
  4. public class AggregateActor extends UntypedActor {
  5.     private Map<String, Integer> finalReducedMap = new ConcurrentHashMap<String, Integer>();

  6.     @Override
  7.     public void onReceive(Object message) throws Exception {
  8.         if(message instanceof ReduceData){
  9.             ReduceData reduceData = (ReduceData)message;
  10.             aggregateInMemoryReduce(reduceData.getReduceDataMap());
  11.         }else if(message instanceof Result){
  12.             System.out.println(finalReducedMap.toString());
  13.         }else{
  14.             unhandled(message);
  15.         }
  16.     }

  17.     private void aggregateInMemoryReduce(Map<String,Integer> reducedMap){
  18.         Integer count = null;
  19.         for(String key : reducedMap.keySet()){
  20.             if(finalReducedMap.containsKey(key)){
  21.                 count = reducedMap.get(key) + finalReducedMap.get(key);
  22.                 finalReducedMap.put(key, count);
  23.             }else{
  24.                 finalReducedMap.put(key, reducedMap.get(key));
  25.             }
  26.         }
  27.     }
  28. }

入口:

点击(此处)折叠或打开

  1. /**入口
  2.  *
  3.  */
  4. public class HelloAkka {
  5.     public static void main(String[] args) throws Exception{
  6.         ActorSystem _system = ActorSystem.create("helloAkka");
  7.         //通过ActorSystem创建Actor
  8.         ActorRef master = _system.actorOf(new Props(MasterActor.class), "master");
  9.        
  10.         //main所在的Actor给masterActor发消息,异步,非阻塞
  11.         master.tell("Hi! My name is Rocky. I'm so so so happy to be here.");
  12.         master.tell("Today, I'm going to read a news article for you.");
  13.         master.tell("I hope I hope you'll like it.");

  14.         Thread.sleep(500);
  15.         master.tell(new Result());
  16.         Thread.sleep(500);
  17.         _system.shutdown();
  18.     }
  19. }

运行结果:
{rocky.=1, you.=1, hope=2, here.=1, my=1, article=1, be=1, so=3, i'm=2, hi!=1, happy=1, name=1, read=1, going=1, it.=1, like=1, you'll=1, i=2, for=1, today,=1, news=1, to=2}

Process finished with exit code 0


来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/28912557/viewspace-2089144/,如需转载,请注明出处,否则将追究法律责任。

下一篇: MasterActor详解
请登录后发表评论 登录
全部评论

注册时间:2013-05-23

  • 博文量
    127
  • 访问量
    479253