stream grouping
-
all grouping
a.1 -> b.1 100% a.1 -> b.2 100% a.2 -> b.1 100% a.2 -> b.2 100% +--------------------+ +--------------------+ | bolt a | | bolt b | | +------------+ | | +------------+ | | | instance 1 | | | | instance 1 | | | +------------+ | | +------------+ | | +------------+ | | +------------+ | | | instance 2 | | | | instance 2 | | | +------------+ | | +------------+ | +--------------------+ +--------------------+
-
sends tuple to all instances of the receiving bolt
-
used to send signals to bolts
// bolt execute public void execute(Tuple input){ String str = null; try{ if (input.getSourceStreamId().equals("signals")){ str = input.getStringByField("action"); if("refreshCache".equals(str)){ counters.clear(); } } }catch (IllegalArgumentException e){ // do nothing } } // topology builder builder.setBolt("word-counter", new WordCounter(), 2) .fieldGrouping("word-normalizer", new Fields("word")) .allGrouping("signals-spout", "signals");
-
-
custom grouping
-
impl
backtype.storm.grouping.CustomStreamGrouping
interfacepublic class ModuleGrouping implements CustomStreamGrouping, Serializable { int numTasks = 0; @Override public List<Integer> chooseTasks(List<Object> values) { List<Integer> boltIds = new ArrayList(); if(values.size()>0){ String str = values.get(0).toString(); if(str.isEmpty()){ boltIds.add(0); } else{ boltIds.add(str.charAt(0) % numTasks); // ************************ } } return boltIds; } @Override public void prepare(TopologyContext context, Fields outFields, List<Integer> targetTasks) { numTasks = targetTasks.size(); } }
-
-
direct grouping
-
source decides which component will receive the tuple
// 1. execute public void execute(Tuple input){ // ... for (String word: words){ if(!word.isEmpty()){ // ... collector.emitDirect(getWordCountIndex(word), new Value(word)); // ********************************** } } // acknowledge the tuple collector.ack(input); } // 2. getWordCountIndex public Integer getWordCountIndex(String word){ word = word.trim().toUpperCase(); if(word.isEmpty()) { return 0; } else{ return word.charAt(0) % numCounterTasks; // ******************************** } } // 3. work out the number of target tasks in the `prepare` method public void prepare(Map stormConf, TopologyContext context, OutputCollector collector){ this.collector = collector; this.numCounterTasks = context.getComponentTasks("word-counter"); } // 4. topology definition builder.setBolt("word-counter", new WordCounter(), 2) .directGrouping("word-normalizer");
-
-
field grouping
a.1 -> b.1 field p a.2 -> b.1 field p a.2 -> b.2 field q +--------------------+ +--------------------+ | bolt a | | bolt b | | +------------+ | | +------------+ | | | instance 1 | | | | instance 1 | | | +------------+ | | +------------+ | | +------------+ | | +------------+ | | | instance 2 | | | | instance 2 | | | +------------+ | | +------------+ | +--------------------+ +--------------------+
-
based on one or more fields of the tuple
builder.setBolt("word-counter", new WordCounter(), 2) .fieldGrouping("word-normalizer", new Fields("word"));
-
-
global grouping
a.1 -> b.1 100% a.2 -> b.1 100% +--------------------+ +--------------------+ | bolt a | | bolt b | | +------------+ | | +------------+ | | | instance 1 | | | | instance 1 | | | +------------+ | | +------------+ | | +------------+ | | +------------+ | | | instance 2 | | | | instance 2 | | | +------------+ | | +------------+ | +--------------------+ +--------------------+
-
all instances
of the source to asingle target instance
-
specifically the task with
lowest id
-
-
local grouping
- same as
shuffle grouping
- same as
-
none grouping
- same as
shuffle grouping
- same as
-
shuffle grouping
a.1 -> b.1 50% a.1 -> b.2 50% a.2 -> b.1 50% a.2 -> b.2 50% +--------------------+ +--------------------+ | bolt a | | bolt b | | +------------+ | | +------------+ | | | instance 1 | | | | instance 1 | | | +------------+ | | +------------+ | | +------------+ | | +------------+ | | | instance 2 | | | | instance 2 | | | +------------+ | | +------------+ | +--------------------+ +--------------------+
-
takes a single parameter (the source component)
-
sends each tuple emitted by the source to a randomly chosen bolt
-
load balancing
is automatically taken care -
useful only for
atomic operations
by specifying a single parameter
-