Commit d24a83e1 authored by fangzongwu's avatar fangzongwu

.

parent 6c244646
client/target/*
daemon/third-party
rpc_sys/bindings
daemon/target/*
common_lib/target/*
mercury-sys/target/*
rootdir/*
mountdir/*
target/*
Cargo.lock
*.lock
target
*.rs.bk
*.rs.fmt
.vscode
.idea
*.o
rocksdb_tmp/
mount_path/
fs_data/
/mount_path/
/fs_data/
rpc_sys/rpc/librpc.a
rpc_sys/rpc/librpc.so
rpc_sys/rpc/librpc.a-bak
rpc_sys/rpc/librpc.so-bak
config/burstfs_hosts.yml
person.MD
temp.rs
rpc_sys/bindings
\ No newline at end of file
[package]
name = "registry"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[[bin]] # Bin to run the HelloWorld gRPC server
name = "server"
path = "src/server.rs"
[[bin]] # Bin to run the HelloWorld gRPC client
name = "client"
path = "src/client.rs"
[dependencies]
tonic = "0.7"
prost = "0.10"
tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs", "macros", "net",] }
tokio-stream = { version = "0.1", features = ["net"] }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
h2 = "0.3"
[build-dependencies]
tonic-build = "0.7"
\ No newline at end of file
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/MetadataOps.proto")?;
Ok(())
}
/*
*
* Copyright beikunyun filesystem team.
*
*
*/
syntax = "proto3";
package bky.registry.api;
message Metadata{
string kv = 1;
}
//client向server拉取数据请求创建文件或目录
message PullCreateReq {
//0: 客户端向服务端发送请求; 1: 服务端向客户端发送请求; 其他:未定义行为
int32 sendtype = 1;
//Tenent及用户各使用16bits标识码,称为tcode;
//在rocksdb里面每个key都会该信息,可以把这部分信息抽离出来,再在数据接收端进行key值的拼接,以减少网络层数据传输的压力
string t_code = 2;
}
message PullCreateResp {
string t_code = 1;
// repeated Metadata md = 2;
// 0:成功; -1: 失败; 其他:未定义行为
int32 ret_code = 3;
}
// //client向server推送数据请求创建文件或目录
// message PushCreateReq {
// //0: 客户端向服务端发送请求; 1: 服务端向客户端发送请求; 其他:未定义行为
// int32 type = 1;
// //Tenent及用户各使用16bits标识码,称为tcode;
// string tcode = 2;
// repeated Metadata metadata = 3;
// }
// message PushCreateResp {
// string tcode = 1;
// //数据推送是否更新成功
// int32 retCode = 2;
// }
// //文件路径,对应rocksdb的key值
// message AbsPath{
// string k = 1;
// }
// //删除
// message RemoveReq{
// string tcode = 1;
// repeated AbsPath path = 2;
// }
// message RemoveResp{
// string tcode = 1;
// int32 retCode = 2;
// }
// //更新
// message UpdateReq{
// string tcode = 1;
// repeated Metadata md = 2;
// }
// message UpdateResp{
// string tcode = 1;
// int32 retCode = 2;
// }
// //目录查询
// message QueryDirdentsReq{
// string tcode = 1;
// repeated AbsPath path= 2;
// }
// message QueryDirdentsResp{
// string tcode = 1;
// repeated Metadata md = 2;
// }
service MetadataOps {
rpc CreateByPull(stream PullCreateReq) returns (stream PullCreateResp) {}
// rpc CreateByPush(stream PushCreateReq) returns (stream PushCreateResp) {}
// rpc Remove(stream RemoveReq) returns(stream RemoveResp){}
// rpc Update(stream UpdateReq) returns(stream UpdateResp){}
// rpc QueryDirdents(stream QueryDirdentsReq) returns(stream QueryDirdentsResp){}
}
\ No newline at end of file
pub mod registry {
tonic::include_proto!("bky.registry.api");
}
use futures::stream::Stream;
use std::time::Duration;
use tokio_stream::StreamExt;
use tonic::{server::ClientStreamingService, transport::Channel};
use registry::{metadata_ops_client::MetadataOpsClient, PullCreateReq};
fn generate_request() -> impl Stream<Item = PullCreateReq> {
tokio_stream::iter(1..u8::MAX).map(|i| PullCreateReq {
t_code: format!("t_code:{:02}", i),
sendtype: i as i32,
})
}
async fn create_by_pull(client: &mut MetadataOpsClient<Channel>, dur: Duration) {
// let in_stream = generate_request().throttle(dur);
let in_stream = generate_request();
let respone = client.create_by_pull(in_stream).await.unwrap();
let mut resp_stream = respone.into_inner();
while let Some(received) = resp_stream.next().await {
let received = received.unwrap();
println!("received :{}", received.t_code);
}
let in_stream_2 = generate_request();
let respone = client.create_by_pull(in_stream_2).await.unwrap();
let mut resp_stream = respone.into_inner();
while let Some(received) = resp_stream.next().await {
let received = received.unwrap();
println!("received :{}", received.t_code);
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = MetadataOpsClient::connect("http://192.168.110.183:50050")
// let mut client = MetadataOpsClient::connect("http://192.168.110.183:50050")
.await
.unwrap();
println!("start bidirectional stream echo ");
create_by_pull(&mut client, Duration::from_micros(1)).await;
Ok(())
}
pub mod registry {
tonic::include_proto!("bky.registry.api");
}
use futures::Stream;
use registry::{PullCreateReq, PullCreateResp};
use std::{error::Error, io::ErrorKind, net::ToSocketAddrs, pin::Pin, result, time::Duration};
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tonic::{transport::Server, Request, Response, Status, Streaming};
type PushCreateResult<T> = Result<Response<T>, Status>;
#[derive(Debug)]
pub struct RegistryServer {}
type RegistryResult<T> = Result<Response<T>, Status>;
type ResponseStream = Pin<Box<dyn Stream<Item = Result<PullCreateResp, Status>> + Send>>;
fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
let mut err: &(dyn Error + 'static) = err_status;
loop {
if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
return Some(io_err);
}
// h2::Error do not expose std::io::Error with `source()`
// https://github.com/hyperium/h2/pull/462
if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
if let Some(io_err) = h2_err.get_io() {
return Some(io_err);
}
}
err = match err.source() {
Some(err) => err,
None => return None,
};
}
}
#[tonic::async_trait]
impl registry::metadata_ops_server::MetadataOps for RegistryServer {
type CreateByPullStream = ResponseStream;
async fn create_by_pull(
&self,
req: Request<Streaming<PullCreateReq>>,
) -> RegistryResult<Self::CreateByPullStream> {
println!("create_by_pull ");
println!("req={:?}", req);
let mut in_stream = req.into_inner();
println!("in_stream={:?}", in_stream);
let (tx, rx) = mpsc::channel(128);
tokio::spawn(async move {
println!("tokio spawn..");
while let Some(result) = in_stream.next().await {
println!("result = {:?}", result);
match result {
Ok(v) => tx
.send(Ok(PullCreateResp {
t_code: v.t_code,
ret_code: 0,
}))
.await
.expect("working rx"),
Err(err) => {
if let Some(io_err) = match_for_io_error(&err) {
if io_err.kind() == ErrorKind::BrokenPipe {
eprintln!("\tclient disconnected: broken pipe");
break;
}
}
match tx.send(Err(err)).await {
Ok(_) => (),
Err(_err) => break,
}
}
}
println!("\tstream ended");
}
});
let out_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(out_stream)))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("main...");
let server = RegistryServer {};
Server::builder()
.add_service(registry::metadata_ops_server::MetadataOpsServer::new(
server,
))
.serve(
"192.168.110.183:50050"
.to_socket_addrs()
.unwrap()
.next()
.unwrap(),
)
.await
.unwrap();
Ok(())
}
[package]
name = "bi"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[[bin]] # Bin to run the HelloWorld gRPC server
name = "helloworld-server"
path = "src/server.rs"
[[bin]] # Bin to run the HelloWorld gRPC client
name = "helloworld-client"
path = "src/client.rs"
[dependencies]
tonic = "0.7"
prost = "0.10"
tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs", "macros", "net",] }
tokio-stream = { version = "0.1", features = ["net"] }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
h2 = "0.3"
[build-dependencies]
tonic-build = "0.7"
\ No newline at end of file
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/echo.proto")?;
Ok(())
}
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
syntax = "proto3";
package grpc.examples.echo;
// EchoRequest is the request for echo.
message EchoRequest {
string message = 1;
string req = 2;
}
// EchoResponse is the response for echo.
message EchoResponse {
string message = 1;
string resp = 2;
}
// Echo is the echo service.
service Echo {
// // UnaryEcho is unary echo.
// rpc UnaryEcho(EchoRequest) returns (EchoResponse) {}
// // ServerStreamingEcho is server side streaming.
// rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {}
// // ClientStreamingEcho is client side streaming.
// rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {}
// // BidirectionalStreamingEcho is bidi streaming.
rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {}
}
\ No newline at end of file
pub mod pb {
tonic::include_proto!("grpc.examples.echo");
}
use futures::stream::Stream;
use std::time::Duration;
use tokio_stream::StreamExt;
use tonic::transport::Channel;
use pb::{echo_client::EchoClient, EchoRequest};
fn echo_requests_iter() -> impl Stream<Item = EchoRequest> {
tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest {
message: format!("msg {:02}", i),
req: "good".to_string(),
})
}
// async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
// let stream = client
// .server_streaming_echo(EchoRequest {
// message: "foo".into(),
// req: "world".to_string(),
// })
// .await
// .unwrap()
// .into_inner();
// // stream is infinite - take just 5 elements and then disconnect
// let mut stream = stream.take(num);
// while let Some(item) = stream.next().await {
// println!("item = {:?}", item);
// println!("\treceived: {}", item.unwrap().message);
// }
// // stream is droped here and the disconnect info is send to server
// }
// async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
// let in_stream = echo_requests_iter().take(num);
// let response = client
// .bidirectional_streaming_echo(in_stream)
// .await
// .unwrap();
// let mut resp_stream = response.into_inner();
// while let Some(received) = resp_stream.next().await {
// let received = received.unwrap();
// println!("\treceived message: `{}`", received.message);
// }
// }
async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration) {
println!("bidirectional_streaming_echo_throttle come");
let in_stream = echo_requests_iter().throttle(dur);
let response = client
.bidirectional_streaming_echo(in_stream)
.await
.unwrap();
let mut resp_stream = response.into_inner();
while let Some(received) = resp_stream.next().await {
let received = received.unwrap();
println!("\treceived message: `{}`", received.message);
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// let mut client = EchoClient::connect("http://[::1]:50051").await.unwrap();
let mut client = EchoClient::connect("http://192.168.110.183:50051")
.await
.unwrap();
// println!("Streaming echo:");
// streaming_echo(&mut client, 5).await;
// tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions
// // tokio::time::sleep(Duration::from_micros(1)).await;
// // Echo stream that sends 17 requests then graceful end that connection
// println!("\r\nBidirectional stream echo:");
// bidirectional_streaming_echo(&mut client, 17).await;
// Echo stream that sends up to `usize::MAX` requets. One request each 2s.
// Exiting client with CTRL+C demonstrate how to distinguish broken pipe from
//graceful client disconnection (above example) on the server side.
println!("\r\nBidirectional stream echo (kill client with CTLR+C):");
println!("client ={:?}", client);
// bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await;
// bidirectional_streaming_echo_throttle(&mut client, Duration::from_micros(1)).await;
Ok(())
}
pub mod pb {
tonic::include_proto!("grpc.examples.echo");
}
use futures::Stream;
use std::{error::Error, io::ErrorKind, net::ToSocketAddrs, pin::Pin, time::Duration};
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tonic::{transport::Server, Request, Response, Status, Streaming};
use pb::{EchoRequest, EchoResponse};
type EchoResult<T> = Result<Response<T>, Status>;
type ResponseStream = Pin<Box<dyn Stream<Item = Result<EchoResponse, Status>> + Send>>;
fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
let mut err: &(dyn Error + 'static) = err_status;
loop {
if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
return Some(io_err);
}
// h2::Error do not expose std::io::Error with `source()`
// https://github.com/hyperium/h2/pull/462
if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
if let Some(io_err) = h2_err.get_io() {
return Some(io_err);
}
}
err = match err.source() {
Some(err) => err,
None => return None,
};
}
}
#[derive(Debug)]
pub struct EchoServer {}
#[tonic::async_trait]
impl pb::echo_server::Echo for EchoServer {
// async fn unary_echo(&self, _: Request<EchoRequest>) -> EchoResult<EchoResponse> {
// Err(Status::unimplemented("not implemented"))
// }
// type ServerStreamingEchoStream = ResponseStream;
// async fn server_streaming_echo(
// &self,
// req: Request<EchoRequest>,
// ) -> EchoResult<Self::ServerStreamingEchoStream> {
// println!("EchoServer::server_streaming_echo");
// println!("\tclient connected from: {:?}", req.remote_addr());
// // creating infinite stream with requested message
// let repeat = std::iter::repeat(EchoResponse {
// message: req.into_inner().message,
// resp: String::from("resp morning"),
// });
// let mut stream = Box::pin(tokio_stream::iter(repeat).throttle(Duration::from_millis(200)));
// // spawn and channel are required if you want handle "disconnect" functionality
// // the `out_stream` will not be polled after client disconnect
// let (tx, rx) = mpsc::channel(128);
// tokio::spawn(async move {
// while let Some(item) = stream.next().await {
// match tx.send(Result::<_, Status>::Ok(item)).await {
// Ok(_) => {
// println!("server loop item send");
// // item (server response) was queued to be send to client
// }
// Err(_item) => {
// // output_stream was build from rx and both are dropped
// break;
// }
// }
// }
// println!("\tclient disconnected");
// });
// let output_stream = ReceiverStream::new(rx);
// Ok(Response::new(
// Box::pin(output_stream) as Self::ServerStreamingEchoStream
// ))
// }
// async fn client_streaming_echo(
// &self,
// _: Request<Streaming<EchoRequest>>,
// ) -> EchoResult<EchoResponse> {
// Err(Status::unimplemented("not implemented"))
// }
type BidirectionalStreamingEchoStream = ResponseStream;
async fn bidirectional_streaming_echo(
&self,
req: Request<Streaming<EchoRequest>>,
) -> EchoResult<Self::BidirectionalStreamingEchoStream> {
println!("EchoServer::bidirectional_streaming_echo");
println!("req={:?}", req);
let mut in_stream = req.into_inner();
println!("in_stream={:?}", in_stream);
let (tx, rx) = mpsc::channel(128);
// this spawn here is required if you want to handle connection error.
// If we just map `in_stream` and write it back as `out_stream` the `out_stream`
// will be drooped when connection error occurs and error will never be propagated
// to mapped version of `in_stream`.
tokio::spawn(async move {
while let Some(result) = in_stream.next().await {
println!("result = {:?}", result);
match result {
Ok(v) => tx
.send(Ok(EchoResponse {
message: v.message,
resp: v.req,
}))
.await
.expect("working rx"),
Err(err) => {
if let Some(io_err) = match_for_io_error(&err) {
if io_err.kind() == ErrorKind::BrokenPipe {
// here you can handle special case when client
// disconnected in unexpected way
eprintln!("\tclient disconnected: broken pipe");
break;
}
}
match tx.send(Err(err)).await {
Ok(_) => (),
Err(_err) => break, // response was droped
}
}
}
}
println!("\tstream ended");
});
// echo just write the same data that was received
let out_stream = ReceiverStream::new(rx);
Ok(Response::new(
Box::pin(out_stream) as Self::BidirectionalStreamingEchoStream
))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let server = EchoServer {};
Server::builder()
.add_service(pb::echo_server::EchoServer::new(server))
// .serve("[::1]:50051".to_socket_addrs().unwrap().next().unwrap())
.serve(
"192.168.110.183:50051"
.to_socket_addrs()
.unwrap()
.next()
.unwrap(),
)
.await
.unwrap();
Ok(())
}
[package]
name = "registry"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[[bin]] # Bin to run the HelloWorld gRPC server
name = "server"
path = "src/server.rs"
[[bin]] # Bin to run the HelloWorld gRPC client
name = "client"
path = "src/client.rs"
[[bin]] # Bin to run the HelloWorld gRPC client
name = "test"
path = "src/test.rs"
[dependencies]
tonic = "0.7"
prost = "0.10"
tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs", "macros", "net",] }
tokio-stream = { version = "0.1", features = ["net"] }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
h2 = "0.3"
[build-dependencies]
tonic-build = "0.7"
futures = "0.3"
futures-util = "0.3"
\ No newline at end of file
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/MetadataOps.proto")?;
Ok(())
}
/*
*
* Copyright beikunyun filesystem team.
*
*
*/
syntax = "proto3";
package bky.registry.api;
message Metadata{
string kv = 1;
}
//client向server拉取数据请求创建文件或目录
message PullCreateReq {
//0: 客户端向服务端发送请求; 1: 服务端向客户端发送请求; 其他:未定义行为
int32 sendtype = 1;
//Tenent及用户各使用16bits标识码,称为tcode;
//在rocksdb里面每个key都会该信息,可以把这部分信息抽离出来,再在数据接收端进行key值的拼接,以减少网络层数据传输的压力
string t_code = 2;
}
message PullCreateResp {
string t_code = 1;
// repeated Metadata md = 2;
// 0:成功; -1: 失败; 其他:未定义行为
int32 ret_code = 3;
}
//client向server推送数据请求创建文件或目录
message PushCreateReq {
//0: 客户端向服务端发送请求; 1: 服务端向客户端发送请求; 其他:未定义行为
int32 sendtype = 1;
//Tenent及用户各使用16bits标识码,称为tcode;
string tcode = 2;
repeated Metadata metadata = 3;
}
message PushCreateResp {
string tcode = 1;
//数据推送是否更新成功
int32 retCode = 2;
}
// //文件路径,对应rocksdb的key值
// message AbsPath{
// string k = 1;
// }
// //删除
// message RemoveReq{
// string tcode = 1;
// repeated AbsPath path = 2;
// }
// message RemoveResp{
// string tcode = 1;
// int32 retCode = 2;
// }
// //更新
// message UpdateReq{
// string tcode = 1;
// repeated Metadata md = 2;
// }
// message UpdateResp{
// string tcode = 1;
// int32 retCode = 2;
// }
// //目录查询
// message QueryDirdentsReq{
// string tcode = 1;
// repeated AbsPath path= 2;
// }
// message QueryDirdentsResp{
// string tcode = 1;
// repeated Metadata md = 2;
// }
service MetadataOps {
rpc CreateByPull(stream PullCreateReq) returns (stream PullCreateResp) {}
rpc CreateByPush(stream PushCreateReq) returns (stream PushCreateResp) {}
// rpc Remove(stream RemoveReq) returns(stream RemoveResp){}
// rpc Update(stream UpdateReq) returns(stream UpdateResp){}
// rpc QueryDirdents(stream QueryDirdentsReq) returns(stream QueryDirdentsResp){}
}
\ No newline at end of file
pub mod registry {
tonic::include_proto!("bky.registry.api");
}
use futures::stream::Stream;
use std::time::Duration;
use tokio_stream::StreamExt;
use tonic::{server::ClientStreamingService, transport::Channel};
use registry::{metadata_ops_client::MetadataOpsClient, PullCreateReq};
fn generate_request() -> impl Stream<Item = PullCreateReq> {
tokio_stream::iter(1..3).map(|i| PullCreateReq {
t_code: format!("t_code:{:02}", i),
sendtype: i as i32,
})
}
async fn create_by_pull(client: &mut MetadataOpsClient<Channel>, dur: Duration) {
// let in_stream = generate_request().throttle(dur);
let in_stream = generate_request();
let respone = client.create_by_pull(in_stream).await.unwrap();
let mut resp_stream = respone.into_inner();
while let Some(received) = resp_stream.next().await {
let received = received.unwrap();
println!("received :{}", received.t_code);
}
let in_stream_2 = generate_request();
let respone = client.create_by_pull(in_stream_2).await.unwrap();
let mut resp_stream = respone.into_inner();
while let Some(received) = resp_stream.next().await {
let received = received.unwrap();
println!("received :{}", received.t_code);
}
}
use std::fs;
use crate::registry::PushCreateReq;
async fn create_by_push(client: &mut MetadataOpsClient<Channel>) {
let rdata = fs::read("./test.txt");
// String::from_utf8_lossy(&fs::read("address.txt").unwrap());
match rdata {
Ok(data) => {
println!("data = {}", std::str::from_utf8(&data).unwrap());
let req_data = PushCreateReq {
sendtype: 1,
tcode: "hello".to_string(),
metadata: Vec::new(),
};
// let messages = vec![req_data.clone(), req_data];
let b = tokio_stream::iter(1..3).map(|i| PullCreateReq {
t_code: format!("t_code:{:02}", i),
sendtype: i as i32,
});
// client.record_route(Request::new(stream::iter(messages.clone())));
let respone = client
.create_by_push(b as impl Stream<Item = PullCreateReq>)
.await
.unwrap();
}
Err(e) => println!("d={:?}", e),
}
// let respone = client.create_by_push()
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = MetadataOpsClient::connect("http://192.168.110.183:50050")
// let mut client = MetadataOpsClient::connect("http://192.168.110.183:50050")
.await
.unwrap();
println!("start bidirectional stream echo ");
// create_by_pull(&mut client, Duration::from_micros(1)).await;
create_by_push(&mut client).await;
Ok(())
}
pub mod registry {
tonic::include_proto!("bky.registry.api");
}
use futures::Stream;
use registry::{PullCreateReq, PullCreateResp, PushCreateReq, PushCreateResp};
use std::{error::Error, io::ErrorKind, net::ToSocketAddrs, pin::Pin, result, time::Duration};
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tonic::{transport::Server, Request, Response, Status, Streaming};
type PushCreateResult<T> = Result<Response<T>, Status>;
#[derive(Debug)]
pub struct RegistryServer {}
type RegistryResult<T> = Result<Response<T>, Status>;
type ResponseStream = Pin<Box<dyn Stream<Item = Result<PullCreateResp, Status>> + Send>>;
fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
let mut err: &(dyn Error + 'static) = err_status;
loop {
if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
return Some(io_err);
}
// h2::Error do not expose std::io::Error with `source()`
// https://github.com/hyperium/h2/pull/462
if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
if let Some(io_err) = h2_err.get_io() {
return Some(io_err);
}
}
err = match err.source() {
Some(err) => err,
None => return None,
};
}
}
#[tonic::async_trait]
impl registry::metadata_ops_server::MetadataOps for RegistryServer {
type CreateByPullStream = ResponseStream;
async fn create_by_pull(
&self,
req: Request<Streaming<PullCreateReq>>,
) -> RegistryResult<Self::CreateByPullStream> {
println!("create_by_pull ");
println!("req={:?}", req);
let mut in_stream = req.into_inner();
println!("in_stream={:?}", in_stream);
let (tx, rx) = mpsc::channel(128);
tokio::spawn(async move {
println!("tokio spawn..");
while let Some(result) = in_stream.next().await {
println!("result = {:?}", result);
match result {
Ok(v) => tx
.send(Ok(PullCreateResp {
t_code: v.t_code,
ret_code: 0,
}))
.await
.expect("working rx"),
Err(err) => {
if let Some(io_err) = match_for_io_error(&err) {
if io_err.kind() == ErrorKind::BrokenPipe {
eprintln!("\tclient disconnected: broken pipe");
break;
}
}
match tx.send(Err(err)).await {
Ok(_) => (),
Err(_err) => break,
}
}
}
println!("\tstream ended");
}
});
let out_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(out_stream)))
}
//======================================
type CreateByPushStream = Pin<Box<dyn Stream<Item = Result<PushCreateResp, Status>> + Send>>;
async fn create_by_push(
&self,
req: Request<Streaming<PushCreateReq>>,
) -> RegistryResult<Self::CreateByPushStream> {
println!("create_by_pull ");
println!("req={:?}", req);
let mut in_stream = req.into_inner();
println!("in_stream={:?}", in_stream);
let (tx, rx) = mpsc::channel(128);
tokio::spawn(async move {
println!("tokio spawn..");
while let Some(result) = in_stream.next().await {
println!("result = {:?}", result);
match result {
Ok(v) => tx
.send(Ok(PushCreateResp {
tcode: v.tcode,
ret_code: 0,
}))
.await
.expect("working rx"),
Err(err) => {
if let Some(io_err) = match_for_io_error(&err) {
if io_err.kind() == ErrorKind::BrokenPipe {
eprintln!("\tclient disconnected: broken pipe");
break;
}
}
match tx.send(Err(err)).await {
Ok(_) => (),
Err(_err) => break,
}
}
}
println!("\tstream ended");
}
});
let out_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(out_stream)))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("main...");
let server = RegistryServer {};
Server::builder()
.add_service(registry::metadata_ops_server::MetadataOpsServer::new(
server,
))
.serve(
"192.168.110.183:50050"
.to_socket_addrs()
.unwrap()
.next()
.unwrap(),
)
.await
.unwrap();
Ok(())
}
use tokio_stream::{self as stream, StreamExt};
#[tokio::main]
async fn main() {
let mut stream = stream::iter(vec![0, 1, 2]);
while let Some(value) = stream.next().await {
println!("Got {}", value);
}
}
// use std::pin::Pin;
// use tokio::sync::mpsc;
// use tokio_stream::{Stream, StreamExt, StreamMap};
// #[tokio::main]
// async fn main() {
// let (tx1, mut rx1) = mpsc::channel::<usize>(10);
// let (tx2, mut rx2) = mpsc::channel::<usize>(10);
// // Convert the channels to a `Stream`.
// let rx1 = Box::pin(async_stream::stream! {
// while let Some(item) = rx1.recv().await {
// yield item;
// }
// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
// let rx2 = Box::pin(async_stream::stream! {
// while let Some(item) = rx2.recv().await {
// yield item;
// }
// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
// tokio::spawn(async move {
// tx1.send(1).await.unwrap();
// // This value will never be received. The send may or may not return
// // `Err` depending on if the remote end closed first or not.
// let _ = tx1.send(2).await;
// });
// tokio::spawn(async move {
// tx2.send(3).await.unwrap();
// let _ = tx2.send(4).await;
// });
// let mut map = StreamMap::new();
// // Insert both streams
// map.insert("one", rx1);
// map.insert("two", rx2);
// // Read twice
// for _ in 0..2 {
// let (key, val) = map.next().await.unwrap();
// if key == "one" {
// assert_eq!(val, 1);
// } else {
// assert_eq!(val, 3);
// }
// // Remove the stream to prevent reading the next value
// map.remove(key);
// }
// }
"\nfe\nfe\n":"33206|0|1000|1000|1653640181|1653640181|1653640181|795538473|795538473|795538473|1|0"|"/good""16895|0|1000|1000|1653640372|1653640372|1653640372|154554408|154554408|154554408|2|0"
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment