Commit ba0245c0 authored by fangzongwu's avatar fangzongwu

add CRUD function

parent fd9806de
...@@ -9,25 +9,31 @@ edition = "2021" ...@@ -9,25 +9,31 @@ edition = "2021"
name = "server" name = "server"
path = "src/server.rs" path = "src/server.rs"
[[bin]] # Bin to run the HelloWorld gRPC server
name = "client"
path = "src/client.rs"
[[bin]] # Bin to run the HelloWorld gRPC client [[bin]] # Bin to run the HelloWorld gRPC client
name = "client-push" name = "test-client"
path = "src/client-push.rs" path = "src/test-client.rs"
# [[bin]] # Bin to run the HelloWorld gRPC client
# name = "client-pull"
# path = "src/client-pull.rs"
[[bin]] # Bin to run the HelloWorld gRPC client [[bin]] # Bin to run the HelloWorld gRPC client
name = "test" name = "test-server"
path = "src/test.rs" path = "src/test-server.rs"
# [[bin]] # Bin to run the HelloWorld gRPC client
# name = "test"
# path = "src/test.rs"
[dependencies] [dependencies]
tonic = "0.7" tonic = "0.7"
prost = "0.10" prost = "0.10"
tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs", "macros", "net",] } tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs", "macros", "net",] }
tokio-stream = { version = "0.1", features = ["net"] } tokio-stream = { version = "0.1", features = ["net"] }
rand = "0.8.4"
# futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures = { version = "0.3" } futures = { version = "0.3" }
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
//client向server拉取数据请求创建文件或目录 //client向server拉取数据请求创建文件或目录
message PullCreateReq { message PullCreateReq {
//0: 客户端向服务端发送请求; 1: 服务端向客户端发送请求; 其他:未定义行为 //0: 客户端向服务端发送请求; 1: 服务端向客户端发送请求; 其他:未定义行为
int32 sendtype = 1; int32 send_type = 1;
//Tenent及用户各使用16bits标识码,称为tcode; //Tenent及用户各使用16bits标识码,称为tcode;
//在rocksdb里面每个key都会该信息,可以把这部分信息抽离出来,再在数据接收端进行key值的拼接,以减少网络层数据传输的压力 //在rocksdb里面每个key都会该信息,可以把这部分信息抽离出来,再在数据接收端进行key值的拼接,以减少网络层数据传输的压力
string tcode = 2; string tcode = 2;
...@@ -30,7 +30,7 @@ ...@@ -30,7 +30,7 @@
//client向server推送数据请求创建文件或目录 //client向server推送数据请求创建文件或目录
message PushCreateReq { message PushCreateReq {
//0: 客户端向服务端发送请求; 1: 服务端向客户端发送请求; 其他:未定义行为 //0: 客户端向服务端发送请求; 1: 服务端向客户端发送请求; 其他:未定义行为
int32 sendtype = 1; int32 send_type = 1;
//Tenent及用户各使用16bits标识码,称为tcode; //Tenent及用户各使用16bits标识码,称为tcode;
string tcode = 2; string tcode = 2;
repeated Metadata metadata = 3; repeated Metadata metadata = 3;
...@@ -38,10 +38,44 @@ ...@@ -38,10 +38,44 @@
message PushCreateResp { message PushCreateResp {
string tcode = 1; string tcode = 1;
//数据推送是否更新成功 //数据推送是否更新成功
int32 retCode = 2; int32 ret_code = 2;
} }
message WatchUpdateReq{
//文件路径,对应rocksdb的key值
message AbsPath{
string k = 1;
}
//删除
message RemoveReq{
string tcode = 1;
repeated AbsPath path = 2;
}
message RemoveResp{
string tcode = 1;
int32 retCode = 2;
}
// 更新
message UpdateReq{
string tcode = 1;
repeated Metadata md = 2;
}
message UpdateResp{
string tcode = 1;
int32 retCode = 2;
}
//目录查询
message QueryDirdentsReq{
string tcode = 1;
repeated AbsPath path= 2;
}
message QueryDirdentsResp{
string tcode = 1;
repeated Metadata md = 2;
}
message WatchUpdateReq{
int32 clientId = 1; int32 clientId = 1;
string tcode = 2; string tcode = 2;
} }
...@@ -50,45 +84,15 @@ message WatchUpdateResp{ ...@@ -50,45 +84,15 @@ message WatchUpdateResp{
int32 clientId = 1; int32 clientId = 1;
string notifyMsg = 2; string notifyMsg = 2;
} }
service MetadataOps {
rpc CreateByPull( PullCreateReq) returns ( PullCreateResp) {}
rpc CreateByPush( PushCreateReq) returns ( PushCreateResp) {}
rpc Remove( RemoveReq) returns( RemoveResp){}
rpc Update( UpdateReq) returns( UpdateResp){}
rpc QueryDirdents( QueryDirdentsReq) returns( QueryDirdentsResp){}
rpc WatchUpdate( WatchUpdateReq) returns (stream WatchUpdateResp){}
}
// //文件路径,对应rocksdb的key值
// message AbsPath{
// string k = 1;
// }
// //删除
// message RemoveReq{
// string tcode = 1;
// repeated AbsPath path = 2;
// }
// message RemoveResp{
// string tcode = 1;
// int32 retCode = 2;
// }
//更新
// message UpdateReq{
// string tcode = 1;
// repeated Metadata md = 2;
// }
// message UpdateResp{
// string tcode = 1;
// int32 retCode = 2;
// }
// //目录查询
// message QueryDirdentsReq{
// string tcode = 1;
// repeated AbsPath path= 2;
// }
// message QueryDirdentsResp{
// string tcode = 1;
// repeated Metadata md = 2;
// }
service MetadataOps {
rpc CreateByPull(stream PullCreateReq) returns (stream PullCreateResp) {}
rpc CreateByPush(stream PushCreateReq) returns (stream PushCreateResp) {}
rpc WatchUpdate(stream WatchUpdateReq) returns (stream WatchUpdateResp){}
// rpc Remove(stream RemoveReq) returns(stream RemoveResp){}
// rpc Update(stream UpdateReq) returns(stream UpdateResp){}
// rpc QueryDirdents(stream QueryDirdentsReq) returns(stream QueryDirdentsResp){}
}
\ No newline at end of file
pub mod registry {
tonic::include_proto!("bky.registry.api");
}
use futures::stream::Stream;
use std::time::Duration;
use tokio_stream::StreamExt;
use tonic::{codegen::http::response, server::ClientStreamingService, transport::Channel};
use registry::{metadata_ops_client::MetadataOpsClient, Metadata, PullCreateReq};
fn generate_request() -> impl Stream<Item = PullCreateReq> {
tokio_stream::iter(1..3).map(|i| PullCreateReq {
tcode: format!("tcode:{:02}", i),
sendtype: i as i32,
})
}
fn my_request() -> impl Stream<Item = PushCreateReq> {
let rdata = fs::read("./test.txt");
// String::from_utf8_lossy(&fs::read("address.txt").unwrap());
let mut Send_metadata = vec![];
match rdata {
Ok(data) => {
let c_data = std::str::from_utf8(&data).unwrap();
// println!("data = {}", c_data.clone());
let d = Metadata {
kv: c_data.to_string(),
};
Send_metadata.push(d);
return tokio_stream::iter(1..2).map(move |i| PushCreateReq {
tcode: "my_tcode".to_string(),
sendtype: 1,
metadata: Send_metadata.clone(),
});
}
Err(e) => {
panic!("dfdf");
// println!("d={:?}", e);
}
}
}
fn pull_request() -> impl Stream<Item = PullCreateReq> {
tokio_stream::iter(1..2).map(|i| PullCreateReq {
tcode: format!("tcode:{:02}", i),
sendtype: i as i32,
})
}
async fn create_by_pull(client: &mut MetadataOpsClient<Channel>) {
// let in_stream = generate_request().throttle(dur);
let in_stream = pull_request();
let respone = client.create_by_pull(in_stream).await.unwrap();
let mut resp_stream = respone.into_inner();
while let Some(received) = resp_stream.next().await {
let received = received.unwrap();
println!("received :{:?}", received.md[0].kv);
}
// let in_stream_2 = generate_request();
// let respone = client.create_by_pull(in_stream_2).await.unwrap();
// let mut resp_stream = respone.into_inner();
// while let Some(received) = resp_stream.next().await {
// let received = received.unwrap();
// println!("received :{}", received.tcode);
// }
}
use std::fs;
use crate::registry::PushCreateReq;
async fn create_by_push(client: &mut MetadataOpsClient<Channel>) {
// let rdata = fs::read("./test.txt");
// // String::from_utf8_lossy(&fs::read("address.txt").unwrap());
// match rdata {
// Ok(data) => {
// println!("data = {}", std::str::from_utf8(&data).unwrap());
let in_stream = my_request();
let respone = client.create_by_push(in_stream).await.unwrap();
let mut resp_stream = respone.into_inner();
while let Some(received) = resp_stream.next().await {
let received = received.unwrap();
println!("received: {}", received.tcode);
}
// let req_data = PushCreateReq {
// sendtype: 1,
// tcode: "hello".to_string(),
// metadata: Vec::new(),
// };
// let b = tokio_stream::iter(1..3).map(|i| PullCreateReq {
// tcode: format!("tcode:{:02}", i),
// sendtype: i as i32,
// });
// // client.record_route(Request::new(stream::iter(messages.clone())));
// let respone = client
// .create_by_push(b as impl Stream<Item = PullCreateReq>)
// .await
// .unwrap();
// }
// Err(e) => println!("d={:?}", e),
// }
// let respone = client.create_by_push()
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = MetadataOpsClient::connect("http://192.168.110.183:50050")
// let mut client = MetadataOpsClient::connect("http://192.168.110.183:50050")
.await
.unwrap();
println!("start bidirectional stream echo ");
create_by_pull(&mut client).await;
// create_by_push(&mut client).await;
Ok(())
}
pub mod registry {
tonic::include_proto!("bky.registry.api");
}
use futures::{executor::block_on, stream::Stream};
use registry::{metadata_ops_client::MetadataOpsClient, Metadata, PullCreateReq, WatchUpdateReq};
use std::time::Duration;
use tokio::runtime::Builder;
use tokio::{self, runtime::Runtime, task, time};
use tokio_stream::StreamExt;
use tonic::{codegen::http::response, server::ClientStreamingService, transport::Channel};
fn generate_request() -> impl Stream<Item = PullCreateReq> {
tokio_stream::iter(1..3).map(|i| PullCreateReq {
tcode: format!("tcode:{:02}", i),
sendtype: i as i32,
})
}
fn my_request() -> impl Stream<Item = PushCreateReq> {
let rdata = fs::read("./test.txt");
// String::from_utf8_lossy(&fs::read("address.txt").unwrap());
let mut Send_metadata = vec![];
match rdata {
Ok(data) => {
let c_data = std::str::from_utf8(&data).unwrap();
// println!("data = {}", c_data.clone());
let d = Metadata {
kv: c_data.to_string(),
};
Send_metadata.push(d);
return tokio_stream::iter(1..2).map(move |i| PushCreateReq {
tcode: "my_tcode".to_string(),
sendtype: 1,
metadata: Send_metadata.clone(),
});
}
Err(e) => {
println!("d={:?}", e);
panic!("dfdf");
}
}
}
fn watch_update_request() -> impl Stream<Item = WatchUpdateReq> {
tokio_stream::iter(1..2).map(move |i| WatchUpdateReq {
tcode: "my_tcode".to_string(),
client_id: 2,
})
}
async fn watch_update(client: &mut MetadataOpsClient<Channel>) {
let in_stream = watch_update_request();
let respone = client.watch_update(in_stream).await.unwrap();
let mut resp_stream = respone.into_inner();
while let Some(received) = resp_stream.next().await {
let received = received.unwrap();
println!("received:{:?}", received);
}
}
async fn create_by_pull(client: &mut MetadataOpsClient<Channel>) {
// let in_stream = generate_request().throttle(dur);
let in_stream = generate_request();
let respone = client.create_by_pull(in_stream).await.unwrap();
let mut resp_stream = respone.into_inner();
while let Some(received) = resp_stream.next().await {
let received = received.unwrap();
println!("received :{}", received.tcode);
}
let in_stream_2 = generate_request();
let respone = client.create_by_pull(in_stream_2).await.unwrap();
let mut resp_stream = respone.into_inner();
while let Some(received) = resp_stream.next().await {
let received = received.unwrap();
println!("received :{}", received.tcode);
}
}
use std::fs;
use crate::registry::PushCreateReq;
async fn create_by_push(client: &mut MetadataOpsClient<Channel>) {
// let rdata = fs::read("./test.txt");
// // String::from_utf8_lossy(&fs::read("address.txt").unwrap());
// match rdata {
// Ok(data) => {
// println!("data = {}", std::str::from_utf8(&data).unwrap());
let in_stream = my_request();
let respone = client.create_by_push(in_stream).await.unwrap();
let mut resp_stream = respone.into_inner();
while let Some(received) = resp_stream.next().await {
let received = received.unwrap();
println!("received: {:?}", received);
}
// let req_data = PushCreateReq {
// sendtype: 1,
// tcode: "hello".to_string(),
// metadata: Vec::new(),
// };
// let b = tokio_stream::iter(1..3).map(|i| PullCreateReq {
// tcode: format!("tcode:{:02}", i),
// sendtype: i as i32,
// });
// // client.record_route(Request::new(stream::iter(messages.clone())));
// let respone = client
// .create_by_push(b as impl Stream<Item = PullCreateReq>)
// .await
// .unwrap();
// }
// Err(e) => println!("d={:?}", e),
// }
// let respone = client.create_by_push()
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let rt = Runtime::new().unwrap();
let guard1 = rt.enter();
task::spawn(async {
let mut client2 = MetadataOpsClient::connect("http://192.168.110.183:50050")
.await
.unwrap();
create_by_push(&mut client2).await;
println!("create_by_push done");
});
drop(guard1);
loop {
let guard2 = rt.enter();
task::spawn(async {
let mut client = MetadataOpsClient::connect("http://192.168.110.183:50050")
.await
.unwrap();
// watch_update(&mut client).await;
let in_stream = watch_update_request();
let respone = client.watch_update(in_stream).await.unwrap();
let mut resp_stream = respone.into_inner();
while let Some(received) = resp_stream.next().await {
let received = received.unwrap();
println!("received:{:?}", received);
}
println!("watch_update done");
});
drop(guard2);
}
// Ok(())
}
pub mod pb {
tonic::include_proto!("bky.registry.api");
}
use pb::{
metadata_ops_client::MetadataOpsClient, PullCreateReq, PullCreateResp, PushCreateReq,
PushCreateResp, QueryDirdentsReq, QueryDirdentsResp, RemoveReq, RemoveResp, UpdateReq,
UpdateResp, WatchUpdateReq, WatchUpdateResp,
};
use tonic::transport::Channel;
async fn create_by_pull(client: &mut MetadataOpsClient<Channel>) {
let req = PullCreateReq {
send_type: 1,
tcode: 1.to_string(),
};
let resp = client.create_by_pull(req).await.unwrap().into_inner();
println!("resp={:?}", resp);
}
async fn create_by_push(client: &mut MetadataOpsClient<Channel>) {
let req = PullCreateReq {
send_type: 1,
tcode: 1.to_string(),
};
let resp = client.create_by_pull(req).await.unwrap().into_inner();
println!("resp={:?}", resp);
}
async fn remove(client: &mut MetadataOpsClient<Channel>) {
let req = PullCreateReq {
send_type: 1,
tcode: 1.to_string(),
};
let resp = client.create_by_pull(req).await.unwrap().into_inner();
println!("resp={:?}", resp);
}
async fn update(client: &mut MetadataOpsClient<Channel>) {
let req = PullCreateReq {
send_type: 1,
tcode: 1.to_string(),
};
let resp = client.create_by_pull(req).await.unwrap().into_inner();
println!("resp={:?}", resp);
}
async fn query_dirdents(client: &mut MetadataOpsClient<Channel>) {
let req = PullCreateReq {
send_type: 1,
tcode: 1.to_string(),
};
let resp = client.create_by_pull(req).await.unwrap().into_inner();
println!("resp={:?}", resp);
}
async fn watch_update(client: &mut MetadataOpsClient<Channel>) {
let req = PullCreateReq {
send_type: 1,
tcode: 1.to_string(),
};
let resp = client.create_by_pull(req).await.unwrap().into_inner();
println!("resp={:?}", resp);
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = MetadataOpsClient::connect("http://[::1]:50051")
.await
.unwrap();
create_by_pull(&mut client).await;
create_by_push(&mut client).await;
remove(&mut client).await;
update(&mut client).await;
query_dirdents(&mut client).await;
watch_update(&mut client).await;
println!("fdfd");
Ok(())
}
This diff is collapsed.
pub mod pb {
tonic::include_proto!("bky.registry.api");
}
use pb::{
metadata_ops_client::MetadataOpsClient, PullCreateReq, PullCreateResp, PushCreateReq,
PushCreateResp, QueryDirdentsReq, QueryDirdentsResp, RemoveReq, RemoveResp, UpdateReq,
UpdateResp, WatchUpdateReq, WatchUpdateResp,
};
use tonic::transport::Channel;
async fn create_by_pull(client: &mut MetadataOpsClient<Channel>) {
let req = PullCreateReq {
send_type: 1,
tcode: 1.to_string(),
};
let resp = client.create_by_pull(req).await.unwrap().into_inner();
println!("resp={:?}", resp);
}
async fn create_by_push(client: &mut MetadataOpsClient<Channel>) {
let req = PullCreateReq {
send_type: 1,
tcode: 1.to_string(),
};
let resp = client.create_by_pull(req).await.unwrap().into_inner();
println!("resp={:?}", resp);
}
async fn remove(client: &mut MetadataOpsClient<Channel>) {
let req = PullCreateReq {
send_type: 1,
tcode: 1.to_string(),
};
let resp = client.create_by_pull(req).await.unwrap().into_inner();
println!("resp={:?}", resp);
}
async fn update(client: &mut MetadataOpsClient<Channel>) {
let req = PullCreateReq {
send_type: 1,
tcode: 1.to_string(),
};
let resp = client.create_by_pull(req).await.unwrap().into_inner();
println!("resp={:?}", resp);
}
async fn query_dirdents(client: &mut MetadataOpsClient<Channel>) {
let req = PullCreateReq {
send_type: 1,
tcode: 1.to_string(),
};
let resp = client.create_by_pull(req).await.unwrap().into_inner();
println!("resp={:?}", resp);
}
async fn watch_update(client: &mut MetadataOpsClient<Channel>) {
let req = PullCreateReq {
send_type: 1,
tcode: 1.to_string(),
};
let resp = client.create_by_pull(req).await.unwrap().into_inner();
println!("resp={:?}", resp);
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = MetadataOpsClient::connect("http://[::1]:50051")
.await
.unwrap();
create_by_pull(&mut client).await;
create_by_push(&mut client).await;
remove(&mut client).await;
update(&mut client).await;
query_dirdents(&mut client).await;
watch_update(&mut client).await;
println!("fdfd");
Ok(())
}
pub mod pb {
tonic::include_proto!("bky.registry.api");
}
use std::{error::Error, io::ErrorKind, net::ToSocketAddrs, pin::Pin, time::Duration};
use futures::Stream;
use pb::{
Metadata, PullCreateReq, PullCreateResp, PushCreateReq, PushCreateResp, QueryDirdentsReq,
QueryDirdentsResp, RemoveReq, RemoveResp, UpdateReq, UpdateResp, WatchUpdateReq,
WatchUpdateResp,
};
use tonic::{transport::Server, Request, Response, Status, Streaming};
#[derive(Debug)]
pub struct MetadataOpsServer {}
type MetadataResult<T> = Result<Response<T>, Status>;
type ResponseStream = Pin<Box<dyn Stream<Item = Result<WatchUpdateResp, Status>> + Send>>;
#[tonic::async_trait]
impl pb::metadata_ops_server::MetadataOps for MetadataOpsServer {
async fn create_by_pull(&self, _: Request<PullCreateReq>) -> MetadataResult<PullCreateResp> {
// Err(Status::unimplemented("..."))
let md: Metadata = Metadata {
kv: "hello".to_string(),
};
let mut mdv = vec![];
mdv.push(md);
let reply = PullCreateResp {
ret_code: 1,
tcode: 1.to_string(),
md: mdv,
};
Ok(Response::new(reply))
}
async fn create_by_push(&self, _: Request<PushCreateReq>) -> MetadataResult<PushCreateResp> {
Err(Status::unimplemented("..."))
}
async fn remove(&self, _: Request<RemoveReq>) -> MetadataResult<RemoveResp> {
Err(Status::unimplemented("..."))
}
async fn update(&self, _: Request<UpdateReq>) -> MetadataResult<UpdateResp> {
Err(Status::unimplemented("..."))
}
async fn query_dirdents(
&self,
_: Request<QueryDirdentsReq>,
) -> MetadataResult<QueryDirdentsResp> {
Err(Status::unimplemented("..."))
}
type WatchUpdateStream = ResponseStream;
async fn watch_update(
&self,
_: Request<WatchUpdateReq>,
) -> MetadataResult<Self::WatchUpdateStream> {
Err(Status::unimplemented("..."))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let server = MetadataOpsServer {};
Server::builder()
.add_service(pb::metadata_ops_server::MetadataOpsServer::new(server))
.serve("[::1]:50051".to_socket_addrs().unwrap().next().unwrap())
.await
.unwrap();
println!("fdfd");
Ok(())
}
...@@ -8,59 +8,3 @@ async fn main() { ...@@ -8,59 +8,3 @@ async fn main() {
println!("Got {}", value); println!("Got {}", value);
} }
} }
// use std::pin::Pin;
// use tokio::sync::mpsc;
// use tokio_stream::{Stream, StreamExt, StreamMap};
// #[tokio::main]
// async fn main() {
// let (tx1, mut rx1) = mpsc::channel::<usize>(10);
// let (tx2, mut rx2) = mpsc::channel::<usize>(10);
// // Convert the channels to a `Stream`.
// let rx1 = Box::pin(async_stream::stream! {
// while let Some(item) = rx1.recv().await {
// yield item;
// }
// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
// let rx2 = Box::pin(async_stream::stream! {
// while let Some(item) = rx2.recv().await {
// yield item;
// }
// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
// tokio::spawn(async move {
// tx1.send(1).await.unwrap();
// // This value will never be received. The send may or may not return
// // `Err` depending on if the remote end closed first or not.
// let _ = tx1.send(2).await;
// });
// tokio::spawn(async move {
// tx2.send(3).await.unwrap();
// let _ = tx2.send(4).await;
// });
// let mut map = StreamMap::new();
// // Insert both streams
// map.insert("one", rx1);
// map.insert("two", rx2);
// // Read twice
// for _ in 0..2 {
// let (key, val) = map.next().await.unwrap();
// if key == "one" {
// assert_eq!(val, 1);
// } else {
// assert_eq!(val, 3);
// }
// // Remove the stream to prevent reading the next value
// map.remove(key);
// }
// }
This diff is collapsed.
"\nfe\nfe\n":"33206|0|1000|1000|1653640181|1653640181|1653640181|795538473|795538473|795538473|1|0"|"/good""16895|0|1000|1000|1653640372|1653640372|1653640372|154554408|154554408|154554408|2|0"
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment