XRootD
Loading...
Searching...
No Matches
XrdCmsRRQ.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d C m s R R Q . c c */
4/* */
5/* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstring>
32#include <sys/types.h>
33#include <netinet/in.h>
34#include <cinttypes>
35
37#include "XrdCms/XrdCmsNode.hh"
38#include "XrdCms/XrdCmsRRQ.hh"
40#include "XrdCms/XrdCmsTrace.hh"
41#include "XrdSys/XrdSysError.hh"
42#include "XrdSys/XrdSysTimer.hh"
43#include <cstdio>
44
45using namespace XrdCms;
46
47// Note: Debugging statements have been commented out. This is time critical
48// code and debugging may only be enabled in standalone testing as the
49// delays introduced by DEBUG() will usually cause timeout failures.
50
51/******************************************************************************/
52/* G l o b a l O b j e c t s & S t a t i c M e m b e r s */
53/******************************************************************************/
54
56
57XrdSysMutex XrdCmsRRQSlot::myMutex;
58XrdCmsRRQSlot *XrdCmsRRQSlot::freeSlot = 0;
59short XrdCmsRRQSlot::initSlot = 0;
60
61/******************************************************************************/
62/* E x t e r n a l F u n c t i o n s */
63/******************************************************************************/
64
65void *XrdCmsRRQ_StartTimeOut(void *parg) {return RRQ.TimeOut();}
66
67void *XrdCmsRRQ_StartRespond(void *parg) {return RRQ.Respond();}
68
69/******************************************************************************/
70/* X r d C m s R R Q C l a s s M e t h o d s */
71/******************************************************************************/
72/******************************************************************************/
73/* A d d */
74/******************************************************************************/
75
77{
78// EPNAME("RRQ Add");
79 XrdCmsRRQSlot *sp;
80
81// Obtain a slot and fill it in
82//
83 if (!(sp = XrdCmsRRQSlot::Alloc(Info))) return 0;
84// DEBUG("adding slot " <<sp->slotNum);
85
86// If a slot number given, check if it's the right slot and it is still queued.
87// If so, piggy-back this request to existing one and make a fast exit
88//
89 myMutex.Lock(); Stats.Add2Q++;
90 if (Snum && Slot[Snum].Info.Key == Info->Key && Slot[Snum].Expire)
91 {if (Info->isLU)
92 {sp->LkUp = Slot[Snum].LkUp;
93 Slot[Snum].LkUp = sp;
94 } else {
95 sp->Cont = Slot[Snum].Cont;
96 Slot[Snum].Cont = sp;
97 }
98 Stats.PBack++;
99 myMutex.UnLock();
100 return Snum;
101 }
102
103// Queue this slot to the pending response queue and tell the timeout scheduler
104//
105 sp->Expire = myClock+1;
106 if (waitQ.Singleton()) isWaiting.Post();
107 waitQ.Prev()->Insert(&sp->Link);
108 myMutex.UnLock();
109 return sp->slotNum;
110}
111
112/******************************************************************************/
113/* D e l */
114/******************************************************************************/
115
116void XrdCmsRRQ::Del(short Snum, const void *Key)
117{
118 Ready(Snum, Key, 0, 0);
119}
120
121/******************************************************************************/
122/* I n i t */
123/******************************************************************************/
124
125int XrdCmsRRQ::Init(int Tint, int Tdly)
126{
127 int rc;
128 pthread_t tid;
129
130// Set values
131//
132 if (Tint) Tslice = Tint;
133 if (Tdly) Tdelay = Tdly;
134 Stats.Reset();
135
136// Fill out the response structure
137//
138 dataResp.Hdr.streamid = 0;
139 dataResp.Hdr.rrCode = kYR_data;
140 dataResp.Hdr.modifier = 0;
141 dataResp.Hdr.datalen = 0;
142 dataResp.Val = 0;
143
144// Fill out the data i/o vector
145//
146 data_iov[0].iov_base = (char *)&dataResp;
147 data_iov[0].iov_len = sizeof(dataResp);
148 data_iov[1].iov_base = databuff;;
149
150// Fill out the response structure
151//
152 redrResp.Hdr.streamid = 0;
153 redrResp.Hdr.rrCode = kYR_redirect;
154 redrResp.Hdr.modifier = 0;
155 redrResp.Hdr.datalen = 0;
156 redrResp.Val = 0;
157
158// Fill out the redirect i/o vector
159//
160 redr_iov[0].iov_base = (char *)&redrResp;
161 redr_iov[0].iov_len = sizeof(redrResp);
162 redr_iov[1].iov_base = hostbuff;;
163
164// Fill out the wait info
165//
166 waitResp.Hdr.streamid = 0;
167 waitResp.Hdr.rrCode = kYR_wait;
168 waitResp.Hdr.modifier = 0;
169 waitResp.Hdr.datalen = htons(static_cast<unsigned short>(sizeof(waitResp.Val)));
170 waitResp.Val = htonl(Tdelay);
171
172// Start the responder thread
173//
174 if ((rc = XrdSysThread::Run(&tid, XrdCmsRRQ_StartRespond, (void *)0,
175 0, "Request Responder")))
176 {Say.Emsg("Config", rc, "create request responder thread");
177 return 1;
178 }
179
180// Start the timeout thread
181//
182 if ((rc = XrdSysThread::Run(&tid, XrdCmsRRQ_StartTimeOut, (void *)0,
183 0, "Request Timeout")))
184 {Say.Emsg("Config", rc, "create request timeout thread");
185 return 1;
186 }
187
188// All done
189//
190 return 0;
191}
192
193/******************************************************************************/
194/* R e a d y */
195/******************************************************************************/
196
197int XrdCmsRRQ::Ready(int Snum, const void *Key, SMask_t mask1, SMask_t mask2)
198{
199// EPNAME("RRQ Ready");
200 XrdCmsRRQSlot *sp;
201
202// Check if it's the right slot and it is still queued.
203//
204 myMutex.Lock();
205 sp = &Slot[Snum];
206 if (sp->Info.Key != Key || !sp->Expire)
207 {myMutex.UnLock();
208// DEBUG("slot " <<Snum <<" no longer valid");
209 return 1;
210 }
211
212// Update the arguments. The first is the running node mask and the second is
213// a fixed differentiation mask. Accumulate the 1st but replace the 2nd.
214//
215 sp->Arg1 |= mask1; sp->Arg2 = mask2;
216 Stats.Resp++;
217
218// Check if we should still hold on to this slot because the number of actual
219// responders is less than the number needed.
220//
221 if (sp->Info.actR < sp->Info.minR)
222 {sp->Info.actR++; Stats.Multi++;
223 myMutex.UnLock();
224 return 0;
225 }
226
227// Move the element from the waiting queue to the ready queue
228//
229 sp->Link.Remove();
230 if (readyQ.Singleton()) isReady.Post();
231 readyQ.Prev()->Insert(&sp->Link);
232 myMutex.UnLock();
233// DEBUG("readied slot " <<Snum <<" mask " <<mask);
234 return 1;
235}
236
237/******************************************************************************/
238/* R e s p o n d */
239/******************************************************************************/
240
242{
243// EPNAME("RRQ Respond");
244 XrdCmsRRQSlot *sp;
245
246// In an endless loop, process all ready elements
247//
248 do {isReady.Wait(); // DEBUG("responder awoken");
249 do {myMutex.Lock();
250 Stats.rdFast += rdFast; Stats.rdSlow += rdSlow;
251 Stats.luFast += luFast; Stats.luSlow += luSlow;
252 if (readyQ.Singleton()) {myMutex.UnLock(); break;}
253 sp = readyQ.Next()->Item(); sp->Link.Remove(); sp->Expire = 0;
254 myMutex.UnLock();
255
256 // A locate request can be pggy-backed on a select request and vice-versa
257 // We separate the two queues here as each has a different response.
258 //
259 if (sp->Info.isLU)
260 {if (sp->Cont)
261 {sp->Cont->Arg1 = sp->Arg1;
262 sendRedResp(sp->Cont);
263 }
264 sendLocResp(sp);
265 } else {
266 if (sp->LkUp)
267 {sp->LkUp->Arg1 = sp->Arg1; sp->LkUp->Arg2 = sp->Arg2;
268 sendLocResp(sp->LkUp);
269 }
270 sendRedResp(sp);
271 }
272 sp->Recycle();
273 } while(1);
274 } while(1);
275
276// Keep the compiler happy
277//
278 return (void *)0;
279}
280
281/******************************************************************************/
282/* s e n d L o c R e s p */
283/******************************************************************************/
284
285void XrdCmsRRQ::sendLocResp(XrdCmsRRQSlot *lP)
286{
287 static const int ovhd = sizeof(kXR_unt32);
288 XrdCmsSelected *sP;
289 XrdCmsNode *nP;
291 int bytes;
292 bool oksel;
293
294// Send a delay if we timed out
295//
296 if (!(lP->Arg1))
297 {sendLwtResp(lP);
298 return;
299 }
300
301// Get the list of servers that have this file. If none found, then force the
302// client to wait as this should never happen and the long path is called for.
303// ASAP responses always respond in with IPv6 addresses or mapped IPv4 ones.
304//
305 lsopts = static_cast<XrdCmsCluster::CmsLSOpts>(lP->Info.lsLU);
306 if (!(sP = Cluster.List(lP->Arg1, lsopts, oksel))
307 || (!(bytes = XrdCmsNode::do_LocFmt(databuff,sP,lP->Arg2,lP->Info.rwVec))))
308 {sendLwtResp(lP);
309 return;
310 }
311
312// Complete the I/O vector
313//
314 bytes++;
315 data_iov[1].iov_len = bytes;
316 bytes += ovhd;
317 dataResp.Hdr.datalen = htons(static_cast<unsigned short>(bytes));
318 bytes += sizeof(dataResp.Hdr);
319
320// Send the reply to each waiting redirector
321//
322 RTable.Lock();
323 do {if ((nP = RTable.Find(lP->Info.Rnum, lP->Info.Rinst)))
324 {dataResp.Hdr.streamid = lP->Info.ID;
325 nP->Send(data_iov, iov_cnt, bytes);
326 }
327 luFast++;
328 } while((lP = lP->LkUp));
329 RTable.UnLock();
330}
331
332/******************************************************************************/
333/* s e n d L w t R e s p */
334/******************************************************************************/
335
336void XrdCmsRRQ::sendLwtResp(XrdCmsRRQSlot *rP)
337{
338// EPNAME("sendLwtResp");
339 XrdCmsNode *nP;
340
341// For each request, find the redirector and ask it to send a wait
342//
343 RTable.Lock();
344do{if ((nP = RTable.Find(rP->Info.Rnum, rP->Info.Rinst)))
345 {waitResp.Hdr.streamid = rP->Info.ID; luSlow++;
346 nP->Send((char *)&waitResp, sizeof(waitResp));
347// DEBUG("Redirect delay " <<nP->Name() <<' ' <<Tdelay);
348 }
349// else {DEBUG("redirector " <<Info->Rnum <<'.' <<Info->Rinst <<"not found");}
350 } while((rP = rP->LkUp));
351 RTable.UnLock();
352}
353
354/******************************************************************************/
355/* s e n d R e d R e s p */
356/******************************************************************************/
357
358void XrdCmsRRQ::sendRedResp(XrdCmsRRQSlot *rP)
359{
360// EPNAME("sendRedResp");
361 static const int ovhd = sizeof(kXR_unt32);
362 XrdCmsNode *nP;
363 int doredir = 0, port = 0, hlen = 0;
364
365// Determine where the client should be redirected
366//
367 if ((doredir = (rP->Arg1 && Cluster.Select(rP->Arg1, port, hostbuff, hlen,
368 rP->Info.isRW, rP->Info.actR,
369 rP->Info.ifOP))))
370 {redrResp.Val = htonl(port);
371 redrResp.Hdr.datalen = htons(static_cast<unsigned short>(hlen+ovhd));
372 redr_iov[1].iov_len = hlen;
373 hlen += ovhd + sizeof(redrResp.Hdr);
374 }
375
376// For each request, find the redirector and ask it to send the message
377//
378 RTable.Lock();
379do{if ((nP = RTable.Find(rP->Info.Rnum, rP->Info.Rinst)))
380 {if (doredir){redrResp.Hdr.streamid = rP->Info.ID; rdFast++;
381 nP->Send(redr_iov, iov_cnt, hlen);
382// DEBUG("Fast redirect " <<nP->Name() <<" -> " <<hostbuff);
383 }
384 else {waitResp.Hdr.streamid = rP->Info.ID; rdSlow++;
385 nP->Send((char *)&waitResp, sizeof(waitResp));
386// DEBUG("Redirect delay " <<nP->Name() <<' ' <<Tdelay);
387 }
388 }
389// else {DEBUG("redirector " <<Info->Rnum <<'.' <<Info->Rinst <<"not found");}
390 } while((rP = rP->Cont));
391 RTable.UnLock();
392}
393
394/******************************************************************************/
395/* T i m e O u t */
396/******************************************************************************/
397
399{
400// EPNAME("RRQ TimeOut");
401 XrdCmsRRQSlot *sp;
402
403// We measure millisecond intervals to timeout waiting requests. We used to zero
404// out arg1/2 to force expiration, but they would be zero anyway if no responses
405// occurred. Now with qdn we need to leave them alone as we may have deferred
406// a fast dispatch because we were waiting for more than one responder.
407//
408 while(1)
409 {isWaiting.Wait();
410 myMutex.Lock();
411 while(1)
412 {myClock++;
413 myMutex.UnLock();
414 XrdSysTimer::Wait(Tslice);
415 myMutex.Lock();
416 while((sp=waitQ.Next()->Item()) && sp->Expire < myClock)
417 {sp->Link.Remove();
418 if (readyQ.Singleton()) isReady.Post();
419// sp->Arg1 = 0; sp->Arg2 = 0;
420// DEBUG("expired slot " <<sp->slotNum);
421 readyQ.Prev()->Insert(&sp->Link);
422 }
423 if (waitQ.Singleton()) break;
424 }
425 myMutex.UnLock();
426 }
427
428// Keep the compiler happy
429//
430 return (void *)0;
431}
432
433/******************************************************************************/
434/* X r d C m s R R Q S l o t C l a s s M e t h o d s */
435/******************************************************************************/
436/******************************************************************************/
437/* C o n s t r u c t o r */
438/******************************************************************************/
439
440XrdCmsRRQSlot::XrdCmsRRQSlot() : Link(this)
441{
442
443 slotNum = initSlot++;
444 if (slotNum)
445 {Cont = freeSlot;
446 freeSlot = this;
447 } else Cont = 0;
448 Arg1 = Arg2 = 0;
449 Info.Key = 0;
450}
451
452/******************************************************************************/
453/* A l l o c */
454/******************************************************************************/
455
456XrdCmsRRQSlot *XrdCmsRRQSlot::Alloc(XrdCmsRRQInfo *theInfo)
457{
458 XrdCmsRRQSlot *sp;
459
460 myMutex.Lock();
461 if ((sp = freeSlot))
462 {sp->Info = *theInfo;
463 freeSlot = sp->Cont;
464 sp->Cont = 0;
465 sp->LkUp = 0;
466 sp->Arg1 = 0;
467 sp->Arg2 = 0;
468 }
469 myMutex.UnLock();
470 return sp;
471}
472
473/******************************************************************************/
474/* R e c y c l e */
475/******************************************************************************/
476
477void XrdCmsRRQSlot::Recycle()
478{
479 XrdCmsRRQSlot *sp, *np;
480
481 myMutex.Lock();
482 if (!Link.Singleton()) Link.Remove();
483
484// Remove items in the lookup chain first
485//
486 np = LkUp;
487 while((sp = np))
488 {np = sp->LkUp;
489 sp->Cont = freeSlot;
490 freeSlot = sp;
491 sp->Info.Key = 0;
492 }
493
494// Now remove items in the select chain
495//
496 np = Cont;
497 while((sp = np))
498 {np = sp->Cont;
499 sp->Cont = freeSlot;
500 freeSlot = sp;
501 sp->Info.Key = 0;
502 }
503
504// Now put this item in the free chain
505//
506 Info.Key = 0;
507 Cont = freeSlot;
508 freeSlot = this;
509 myMutex.UnLock();
510}
unsigned int kXR_unt32
Definition XPtypes.hh:90
void * XrdCmsRRQ_StartRespond(void *parg)
Definition XrdCmsRRQ.cc:67
void * XrdCmsRRQ_StartTimeOut(void *parg)
Definition XrdCmsRRQ.cc:65
unsigned long long SMask_t
int Select(XrdCmsSelect &Sel)
XrdCmsSelected * List(SMask_t mask, CmsLSOpts opts, bool &oksel)
int Send(const char *buff, int blen=0)
static int do_LocFmt(char *buff, XrdCmsSelected *sP, SMask_t pf, SMask_t wf, bool lsall=false, bool lsuniq=false)
SMask_t rwVec
Definition XrdCmsRRQ.hh:59
kXR_unt32 ID
Definition XrdCmsRRQ.hh:50
int Init(int Tint=0, int Tdly=0)
Definition XrdCmsRRQ.cc:125
void * TimeOut()
Definition XrdCmsRRQ.cc:398
void * Respond()
Definition XrdCmsRRQ.cc:241
short Add(short Snum, XrdCmsRRQInfo *ip)
Definition XrdCmsRRQ.cc:76
void Del(short Snum, const void *Key)
Definition XrdCmsRRQ.cc:116
int Ready(int Snum, const void *Key, SMask_t mask1, SMask_t mask2)
Definition XrdCmsRRQ.cc:197
XrdCmsNode * Find(short Num, int Inst)
void Insert(XrdOucDLlist *Node, T *Item=0)
XrdOucDLlist * Next()
XrdOucDLlist * Prev()
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Wait(int milliseconds)
XrdCmsRRQ RRQ
Definition XrdCmsRRQ.cc:55
kXR_unt16 datalen
Definition YProtocol.hh:86
kXR_char modifier
Definition YProtocol.hh:85
XrdCmsCluster Cluster
@ kYR_redirect
Definition YProtocol.hh:143
XrdSysError Say
XrdCmsRTable RTable
kXR_char rrCode
Definition YProtocol.hh:84
kXR_unt32 streamid
Definition YProtocol.hh:83
long long luSlow
Definition XrdCmsRRQ.hh:139
long long rdSlow
Definition XrdCmsRRQ.hh:141
long long Resp
Definition XrdCmsRRQ.hh:136
long long luFast
Definition XrdCmsRRQ.hh:138
long long Add2Q
Definition XrdCmsRRQ.hh:134
long long Multi
Definition XrdCmsRRQ.hh:137
long long rdFast
Definition XrdCmsRRQ.hh:140
long long PBack
Definition XrdCmsRRQ.hh:135