Reputation: 109
I need to implement a distributed database and I implemented based on gRPC example. The following is my code snippet.
sundial_sync_server.h
#ifndef SUNDIAL_GRPC_SYNC_SERVER_H
#define SUNDIAL_GRPC_SYNC_SERVER_H
#endif //SUNDIAL_GRPC_SYNC_SERVER_H
#include "sundial_grpc.grpc.pb.h"
#include "sundial_grpc.pb.h"
#include <iostream>
#include <memory>
#include <string>
#include <grpcpp/grpcpp.h>
using grpc::Channel;
using grpc::ServerContext;
using grpc::Status;
using sundial_rpc::SundialRequest;
using sundial_rpc::SundialResponse;
using sundial_rpc::Sundial_GRPC_SYNC;
#ifndef ABC
#define ABC
class SundialServiceImp final : public Sundial_GRPC_SYNC::Service
{
public:
Status contactRemote(::grpc::ServerContext* context, const ::sundial_rpc::SundialRequest* request, ::sundial_rpc::SundialResponse* response) override;
void run();
};
#endif
sundial_sync_server.cpp
#include "sundial_grpc.grpc.pb.h"
#include "sundial_grpc.pb.h"
#include <iostream>
#include <memory>
#include <string>
#include <grpcpp/grpcpp.h>
#include "txn.h"
#include "global.h"
#include "manager.h"
#include "stats.h"
#include "helper.h"
#include "grpc_sync_server.h"
#include "txn_table.h"
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
#include <grpcpp/ext/proto_server_reflection_plugin.h>
using grpc::ServerContext;
using grpc::Status;
using sundial_rpc::SundialRequest;
using sundial_rpc::SundialResponse;
using sundial_rpc::Sundial_GRPC_SYNC;
using grpc::Server;
using grpc::ServerBuilder;
Status SundialServiceImp::contactRemote(::grpc::ServerContext* context, const ::sundial_rpc::SundialRequest* request, ::sundial_rpc::SundialResponse* response){
if (request->request_type() == SundialRequest::SYS_REQ) {
glob_manager->receive_sync_request();
response->set_response_type( SundialResponse::SYS_RESP );
return Status::OK;
}
uint64_t txn_id = request->txn_id();
TxnManager * txn_man = txn_table->get_txn(txn_id);
// If no TxnManager exists for the requesting transaction, create one.
if (txn_man == NULL) {
//printf("adding txnID=%ld into txn_table\n", txn_id);
assert( request->request_type() == SundialRequest::READ_REQ );
txn_man = new TxnManager();
txn_man->set_txn_id( txn_id );
txn_table->add_txn( txn_man );
}
// the transaction handles the RPC call
txn_man->process_remote_request(request, response);
// if the sub-transaction is no longer required, remove from txn_table
if (response->response_type() == SundialResponse::RESP_ABORT
|| response->response_type() == SundialResponse::PREPARED_OK_RO
|| response->response_type() == SundialResponse::PREPARED_ABORT
|| response->response_type() == SundialResponse::ACK) {
txn_table->remove_txn( txn_man );
delete txn_man;
}
return Status::OK;
}
void SundialServiceImp::run(){
std::istringstream in(ifconfig_string);
string line;
uint32_t num_nodes = 0;
string port;
while (getline (in, line)) {
if (line[0] == '#')
continue;
else {
if (num_nodes == g_node_id) {
port = line.substr(0, line.length());
break;
}
num_nodes ++;
}
}
port.append(sync_port);
grpc::EnableDefaultHealthCheckService(true);
grpc::reflection::InitProtoReflectionServerBuilderPlugin();
ServerBuilder builder;
builder.AddListeningPort(port, grpc::InsecureServerCredentials());
builder.RegisterService(this);
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << port << std::endl;
server->Wait();
}
`sundial_sync_client.h'
#endif //SUNDIAL_GRPC_CLIENT_H
#include "sundial_grpc.grpc.pb.h"
#include "sundial_grpc.pb.h"
#include <iostream>
#include <memory>
#include <string>
#include <grpcpp/grpcpp.h>
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
using sundial_rpc::SundialRequest;
using sundial_rpc::SundialResponse;
using sundial_rpc::Sundial_GRPC_SYNC;
//toDo: now assume we only have 2 nodes
#ifndef SSC
#define SSC
class Sundial_Sync_Client{
public:
Sundial_Sync_Client(std::shared_ptr<Channel>* channel);
Status contactRemote(uint64_t node_id,SundialRequest& request, SundialResponse* response);
private:
std::unique_ptr<Sundial_GRPC_SYNC::Stub> stub_;
};
#endif
sundial_sync_client.cpp
#include "sundial_grpc.grpc.pb.h"
#include "sundial_grpc.pb.h"
#include "grpc_sync_client.h"
#include <iostream>
#include <memory>
#include <string>
#include <grpcpp/grpcpp.h>
#include "txn.h"
#include "global.h"
#include "helper.h"
#include "manager.h"
#include "stats.h"
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
using sundial_rpc::SundialRequest;
using sundial_rpc::SundialResponse;
using sundial_rpc::Sundial_GRPC_SYNC;
//toDo: add more nodes to it
Sundial_Sync_Client::Sundial_Sync_Client(std::shared_ptr<Channel>* channel){
for(int i=0; i<g_num_nodes;i++){
if(i==g_node_id)
continue;
//stub_[i]=Sundial_GRPC_SYNC::NewStub(channel[i]);
stub_=Sundial_GRPC_SYNC::NewStub(channel[i]);
}
}
Status
Sundial_Sync_Client::contactRemote(uint64_t node_id, SundialRequest& request, SundialResponse* response){
//toDo: choose the right stub with node id
ClientContext context;
printf("Client sends request\n");
//Status status = stub_[node_id]->contactRemote(&context, request, &response);
Status status = stub_->contactRemote(&context, request, response);
if (status.ok()) {
//printf("status ok\n");
glob_stats->_stats[GET_THD_ID]->_resp_msg_count[ response->response_type() ] ++;
glob_stats->_stats[GET_THD_ID]->_resp_msg_size[ response->response_type() ] += response->SpaceUsedLong();
return status;
} else {
std::cout << status.error_code() << ": " << status.error_message()
<< std::endl;
return status;
}
}
It runs pretty smoothly at the setup. I did get the ip address and let servers run at the right address. However, when I make the client send request, the status it returns always has error code 12 without error message.
Upvotes: 3
Views: 8093
Reputation: 116
Error code 12 is "UNIMPLEMENTED". Check your proto and make sure your server is implementing the correct RPC method (correct upper/lower case for method name and things like that)
Upvotes: 5