gRPC实战总结
最近工作中在java项目中使用到gRPC,为防遗忘特在此留痕。
关于gRPC是什么,从何而来,什么使用场景这里就不再赘述,相信能看到这篇文章的朋友既是对gRPC有相关了解了,下面直接进入主题。
添加依赖
<properties>
<grpc.version>1.12.0</grpc.version>
<protoc.version>3.5.1-1</protoc.version>
</properties>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.5.0.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
创建描述文件
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.**.**.**.**.hydrologicgrpcsvr";
option java_outer_classname = "HydrologicProto";
option objc_class_prefix = "HGS";
package hydrologicgrpcsvr;
//定义服务
service HydrologicService {
//服务中的方法,根据DownwardOrder类型的参数获取UpwardRespond类型的结果
//获取时钟
rpc getSZ51H (DownwardOrder51H) returns (UpwardRespond51H) {
}
...
...
...
//此处省略N个方法
//定义51H下行命令消息类型
message DownwardOrder51H {
string devid = 1;
string centeraddr = 2;
string flownum = 3;
string pwd = 4;
string afn = 5;
}
//定义51H上行响应消息类型
message UpwardRespond51H {
string devid = 1;
string centeraddr = 2;
string flownum = 3;
string pwd = 4;
string afn = 5;
string sendtime = 6;
bool issuccess = 7;
string returnmsg = 8 ;
}
...
...
...
//此处省略N个对象
}
maven生成代码并更新目录
实现服务类
public class HydrologicServiceImplBaseImpl extends HydrologicServiceGrpc.HydrologicServiceImplBase {
private static Logger logger = LoggerFactory.getLogger(HydrologicServiceImplBaseImpl.class);
@Override
public void getSZ51H(DownwardOrder51H request, StreamObserver<UpwardRespond51H> responseObserver) {
String devid = request.getDevid();
GrpcCache.responseObservercache_51H.put(devid, responseObserver);
HydrBaseBean hydrBaseBean = new HydrBaseBean();
hydrBaseBean.setYczdz(devid);
hydrBaseBean.setZxzdz("01");
hydrBaseBean.setLsh("0001");
hydrBaseBean.setPwd("1234");
hydrBaseBean.setGnm("51");
hydrBaseBean.setEncodeType(Global.ControlCharacters.ENQ.getcch());
HandlerManager hm = SpringUtil.getBean(HandlerManager.class);
IHandler handler = hm.getHandler(Integer.parseInt(hydrBaseBean.getGnm(), 16));
//根据encodeType进行组包
String reshex = handler.encode(hydrBaseBean);
String issuccess = new ProtocalAdapterImpl().send2dev(reshex, devid);
if (!issuccess.equals("true")) {
//下发失败立即返回
//只有私有构造方法,所以只能通过builder来构造
UpwardRespond51H upwardRespond51H = UpwardRespond51H.newBuilder()
.setDevid(devid)
.setAfn("51")
.setIssuccess(false)
.setReturnmsg(issuccess)
.build();
//用于返回结果
responseObserver.onNext(upwardRespond51H);
//用于告诉客户端调用已结束
responseObserver.onCompleted();
GrpcCache.responseObservercache_51H.invalidate(devid);
}
}
}
服务器实现
public class HydrologicGrpcServer {
private Logger logger = Logger.getLogger(HydrologicGrpcServer.class.getName());
public static final int DEFAULT_PORT = 3135;
private int port;//服务端口号
private Server server;
public HydrologicGrpcServer(int port) {
this(port, ServerBuilder.forPort(port));
}
public HydrologicGrpcServer(int port, ServerBuilder<?> serverBuilder) {
this.port = port;
//构造服务器,添加我们实际的服务
server = serverBuilder.addService(new HydrologicServiceImplBaseImpl()).build();
}
public void start() throws IOException {
server.start();
logger.info("gRPC server has started, listening on " + port);
Thread gcacherefresh = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(3000);
GrpcCache.responseObservercache_51H.cleanUp();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
gcacherefresh.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
HydrologicGrpcServer.this.stop();
}
});
}
public void stop() {
if (server != null)
server.shutdown();
}
//阻塞到应用停止
public void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
}
客户端实现
public class HydrologicGrpcClient {
private Logger logger = Logger.getLogger(HydrologicGrpcClient.class.getName());
private static final String DEFAULT_HOST = "localhost";
public static final int DEFAULT_PORT = 3135;
private ManagedChannel managedChannel;
//服务存根,用于客户端本地调用
private HydrologicServiceGrpc.HydrologicServiceBlockingStub simpleServiceBlockingStub;
public HydrologicGrpcClient(String host, int port){
this(ManagedChannelBuilder.forAddress(host,port).usePlaintext(true).build());
}
public HydrologicGrpcClient(ManagedChannel managedChannel) {
this.managedChannel = managedChannel;
this.simpleServiceBlockingStub = HydrologicServiceGrpc.newBlockingStub(managedChannel);
}
public void shutdown() throws InterruptedException {
managedChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
//获取终端时钟
public UpwardRespond51H getDevSZ51H(String devId){
DownwardOrder51H downwardOrder = DownwardOrder51H.newBuilder()
.setDevid(devId)
.setAfn("51").build();
UpwardRespond51H upwardRespond;
try {
upwardRespond = simpleServiceBlockingStub.getSZ51H(downwardOrder);
if(upwardRespond.getIssuccess()){
logger.log(Level.WARNING, devId+": 51下发请求成功");
}else {
logger.log(Level.WARNING, devId+": 51下发请求失败");
}
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return null;
}
return upwardRespond;
}
}
完结
到此一个完整的gRPC应用示例就完成了,朋友们动起手来吧。。
-- Jiay 2019/7/15 00:23
欢迎来访
- 有问题欢迎留言或加交流qq:825121848
- 转载请注明出处
- 请小编喝茶~