RPC实现伪个性化推荐方案——python

一、相关概念

RPC
远程过程调用(英语:Remote Procedure Call,缩写为 RPC,也叫远程程序调用)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用。
GRPC
  • gRPC是由Google公司开源的高性能RPC框架。gRPC支持多语言。
  • gRPC原生使用C、Java、Go进行了三种实现,而C语言实现的版本进行封装后又支持C++、C#、Node、ObjC、Python、Ruby、PHP等开发语言
  • gRPC支持多平台。支持的平台包括:Linux、Android、iOS、MacOS、Windows
  • gRPC的消息协议使用Google自家开源的Protocol Buffers协议机制(proto3) 序列化
  • gRPC的传输使用HTTP/2标准,支持双向流和连接多路复用
Protocol Buffers
Protocol Buffers 是一种与语言无关,平台无关的可扩展机制,用于序列化结构化数据。使用Protocol Buffers 可以一次定义结构化的数据,然后可以使用特殊生成的源代码轻松地在各种数据流中使用各种语言编写和读取结构化数据。

现在有许多框架等在使用Protocol Buffers。gRPC也是基于Protocol Buffers。 Protocol Buffers 目前有2和3两个版本号。

在gRPC中推荐使用proto3版本。

二、推荐系统接口定义

接口名称: user_recommend
调用参数:
UserRequest:
    user_id       # 用户id
    channel_id    # 频道id
    article_num   # 推荐的文章数量
    time_stamp    # 推荐的时间戳
返回数据:
ArticleResponse:
    expousre         # 曝光埋点数据
    time_stamp       # 推荐的时间戳
    recommends:      # 推荐结果
        article_id   # 文章id
        track:         # 关于文章的埋点数据
            click    # 用户点击行为的埋点参数
            collect  # 用户收藏的埋点参数
            share    # 用户分享的埋点参数
            read     # 用户进入文章详情的埋点参数
使用Protobuf 定义接口

新建并编辑reco.proto

syntax = "proto3";

message UserRequest {
    string user_id=1;
    int32 channel_id=2;
    int32 article_num=3;
    int64 time_stamp=4;
}

message Track {
    string click=1;
    string collect=2;
    string share=3;
    string read=4;
}

message Article {
    int64 article_id=1;
    Track track=2;
}

message ArticleResponse {
    string exposure=1;
    int64 time_stamp=2;
    repeated Article recommends=3;
}

service UserRecommend {
    rpc user_recommend(UserRequest) returns(ArticleResponse) {}
}

三、代码生成

安装protobuf编译器和grpc库
pip install grpcio-tools
编译生成代码
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. reco.proto
  • -I表示搜索proto文件中被导入文件的目录
  • –python_out表示保存生成Python文件的目录,生成的文件中包含接口定义中的数据类型
  • –grpc_python_out表示保存生成Python文件的目录,生成的文件中包含接口定义中的服务类型

会自动生成如下两个rpc调用辅助代码模块:

reco_pb2.py 保存根据接口定义文件中的数据类型生成的python类

reco_pb2_grpc.py 保存根据接口定义文件中的服务方法类型生成的python调用RPC方法

四、服务端

新建并编辑server.py

# -*- coding: utf-8 -*-

import reco_pb2
import reco_pb2_grpc
import grpc
from concurrent.futures import ThreadPoolExecutor
import time


# rpc接口定义中服务对应成Python的类
class UserRecommendService(reco_pb2_grpc.UserRecommendServicer):

    # 在接口定义的同名方法中补全,被调用时应该执行的逻辑
    def user_recommend(self, request, context):
        # request是调用的请求数据对象
        user_id = request.user_id
        channel_id = request.channel_id
        article_num = request.article_num
        time_stamp = request.time_stamp

        response = reco_pb2.ArticleResponse()
                
        # 手动构造推荐结果,后续对接真实推荐代码
        response.exposure = 'exposure param'
        response.time_stamp = round(time.time() * 1000)
        recommends = []
        for i in range(article_num):
            article = reco_pb2.Article()
            article.track.click = 'click param {}'.format(i + 1)
            article.track.collect = 'collect param {}'.format(i + 1)
            article.track.share = 'share param {}'.format(i + 1)
            article.track.read = 'read param {}'.format(i + 1)
            article.article_id = i + 1
            recommends.append(article)
        response.recommends.extend(recommends)

        # 最终要返回一个调用结果
        return response


def serve():
    """
    rpc服务端启动方法
    """
    # 创建一个rpc服务器
    server = grpc.server(ThreadPoolExecutor(max_workers=10))

    # 向服务器中添加被调用的服务方法
    reco_pb2_grpc.add_UserRecommendServicer_to_server(UserRecommendService(), server)

    # 微服务器绑定ip地址和端口
    server.add_insecure_port('127.0.0.1:8888')

    # 启动rpc服务
    server.start()

    # start()不会阻塞,此处需要加上循环睡眠 防止程序退出
    while True:
        time.sleep(10)


if __name__ == '__main__':
    serve()

五、客户端

新建并编辑client.py

# -*- coding: utf-8 -*-

import grpc
import reco_pb2
import reco_pb2_grpc
import time


def feed_articles(stub):
    # 构建rpc调用的调用参数
    user_request = reco_pb2.UserRequest()
    user_request.user_id = '1'
    user_request.channel_id = 1
    user_request.article_num = 10
    user_request.time_stamp = round(time.time() * 1000)

    # 通过stub进行方法调用,并接收调用返回值
    ret = stub.user_recommend(user_request)
    print('ret={}'.format(ret))


def run():
    """
    rpc客户端调用的方法
    """
    # 使用with语句连接rpc服务器
    with grpc.insecure_channel('127.0.0.1:8888') as channel:
        # 创建调用rpc远端服务的辅助对象stub
        stub = reco_pb2_grpc.UserRecommendStub(channel)
        # 通过stub进行rpc调用
        feed_articles(stub)


if __name__ == '__main__':
    run()

六、运行与结果

1、先运行服务端python server.py

2、再运行客户端python client.py

3、结果:

ret=exposure: "exposure param"
time_stamp: 1593676322597
recommends {
  article_id: 1
  track {
    click: "click param 1"
    collect: "collect param 1"
    share: "share param 1"
    read: "read param 1"
  }
}

。。。省略。。。

recommends {
  article_id: 10
  track {
    click: "click param 10"
    collect: "collect param 10"
    share: "share param 10"
    read: "read param 10"
  }
}

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐