gRPC实战总结

最近工作中在java项目中使用到gRPC,为防遗忘特在此留痕。
关于gRPC是什么,从何而来,什么使用场景这里就不再赘述,相信能看到这篇文章的朋友既是对gRPC有相关了解了,下面直接进入主题。

添加依赖

<properties>
	<grpc.version>1.12.0</grpc.version>
	<protoc.version>3.5.1-1</protoc.version>
</properties>
<dependencies>
    <dependency>
		<groupId>io.grpc</groupId>
		<artifactId>grpc-netty</artifactId>
		<version>${grpc.version}</version>
	</dependency>
	<dependency>
		<groupId>io.grpc</groupId>
		<artifactId>grpc-protobuf</artifactId>
		<version>${grpc.version}</version>
	</dependency>
	<dependency>
		<groupId>io.grpc</groupId>
		<artifactId>grpc-stub</artifactId>
		<version>${grpc.version}</version>
	</dependency>
</dependencies>
<build>
	<extensions>
		<extension>
			<groupId>kr.motd.maven</groupId>
			<artifactId>os-maven-plugin</artifactId>
			<version>1.5.0.Final</version>
		</extension>
	</extensions>
	<plugins>
        <plugin>
			<groupId>org.xolstice.maven.plugins</groupId>
			<artifactId>protobuf-maven-plugin</artifactId>
			<version>0.5.1</version>
			<configuration>
				<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
				</protocArtifact>
				<pluginId>grpc-java</pluginId>
				<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
				</pluginArtifact>
			</configuration>
			<executions>
				<execution>
					<goals>
						<goal>compile</goal>
						<goal>compile-custom</goal>
					</goals>
				</execution>
			</executions>
		</plugin>
	</plugins>
</build>

创建描述文件

syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.**.**.**.**.hydrologicgrpcsvr";
option java_outer_classname = "HydrologicProto";
option objc_class_prefix = "HGS";

package hydrologicgrpcsvr;

//定义服务
service HydrologicService {
    //服务中的方法,根据DownwardOrder类型的参数获取UpwardRespond类型的结果
    //获取时钟
    rpc getSZ51H (DownwardOrder51H) returns (UpwardRespond51H) {
    }
    ...
    ...
    ...
    //此处省略N个方法

    //定义51H下行命令消息类型
    message DownwardOrder51H {
        string devid = 1;
        string centeraddr = 2;
        string flownum = 3;
        string pwd = 4;
        string afn = 5;
    }

    //定义51H上行响应消息类型
    message UpwardRespond51H {
        string devid = 1;
        string centeraddr = 2;
        string flownum = 3;
        string pwd = 4;
        string afn = 5;
        string sendtime = 6;
        bool issuccess = 7;
        string returnmsg = 8 ;
    }
    ...
    ...
    ...
    //此处省略N个对象
}

maven生成代码并更新目录

maven_ganerated

实现服务类

public class HydrologicServiceImplBaseImpl extends HydrologicServiceGrpc.HydrologicServiceImplBase {

    private static Logger logger = LoggerFactory.getLogger(HydrologicServiceImplBaseImpl.class);

    @Override
    public void getSZ51H(DownwardOrder51H request, StreamObserver<UpwardRespond51H> responseObserver) {
        String devid = request.getDevid();
        GrpcCache.responseObservercache_51H.put(devid, responseObserver);
        HydrBaseBean hydrBaseBean = new HydrBaseBean();
        hydrBaseBean.setYczdz(devid);
        hydrBaseBean.setZxzdz("01");
        hydrBaseBean.setLsh("0001");
        hydrBaseBean.setPwd("1234");
        hydrBaseBean.setGnm("51");
        hydrBaseBean.setEncodeType(Global.ControlCharacters.ENQ.getcch());
        HandlerManager hm = SpringUtil.getBean(HandlerManager.class);
        IHandler handler = hm.getHandler(Integer.parseInt(hydrBaseBean.getGnm(), 16));
        //根据encodeType进行组包
        String reshex = handler.encode(hydrBaseBean);
        String issuccess = new ProtocalAdapterImpl().send2dev(reshex, devid);
        if (!issuccess.equals("true")) {
            //下发失败立即返回
            //只有私有构造方法,所以只能通过builder来构造
            UpwardRespond51H upwardRespond51H = UpwardRespond51H.newBuilder()
                    .setDevid(devid)
                    .setAfn("51")
                    .setIssuccess(false)
                    .setReturnmsg(issuccess)
                    .build();
            //用于返回结果
            responseObserver.onNext(upwardRespond51H);
            //用于告诉客户端调用已结束
            responseObserver.onCompleted();
            GrpcCache.responseObservercache_51H.invalidate(devid);
        }
    }
}    

服务器实现

public class HydrologicGrpcServer {
    private Logger logger = Logger.getLogger(HydrologicGrpcServer.class.getName());
    public static final int DEFAULT_PORT = 3135;

    private int port;//服务端口号

    private Server server;

    public HydrologicGrpcServer(int port) {
        this(port, ServerBuilder.forPort(port));
    }

    public HydrologicGrpcServer(int port, ServerBuilder<?> serverBuilder) {
        this.port = port;

        //构造服务器,添加我们实际的服务
        server = serverBuilder.addService(new HydrologicServiceImplBaseImpl()).build();
    }

    public void start() throws IOException {
        server.start();
        logger.info("gRPC server has started, listening on " + port);
        Thread gcacherefresh = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        Thread.sleep(3000);
                        GrpcCache.responseObservercache_51H.cleanUp();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        gcacherefresh.start();
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {

                HydrologicGrpcServer.this.stop();

            }
        });
    }

    public void stop() {

        if (server != null)
            server.shutdown();

    }

    //阻塞到应用停止
    public void blockUntilShutdown() throws InterruptedException {
        if (server != null) {
            server.awaitTermination();
        }
    }
}

客户端实现

public class HydrologicGrpcClient {
    private Logger logger = Logger.getLogger(HydrologicGrpcClient.class.getName());
    private static final String DEFAULT_HOST = "localhost";

    public static final int DEFAULT_PORT = 3135;

    private ManagedChannel managedChannel;

    //服务存根,用于客户端本地调用
    private HydrologicServiceGrpc.HydrologicServiceBlockingStub simpleServiceBlockingStub;

    public HydrologicGrpcClient(String host, int port){
        this(ManagedChannelBuilder.forAddress(host,port).usePlaintext(true).build());
    }

    public HydrologicGrpcClient(ManagedChannel managedChannel) {
        this.managedChannel = managedChannel;
        this.simpleServiceBlockingStub = HydrologicServiceGrpc.newBlockingStub(managedChannel);
    }

    public void shutdown() throws InterruptedException {
        managedChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
    }

    //获取终端时钟
    public UpwardRespond51H getDevSZ51H(String devId){

        DownwardOrder51H downwardOrder = DownwardOrder51H.newBuilder()
                .setDevid(devId)
                .setAfn("51").build();

        UpwardRespond51H upwardRespond;
        try {
            upwardRespond = simpleServiceBlockingStub.getSZ51H(downwardOrder);
            if(upwardRespond.getIssuccess()){
                logger.log(Level.WARNING, devId+": 51下发请求成功");
            }else {
                logger.log(Level.WARNING, devId+": 51下发请求失败");
            }
        } catch (StatusRuntimeException e) {
            logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
            return null;
        }

        return upwardRespond;
    }
}    

完结

到此一个完整的gRPC应用示例就完成了,朋友们动起手来吧。。

-- Jiay 2019/7/15 00:23

欢迎来访

  • 有问题欢迎留言或加交流qq:825121848
  • 转载请注明出处
  • 请小编喝茶~
Last Updated: 4/16/2022, 11:05:56 AM