Java Integration with InfluxDB

本文主要介绍java集成InfluxDB,基于springboot:

  • RestTemplate call InfluxDB HTTP API
  • 依赖 “influxdb-java” jar

1)RestTemplate call InfluxDB HTTP API

缺点:代码维护成本大,且存在sql注入问题

String url="HTTP://localhost:8086/query";
MultiValueMap<String,String> postParameter=new LinkedMultiValueMap<String,String>();
postParameter.add("db","my_test_influx");
postParameter.add("q","SELECT * FROM cpu");
Object result = restTemplate.postForObject(url,postParameter,Object.class);

2)依赖 “influxdb-java” jar

优缺点:可以简化开发,并提供数据库结果集和pojo之间的转换 不足: 目前功能不太完善,没有提供插入数据时POJO直接映射DB存储

<dependency>
   <groupId>org.influxdb</groupId>
   <artifactId>influxdb-java</artifactId>
   <version>2.11</version>
</dependency>

底层借助Retrofit框架使用OKHTTP 处理网络请求

//1)普通查询
InfluxDB influxDB = InfluxDBFactory.connect(infulxUrl);
Query query = new Query("SELECT * FROM cpu", "my_test_influx");
QueryResult queryResult = influxDB.query(query);

//2)避免注入查询
Query query = BoundParameterQuery.QueryBuilder.newQuery("SELECT * FROM cpu WHERE region = $region")
        .forDatabase("my_test_influx")
        .bind("region", region)
        .create();
QueryResult queryResult = influxDB.query(query);

//借助InfluxDBResultMapper类进行结果集到POJO的转换
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
List<Cpu> list = resultMapper.toPOJO(queryResult, Cpu.class);
//实体类需添加对应注解
@Measurement(name = "cpu")
public class Cpu {
    @Column(name = "time")
    private Instant time;
    @Column(name = "hostname", tag = true)
    private String hostname;
    @Column(name = "region", tag = true)
    private String region;
    @Column(name = "idle")
    Get... Set ...
}
//写入数据
InfluxDB influxDB = InfluxDBFactory.connect(infulxUrl);
influxDB.setRetentionPolicy("2_hours");//数据保存测试,by default 用默认策略
influxDB.setDatabase("my_test_influx");
Point.Builder builder = Point.measurement("cpu");
Long currentTime = System.currentTimeMillis();
builder.time(currentTime, TimeUnit.MILLISECONDS);
builder.addField("idle", 90L);
builder.addField("hostname", "server1");
builder.addField("region", region);
builder.addField("happydevop", false);
Point point = builder.build();
influxDB.write(point);

//根据POJO注解,用反射的方式封装插入POJO数据

Class clz = obj.getClass();
String name = ((Measurement)clz.getAnnotation(Measurement.class)).name();
Point.Builder builder = Point.measurement(name);
Field[] fields = clz.getDeclaredFields();
Map<String, Object> fieldMaps = new TreeMap();
for (Field field : fields){
    if (!field.isAccessible()) {
        field.setAccessible(true);
    }
    Column column =  getColumnAnnotation(field);
    if(column!=null){
        Object value = field.get(obj);
        if(value==null)continue;
        if ("time".equals(column.name())) {
            Instant timeT = Instant.parse((String)value);
            builder.time(timeT.toEpochMilli(), TimeUnit.MILLISECONDS);
        } else {
            if (column.tag()) {
                builder.tag(column.name(), String.valueOf(value));
            } else {
                fieldMaps.put(column.name(),value);
 }
        }
   }
}
builder.fields(fieldMaps);
Point point = builder.build();
influxDB.write(point);
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐