如何使移动端成为服务端?并通过Http调用
如何使APP成为服务端?并通过http调用编程语言:JavaJDK版本:JDK1.8构建工具:Maven项目描述:整体设计思路为,使用netty作为serverSocket服务器,移动端利用socket协议与服务端建立连接,使用自定义通信协议解决大字符串传输问题,使用sun提供的HttpHandler提供Http服务,通过继承NettyDefaultChannelGroup实现对移动端的...
·
如何使APP成为服务端?并通过http调用
2020年开年第一篇!!!继续加油
快照版本git源码地址
简介
- 编程语言:Java
- JDK版本:JDK1.8
- 构建工具:Maven
- 项目描述:整体设计思路为,使用netty作为serverSocket服务器,移动端利用socket协议与服务端建立连接,使用自定义通信协议解决大字符串传输问题,使用sun提供的HttpHandler提供Http服务,通过继承NettyDefaultChannelGroup实现对移动端的管理,通过Channel与消息id的绑定进行异步通信,通过label标签 (自定义,通过唯一不同的标签来确定接口,分布式情况下如果不是唯一会随机分配) 来定义当前移动端所具备的功能,使用zookeeper实现分布式协调服务,当单机模式下,http请求会分为公平和非公平两种请求方式,以达到均衡请求和随机请求,当分布式模式下会优先使用本地所连接的移动端,如需要的label在其他移动端上将会调用rpc方式去请求(减少额外网络开销)
项目结构树图:
工作流程图:
单机,三个客户端,并发测试图:
线程组配置:
吞吐量(感觉时间还是慢):
响应时间图(不太理想还得优化,最慢超过了10s):
部分代码:
启动类源码
import com.sun.net.httpserver.HttpServer;
import config.DispatchConfig;
import core.NettyStart;
import core.ZookeeperClient;
import http.DispatchHandler;
import http.MonitorHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 万物开始的地方
*/
public class DispatchBootStrap {
private static final Logger LOGGER = LoggerFactory.getLogger(DispatchBootStrap.class);
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 1000, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), r -> {
Thread thread = new Thread(r);
thread.setName("dispatch-");
return thread;
});
if(DispatchConfig.ZOOKEEPER_ENABLE){
ZookeeperClient.init(DispatchConfig.ZOOKEEPER_HOST);
}
// ExecutorService threadPoolExecutor = Executors.newCachedThreadPool();
InetSocketAddress address = new InetSocketAddress(DispatchConfig.DISPATCH_PORT);
try {
HttpServer httpServer = HttpServer.create(address, 0);
httpServer.createContext("/get",new DispatchHandler(DispatchConfig.WAIT_TIME));
httpServer.createContext("/info",new MonitorHandler());
httpServer.setExecutor(threadPoolExecutor);
httpServer.start();
LOGGER.info("====== HTTP START port is {} ======",DispatchConfig.DISPATCH_PORT);
threadPoolExecutor.execute(new DispatchNetty());
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if(LOGGER.isWarnEnabled()){
LOGGER.warn("shut down server !");
}
ZookeeperClient.deleteLocal();
threadPoolExecutor.shutdown();
}));
} catch (IOException e) {
e.printStackTrace();
}
}
static class DispatchNetty implements Runnable{
@Override
public void run() {
LOGGER.info("====== NETTY START port is {} ======",DispatchConfig.NETTY_PORT);
NettyStart nettyStart = new NettyStart(DispatchConfig.NETTY_PORT);
nettyStart.start();
}
}
}
zk客户端代码
package core;
import beans.RemoteChannel;
import config.DispatchConfig;
import org.apache.commons.collections.CollectionUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import utils.IpUtils;
import utils.JsonUtils;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import static org.apache.zookeeper.ZooDefs.Ids.OPEN_ACL_UNSAFE;
/**
* zk客户端
*/
public class ZookeeperClient {
private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperClient.class);
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
/**
* 客户端节点
*/
private static final String PATH = "/channels";
/**
* 服务端节点
*/
private static final String REMOTES_PATH = "/remotes";
private static final ConnectionRepository CHANNEL = ConnectionRepository.get();
private static ZooKeeper zooKeeper = null;
private static final Stat stat = new Stat();
public ZookeeperClient() { LOGGER.info("load ZookeeperInstance ..."); }
public static void init(String host){
LOGGER.info("wait zk callback ...");
try {
zooKeeper = new ZooKeeper(host,500000,watchedEvent -> {
List<String> children = null;
if(Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState()){
if(Watcher.Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()){
connectedSemaphore.countDown();
try {
//每次都新加一个watch监听channel下节点变化
children = zooKeeper.getChildren(PATH, true);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
if(CollectionUtils.isNotEmpty(children)){
LOGGER.info("add old channel size {}",children.size());
addRemoteNodeChannels(children);
}
}else if(Watcher.Event.EventType.NodeChildrenChanged == watchedEvent.getType()){
try {
//每次都新加一个watch监听channel下节点变化
children = zooKeeper.getChildren(PATH, true);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
if(Objects.isNull(children)){
LOGGER.warn("children node is null .." );
return;
}
LOGGER.info("add node changes remote size {} ..", children.size());
addRemoteNodeChannels(children);
}
}
});
connectedSemaphore.await();
LOGGER.info("zookeeper successful connected !");
Stat rootExists = zooKeeper.exists(PATH, false);
if(Objects.isNull(rootExists)){
LOGGER.info("[ init create channels root node ! ]");
zooKeeper.create(PATH,"".getBytes(), OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
assert IpUtils.getRealIp() != null;
Stat remoteExists = zooKeeper.exists(REMOTES_PATH, false);
if(Objects.isNull(remoteExists)){
LOGGER.info("[ init create remotes root node! ]");
zooKeeper.create(REMOTES_PATH , "".getBytes(), OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
Thread.sleep(100);
createZkRemotesNode();
}else{
Stat exists = zooKeeper.exists(REMOTES_PATH + "/" + IpUtils.getRealIp(), false);
if(Objects.isNull(exists)){
createZkRemotesNode();
}else{
LOGGER.warn("{} already exists !",REMOTES_PATH + "/" + IpUtils.getRealIp());
}
}
} catch (Exception e) {
LOGGER.error("zookeeper connection exception !" + e.getMessage());
}
}
public static void addRemoteNodeChannels(List<String> children){
for (String child : children) {
try {
byte[] data = zooKeeper.getData(PATH+"/"+child, false ,stat);
RemoteChannel remoteChannel = JsonUtils.get().readValue(data, RemoteChannel.class);
List<String> label = remoteChannel.getLabel();
// 127.0.0.0:8080
CHANNEL.addRemoteChannel(label,child.split("#")[0] + ":" + DispatchConfig.DISPATCH_PORT);
} catch (KeeperException | InterruptedException | IOException e) {
e.printStackTrace();
}
}
}
public static void createZkRemotesNode(){
try {
zooKeeper.create(REMOTES_PATH + "/" + IpUtils.getRealIp(), IpUtils.getRealIp().getBytes(), OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static boolean createChannel(String remoteIp, String info){
try {
synchronized (zooKeeper){
zooKeeper.create(PATH + remoteIp , info.getBytes(), OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
// /channel/127.0.0.1#0
} catch (KeeperException | InterruptedException e) {
LOGGER.error("create remote channel error !");
e.printStackTrace();
}
return true;
}
public static void deleteLocal(){
try {
zooKeeper.delete(REMOTES_PATH + "/" + IpUtils.getRealIp(),stat.getVersion());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
public static void deleteNode(String ip) {
try {
zooKeeper.delete(PATH + ip,0);
LOGGER.info("Delete remote ip : {}",ip);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)