利用python现成的tornado包封装http,调用http接口服务

tornado就不过多做介绍了,一个轻量级别低HTTP服务
如需安装直接

pip install tornado

我这里是简单的获取mysql数据,做简单处理,封装成get请求的服务

#!/usr/bin/env python
from __future__ import print_function

import json
import pymysql
import subprocess
import tornado.escape
from tornado import gen
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
from tornado.options import define, options

# 定义参数信息
define("port", default=8888, help="run on the given port", type=int)
define("mysql_host", default=None, help="mysql database host")
define("mysql_port", default=None, help="mysql database port")
define("mysql_database", default=None, help="mysql database db")
define("mysql_user", default=None, help="mysql database user")
define("mysql_password", default=None, help="mysql database password")

class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            (r"/", MainHandler),
            (r"/GetMysql", MysqlInfoHandler),
        super(Application, self).__init__(handlers)


class MysqlInfoHandler(tornado.web.RequestHandler):
	"""定义连接Mysql的类"""
    def get(self):
    	"""get请求"""
        ids = self.get_argument("id").   # 获取url参数
        # Mysql连接
        self.db = pymysql.Connection(
            host=options.mysql_host, user=options.mysql_user,
            password=options.mysql_password, port=options.mysql_port, database=options.mysql_database)
        cur = self.db.cursor()
        try:
        	# Mysql读数据
            cur.execute("SELECT * FROM mysql_table WHERE dt={_date}".format(_date=ids))
            res = cur.fetchall()
            if len(res) > 0:
                self.write(json.dumps(str(res)))
            else:
                self.write("No Mysql Result")
        except Exception as e:
            return self.write(str(e))
        self.db.commit()
        
        
class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world")

def main():
	#创建一个应用对象
    tornado.options.parse_command_line()
    http_server = tornado.httpserver.HTTPServer(Application())
    http_server.listen(options.port)
    #启动web程序,开始监听端口的连接
    tornado.ioloop.IOLoop.current().start()

if __name__ == "__main__":
    main()

接下来可以直接用request去调用服务

import requests
r = "http://localhost1/GetMysql?id='2021-11-05'"
res = requests.get(r)
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐