如何使APP成为服务端?并通过http调用

2020年开年第一篇!!!继续加油

快照版本git源码地址

简介
  1. 编程语言:Java
  2. JDK版本:JDK1.8
  3. 构建工具:Maven
  4. 项目描述:整体设计思路为,使用netty作为serverSocket服务器,移动端利用socket协议与服务端建立连接,使用自定义通信协议解决大字符串传输问题,使用sun提供的HttpHandler提供Http服务,通过继承NettyDefaultChannelGroup实现对移动端的管理,通过Channel与消息id的绑定进行异步通信,通过label标签 (自定义,通过唯一不同的标签来确定接口,分布式情况下如果不是唯一会随机分配) 来定义当前移动端所具备的功能,使用zookeeper实现分布式协调服务,当单机模式下,http请求会分为公平和非公平两种请求方式,以达到均衡请求和随机请求,当分布式模式下会优先使用本地所连接的移动端,如需要的label在其他移动端上将会调用rpc方式去请求(减少额外网络开销

项目结构树图:
在这里插入图片描述
工作流程图:
在这里插入图片描述
单机,三个客户端,并发测试图:
线程组配置:
在这里插入图片描述
吞吐量(感觉时间还是慢):
在这里插入图片描述
响应时间图(不太理想还得优化,最慢超过了10s):
在这里插入图片描述

部分代码:

启动类源码

import com.sun.net.httpserver.HttpServer;
import config.DispatchConfig;
import core.NettyStart;
import core.ZookeeperClient;
import http.DispatchHandler;
import http.MonitorHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 万物开始的地方
 */
public class DispatchBootStrap {
    private static final Logger LOGGER = LoggerFactory.getLogger(DispatchBootStrap.class);
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 1000, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), r -> {
            Thread thread = new Thread(r);
            thread.setName("dispatch-");
            return thread;
        });
        if(DispatchConfig.ZOOKEEPER_ENABLE){
            ZookeeperClient.init(DispatchConfig.ZOOKEEPER_HOST);
        }
//        ExecutorService threadPoolExecutor = Executors.newCachedThreadPool();
        InetSocketAddress address = new InetSocketAddress(DispatchConfig.DISPATCH_PORT);
        try {
            HttpServer httpServer = HttpServer.create(address, 0);
            httpServer.createContext("/get",new DispatchHandler(DispatchConfig.WAIT_TIME));
            httpServer.createContext("/info",new MonitorHandler());
            httpServer.setExecutor(threadPoolExecutor);
            httpServer.start();
            LOGGER.info("====== HTTP START port is {} ======",DispatchConfig.DISPATCH_PORT);
            threadPoolExecutor.execute(new DispatchNetty());
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                if(LOGGER.isWarnEnabled()){
                    LOGGER.warn("shut down server !");
                }
                ZookeeperClient.deleteLocal();
                threadPoolExecutor.shutdown();
            }));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    static class DispatchNetty implements Runnable{
        @Override
        public void run() {
            LOGGER.info("====== NETTY START port is {} ======",DispatchConfig.NETTY_PORT);
            NettyStart nettyStart = new NettyStart(DispatchConfig.NETTY_PORT);
            nettyStart.start();
        }
    }
}

zk客户端代码

package core;

import beans.RemoteChannel;
import config.DispatchConfig;
import org.apache.commons.collections.CollectionUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import utils.IpUtils;
import utils.JsonUtils;

import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;

import static org.apache.zookeeper.ZooDefs.Ids.OPEN_ACL_UNSAFE;

/**
 * zk客户端
 */
public class ZookeeperClient  {
    private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperClient.class);
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
    /**
     * 客户端节点
     */
    private static final String PATH = "/channels";
    /**
     * 服务端节点
     */
    private static final String REMOTES_PATH = "/remotes";
    private static final ConnectionRepository CHANNEL = ConnectionRepository.get();
    private static ZooKeeper zooKeeper = null;
    private static final Stat stat = new Stat();

    public ZookeeperClient() { LOGGER.info("load ZookeeperInstance ..."); }

    public static void init(String host){
        LOGGER.info("wait zk callback ...");
        try {
            zooKeeper = new ZooKeeper(host,500000,watchedEvent -> {
                List<String> children = null;
                if(Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState()){
                    if(Watcher.Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()){
                        connectedSemaphore.countDown();
                        try {
                            //每次都新加一个watch监听channel下节点变化
                            children = zooKeeper.getChildren(PATH, true);
                        } catch (KeeperException | InterruptedException e) {
                            e.printStackTrace();
                        }
                        if(CollectionUtils.isNotEmpty(children)){
                            LOGGER.info("add old channel size {}",children.size());
                            addRemoteNodeChannels(children);
                        }
                    }else if(Watcher.Event.EventType.NodeChildrenChanged == watchedEvent.getType()){
                        try {
                            //每次都新加一个watch监听channel下节点变化
                            children = zooKeeper.getChildren(PATH, true);
                        } catch (KeeperException | InterruptedException e) {
                            e.printStackTrace();
                        }
                        if(Objects.isNull(children)){
                            LOGGER.warn("children node is null  .." );
                            return;
                        }
                        LOGGER.info("add node changes remote size {} ..", children.size());
                        addRemoteNodeChannels(children);
                    }
                }
            });

            connectedSemaphore.await();
            LOGGER.info("zookeeper successful connected !");

            Stat rootExists = zooKeeper.exists(PATH, false);
            if(Objects.isNull(rootExists)){
                LOGGER.info("[  init create channels root  node !  ]");
                zooKeeper.create(PATH,"".getBytes(), OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
            }
            assert IpUtils.getRealIp() != null;
            Stat remoteExists = zooKeeper.exists(REMOTES_PATH, false);
            if(Objects.isNull(remoteExists)){
                LOGGER.info("[  init create remotes root node!  ]");
                zooKeeper.create(REMOTES_PATH , "".getBytes(), OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
                Thread.sleep(100);
                createZkRemotesNode();
            }else{
                Stat exists = zooKeeper.exists(REMOTES_PATH + "/" + IpUtils.getRealIp(), false);
                if(Objects.isNull(exists)){
                    createZkRemotesNode();
                }else{
                    LOGGER.warn("{} already exists !",REMOTES_PATH + "/" + IpUtils.getRealIp());
                }
            }


        } catch (Exception e) {
            LOGGER.error("zookeeper connection exception !" + e.getMessage());
        }
    }
    public static void addRemoteNodeChannels(List<String>  children){
        for (String child : children) {
            try {
                byte[] data = zooKeeper.getData(PATH+"/"+child, false ,stat);
                RemoteChannel remoteChannel = JsonUtils.get().readValue(data, RemoteChannel.class);
                List<String> label = remoteChannel.getLabel();
                // 127.0.0.0:8080
                CHANNEL.addRemoteChannel(label,child.split("#")[0] + ":" + DispatchConfig.DISPATCH_PORT);
            } catch (KeeperException | InterruptedException | IOException e) {
                e.printStackTrace();
            }
        }
    }
    public static void createZkRemotesNode(){
        try {
            zooKeeper.create(REMOTES_PATH + "/" + IpUtils.getRealIp(), IpUtils.getRealIp().getBytes(), OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static boolean createChannel(String remoteIp, String info){
        try {
            synchronized (zooKeeper){
                zooKeeper.create(PATH + remoteIp , info.getBytes(), OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            }
            //    /channel/127.0.0.1#0
        } catch (KeeperException | InterruptedException e) {
            LOGGER.error("create remote channel error !");
            e.printStackTrace();
        }
        return true;
    }
    public static void deleteLocal(){
        try {
            zooKeeper.delete(REMOTES_PATH + "/" + IpUtils.getRealIp(),stat.getVersion());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }

    }
    public static void deleteNode(String ip) {
        try {
            zooKeeper.delete(PATH + ip,0);
            LOGGER.info("Delete remote ip : {}",ip);
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }
}

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐