storm streaming 2.0新特性
Stream APIStream API是Storm的另一种替代接口。它提供了一种用于表达流式计算的类型化API,并支持功能样式操作。快速入门StreamBuilder builder = new StreamBuilder();KafkaSpout<String, String> spout = KafkaSpoutUtils.buildKafkaSpout("CentOS...
·
Stream API
Stream API是Storm的另一种替代接口。它提供了一种用于表达流式计算的类型化API,并支持功能样式操作。
快速入门
StreamBuilder builder = new StreamBuilder();
KafkaSpout<String, String> spout = KafkaSpoutUtils.buildKafkaSpout("CentOSA:9092,CentOSB:9092,CentOSC:9092",
"topic01");
builder.newStream(spout, TupleValueMappers.<Long,String,String>of(1,3,4),3)
.peek(new Consumer<Tuple3<Long, String, String>>() {
@Override
public void accept(Tuple3<Long, String, String> input) {
System.out.println(input._1+" "+input._2+" "+input._3);
}
});
Config conf = new Config();
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("stream",conf,
builder.build());
Stream APIs
基本转换
builder.newStream(spout, TupleValueMappers.<Long,String,String>of(1,3,4),3)
.filter((t)-> t._3.contains("error"))
.peek((t)-> System.out.println(t));
builder.newStream(spout, TupleValueMappers.<Long,String,String>of(1,3,4),3)
.map((t)-> t._3)
.peek((t)-> System.out.println(t));
builder.newStream(spout, TupleValueMappers.<Long,String,String>of(1,3,4),3)
.flatMap(t-> Arrays.asList(t._3.split("\\s")))
.map(t-> Pair.<String,Integer>of(t,1))
.peek(t -> System.out.println(t));
窗口操作
builder.newStream(spout, TupleValueMappers.<Long,String,String>of(1,3,4),3)
.flatMap(t-> Arrays.asList(t._3.split("\\s")))
.map(t-> Pair.<String,Integer>of(t,1))
.window(SlidingWindows.of(BaseWindowedBolt.Duration.seconds(5),BaseWindowedBolt.Duration.seconds(2)))
.peek(input->System.out.println(input));
针对KeyValue pair 转换
- flatMapToPair (等价 flatMap+mapToPair)
builder.newStream(spout, TupleValueMappers.<Long, String, String>of(1, 3, 4), 3)
.flatMapToPair(t -> {
String[] tokens = t._3.split("\\s");
List<Pair<String,Integer>> pairList=new ArrayList<>();
for (String token : tokens) {
pairList.add(Pair.<String, Integer>of(token, 1));
}
return pairList;
}).peek(t -> System.out.println(t));
- mapToPair
builder.newStream(spout, TupleValueMappers.<Long, String, String>of(1, 3, 4), 3)
.flatMap(v->Arrays.asList(v._3.split("\\s+")))
.mapToPair(t -> Pair.<String, Integer>of(t, 1))
.peek(t -> System.out.println(t));
聚合
- 单个值聚合
builder.newStream(spout, TupleValueMappers.<Long, String, String>of(1, 3, 4), 3)
.flatMap(v->Arrays.asList(v._3.split("\\s+")))
.map(t-> Integer.parseInt(t))
.window(SlidingWindows.of(BaseWindowedBolt.Duration.seconds(5),BaseWindowedBolt.Duration.seconds(2)))
//.reduce((v1,v2)->v1+v2)
.aggregate(0,(v1,v2)->v1+v2,(v1,v2)->v1+v2)
.peek(t -> System.out.println(t));
- 聚合key-value
builder.newStream(spout, TupleValueMappers.<Long, String, String>of(1, 3, 4), 3)
.flatMap(v->Arrays.asList(v._3.split("\\s+")))
.mapToPair(t-> Pair.<String,Integer>of(t,1))
.window(SlidingWindows.of(BaseWindowedBolt.Duration.seconds(5),BaseWindowedBolt.Duration.seconds(2)))
.reduceByKey((v1,v2)->v1+v2)
//.aggregateByKey(0,(v1,v2)->v1+v2,(v1,v2)->v1+v2)
.peek(t -> System.out.println(t));
- groupBy
builder.newStream(spout, TupleValueMappers.<Long, String, String>of(1, 3, 4), 3)
.flatMap(v->Arrays.asList(v._3.split("\\s+")))
.mapToPair(t-> Pair.<String,Integer>of(t,1))
.window(SlidingWindows.of(BaseWindowedBolt.Duration.seconds(5),BaseWindowedBolt.Duration.seconds(2)))
.groupByKey()
.map(t-> {
int total=0;
for (Integer integer : t._2) {
total+=integer;
}
return Pair.<String,Integer>of(t._1,total);
}).peek(t-> System.out.println(t));
- countByKey
builder.newStream(spout, TupleValueMappers.<Long, String, String>of(1, 3, 4), 3)
.flatMap(v->Arrays.asList(v._3.split("\\s+")))
.mapToPair(t-> Pair.<String,Integer>of(t,1))
.window(SlidingWindows.of(BaseWindowedBolt.Duration.seconds(5),BaseWindowedBolt.Duration.seconds(2)))
.countByKey().peek(t-> System.out.println(t));
重新分区
重新分区操作会重新分区当前流并返回具有指定分区数的新流。对结果流的进一步操作将在该并行级别上执行。重新分区可用于增加或减少流中操作的并行性。
builder.newStream(spout, TupleValueMappers.<Long, String, String>of(1, 3, 4), 3)
.flatMap(v->Arrays.asList(v._3.split("\\s+")))
.repartition(4)
.mapToPair(t-> Pair.<String,Integer>of(t,1))
.repartition(2)
.window(SlidingWindows.of(BaseWindowedBolt.Duration.seconds(5),BaseWindowedBolt.Duration.seconds(2)))
.countByKey().peek(t-> System.out.println(t));
注意:repartition操作会产生网络操作- shuffle
输出算子-Sinks
print和peek
builder.newStream(spout, TupleValueMappers.<Long, String, String>of(1, 3, 4), 3)
.flatMap(v->Arrays.asList(v._3.split("\\s+")))
.repartition(4)
.mapToPair(t-> Pair.<String,Integer>of(t,1))
.repartition(2)
.window(SlidingWindows.of(BaseWindowedBolt.Duration.seconds(5),BaseWindowedBolt.Duration.seconds(2)))
.countByKey().print();
print 返回值是void表示流的终止,后续无法追加算子。而Peek作为程序执行探针,用于debug调试,并不影响程序正常执行的流程。
forEach
builder.newStream(spout, TupleValueMappers.<Long, String, String>of(1, 3, 4), 3)
.flatMap(v->Arrays.asList(v._3.split("\\s+")))
.repartition(4)
.mapToPair(t-> Pair.<String,Integer>of(t,1))
.repartition(2)
.window(SlidingWindows.of(BaseWindowedBolt.Duration.seconds(5),BaseWindowedBolt.Duration.seconds(2)))
.countByKey()
.forEach(t-> System.out.println(t));
to
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder().setHost("CentOSA").setPort(6379).build();
KafkaSpout<String, String> spout = KafkaSpoutUtils.buildKafkaSpout("CentOSA:9092,CentOSB:9092,CentOSC:9092",
"topic01");
builder.newStream(spout, TupleValueMappers.<Long, String, String>of(1, 3, 4), 3)
.flatMap(v->Arrays.asList(v._3.split("\\s+")))
.repartition(4)
.mapToPair(t-> Pair.<String,Integer>of(t,1))
.repartition(2)
.window(SlidingWindows.of(BaseWindowedBolt.Duration.seconds(5),BaseWindowedBolt.Duration.seconds(2)))
.countByKey()
.to(new RedisStoreBolt(jedisPoolConfig,new WordCountRedisStoreMapper()));
WordCountRedisStoreMapper
public class WordCountRedisStoreMapper implements RedisStoreMapper {
@Override
public RedisDataTypeDescription getDataTypeDescription() {
return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH,"swc");
}
@Override
public String getKeyFromTuple(ITuple tuple) {
System.out.println(tuple.getFields());//默认field key,value
return tuple.getString(0);
}
@Override
public String getValueFromTuple(ITuple tuple) {
return tuple.getLong(1)+"";
}
}
分支算子
branch
KafkaSpout<String, String> spout = KafkaSpoutUtils.buildKafkaSpout("CentOSA:9092,CentOSB:9092,CentOSC:9092",
"topic01");
Stream<Tuple3<Long, String, String>>[] streams = builder.newStream(spout, TupleValueMappers.<Long, String, String>of(1, 3, 4), 3)
.branch(
t-> t._3.contains("info"),
t->t._3.contains("error"),
t-> true
);
Stream<Tuple3<Long, String, String>> infoStream = streams[0];
Stream<Tuple3<Long, String, String>> errorStream = streams[1];
Stream<Tuple3<Long, String, String>> otherStream = streams[2];
infoStream.peek(t -> System.out.println("info:"+t));
errorStream.peek(t -> System.out.println("error:"+t));
otherStream.peek(t -> System.out.println("other:"+t));
Join
join操作将一个流的值与来自另一个流的具有相同键的值连接起来。
KafkaSpout<String, String> userSpout = KafkaSpoutUtils.buildKafkaSpout("CentOSA:9092,CentOSB:9092,CentOSC:9092",
"usertopic");
KafkaSpout<String, String> orderSpout = KafkaSpoutUtils.buildKafkaSpout("CentOSA:9092,CentOSB:9092,CentOSC:9092",
"ordertopic");
//001 zhangsan
PairStream<String, String> userPair = builder.newStream(userSpout, TupleValueMappers.<Long, String, String>of(1, 3, 4))
.mapToPair(t -> {
String[] tokens = t._3.split("\\s");
return Pair.<String, String>of(tokens[0], tokens[1]);
});
//001 apple 100
PairStream<String, String> orderPair = builder.newStream(orderSpout, TupleValueMappers.<Long, String, String>of(1, 3, 4))
.mapToPair(t -> {
String[] tokens = t._3.split("\\s");
return Pair.<String, String>of(tokens[0], tokens[1]+":"+tokens[2]);
});
userPair.window(TumblingWindows.of(BaseWindowedBolt.Duration.seconds(5)))
.leftOuterJoin(orderPair).peek(t -> System.out.println(t));
CoGroupByKey
coGroupByKey使用其他流中具有相同键的值对此流的值进行分组。
KafkaSpout<String, String> userSpout = KafkaSpoutUtils.buildKafkaSpout("CentOSA:9092,CentOSB:9092,CentOSC:9092",
"usertopic");
KafkaSpout<String, String> orderSpout = KafkaSpoutUtils.buildKafkaSpout("CentOSA:9092,CentOSB:9092,CentOSC:9092",
"ordertopic");
//001 zhangsan
PairStream<String, String> userPair = builder.newStream(userSpout, TupleValueMappers.<Long, String, String>of(1, 3, 4))
.mapToPair(t -> {
String[] tokens = t._3.split("\\s");
return Pair.<String, String>of(tokens[0], tokens[1]);
});
//001 apple 100
PairStream<String, String> orderPair = builder.newStream(orderSpout, TupleValueMappers.<Long, String, String>of(1, 3, 4))
.mapToPair(t -> {
String[] tokens = t._3.split("\\s");
return Pair.<String, String>of(tokens[0], tokens[1]+":"+tokens[2]);
});
userPair.coGroupByKey(orderPair).peek(t-> System.out.println(t));
State
updateStateByKey
builder.newStream(userSpout, TupleValueMappers.<Long, String, String>of(1, 3, 4))
.map(t->t._3)
.flatMap(line-> Arrays.asList(line.split("\\s+")))
.mapToPair(word-> Pair.<String,Integer>of(word,1))
.updateStateByKey(0,(v1,v2)->v1+v2)
.toPairStream()
.peek( t -> System.out.println(t));
Config conf = new Config();
conf.put(Config.TOPOLOGY_STATE_PROVIDER,"org.apache.storm.redis.state.RedisKeyValueStateProvider");
Map<String,Object> stateConfig=new HashMap<String,Object>();
Map<String,Object> redisConfig=new HashMap<String,Object>();
redisConfig.put("host","CentOSA");
redisConfig.put("port",6379);
stateConfig.put("jedisPoolConfig",redisConfig);
ObjectMapper objectMapper=new ObjectMapper();
conf.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG,objectMapper.writeValueAsString(stateConfig));
stateQuery
KafkaSpout<String, String> userSpout = KafkaSpoutUtils.buildKafkaSpout("CentOSA:9092,CentOSB:9092,CentOSC:9092",
"topic01");
StreamState<String, Integer> streamState = builder.newStream(userSpout, TupleValueMappers.<Long, String, String>of(1, 3, 4))
.map(t -> t._3)
.flatMap(line -> Arrays.asList(line.split("\\s+")))
.mapToPair(word -> Pair.<String, Integer>of(word, 1))
.updateStateByKey(0, (v1, v2) -> v1 + v2);
KafkaSpout<String, String> querySpout = KafkaSpoutUtils.buildKafkaSpout("CentOSA:9092,CentOSB:9092,CentOSC:9092",
"topic02");
builder.newStream(querySpout, TupleValueMappers.<Long, String, String>of(1, 3, 4))
.map(t -> t._3)
.stateQuery(streamState).peek(t-> System.out.println(t));
更多推荐
已为社区贡献1条内容
所有评论(0)