Stream API

Stream API是Storm的另一种替代接口。它提供了一种用于表达流式计算的类型化API,并支持功能样式操作。

快速入门

StreamBuilder builder = new StreamBuilder();

KafkaSpout<String, String> spout = KafkaSpoutUtils.buildKafkaSpout("CentOSA:9092,CentOSB:9092,CentOSC:9092",
                                                                   "topic01");
builder.newStream(spout, TupleValueMappers.<Long,String,String>of(1,3,4),3)
    .peek(new Consumer<Tuple3<Long, String, String>>() {
        @Override
        public void accept(Tuple3<Long, String, String> input) {
            System.out.println(input._1+" "+input._2+" "+input._3);
        }
    });

Config conf = new Config();
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("stream",conf,
                            builder.build());

Stream APIs

基本转换

builder.newStream(spout, TupleValueMappers.<Long,String,String>of(1,3,4),3)
    .filter((t)-> t._3.contains("error"))
    .peek((t)-> System.out.println(t));
 builder.newStream(spout, TupleValueMappers.<Long,String,String>of(1,3,4),3)
                .map((t)-> t._3)
     .peek((t)-> System.out.println(t));
builder.newStream(spout, TupleValueMappers.<Long,String,String>of(1,3,4),3)
                .flatMap(t-> Arrays.asList(t._3.split("\\s")))
                .map(t-> Pair.<String,Integer>of(t,1))
                .peek(t -> System.out.println(t));

窗口操作

在这里插入图片描述

builder.newStream(spout, TupleValueMappers.<Long,String,String>of(1,3,4),3)
        .flatMap(t-> Arrays.asList(t._3.split("\\s")))
        .map(t-> Pair.<String,Integer>of(t,1))
        .window(SlidingWindows.of(BaseWindowedBolt.Duration.seconds(5),BaseWindowedBolt.Duration.seconds(2)))
        .peek(input->System.out.println(input));

针对KeyValue pair 转换

  • flatMapToPair (等价 flatMap+mapToPair)
builder.newStream(spout, TupleValueMappers.<Long, String, String>of(1, 3, 4), 3)
                .flatMapToPair(t -> {
                    String[] tokens = t._3.split("\\s");
                    List<Pair<String,Integer>> pairList=new ArrayList<>();
                    for (String token : tokens) {
                        pairList.add(Pair.<String, Integer>of(token, 1));
                    }
                    return pairList;
                }).peek(t -> System.out.println(t));
  • mapToPair
builder.newStream(spout, TupleValueMappers.<Long, String, String>of(1, 3, 4), 3)
                .flatMap(v->Arrays.asList(v._3.split("\\s+")))
                .mapToPair(t -> Pair.<String, Integer>of(t, 1))
                .peek(t -> System.out.println(t));

聚合

  • 单个值聚合
builder.newStream(spout, TupleValueMappers.<Long, String, String>of(1, 3, 4), 3)
    .flatMap(v->Arrays.asList(v._3.split("\\s+")))
    .map(t-> Integer.parseInt(t))
    .window(SlidingWindows.of(BaseWindowedBolt.Duration.seconds(5),BaseWindowedBolt.Duration.seconds(2)))
    //.reduce((v1,v2)->v1+v2)
    .aggregate(0,(v1,v2)->v1+v2,(v1,v2)->v1+v2)
    .peek(t -> System.out.println(t));
  • 聚合key-value
builder.newStream(spout, TupleValueMappers.<Long, String, String>of(1, 3, 4), 3)
        .flatMap(v->Arrays.asList(v._3.split("\\s+")))
        .mapToPair(t-> Pair.<String,Integer>of(t,1))
        .window(SlidingWindows.of(BaseWindowedBolt.Duration.seconds(5),BaseWindowedBolt.Duration.seconds(2)))
        .reduceByKey((v1,v2)->v1+v2)
        //.aggregateByKey(0,(v1,v2)->v1+v2,(v1,v2)->v1+v2)
        .peek(t -> System.out.println(t));
  • groupBy
 builder.newStream(spout, TupleValueMappers.<Long, String, String>of(1, 3, 4), 3)
                .flatMap(v->Arrays.asList(v._3.split("\\s+")))
                .mapToPair(t-> Pair.<String,Integer>of(t,1))
                .window(SlidingWindows.of(BaseWindowedBolt.Duration.seconds(5),BaseWindowedBolt.Duration.seconds(2)))
                .groupByKey()
                .map(t-> {
                    int total=0;
                    for (Integer integer : t._2) {
                        total+=integer;
                    }
                    return Pair.<String,Integer>of(t._1,total);
                }).peek(t-> System.out.println(t));
  • countByKey
 builder.newStream(spout, TupleValueMappers.<Long, String, String>of(1, 3, 4), 3)
                .flatMap(v->Arrays.asList(v._3.split("\\s+")))
                .mapToPair(t-> Pair.<String,Integer>of(t,1))
                .window(SlidingWindows.of(BaseWindowedBolt.Duration.seconds(5),BaseWindowedBolt.Duration.seconds(2)))
                .countByKey().peek(t-> System.out.println(t));

重新分区

重新分区操作会重新分区当前流并返回具有指定分区数的新流。对结果流的进一步操作将在该并行级别上执行。重新分区可用于增加或减少流中操作的并行性。

builder.newStream(spout, TupleValueMappers.<Long, String, String>of(1, 3, 4), 3)
    .flatMap(v->Arrays.asList(v._3.split("\\s+")))
    .repartition(4)
    .mapToPair(t-> Pair.<String,Integer>of(t,1))
    .repartition(2)
    .window(SlidingWindows.of(BaseWindowedBolt.Duration.seconds(5),BaseWindowedBolt.Duration.seconds(2)))
    .countByKey().peek(t-> System.out.println(t));

注意:repartition操作会产生网络操作- shuffle

输出算子-Sinks

print和peek
builder.newStream(spout, TupleValueMappers.<Long, String, String>of(1, 3, 4), 3)
                .flatMap(v->Arrays.asList(v._3.split("\\s+")))
                .repartition(4)
                .mapToPair(t-> Pair.<String,Integer>of(t,1))
                .repartition(2)
                .window(SlidingWindows.of(BaseWindowedBolt.Duration.seconds(5),BaseWindowedBolt.Duration.seconds(2)))
                .countByKey().print();

print 返回值是void表示流的终止,后续无法追加算子。而Peek作为程序执行探针,用于debug调试,并不影响程序正常执行的流程。

forEach
builder.newStream(spout, TupleValueMappers.<Long, String, String>of(1, 3, 4), 3)
        .flatMap(v->Arrays.asList(v._3.split("\\s+")))
        .repartition(4)
        .mapToPair(t-> Pair.<String,Integer>of(t,1))
        .repartition(2)
        .window(SlidingWindows.of(BaseWindowedBolt.Duration.seconds(5),BaseWindowedBolt.Duration.seconds(2)))
        .countByKey()
        .forEach(t-> System.out.println(t));
to
 JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder().setHost("CentOSA").setPort(6379).build();
KafkaSpout<String, String> spout = KafkaSpoutUtils.buildKafkaSpout("CentOSA:9092,CentOSB:9092,CentOSC:9092",
                                                                   "topic01");
builder.newStream(spout, TupleValueMappers.<Long, String, String>of(1, 3, 4), 3)
    .flatMap(v->Arrays.asList(v._3.split("\\s+")))
    .repartition(4)
    .mapToPair(t-> Pair.<String,Integer>of(t,1))
    .repartition(2)
    .window(SlidingWindows.of(BaseWindowedBolt.Duration.seconds(5),BaseWindowedBolt.Duration.seconds(2)))
    .countByKey()
    .to(new RedisStoreBolt(jedisPoolConfig,new WordCountRedisStoreMapper()));

WordCountRedisStoreMapper

public class WordCountRedisStoreMapper implements RedisStoreMapper {
    @Override
    public RedisDataTypeDescription getDataTypeDescription() {
        return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH,"swc");
    }

    @Override
    public String getKeyFromTuple(ITuple tuple) {
        System.out.println(tuple.getFields());//默认field key,value
        return tuple.getString(0);
    }

    @Override
    public String getValueFromTuple(ITuple tuple) {
        return tuple.getLong(1)+"";
    }
}

分支算子

branch
 KafkaSpout<String, String> spout = KafkaSpoutUtils.buildKafkaSpout("CentOSA:9092,CentOSB:9092,CentOSC:9092",
                "topic01");
        Stream<Tuple3<Long, String, String>>[] streams = builder.newStream(spout, TupleValueMappers.<Long, String, String>of(1, 3, 4), 3)
                .branch(
                        t-> t._3.contains("info"),
                        t->t._3.contains("error"),
                        t-> true
                );
        Stream<Tuple3<Long, String, String>> infoStream = streams[0];
        Stream<Tuple3<Long, String, String>> errorStream = streams[1];
        Stream<Tuple3<Long, String, String>> otherStream = streams[2];

        infoStream.peek(t -> System.out.println("info:"+t));
        errorStream.peek(t -> System.out.println("error:"+t));
        otherStream.peek(t -> System.out.println("other:"+t));

Join

join操作将一个流的值与来自另一个流的具有相同键的值连接起来。

KafkaSpout<String, String> userSpout = KafkaSpoutUtils.buildKafkaSpout("CentOSA:9092,CentOSB:9092,CentOSC:9092",
                                                                       "usertopic");
KafkaSpout<String, String> orderSpout = KafkaSpoutUtils.buildKafkaSpout("CentOSA:9092,CentOSB:9092,CentOSC:9092",
                                                                        "ordertopic");
//001 zhangsan
PairStream<String, String> userPair = builder.newStream(userSpout, TupleValueMappers.<Long, String, String>of(1, 3, 4))
    .mapToPair(t -> {
        String[] tokens = t._3.split("\\s");
        return  Pair.<String, String>of(tokens[0], tokens[1]);
    });
//001 apple 100
PairStream<String, String> orderPair = builder.newStream(orderSpout, TupleValueMappers.<Long, String, String>of(1, 3, 4))
    .mapToPair(t -> {
        String[] tokens = t._3.split("\\s");
        return  Pair.<String, String>of(tokens[0], tokens[1]+":"+tokens[2]);
    });
userPair.window(TumblingWindows.of(BaseWindowedBolt.Duration.seconds(5)))
    .leftOuterJoin(orderPair).peek(t -> System.out.println(t));

CoGroupByKey

coGroupByKey使用其他流中具有相同键的值对此流的值进行分组。

KafkaSpout<String, String> userSpout = KafkaSpoutUtils.buildKafkaSpout("CentOSA:9092,CentOSB:9092,CentOSC:9092",
                                                                       "usertopic");
KafkaSpout<String, String> orderSpout = KafkaSpoutUtils.buildKafkaSpout("CentOSA:9092,CentOSB:9092,CentOSC:9092",
                                                                        "ordertopic");
//001 zhangsan
PairStream<String, String> userPair = builder.newStream(userSpout, TupleValueMappers.<Long, String, String>of(1, 3, 4))
    .mapToPair(t -> {
        String[] tokens = t._3.split("\\s");
        return  Pair.<String, String>of(tokens[0], tokens[1]);
    });
//001 apple 100
PairStream<String, String> orderPair = builder.newStream(orderSpout, TupleValueMappers.<Long, String, String>of(1, 3, 4))
    .mapToPair(t -> {
        String[] tokens = t._3.split("\\s");
        return  Pair.<String, String>of(tokens[0], tokens[1]+":"+tokens[2]);
    });
userPair.coGroupByKey(orderPair).peek(t-> System.out.println(t));

State

updateStateByKey
 builder.newStream(userSpout, TupleValueMappers.<Long, String, String>of(1, 3, 4))
         .map(t->t._3)
         .flatMap(line-> Arrays.asList(line.split("\\s+")))
         .mapToPair(word-> Pair.<String,Integer>of(word,1))
         .updateStateByKey(0,(v1,v2)->v1+v2)
         .toPairStream()
         .peek( t -> System.out.println(t));

Config conf = new Config();
conf.put(Config.TOPOLOGY_STATE_PROVIDER,"org.apache.storm.redis.state.RedisKeyValueStateProvider");
Map<String,Object> stateConfig=new HashMap<String,Object>();
Map<String,Object> redisConfig=new HashMap<String,Object>();
redisConfig.put("host","CentOSA");
redisConfig.put("port",6379);
stateConfig.put("jedisPoolConfig",redisConfig);
ObjectMapper objectMapper=new ObjectMapper();
conf.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG,objectMapper.writeValueAsString(stateConfig));
stateQuery
KafkaSpout<String, String> userSpout = KafkaSpoutUtils.buildKafkaSpout("CentOSA:9092,CentOSB:9092,CentOSC:9092",
                                                                       "topic01");

StreamState<String, Integer> streamState = builder.newStream(userSpout, TupleValueMappers.<Long, String, String>of(1, 3, 4))
    .map(t -> t._3)
    .flatMap(line -> Arrays.asList(line.split("\\s+")))
    .mapToPair(word -> Pair.<String, Integer>of(word, 1))
    .updateStateByKey(0, (v1, v2) -> v1 + v2);


KafkaSpout<String, String> querySpout = KafkaSpoutUtils.buildKafkaSpout("CentOSA:9092,CentOSB:9092,CentOSC:9092",
                                                                        "topic02");

builder.newStream(querySpout, TupleValueMappers.<Long, String, String>of(1, 3, 4))
    .map(t -> t._3)
    .stateQuery(streamState).peek(t-> System.out.println(t));
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐