04 November 2015

stream grouping

  1. all grouping

         a.1 -> b.1 100%
         a.1 -> b.2 100%
         a.2 -> b.1 100%
         a.2 -> b.2 100%
    
         +--------------------+      +--------------------+
         |       bolt a       |      |       bolt b       |
         |   +------------+   |      |   +------------+   |
         |   | instance 1 |   |      |   | instance 1 |   |
         |   +------------+   |      |   +------------+   |
         |   +------------+   |      |   +------------+   |
         |   | instance 2 |   |      |   | instance 2 |   |
         |   +------------+   |      |   +------------+   |
         +--------------------+      +--------------------+
    
    1. sends tuple to all instances of the receiving bolt

    2. used to send signals to bolts

       // bolt execute
       public void execute(Tuple input){
           String str = null;
           try{
               if (input.getSourceStreamId().equals("signals")){
                   str = input.getStringByField("action");
                   if("refreshCache".equals(str)){
                       counters.clear();
                   }
               }
           }catch (IllegalArgumentException e){
               // do nothing
           }
       }
      
       // topology builder
       builder.setBolt("word-counter", new WordCounter(), 2)
           .fieldGrouping("word-normalizer", new Fields("word"))
           .allGrouping("signals-spout", "signals");
      
  2. custom grouping

    1. impl backtype.storm.grouping.CustomStreamGrouping interface

       public class ModuleGrouping implements CustomStreamGrouping, Serializable
       {
           int numTasks = 0;
      
           @Override
           public List<Integer> chooseTasks(List<Object> values) {
               List<Integer> boltIds = new ArrayList();
               if(values.size()>0){
                   String str = values.get(0).toString();
                   if(str.isEmpty()){
                       boltIds.add(0);
                   }
                   else{
                       boltIds.add(str.charAt(0) % numTasks);
                       //          ************************
                   }
               }
               return boltIds;
           }
      
           @Override
           public void prepare(TopologyContext context, Fields outFields, List<Integer> targetTasks) {
               numTasks = targetTasks.size();
           }
       }
      
  3. direct grouping

    1. source decides which component will receive the tuple

       // 1. execute
       public void execute(Tuple input){
           // ...
           for (String word: words){
               if(!word.isEmpty()){
                   // ...
                   collector.emitDirect(getWordCountIndex(word), new Value(word));
                   //        **********************************
               }
           }
           // acknowledge the tuple
           collector.ack(input);
       }
      
       // 2. getWordCountIndex
       public Integer getWordCountIndex(String word){
           word = word.trim().toUpperCase();
           if(word.isEmpty())
           {
               return 0;
           }
           else{
               return word.charAt(0) % numCounterTasks;
               //     ********************************
           }
       }
      
       // 3. work out the number of target tasks in the `prepare` method
       public void prepare(Map stormConf, TopologyContext context, OutputCollector collector){
           this.collector = collector;
           this.numCounterTasks = context.getComponentTasks("word-counter");
       }
      
       // 4. topology definition
       builder.setBolt("word-counter", new WordCounter(), 2)
           .directGrouping("word-normalizer");
      
  4. field grouping

         a.1 -> b.1 field p
         a.2 -> b.1 field p
         a.2 -> b.2 field q
    
         +--------------------+      +--------------------+
         |       bolt a       |      |       bolt b       |
         |   +------------+   |      |   +------------+   |
         |   | instance 1 |   |      |   | instance 1 |   |
         |   +------------+   |      |   +------------+   |
         |   +------------+   |      |   +------------+   |
         |   | instance 2 |   |      |   | instance 2 |   |
         |   +------------+   |      |   +------------+   |
         +--------------------+      +--------------------+
    
    1. based on one or more fields of the tuple

       builder.setBolt("word-counter", new WordCounter(), 2)
           .fieldGrouping("word-normalizer", new Fields("word"));
      
  5. global grouping

         a.1 -> b.1 100%
         a.2 -> b.1 100%
    
         +--------------------+      +--------------------+
         |       bolt a       |      |       bolt b       |
         |   +------------+   |      |   +------------+   |
         |   | instance 1 |   |      |   | instance 1 |   |
         |   +------------+   |      |   +------------+   |
         |   +------------+   |      |   +------------+   |
         |   | instance 2 |   |      |   | instance 2 |   |
         |   +------------+   |      |   +------------+   |
         +--------------------+      +--------------------+
    
    1. all instances of the source to a single target instance

    2. specifically the task with lowest id

  6. local grouping

    1. same as shuffle grouping
  7. none grouping

    1. same as shuffle grouping
  8. shuffle grouping

         a.1 -> b.1 50%
         a.1 -> b.2 50%
         a.2 -> b.1 50%
         a.2 -> b.2 50%
    
         +--------------------+      +--------------------+
         |       bolt a       |      |       bolt b       |
         |   +------------+   |      |   +------------+   |
         |   | instance 1 |   |      |   | instance 1 |   |
         |   +------------+   |      |   +------------+   |
         |   +------------+   |      |   +------------+   |
         |   | instance 2 |   |      |   | instance 2 |   |
         |   +------------+   |      |   +------------+   |
         +--------------------+      +--------------------+
    
    1. takes a single parameter (the source component)

    2. sends each tuple emitted by the source to a randomly chosen bolt

    3. load balancing is automatically taken care

    4. useful only for atomic operations by specifying a single parameter



blog comments powered by Disqus