/home/tarsproto/TestApp/HelloClient
cd /home/muten/module/TARS/TarsFramework/build
cmake .. -DCMAKE_BUILD_TYPE=debug
make -j8
make install
本文中的大量篇幅取自 中的内容,感谢TARS基金会!
本节主要关注异步调用逻辑,注意在2.1.5中调用的同步方法prx->testHello(sReq, sRsp)要改成prx->async_testHello.
在客户端程序中,一开始会执行下面的代码进行整个客户端代理的初始化:
Communicator comm;
HelloPrx prx;
comm.stringToProxy("TestApp.HelloServer.HelloObj@tcp -h 1.1.1.1 -p 20001" , prx);
先声明一个Communicator变量comm(其实不建议这么做)以及一个ServantProxy类的指针变量prx,在此
处,服务为Hello,因此声明一个HelloPrx prx。注意一个客户端只能拥有一个Communicator。为了能够获
得RPC的服务句柄,我们调用Communicator::stringToProxy(),并传入服务端的信息与prx变量,函数返
回后,prx就是RPC服务的句柄。
进入Communicator::stringToProxy()函数中,我们通过Communicator::getServantProxy()来依据
objectName与setName获取服务代理ServantProxy:
/**
* 生成代理
* @param T
* @param objectName
* @param setName 指定set调用的setid
* @param proxy
*/
template<class T> void stringToProxy(const string& objectName, T& proxy,const string& setName="")
{
ServantProxy * pServantProxy = getServantProxy(objectName,setName);
proxy = (typename T::element_type*)(pServantProxy);
}
进入Communicator::getServantProxy(),首先会执行Communicator::initialize()来初始化
Communicator,需要注意一点,Communicator:: initialize()只会被执行一次,下一次执行
Communicator::getServantProxy()将不会再次执行Communicator:: initialize()函数:
void Communicator::initialize()
{
TC_LockT<TC_ThreadRecMutex> lock(*this);
if (_initialized) //已经被初始化则直接返回
return;
......
}
进入Communicator::initialize()函数中,在这里,将会new出上文介绍的与Communicator密切相关的类
ServantProxyFactory与n个CommunicatorEpoll,n为客户端的网络线程数,最小为1,最大为
MAX_CLIENT_THREAD_NUM:
void Communicator::initialize()
{
......
_servantProxyFactory = new ServantProxyFactory(this);
......
for(size_t i = 0; i < _clientThreadNum; ++i)
{
_communicatorEpoll[i] = new CommunicatorEpoll(this, i);
_communicatorEpoll[i]->start(); //启动网络线程
}
......
}
在CommunicatorEpoll的构造函数中,ObjectProxyFactory被创建出来,这是构造图(1-2)关系的前提。
除此之外,还可以看到获取相应配置,创建并启动若干个异步回调后的处理线程。创建完成后,调用
CommunicatorEpoll::start()启动网络线程。至此,Communicator::initialize()顺利执行。
说明:本段摘自「TARS基金会」的原创文章
————————————————
版权声明:本文为CSDN博主「TARS基金会」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https:///TARSFoundation/article/details/107208534
进入ServantProxyFactory::getServantProxy(),首先会加锁,从map<string, ServantPrx>
_servantProxy中查找目标,若查找成功直接返回.若查找失败,TARS需要构造出相应的ServantProxy,
ServantProxy的构造需要如图(1-3)所示的相对应的ObjectProxy作为构造函数的参数,由此可见,在
ServantProxyFactory::getServantProxy()中有如下获取ObjectProxy指针数组的代码:
ObjectProxy ** ppObjectProxy = new ObjectProxy * [_comm->getClientThreadNum()];
assert(ppObjectProxy != NULL);
for(size_t i = 0; i < _comm->getClientThreadNum(); ++i)
{
ppObjectProxy[i] = _comm->getCommunicatorEpoll(i)->getObjectProxy(name, setName);
}
看看如何获取ObjectProxy:
代码来到ObjectProxyFactory::getObjectProxy(),同样,会首先加锁,再从map<string,ObjectProxy*>
_objectProxys中查找是否已经拥有目标ObjectProxy,若查找成功直接返回。若查找失败,需要新建一个新的
ObjectProxy.
通过类图可知,ObjectProxy需要一个CommunicatorEpoll对象进行初始化,由此关联管理自己
的CommunicatorEpoll,CommunicatorEpoll之后便可以通过getObjectProxy()接口获取属于自己的
ObjectProxy.
、
新建ObjectProxy的过程同样非常值得关注,在ObjectProxy::ObjectProxy()中,关键代码是:
_endpointManger.reset(new EndpointManager(this, _communicatorEpoll->getCommunicator(),
sObjectProxyName, pCommunicatorEpoll->isFirstNetThread(), setName));
用ObjectProxy构造中 _endpointManger.reset来实现ObjectProxy与AdapterProxy的关系的建立.
逻辑说明:
每个ObjectProxy都有属于自己的EndpointManager负责管理到服务端的所有socket连接AdapterProxy,每个
AdapterProxy连接到一个提供相应服务的服务端物理机socket上。通过EndpointManager还可以以不同的负载
均衡方式获取与服务器的socket连接AdapterProxy。
ObjectProxy:: ObjectProxy()是图(1-6)或者图(1-8)中的略1,具体的代码流程如图(1-9)所示。
ObjectProxy创建一个EndpointManager对象,在EndpointManager的初始化过程中,依据客户端提供的信息,
直接创建连接到服务端物理机的TCP/UDP连接AdapterProxy或者从代理中获取服务端物理机socket列表后再创
建TCP/UDP连接AdapterProxy。
————————————————
版权声明:本文为CSDN博主「TARS基金会」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https:///TARSFoundation/article/details/107208534
退出层层的函数调用栈,代码再次回 ServantProxyFactory::getServantProxy()--本文的2.1.2图,此时,
ServantProxyFactory已经获得相应的ObjectProxy数组ObjectProxy** ppObjectProxy,接着便可以调
用:
ServantPrx sp = new ServantProxy(_comm, ppObjectProxy, _comm->getClientThreadNum());
进行ServantProxy的构造。构造完成便可以呈现出如图(2-1)的关系。在ServantProxy的构造函数中可以看
到,ServantProxy在新建一个EndpointManagerThread变量,这是对外获取路由请求的类,是TARS为调用逻辑
而提供的多种解决跨地区调用等问题的方案。同时可以看到:
for(size_t i = 0;i < _objectProxyNum; ++i)
{
(*(_objectProxy + i))->setServantProxy(this);
}
建立了ServantProxy与ObjectProxy的相互关联关系. 剩下的是读取配置文件,获取相应的信息.
构造ServantProxy变量完成后,ServantProxyFactory::getServantProxy()获取一些超时参数,赋值给
ServantProxy变量,同时将其放进map<string, ServantPrx> _servantProxy中,方便下次直接查找获
取。
ServantProxyFactory::getServantProxy()将刚刚构造的ServantProxy指针变量返回给调用他的
Communicator::getServantProxy(),在Communicator::getServantProxy()中:
ServantProxy * Communicator::getServantProxy(const string& objectName,const string& setName)
{
……
return _servantProxyFactory->getServantProxy(objectName,setName);
}
直接将返回值返回给调用起Communicator::getServantProxy()的Communicator::stringToProxy()。可
以看到:
template<class T> void stringToProxy(const string& objectName, T& proxy,const string&
setName="")
{
ServantProxy * pServantProxy = getServantProxy(objectName,setName);
proxy = (typename T::element_type*)(pServantProxy);
}
Communicator::stringToProxy()将返回值强制转换为客户端代码中与HelloPrx prx同样的类型
HelloPrx。由于函数参数proxy就是prx的引用。那么实际就是将句柄prx成功初始化了,用户可以利用句柄
prx进行RPC调用了.
————————————————
版权声明:本段为CSDN博主「TARS基金会」的原创文章的摘选,遵循CC 4.0 BY-SA版权协议,转载请附上原
文出处链接及本声明。
原文链接:https:///TARSFoundation/article/details/107208534
当我们获得一个ServantProxy句柄后,便可以进行RPC调用了.Tars提供四种RPC调用方式,分别是同步调用,
异步调用,promise调用与协程调用. 其中最简单最常见的RPC调用方式是同步调用,接下来,将简单分析Tars
的同步调用.
现假设有一个MyDemo.StringServer.StringServantObj的服务,提供一个RPC接口是append,传入两个
string类型的变量,返回两个string类型变量的拼接结果。而且假设有两台服务器,socket标识分别是
192.168.106.129:34132与192.168.106.130:34132,设置客户端的网络线程数为3,那么执行如下代码:
Communicator _comm;
StringServantPrx _proxy;
_comm.stringToProxy("MyDemo.StringServer.StringServantObj@tcp -h 192.168.106.129 -p 34132",
_proxy);
_comm.stringToProxy("MyDemo.StringServer.StringServantObj@tcp -h 192.168.106.130 -p 34132",
_proxy);
————————————————
版权声明:本文为CSDN博主「TARS基金会」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https:///TARSFoundation/article/details/107208534
经过上文关于客户端初始化的分析介绍可知,可以得出如下图所示的关系图:
获取StringServantPrx _proxy后,直接调用:
string str1(abc-), str2(defg), rStr;
int retCode = _proxy->append(str1, str2, rStr);
成功进行RPC同步调用后,返回的结果是rStr = “abc-defg”. 同样,我们先看看与同步调用相关的类图,如下图所示:
StringServantProxy是继承自ServantProxy的,StringServantProxy提供了RPC同步调用的接口Int32 append(),当用户发起同步调用_proxy->append(str1, str2, rStr)时,
所进行的函数调用过程如下图所示:
在函数StringServantProxy::append()中,程序会先构造ServantProxy::tars_invoke()所需要的参数,如请求包类型,RPC方法名,方法参数等,需要值得注意的是,
传递参数中有一个ResponsePacket类型的变量,在同步调用中,最终的返回结果会放置在这个变量上。接下来便直接调用了ServantProxy::tars_invoke()方法:
tars_invoke(tars::TARSNORMAL, "append", _os.getByteBuffer(), context, _mStatus, rep);
在ServantProxy::tars_invoke()方法中,先创建一个ReqMessage变量msg,初始化msg变量,给变量赋值,如Tars版本号,数据包类型,服务名,RPC方法名,Tars的上下文容器,同步调用的超时时间(单位为毫秒)等。最后,调用ServantProxy::invoke()进行远程方法调用。
无论同步调用还是各种异步调用,ServantProxy::invoke()都是RPC调用的必经之地。在ServantProxy::invoke()中,继续填充传递进来的变量ReqMessage msg。此外,还需要获取调用者caller线程的线程私有数据ServantProxyThreadData,用来指导RPC调用。客户端的每个caller线程都有属于自己的维护调用上下文的线程私有数据,如hash属性,消息染色信息。最关键的还是每条caller线程与每条客户端网络线程CommunicatorEpoll进行信息交互的桥梁——通信队列ReqInfoQueue数组,数组中的每个ReqInfoQueue元素负责与一条网络线程进行交互,如图(1-13)所示,图中橙色阴影代表数组ReqInfoQueue[],阴影内的圆柱体代表数组元素ReqInfoQueue。假如客户端create两条线程(下称caller线程)发起StringServant服务的RPC请求,且客户端网络线程数设置为2,那么两条caller线程各自有属于自己的线程私有数据请求队列数组ReqInfoQueue[],数组里面的ReqInfoQueue元素便是该数组对应的caller线程与两条网络线程的通信桥梁,一条网络线程对应着数组里面的一个元素,通过网络线程ID进行数组索引。整个关系有点像生产者消费者模型,生产者Caller线程向自己的线程私有数据**ReqInfoQueue[]**中的第N个元素ReqInfoQueue[N] push请求包,消费者客户端第N个网络线程就会从这个队列中pop请求包。
阅读代码可能会发现几个常量值,如MAX_CLIENT_THREAD_NUM=64,这是最大网络线程数,在图(1-13)中为单个请求队列数组ReqInfoQueue[]的最大size;
MAX_CLIENT_NOTIFYEVENT_NUM=2048,在图(1-13)中,可以看作caller线程的最大数量,或者请求队列数组ReqInfoQueue[]的最大数量
(反正两者一一对应,每个caller线程都有自己的线程私有数据ReqInfoQueue[])。
接着依据caller线程的线程私有数据进行第一次的负载均衡——选取ObjectProxy(即选择网络线程CommunicatorEpoll)和与之相对应的ReqInfoQueue:
ObjectProxy * pObjProxy = NULL;
ReqInfoQueue * pReqQ = NULL;
//选择网络线程
selectNetThreadInfo(pSptd, pObjProxy, pReqQ);
在ServantProxy::selectNetThreadInfo()中,通过轮询的形式来选取ObjectProxy与ReqInfoQueue。
退出ServantProxy::selectNetThreadInfo()后,便得到ObjectProxy类型的pObjProxy及其对应的ReqInfoQueue类型的ReqInfoQueue,稍后通过pObjectProxy来发送RPC请求,请求信息会暂存在ReqInfoQueue中。
由于是同步调用,需要新建一个条件变量去监听RPC的完成,可见:
//同步调用 new 一个ReqMonitor
assert(msg->pMonitor == NULL);
if(msg->eType == ReqMessage::SYNC_CALL)
{
msg->bMonitorFin = false;
if(pSptd->_sched)
{
msg->bCoroFlag = true;
msg->sched = pSptd->_sched;
msg->iCoroId = pSptd->_sched->getCoroutineId();
}
else
{
msg->pMonitor = new ReqMonitor;
}
}
创建完条件变量,接下来往ReqInfoQueue中push_back()请求信息包msg,并通知pObjProxy所属的CommunicatorEpoll进行数据发送:
if(!pReqQ->push_back(msg,bEmpty))
{
TLOGERROR("[TARS][ServantProxy::invoke msgQueue push_back error num:" << pSptd->_netSeq << "]" << endl);
delete msg;
msg = NULL;
pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);
throw TarsClientQueueException("client queue full");
}
pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);
来到CommunicatorEpoll::notify()中,往请求事件通知数组NotifyInfo _notify[]中添加请求事件,通知CommunicatorEpoll进行请求包的发送。注意了,这个函数的作用仅仅是通知网络线程准备发送数据,通过TC_Epoller::mod()或者TC_Epoller::add()触发一个EPOLLIN事件,从而促使阻塞在TC_Epoller::wait()(在CommunicatorEpoll::run()中阻塞)的网络线程CommunicatorEpoll被唤醒,并设置唤醒后的epoll_event中的联合体epoll_data变量为&_notify[iSeq].stFDInfo:
void CommunicatorEpoll::notify(size_t iSeq,ReqInfoQueue * msgQueue)
{
assert(iSeq < MAX_CLIENT_NOTIFYEVENT_NUM);
if(_notify[iSeq].bValid)
{
_ep.mod(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN);
assert(_notify[iSeq].stFDInfo.p == (void*)msgQueue);
}
else
{
_notify[iSeq].stFDInfo.iType = FDInfo::ET_C_NOTIFY;
_notify[iSeq].stFDInfo.p = (void*)msgQueue;
_notify[iSeq].stFDInfo.fd = _notify[iSeq].eventFd;
_notify[iSeq].stFDInfo.iSeq = iSeq;
_notify[iSeq].notify.createSocket();
_notify[iSeq].bValid = true;
_ep.add(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN);
}
}
就是经过这么一个操作,网络线程就可以被唤醒,唤醒后通过epoll_event变量可获得&_notify[iSeq].stFDInfo。接下来的请求发送与响应的接收会在后面会详细介绍。
随后,代码再次回到ServantProxy::invoke(),阻塞于:
if(!msg->bMonitorFin)
{
TC_ThreadLock::Lock lock(*(msg->pMonitor));
//等待直到网络线程通知过来
if(!msg->bMonitorFin)
{
msg->pMonitor->wait();
}
}
等待网络线程接收到数据后,对其进行唤醒。
接收到响应后,检查是否成功获取响应,是则直接退出函数即可,响应信息在传入的参数msg中:
if(msg->eStatus == ReqMessage::REQ_RSP && msg->response.iRet == TARSSERVERSUCCESS)
{
snprintf(pSptd->_szHost, sizeof(pSptd->_szHost), "%s", msg->adapter->endpoint().desc().c_str());
//成功
return;
}
若接收失败,会抛出异常,并删除msg:
若接收成功,退出ServantProxy::invoke()后,回到ServantProxy::tars_invoke(),获取ResponsePacket类型的响应信息,并删除msg包:
rsp = msg->response;
delete msg;
msg = NULL;
代码回到StringServantProxy::append(),此时经过同步调用,可以直接获取RPC返回值并回到客户端中.
上面提到,当在ServantProxy::invoke()中,调用CommunicatorEpoll::notify()通知网络线程进行请求发送后,接下来,网络线程的具体执行流程如下图所示:
由于CommunicatorEpoll继承自TC_Thread,在上文1.2.2节中的第2小点的初始化CommunicatorEpoll之后便调用其CommunicatorEpoll::start()函数启动网络线程,
网络线程在CommunicatorEpoll::run()中一直等待_ep.wait(iTimeout)。由于在上一节的描述中,在CommunicatorEpoll::notify(),caller线程发起了通知notify,网络
线程在CommunicatorEpoll::run()就会调用CommunicatorEpoll::handle()处理通知:
void CommunicatorEpoll::run()
{
......
try
{
int iTimeout = ((_waitTimeout < _timeoutCheckInterval) ? _waitTimeout : _timeoutCheckInterval);
int num = _ep.wait(iTimeout);
if(_terminate)
{
break;
}
//先处理epoll的网络事件
for (int i = 0; i < num; ++i)
{
//获取epoll_event变量的data,就是1.3.1节中提过的&_notify[iSeq].stFDInfo
const epoll_event& ev = _ep.get(i);
uint64_t data = ev.data.u64;
if(data == 0)
{
continue; //data非指针, 退出循环
}
handle((FDInfo*)data, ev.events);
}
}
......
}
在CommunicatorEpoll::handle()中,通过传递进来的epoll_event中的data成员变量获取前面被选中的ObjectProxy并调用其ObjectProxy::invoke()函数:
void CommunicatorEpoll::handle(FDInfo * pFDInfo, uint32_t events)
{
try
{
assert(pFDInfo != NULL);
//队列有消息通知过来
if(FDInfo::ET_C_NOTIFY == pFDInfo->iType)
{
ReqInfoQueue * pInfoQueue=(ReqInfoQueue*)pFDInfo->p;
ReqMessage * msg = NULL;
try
{
while(pInfoQueue->pop_front(msg))
{
......
try
{
msg->pObjectProxy->invoke(msg);
}
......
}
}
......
}
......
}
在ObjectProxy::invoke()中将进行第二次的负载均衡,每个ObjectProxy通过EndpointManager可以以不同的负载均衡方式对AdapterProxy进行选取选择:
void ObjectProxy::invoke(ReqMessage * msg)
{
......
//选择一个远程服务的Adapter来调用
AdapterProxy * pAdapterProxy = NULL;
bool bFirst = _endpointManger->selectAdapterProxy(msg, pAdapterProxy);
......
}
在EndpointManager:: selectAdapterProxy()中,有多种负载均衡的方式来选取AdapterProxy,如getHashProxy(),getWeightedProxy(),getNextValidProxy()等。
获取AdapterProxy之后,便将选择到的AdapterProxy赋值给EndpointManager:: selectAdapterProxy()函数中的引用参数pAdapterProxy,随后执行:
void ObjectProxy::invoke(ReqMessage * msg)
{
......
msg->adapter = pAdapterProxy;
pAdapterProxy->invoke(msg);
}
调用pAdapterProxy将请求信息发送出去。而在AdapterProxy::invoke()中,AdapterProxy将调用Transceiver::sendRequset()进行请求的发送。
至此,对应同步调用的网络线程发送请求的工作就结束了,网络线程会回到CommunicatorEpoll::run()中,继续等待数据的收发。
当网络线程CommunicatorEpoll接收到响应数据之后,如同之前发送请求那样, 在CommunicatorEpoll::run()中,程序获取活跃的epoll_event的变量,并将其中的epoll_data_t data传递给CommunicatorEpoll::handle():
//先处理epoll的网络事件
for (int i = 0; i < num; ++i)
{
const epoll_event& ev = _ep.get(i);
uint64_t data = ev.data.u64;
if(data == 0)
{
continue; //data非指针, 退出循环
}
handle((FDInfo*)data, ev.events);
}
接下来的程序流程如下图所示:
在CommunicatorEpoll::handle()中,从epoll_data::data中获取Transceiver指针,并调用CommunicatorEpoll::handleInputImp():
Transceiver *pTransceiver = (Transceiver*)pFDInfo->p;
//先收包
if (events & EPOLLIN)
{
try
{
handleInputImp(pTransceiver);
}
catch(exception & e)
{
TLOGERROR("[TARS]CommunicatorEpoll::handle exp:"<<e.what()<<" ,line:"<<__LINE__<<endl);
}
catch(...)
{
TLOGERROR("[TARS]CommunicatorEpoll::handle|"<<__LINE__<<endl);
}
}
在CommunicatorEpoll::handleInputImp()中,除了对连接的判断外,主要做两件事,调用Transceiver::doResponse()以及AdapterProxy::finishInvoke(ResponsePacket&),前者的工作是从socket连接中获取响应数据并判断接收的数据是否为一个完整的RPC响应包。后者的作用是将响应结果返回给客户端,同步调用的会唤醒阻塞等待在条件变量中的caller线程,异步调用的会在异步回调处理线程中执行回调函数。
在AdapterProxy::finishInvoke(ResponsePacket&)中,需要注意一点,假如是同步调用的,需要获取响应包rsp对应的ReqMessage信息,在Tars中,执行:
ReqMessage * msg = NULL;
// 获取响应包rsp对应的msg信息,并在超时队列中剔除该msg
bool retErase = _timeoutQueue->erase(rsp.iRequestId, msg);
在找回响应包对应的请求信息msg的同时,将其在超时队列中剔除出来。接着执行:
msg->eStatus = ReqMessage::REQ_RSP;
msg->response = rsp;
finishInvoke(msg);
程序调用另一个重载函数AdapterProxy::finishInvoke(ReqMessage*),在AdapterProxy::finishInvoke(ReqMessage*)中,
不同的RPC调用方式会执行不同的动作,例如同步调用会唤醒对应的caller线程:
//同步调用,唤醒ServantProxy线程
if(msg->eType == ReqMessage::SYNC_CALL)
{
if(!msg->bCoroFlag)
{
assert(msg->pMonitor);
TC_ThreadLock::Lock sync(*(msg->pMonitor));
msg->pMonitor->notify();
msg->bMonitorFin = true;
}
else
{
msg->sched->put(msg->iCoroId);
}
return ;
}
至此,对应同步调用的网络线程接收响应的工作就结束了,网络线程会回到CommunicatorEpoll::run()中,继续等待数据的收发。
综上,客户端同步调用的过程如下图所示。
在Tars中,除了最常见的同步调用之外,还可以进行异步调用,异步调用可分三种:普通的异步调用,promise异步调用与协程异步调用,这里简单介绍普通的异步调用,看看其与上文介绍的同步调用有何异同。
异步调用不会阻塞整个客户端程序,调用完成(请求发送)之后,用户可以继续处理其他事情,等接收到响应之后,Tars会在异步处理线程当中执行用户实现好的回调函数。在这里,会用到《Effective C++》中条款35所介绍的“藉由Non-Virtual Interface手法实现Template Method模式”,用户需要继承一个XXXServantPrxCallback基类,并实现里面的虚函数,异步回调线程会在收到响应包之后回调这些虚函数,具体的异步调用客户端示例这里不作详细介绍,在Tars的Example中会找到相应的示例代码。
本文第一章已经详细介绍了客户端的初始化,这里再简单提一下,在第一章的“1.2.2初始化代码跟踪- 2.执行Communicator的初始化函数”中,已经提到说,在每一个网络线程CommunicatorEpoll的初始化过程中,会创建_asyncThreadNum条异步线程,等待异步调用的时候处理响应数据:
CommunicatorEpoll::CommunicatorEpoll(Communicator * pCommunicator,size_t netThreadSeq)
{
......
//异步线程数
_asyncThreadNum = TC_Common::strto<size_t>(pCommunicator->getProperty("asyncthread", "3"));
if(_asyncThreadNum == 0)
{
_asyncThreadNum = 3;
}
if(_asyncThreadNum > MAX_CLIENT_ASYNCTHREAD_NUM)
{
_asyncThreadNum = MAX_CLIENT_ASYNCTHREAD_NUM;
}
......
//异步队列的大小
size_t iAsyncQueueCap = TC_Common::strto<size_t>(pCommunicator->getProperty("asyncqueuecap", "10000"));
if(iAsyncQueueCap < 10000)
{
iAsyncQueueCap = 10000;
}
......
//创建异步线程
for(size_t i = 0; i < _asyncThreadNum; ++i)
{
_asyncThread[i] = new AsyncProcThread(iAsyncQueueCap);
_asyncThread[i]->start();
}
......
}
在开始讲述异步调用与接收响应之前,先看看大致的调用过程,与同步调用来个对比:
跟同步调用的示例一样,现在有一MyDemo.StringServer.StringServantObj的服务,提供一个RPC接口是append,传入两个string类型的变量,返回两个string类型变量的拼接结果。在执行tars2cpp而生成的文件中,定义了回调函数基类StringServantPrxCallback,用户需要public继承这个基类并实现自己的方法,例如:
class asyncClientCallback : public StringServantPrxCallback {
public:
void callback_append(Int32 ret, const string& rStr) {
cout << "append: async callback success and retCode is " << ret << " ,rStr is " << rStr << "\n";
}
void callback_append_exception(Int32 ret) {
cout << "append: async callback fail and retCode is " << ret << "\n";
}
};
然后,用户就可以通过proxy->async_append(new asyncClientCallback(), str1, str2)进行异步调用了,调用过程与上文的同步调用差不多,函数调用流程如下图所示,可以与同步调用的进行比较,看看同步调用与异步调用的异同。
在异步调用中,客户端发起异步调用_proxy->async_append(new asyncClientCallback(), str1, str2)后,在函数StringServantProxy::async_append()中,程序同样会先构造ServantProxy::tars_invoke_async()所需要的参数,如请求包类型,RPC方法名,方法参数等,与同步调用的一个区别是,还传递了承载回调函数的派生类实例。接下来便直接调用了ServantProxy::tars_invoke_async()方法:
tars_invoke_async(tars::TARSNORMAL,"append", _os.getByteBuffer(), context, _mStatus, callback)
在ServantProxy::tars_invoke_async()方法中,先创建一个ReqMessage变量msg,初始化msg变量,给变量赋值,如Tars版本号,数据包类型,服务名,RPC方法名,Tars的上下文容器,异步调用的超时时间(单位为毫秒)以及异步调用后的回调函数ServantProxyCallbackPtr callback(等待异步调用返回响应后回调里面的函数)等。最后,与同步调用一样,调用ServantProxy::invoke()进行远程方法调用。
在ServantProxy::invoke()中,继续填充传递进来的变量ReqMessage msg。此外,还需要获取调用者caller线程的线程私有数据ServantProxyThreadData,用来指导RPC调用。与同步调用一样,利用ServantProxy::selectNetThreadInfo()来轮询选取ObjectProxy(网络线程CommunicatorEpoll)与对应的ReqInfoQueue,详细可看同步调用中的介绍,注意区分客户端中的调用者caller线程与网络线程,以及之间的通信桥梁。
退出ServantProxy::selectNetThreadInfo()后,便得到ObjectProxy类型的pObjProxy及其对应的ReqInfoQueue类型的ReqInfoQueue,在异步调用中,不需要建立条件变量来阻塞进程,直接通过pObjectProxy来发送RPC请求,请求信息会暂存在ReqInfoQueue中:
if(!pReqQ->push_back(msg,bEmpty))
{
TLOGERROR("[TARS][ServantProxy::invoke msgQueue push_back error num:" << pSptd->_netSeq << "]" << endl);
delete msg;
msg = NULL;
pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);
throw TarsClientQueueException("client queue full");
}
pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);
在之后,就不需要做任何的工作,退出层层函数调用,回到客户端中,程序可以继续执行其他动作.
Communicator根据client配置的线程数_clientThreadNum来决定创建CommunicatorEpoll的个数,_clientThreadNum是多少,就创建多少个CommunicatorEpoll(取值逻辑在Communicator::initialize()中,可看本文2.1.4中的源码或自己机器上的源码)
异步调用的请求发送过程与同步调用的一致,都是在网络线程中通过ObjectProxy去调用AdapterProxy来发送数据。但是在接收到响应之后,通过图(1-15)可以看到,在函数AdapterProxy::finishInvoke(ReqMessage*)中,同步调用会通过msg->pMonitor->notify()唤醒客户端的caller线程来接收响应包,而在异步调用中,则是如图(1-19)所示,CommunicatorEpoll与AsyncProcThread的关系如图(1-20)所示。
在函数AdapterProxy::finishInvoke(ReqMessage*)中,程序通过:
//异步回调,放入回调处理线程中
_objectProxy->getCommunicatorEpoll()->pushAsyncThreadQueue(msg);
将信息包msg(带响应信息)放到异步回调处理线程中,在CommunicatorEpoll::pushAsyncThreadQueue()中,通过轮询的方式选择异步回调处理线程处理接收到的响应包,异步处理线程数默认是3,最大是1024。
void CommunicatorEpoll::pushAsyncThreadQueue(ReqMessage * msg)
{
//先不考虑每个线程队列数目不一致的情况
_asyncThread[_asyncSeq]->push_back(msg);
_asyncSeq ++;
if(_asyncSeq == _asyncThreadNum)
{
_asyncSeq = 0;
}
}
选取之后,通过AsyncProcThread::push_back(),将msg包放在响应包队列AsyncProcThread::_msgQueue中,然后通过AsyncProcThread:: notify()函数通知本异步回调处理线程进行处理,AsyncProcThread:: notify()函数可以令阻塞在AsyncProcThread:: run()中的AsyncProcThread::timedWait()的异步处理线程被唤醒。
在AsyncProcThread::run()中,主要执行下面的程序进行函数回调:
if (_msgQueue->pop_front(msg))
{
......
try
{
ReqMessagePtr msgPtr = msg;
msg->callback->onDispatch(msgPtr);
}
catch (exception& e)
{
TLOGERROR("[TARS][AsyncProcThread exception]:" << e.what() << endl);
}
catch (...)
{
TLOGERROR("[TARS][AsyncProcThread exception.]" << endl);
}
}
通过msg->callback,程序可以调用回调函数基类StringServantPrxCallback里面的onDispatch()函数。在StringServantPrxCallback:: onDispatch()中,分析此次响应所对应的RPC方法名,获取响应结果,并通过动态多态,执行用户所定义好的派生类的虚函数。通过ReqMessagePtr的引用计数,还可以将ReqNessage* msg删除掉,与同步调用不同,同步调用的msg的新建与删除都在caller线程中,而异步调用的msg在caller线程上构造,在异步回调处理线程中析构.
不同的网络线程CommunicatorEpoll均可以发起同一RPC请求,对于同一RPC服务,选取不同的ObjectProxy(或可认为选取不同的网络线程CommunicatorEpoll)是第一层的负载均衡,
而对于同一个被选中的ObjectProxy,通过EndpointManager选择不同的socket连接AdapterProxy(假如ObjectProxy有大于1个的AdapterProxy,如图(1-4)的fooObjectProxy)是第二层的负载均衡.
可回顾本文5.1.2中的内容.
因篇幅问题不能全部显示,请点此查看更多更全内容