File: /usr/src/linux/net/sunrpc/xprt.c
1 /*
2 * linux/net/sunrpc/xprt.c
3 *
4 * This is a generic RPC call interface supporting congestion avoidance,
5 * and asynchronous calls.
6 *
7 * The interface works like this:
8 *
9 * - When a process places a call, it allocates a request slot if
10 * one is available. Otherwise, it sleeps on the backlog queue
11 * (xprt_reserve).
12 * - Next, the caller puts together the RPC message, stuffs it into
13 * the request struct, and calls xprt_call().
14 * - xprt_call transmits the message and installs the caller on the
15 * socket's wait list. At the same time, it installs a timer that
16 * is run after the packet's timeout has expired.
17 * - When a packet arrives, the data_ready handler walks the list of
18 * pending requests for that socket. If a matching XID is found, the
19 * caller is woken up, and the timer removed.
20 * - When no reply arrives within the timeout interval, the timer is
21 * fired by the kernel and runs xprt_timer(). It either adjusts the
22 * timeout values (minor timeout) or wakes up the caller with a status
23 * of -ETIMEDOUT.
24 * - When the caller receives a notification from RPC that a reply arrived,
25 * it should release the RPC slot, and process the reply.
26 * If the call timed out, it may choose to retry the operation by
27 * adjusting the initial timeout value, and simply calling rpc_call
28 * again.
29 *
30 * Support for async RPC is done through a set of RPC-specific scheduling
31 * primitives that `transparently' work for processes as well as async
32 * tasks that rely on callbacks.
33 *
34 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
35 *
36 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
37 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
38 * TCP NFS related read + write fixes
39 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
40 *
41 * Rewrite of larges part of the code in order to stabilize TCP stuff.
42 * Fix behaviour when socket buffer is full.
43 * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
44 */
45
46 #define __KERNEL_SYSCALLS__
47
48 #include <linux/version.h>
49 #include <linux/types.h>
50 #include <linux/slab.h>
51 #include <linux/capability.h>
52 #include <linux/sched.h>
53 #include <linux/errno.h>
54 #include <linux/socket.h>
55 #include <linux/in.h>
56 #include <linux/net.h>
57 #include <linux/mm.h>
58 #include <linux/udp.h>
59 #include <linux/unistd.h>
60 #include <linux/sunrpc/clnt.h>
61 #include <linux/file.h>
62
63 #include <net/sock.h>
64 #include <net/checksum.h>
65 #include <net/udp.h>
66
67 #include <asm/uaccess.h>
68
69 /* Following value should be > 32k + RPC overhead */
70 #define XPRT_MIN_WRITE_SPACE (35000 + SOCK_MIN_WRITE_SPACE)
71
72 extern spinlock_t rpc_queue_lock;
73
74 /*
75 * Local variables
76 */
77
78 #ifdef RPC_DEBUG
79 # undef RPC_DEBUG_DATA
80 # define RPCDBG_FACILITY RPCDBG_XPRT
81 #endif
82
83 /*
84 * Local functions
85 */
86 static void xprt_request_init(struct rpc_task *, struct rpc_xprt *);
87 static void do_xprt_transmit(struct rpc_task *);
88 static void xprt_reserve_status(struct rpc_task *task);
89 static void xprt_disconnect(struct rpc_xprt *);
90 static void xprt_reconn_status(struct rpc_task *task);
91 static struct socket *xprt_create_socket(int, struct rpc_timeout *);
92 static int xprt_bind_socket(struct rpc_xprt *, struct socket *);
93 static void xprt_remove_pending(struct rpc_xprt *);
94
95 #ifdef RPC_DEBUG_DATA
96 /*
97 * Print the buffer contents (first 128 bytes only--just enough for
98 * diropres return).
99 */
100 static void
101 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
102 {
103 u8 *buf = (u8 *) packet;
104 int j;
105
106 dprintk("RPC: %s\n", msg);
107 for (j = 0; j < count && j < 128; j += 4) {
108 if (!(j & 31)) {
109 if (j)
110 dprintk("\n");
111 dprintk("0x%04x ", j);
112 }
113 dprintk("%02x%02x%02x%02x ",
114 buf[j], buf[j+1], buf[j+2], buf[j+3]);
115 }
116 dprintk("\n");
117 }
118 #else
119 static inline void
120 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
121 {
122 /* NOP */
123 }
124 #endif
125
126 /*
127 * Look up RPC transport given an INET socket
128 */
129 static inline struct rpc_xprt *
130 xprt_from_sock(struct sock *sk)
131 {
132 return (struct rpc_xprt *) sk->user_data;
133 }
134
135 /*
136 * Adjust the iovec to move on 'n' bytes
137 */
138
139 extern inline void
140 xprt_move_iov(struct msghdr *msg, struct iovec *niv, unsigned amount)
141 {
142 struct iovec *iv=msg->msg_iov;
143 int i;
144
145 /*
146 * Eat any sent iovecs
147 */
148 while (iv->iov_len <= amount) {
149 amount -= iv->iov_len;
150 iv++;
151 msg->msg_iovlen--;
152 }
153
154 /*
155 * And chew down the partial one
156 */
157 niv[0].iov_len = iv->iov_len-amount;
158 niv[0].iov_base =((unsigned char *)iv->iov_base)+amount;
159 iv++;
160
161 /*
162 * And copy any others
163 */
164 for(i = 1; i < msg->msg_iovlen; i++)
165 niv[i]=*iv++;
166
167 msg->msg_iov=niv;
168 }
169
170 /*
171 * Serialize write access to sockets, in order to prevent different
172 * requests from interfering with each other.
173 * Also prevents TCP socket reconnections from colliding with writes.
174 */
175 static int
176 xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
177 {
178 int retval;
179 spin_lock_bh(&xprt->sock_lock);
180 if (!xprt->snd_task)
181 xprt->snd_task = task;
182 else if (xprt->snd_task != task) {
183 dprintk("RPC: %4d TCP write queue full (task %d)\n",
184 task->tk_pid, xprt->snd_task->tk_pid);
185 task->tk_timeout = 0;
186 task->tk_status = -EAGAIN;
187 rpc_sleep_on(&xprt->sending, task, NULL, NULL);
188 }
189 retval = xprt->snd_task == task;
190 spin_unlock_bh(&xprt->sock_lock);
191 return retval;
192 }
193
194 /*
195 * Releases the socket for use by other requests.
196 */
197 static void
198 xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
199 {
200 spin_lock_bh(&xprt->sock_lock);
201 if (xprt->snd_task == task) {
202 xprt->snd_task = NULL;
203 rpc_wake_up_next(&xprt->sending);
204 }
205 spin_unlock_bh(&xprt->sock_lock);
206 }
207
208 /*
209 * Write data to socket.
210 */
211 static inline int
212 xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
213 {
214 struct socket *sock = xprt->sock;
215 struct msghdr msg;
216 mm_segment_t oldfs;
217 int result;
218 int slen = req->rq_slen - req->rq_bytes_sent;
219 struct iovec niv[MAX_IOVEC];
220
221 if (slen <= 0)
222 return 0;
223
224 if (!sock)
225 return -ENOTCONN;
226
227 xprt_pktdump("packet data:",
228 req->rq_svec->iov_base,
229 req->rq_svec->iov_len);
230
231 msg.msg_flags = MSG_DONTWAIT|MSG_NOSIGNAL;
232 msg.msg_iov = req->rq_svec;
233 msg.msg_iovlen = req->rq_snr;
234 msg.msg_name = (struct sockaddr *) &xprt->addr;
235 msg.msg_namelen = sizeof(xprt->addr);
236 msg.msg_control = NULL;
237 msg.msg_controllen = 0;
238
239 /* Dont repeat bytes */
240 if (req->rq_bytes_sent)
241 xprt_move_iov(&msg, niv, req->rq_bytes_sent);
242
243 oldfs = get_fs(); set_fs(get_ds());
244 result = sock_sendmsg(sock, &msg, slen);
245 set_fs(oldfs);
246
247 dprintk("RPC: xprt_sendmsg(%d) = %d\n", slen, result);
248
249 if (result >= 0)
250 return result;
251
252 switch (result) {
253 case -ECONNREFUSED:
254 /* When the server has died, an ICMP port unreachable message
255 * prompts ECONNREFUSED.
256 */
257 break;
258 case -EAGAIN:
259 if (test_bit(SOCK_NOSPACE, &sock->flags))
260 result = -ENOMEM;
261 break;
262 case -ENOTCONN:
263 case -EPIPE:
264 /* connection broken */
265 if (xprt->stream)
266 result = -ENOTCONN;
267 break;
268 default:
269 printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result);
270 }
271 return result;
272 }
273
274 /*
275 * Read data from socket
276 */
277 static int
278 xprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, unsigned len, unsigned shift)
279 {
280 struct socket *sock = xprt->sock;
281 struct msghdr msg;
282 mm_segment_t oldfs;
283 struct iovec niv[MAX_IOVEC];
284 int result;
285
286 if (!sock)
287 return -ENOTCONN;
288
289 msg.msg_flags = MSG_DONTWAIT|MSG_NOSIGNAL;
290 msg.msg_iov = iov;
291 msg.msg_iovlen = nr;
292 msg.msg_name = NULL;
293 msg.msg_namelen = 0;
294 msg.msg_control = NULL;
295 msg.msg_controllen = 0;
296
297 /* Adjust the iovec if we've already filled it */
298 if (shift)
299 xprt_move_iov(&msg, niv, shift);
300
301 oldfs = get_fs(); set_fs(get_ds());
302 result = sock_recvmsg(sock, &msg, len, MSG_DONTWAIT);
303 set_fs(oldfs);
304
305 dprintk("RPC: xprt_recvmsg(iov %p, len %d) = %d\n",
306 iov, len, result);
307 return result;
308 }
309
310
311 /*
312 * Adjust RPC congestion window
313 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
314 */
315 static void
316 xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
317 {
318 unsigned long cwnd;
319
320 if (xprt->nocong)
321 return;
322 /*
323 * Note: we're in a BH context
324 */
325 spin_lock(&xprt->xprt_lock);
326 cwnd = xprt->cwnd;
327 if (result >= 0) {
328 if (xprt->cong < cwnd || time_before(jiffies, xprt->congtime))
329 goto out;
330 /* The (cwnd >> 1) term makes sure
331 * the result gets rounded properly. */
332 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
333 if (cwnd > RPC_MAXCWND)
334 cwnd = RPC_MAXCWND;
335 else
336 pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);
337 xprt->congtime = jiffies + ((cwnd * HZ) << 2) / RPC_CWNDSCALE;
338 dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx, "
339 "time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,
340 (xprt->congtime-jiffies)*1000/HZ);
341 } else if (result == -ETIMEDOUT) {
342 if ((cwnd >>= 1) < RPC_CWNDSCALE)
343 cwnd = RPC_CWNDSCALE;
344 xprt->congtime = jiffies + ((cwnd * HZ) << 3) / RPC_CWNDSCALE;
345 dprintk("RPC: cong %ld, cwnd was %ld, now %ld, "
346 "time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,
347 (xprt->congtime-jiffies)*1000/HZ);
348 pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);
349 }
350
351 xprt->cwnd = cwnd;
352 out:
353 spin_unlock(&xprt->xprt_lock);
354 }
355
356 /*
357 * Adjust timeout values etc for next retransmit
358 */
359 int
360 xprt_adjust_timeout(struct rpc_timeout *to)
361 {
362 if (to->to_retries > 0) {
363 if (to->to_exponential)
364 to->to_current <<= 1;
365 else
366 to->to_current += to->to_increment;
367 if (to->to_maxval && to->to_current >= to->to_maxval)
368 to->to_current = to->to_maxval;
369 } else {
370 if (to->to_exponential)
371 to->to_initval <<= 1;
372 else
373 to->to_initval += to->to_increment;
374 if (to->to_maxval && to->to_initval >= to->to_maxval)
375 to->to_initval = to->to_maxval;
376 to->to_current = to->to_initval;
377 }
378
379 if (!to->to_current) {
380 printk(KERN_WARNING "xprt_adjust_timeout: to_current = 0!\n");
381 to->to_current = 5 * HZ;
382 }
383 pprintk("RPC: %lu %s\n", jiffies,
384 to->to_retries? "retrans" : "timeout");
385 return to->to_retries-- > 0;
386 }
387
388 /*
389 * Close down a transport socket
390 */
391 static void
392 xprt_close(struct rpc_xprt *xprt)
393 {
394 struct socket *sock = xprt->sock;
395 struct sock *sk = xprt->inet;
396
397 if (!sk)
398 return;
399
400 xprt->inet = NULL;
401 xprt->sock = NULL;
402
403 sk->user_data = NULL;
404 sk->data_ready = xprt->old_data_ready;
405 sk->state_change = xprt->old_state_change;
406 sk->write_space = xprt->old_write_space;
407
408 xprt_disconnect(xprt);
409 sk->no_check = 0;
410
411 sock_release(sock);
412 /*
413 * TCP doesnt require the rpciod now - other things may
414 * but rpciod handles that not us.
415 */
416 if(xprt->stream)
417 rpciod_down();
418 }
419
420 /*
421 * Mark a transport as disconnected
422 */
423 static void
424 xprt_disconnect(struct rpc_xprt *xprt)
425 {
426 dprintk("RPC: disconnected transport %p\n", xprt);
427 xprt_clear_connected(xprt);
428 xprt_remove_pending(xprt);
429 rpc_wake_up_status(&xprt->pending, -ENOTCONN);
430 }
431
432 /*
433 * Reconnect a broken TCP connection.
434 *
435 * Note: This cannot collide with the TCP reads, as both run from rpciod
436 */
437 void
438 xprt_reconnect(struct rpc_task *task)
439 {
440 struct rpc_xprt *xprt = task->tk_xprt;
441 struct socket *sock = xprt->sock;
442 struct sock *inet = xprt->inet;
443 int status;
444
445 dprintk("RPC: %4d xprt_reconnect %p connected %d\n",
446 task->tk_pid, xprt, xprt_connected(xprt));
447 if (xprt->shutdown)
448 return;
449
450 if (!xprt->stream)
451 return;
452
453 if (!xprt->addr.sin_port) {
454 task->tk_status = -EIO;
455 return;
456 }
457
458 if (!xprt_lock_write(xprt, task))
459 return;
460 if (xprt_connected(xprt))
461 goto out_write;
462
463 status = -ENOTCONN;
464 if (!inet) {
465 /* Create an unconnected socket */
466 if (!(sock = xprt_create_socket(xprt->prot, &xprt->timeout)))
467 goto defer;
468 xprt_bind_socket(xprt, sock);
469 inet = sock->sk;
470 }
471
472 xprt_disconnect(xprt);
473
474 /* Reset TCP record info */
475 xprt->tcp_offset = 0;
476 xprt->tcp_reclen = 0;
477 xprt->tcp_copied = 0;
478 xprt->tcp_more = 0;
479
480 /* Now connect it asynchronously. */
481 dprintk("RPC: %4d connecting new socket\n", task->tk_pid);
482 status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
483 sizeof(xprt->addr), O_NONBLOCK);
484
485 if (status < 0) {
486 switch (status) {
487 case -EALREADY:
488 case -EINPROGRESS:
489 status = 0;
490 break;
491 case -EISCONN:
492 case -EPIPE:
493 status = 0;
494 xprt_close(xprt);
495 goto defer;
496 default:
497 printk("RPC: TCP connect error %d!\n", -status);
498 xprt_close(xprt);
499 goto defer;
500 }
501
502 dprintk("RPC: %4d connect status %d connected %d\n",
503 task->tk_pid, status, xprt_connected(xprt));
504
505 spin_lock_bh(&xprt->sock_lock);
506 if (!xprt_connected(xprt)) {
507 task->tk_timeout = xprt->timeout.to_maxval;
508 rpc_sleep_on(&xprt->sending, task, xprt_reconn_status, NULL);
509 spin_unlock_bh(&xprt->sock_lock);
510 return;
511 }
512 spin_unlock_bh(&xprt->sock_lock);
513 }
514 defer:
515 if (status < 0) {
516 rpc_delay(task, 5*HZ);
517 task->tk_status = -ENOTCONN;
518 }
519 out_write:
520 xprt_release_write(xprt, task);
521 }
522
523 /*
524 * Reconnect timeout. We just mark the transport as not being in the
525 * process of reconnecting, and leave the rest to the upper layers.
526 */
527 static void
528 xprt_reconn_status(struct rpc_task *task)
529 {
530 struct rpc_xprt *xprt = task->tk_xprt;
531
532 dprintk("RPC: %4d xprt_reconn_timeout %d\n",
533 task->tk_pid, task->tk_status);
534
535 xprt_release_write(xprt, task);
536 }
537
538 /*
539 * Look up the RPC request corresponding to a reply, and then lock it.
540 */
541 static inline struct rpc_rqst *
542 xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
543 {
544 struct rpc_task *head, *task;
545 struct rpc_rqst *req;
546 int safe = 0;
547
548 spin_lock_bh(&rpc_queue_lock);
549 if ((head = xprt->pending.task) != NULL) {
550 task = head;
551 do {
552 if ((req = task->tk_rqstp) && req->rq_xid == xid)
553 goto out;
554 task = task->tk_next;
555 if (++safe > 100) {
556 printk("xprt_lookup_rqst: loop in Q!\n");
557 goto out_bad;
558 }
559 } while (task != head);
560 }
561 dprintk("RPC: unknown XID %08x in reply.\n", xid);
562 out_bad:
563 req = NULL;
564 out:
565 if (req && !__rpc_lock_task(req->rq_task))
566 req = NULL;
567 spin_unlock_bh(&rpc_queue_lock);
568 return req;
569 }
570
571 /*
572 * Complete reply received.
573 * The TCP code relies on us to remove the request from xprt->pending.
574 */
575 static inline void
576 xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
577 {
578 struct rpc_task *task = req->rq_task;
579
580 /* Adjust congestion window */
581 xprt_adjust_cwnd(xprt, copied);
582
583 #ifdef RPC_PROFILE
584 /* Profile only reads for now */
585 if (copied > 1024) {
586 static unsigned long nextstat = 0;
587 static unsigned long pkt_rtt = 0, pkt_len = 0, pkt_cnt = 0;
588
589 pkt_cnt++;
590 pkt_len += req->rq_slen + copied;
591 pkt_rtt += jiffies - req->rq_xtime;
592 if (time_before(nextstat, jiffies)) {
593 printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd);
594 printk("RPC: %ld %ld %ld %ld stat\n",
595 jiffies, pkt_cnt, pkt_len, pkt_rtt);
596 pkt_rtt = pkt_len = pkt_cnt = 0;
597 nextstat = jiffies + 5 * HZ;
598 }
599 }
600 #endif
601
602 dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
603 task->tk_status = copied;
604 req->rq_received = 1;
605
606 /* ... and wake up the process. */
607 rpc_wake_up_task(task);
608 return;
609 }
610
611 /*
612 * We have set things up such that we perform the checksum of the UDP
613 * packet in parallel with the copies into the RPC client iovec. -DaveM
614 */
615 static int csum_partial_copy_to_page_cache(struct iovec *iov,
616 struct sk_buff *skb,
617 int copied)
618 {
619 int offset = sizeof(struct udphdr);
620 __u8 *cur_ptr = iov->iov_base;
621 __kernel_size_t cur_len = iov->iov_len;
622 unsigned int csum = skb->csum;
623 int need_csum = (skb->ip_summed != CHECKSUM_UNNECESSARY);
624 int slack = skb->len - copied - sizeof(struct udphdr);
625
626 if (need_csum)
627 csum = csum_partial(skb->data, sizeof(struct udphdr), csum);
628 while (copied > 0) {
629 if (cur_len) {
630 int to_move = cur_len;
631 if (to_move > copied)
632 to_move = copied;
633 if (need_csum)
634 csum = skb_copy_and_csum_bits(skb, offset, cur_ptr,
635 to_move, csum);
636 else
637 skb_copy_bits(skb, offset, cur_ptr, to_move);
638 offset += to_move;
639 copied -= to_move;
640 cur_ptr += to_move;
641 cur_len -= to_move;
642 }
643 if (cur_len <= 0) {
644 iov++;
645 cur_len = iov->iov_len;
646 cur_ptr = iov->iov_base;
647 }
648 }
649 if (need_csum) {
650 if (slack > 0)
651 csum = skb_checksum(skb, offset, slack, csum);
652 if ((unsigned short)csum_fold(csum))
653 return -1;
654 }
655 return 0;
656 }
657
658 /*
659 * Input handler for RPC replies. Called from a bottom half and hence
660 * atomic.
661 */
662 static void
663 udp_data_ready(struct sock *sk, int len)
664 {
665 struct rpc_task *task;
666 struct rpc_xprt *xprt;
667 struct rpc_rqst *rovr;
668 struct sk_buff *skb;
669 int err, repsize, copied;
670
671 dprintk("RPC: udp_data_ready...\n");
672 if (!(xprt = xprt_from_sock(sk))) {
673 printk("RPC: udp_data_ready request not found!\n");
674 goto out;
675 }
676
677 dprintk("RPC: udp_data_ready client %p\n", xprt);
678
679 if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
680 goto out;
681
682 if (xprt->shutdown)
683 goto dropit;
684
685 repsize = skb->len - sizeof(struct udphdr);
686 if (repsize < 4) {
687 printk("RPC: impossible RPC reply size %d!\n", repsize);
688 goto dropit;
689 }
690
691 /* Look up and lock the request corresponding to the given XID */
692 rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + sizeof(struct udphdr)));
693 if (!rovr)
694 goto dropit;
695 task = rovr->rq_task;
696
697 dprintk("RPC: %4d received reply\n", task->tk_pid);
698 xprt_pktdump("packet data:",
699 (u32 *) (skb->h.raw+sizeof(struct udphdr)), repsize);
700
701 if ((copied = rovr->rq_rlen) > repsize)
702 copied = repsize;
703
704 /* Suck it into the iovec, verify checksum if not done by hw. */
705 if (csum_partial_copy_to_page_cache(rovr->rq_rvec, skb, copied))
706 goto out_unlock;
707
708 /* Something worked... */
709 dst_confirm(skb->dst);
710
711 xprt_complete_rqst(xprt, rovr, copied);
712
713 out_unlock:
714 rpc_unlock_task(task);
715
716 dropit:
717 skb_free_datagram(sk, skb);
718 out:
719 if (sk->sleep && waitqueue_active(sk->sleep))
720 wake_up_interruptible(sk->sleep);
721 }
722
723 /*
724 * TCP read fragment marker
725 */
726 static inline int
727 tcp_read_fraghdr(struct rpc_xprt *xprt)
728 {
729 struct iovec riov;
730 int want, result;
731
732 if (xprt->tcp_offset >= sizeof(xprt->tcp_recm))
733 goto done;
734
735 want = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
736 dprintk("RPC: reading header (%d bytes)\n", want);
737 do {
738 riov.iov_base = ((u8*) &xprt->tcp_recm) + xprt->tcp_offset;
739 riov.iov_len = want;
740 result = xprt_recvmsg(xprt, &riov, 1, want, 0);
741 if (result < 0)
742 return result;
743 xprt->tcp_offset += result;
744 want -= result;
745 } while (want);
746
747 /* Get the record length and mask out the last fragment bit */
748 xprt->tcp_reclen = ntohl(xprt->tcp_recm);
749 xprt->tcp_more = (xprt->tcp_reclen & 0x80000000) ? 0 : 1;
750 xprt->tcp_reclen &= 0x7fffffff;
751
752 dprintk("RPC: New record reclen %d morefrags %d\n",
753 xprt->tcp_reclen, xprt->tcp_more);
754 done:
755 return xprt->tcp_reclen + sizeof(xprt->tcp_recm) - xprt->tcp_offset;
756 }
757
758 /*
759 * TCP read xid
760 */
761 static inline int
762 tcp_read_xid(struct rpc_xprt *xprt, int avail)
763 {
764 struct iovec riov;
765 int want, result;
766
767 if (xprt->tcp_copied >= sizeof(xprt->tcp_xid) || !avail)
768 goto done;
769 want = min_t(unsigned int, sizeof(xprt->tcp_xid) - xprt->tcp_copied, avail);
770 do {
771 dprintk("RPC: reading xid (%d bytes)\n", want);
772 riov.iov_base = ((u8*) &xprt->tcp_xid) + xprt->tcp_copied;
773 riov.iov_len = want;
774 result = xprt_recvmsg(xprt, &riov, 1, want, 0);
775 if (result < 0)
776 return result;
777 xprt->tcp_copied += result;
778 xprt->tcp_offset += result;
779 want -= result;
780 avail -= result;
781 } while (want);
782 done:
783 return avail;
784 }
785
786 /*
787 * TCP read and complete request
788 */
789 static inline int
790 tcp_read_request(struct rpc_xprt *xprt, struct rpc_rqst *req, int avail)
791 {
792 int want, result;
793
794 if (req->rq_rlen <= xprt->tcp_copied || !avail)
795 goto done;
796 want = min_t(unsigned int, req->rq_rlen - xprt->tcp_copied, avail);
797 do {
798 dprintk("RPC: %4d TCP receiving %d bytes\n",
799 req->rq_task->tk_pid, want);
800
801 result = xprt_recvmsg(xprt, req->rq_rvec, req->rq_rnr, want, xprt->tcp_copied);
802 if (result < 0)
803 return result;
804 xprt->tcp_copied += result;
805 xprt->tcp_offset += result;
806 avail -= result;
807 want -= result;
808 } while (want);
809
810 done:
811 if (req->rq_rlen > xprt->tcp_copied && xprt->tcp_more)
812 return avail;
813 dprintk("RPC: %4d received reply complete\n", req->rq_task->tk_pid);
814 xprt_complete_rqst(xprt, req, xprt->tcp_copied);
815
816 return avail;
817 }
818
819 /*
820 * TCP discard extra bytes from a short read
821 */
822 static inline int
823 tcp_read_discard(struct rpc_xprt *xprt, int avail)
824 {
825 struct iovec riov;
826 static u8 dummy[64];
827 int want, result = 0;
828
829 while (avail) {
830 want = min_t(unsigned int, avail, sizeof(dummy));
831 riov.iov_base = dummy;
832 riov.iov_len = want;
833 dprintk("RPC: TCP skipping %d bytes\n", want);
834 result = xprt_recvmsg(xprt, &riov, 1, want, 0);
835 if (result < 0)
836 return result;
837 xprt->tcp_offset += result;
838 avail -= result;
839 }
840 return avail;
841 }
842
843 /*
844 * TCP record receive routine
845 * This is not the most efficient code since we call recvfrom thrice--
846 * first receiving the record marker, then the XID, then the data.
847 *
848 * The optimal solution would be a RPC support in the TCP layer, which
849 * would gather all data up to the next record marker and then pass us
850 * the list of all TCP segments ready to be copied.
851 */
852 static int
853 tcp_input_record(struct rpc_xprt *xprt)
854 {
855 struct rpc_rqst *req = NULL;
856 struct rpc_task *task = NULL;
857 int avail, result;
858
859 dprintk("RPC: tcp_input_record\n");
860
861 if (xprt->shutdown)
862 return -EIO;
863 if (!xprt_connected(xprt))
864 return -ENOTCONN;
865
866 /* Read in a new fragment marker if necessary */
867 /* Can we ever really expect to get completely empty fragments? */
868 if ((result = tcp_read_fraghdr(xprt)) < 0)
869 return result;
870 avail = result;
871
872 /* Read in the xid if necessary */
873 if ((result = tcp_read_xid(xprt, avail)) < 0)
874 return result;
875 if (!(avail = result))
876 goto out_ok;
877
878 /* Find and lock the request corresponding to this xid */
879 req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
880 if (req) {
881 task = req->rq_task;
882 /* Read in the request data */
883 result = tcp_read_request(xprt, req, avail);
884 rpc_unlock_task(task);
885 if (result < 0)
886 return result;
887 avail = result;
888 }
889
890 /* Skip over any trailing bytes on short reads */
891 if ((result = tcp_read_discard(xprt, avail)) < 0)
892 return result;
893
894 out_ok:
895 dprintk("RPC: tcp_input_record done (off %d reclen %d copied %d)\n",
896 xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_copied);
897 result = xprt->tcp_reclen;
898 xprt->tcp_reclen = 0;
899 xprt->tcp_offset = 0;
900 if (!xprt->tcp_more)
901 xprt->tcp_copied = 0;
902 return result;
903 }
904
905 /*
906 * TCP task queue stuff
907 */
908 LIST_HEAD(rpc_xprt_pending); /* List of xprts having pending tcp requests */
909
910 static inline
911 void tcp_rpciod_queue(void)
912 {
913 rpciod_wake_up();
914 }
915
916 int xprt_tcp_pending(void)
917 {
918 int retval;
919
920 spin_lock_bh(&rpc_queue_lock);
921 retval = !list_empty(&rpc_xprt_pending);
922 spin_unlock_bh(&rpc_queue_lock);
923 return retval;
924 }
925
926 static inline
927 void xprt_append_pending(struct rpc_xprt *xprt)
928 {
929 spin_lock_bh(&rpc_queue_lock);
930 if (list_empty(&xprt->rx_pending)) {
931 list_add(&xprt->rx_pending, rpc_xprt_pending.prev);
932 dprintk("RPC: xprt queue %p\n", xprt);
933 tcp_rpciod_queue();
934 }
935 spin_unlock_bh(&rpc_queue_lock);
936 }
937
938 static
939 void xprt_remove_pending(struct rpc_xprt *xprt)
940 {
941 spin_lock_bh(&rpc_queue_lock);
942 if (!list_empty(&xprt->rx_pending)) {
943 list_del(&xprt->rx_pending);
944 INIT_LIST_HEAD(&xprt->rx_pending);
945 }
946 spin_unlock_bh(&rpc_queue_lock);
947 }
948
949 static inline
950 struct rpc_xprt *xprt_remove_pending_next(void)
951 {
952 struct rpc_xprt *xprt = NULL;
953
954 spin_lock_bh(&rpc_queue_lock);
955 if (!list_empty(&rpc_xprt_pending)) {
956 xprt = list_entry(rpc_xprt_pending.next, struct rpc_xprt, rx_pending);
957 list_del(&xprt->rx_pending);
958 INIT_LIST_HEAD(&xprt->rx_pending);
959 }
960 spin_unlock_bh(&rpc_queue_lock);
961 return xprt;
962 }
963
964 /*
965 * This is protected from tcp_data_ready and the stack as its run
966 * inside of the RPC I/O daemon
967 */
968 void
969 __rpciod_tcp_dispatcher(void)
970 {
971 struct rpc_xprt *xprt;
972 int safe_retry = 0, result;
973
974 dprintk("rpciod_tcp_dispatcher: Queue Running\n");
975
976 /*
977 * Empty each pending socket
978 */
979 while ((xprt = xprt_remove_pending_next()) != NULL) {
980 dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt);
981
982 do {
983 result = tcp_input_record(xprt);
984 } while (result >= 0);
985
986 if (safe_retry++ > 200) {
987 schedule();
988 safe_retry = 0;
989 }
990 }
991 }
992
993 /*
994 * data_ready callback for TCP. We can't just jump into the
995 * tcp recvmsg functions inside of the network receive bh or
996 * bad things occur. We queue it to pick up after networking
997 * is done.
998 */
999
1000 static void tcp_data_ready(struct sock *sk, int len)
1001 {
1002 struct rpc_xprt *xprt;
1003
1004 dprintk("RPC: tcp_data_ready...\n");
1005 if (!(xprt = xprt_from_sock(sk)))
1006 {
1007 printk("Not a socket with xprt %p\n", sk);
1008 goto out;
1009 }
1010
1011 if (xprt->shutdown)
1012 goto out;
1013
1014 xprt_append_pending(xprt);
1015
1016 dprintk("RPC: tcp_data_ready client %p\n", xprt);
1017 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
1018 sk->state, xprt_connected(xprt),
1019 sk->dead, sk->zapped);
1020 out:
1021 if (sk->sleep && waitqueue_active(sk->sleep))
1022 wake_up_interruptible(sk->sleep);
1023 }
1024
1025
1026 static void
1027 tcp_state_change(struct sock *sk)
1028 {
1029 struct rpc_xprt *xprt;
1030
1031 if (!(xprt = xprt_from_sock(sk)))
1032 goto out;
1033 dprintk("RPC: tcp_state_change client %p...\n", xprt);
1034 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
1035 sk->state, xprt_connected(xprt),
1036 sk->dead, sk->zapped);
1037
1038 switch (sk->state) {
1039 case TCP_ESTABLISHED:
1040 if (xprt_test_and_set_connected(xprt))
1041 break;
1042 spin_lock(&xprt->sock_lock);
1043 if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
1044 rpc_wake_up_task(xprt->snd_task);
1045 spin_unlock(&xprt->sock_lock);
1046 break;
1047 case TCP_SYN_SENT:
1048 case TCP_SYN_RECV:
1049 break;
1050 default:
1051 xprt_disconnect(xprt);
1052 break;
1053 }
1054 out:
1055 if (sk->sleep && waitqueue_active(sk->sleep))
1056 wake_up_interruptible_all(sk->sleep);
1057 }
1058
1059 /*
1060 * The following 2 routines allow a task to sleep while socket memory is
1061 * low.
1062 */
1063 static void
1064 tcp_write_space(struct sock *sk)
1065 {
1066 struct rpc_xprt *xprt;
1067 struct socket *sock;
1068
1069 if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->socket))
1070 return;
1071 if (xprt->shutdown)
1072 return;
1073
1074 /* Wait until we have enough socket memory */
1075 if (!sock_writeable(sk))
1076 return;
1077
1078 if (!xprt_test_and_set_wspace(xprt)) {
1079 spin_lock(&xprt->sock_lock);
1080 if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
1081 rpc_wake_up_task(xprt->snd_task);
1082 spin_unlock(&xprt->sock_lock);
1083 }
1084
1085 if (test_bit(SOCK_NOSPACE, &sock->flags)) {
1086 if (sk->sleep && waitqueue_active(sk->sleep)) {
1087 clear_bit(SOCK_NOSPACE, &sock->flags);
1088 wake_up_interruptible(sk->sleep);
1089 }
1090 }
1091 }
1092
1093 static void
1094 udp_write_space(struct sock *sk)
1095 {
1096 struct rpc_xprt *xprt;
1097
1098 if (!(xprt = xprt_from_sock(sk)))
1099 return;
1100 if (xprt->shutdown)
1101 return;
1102
1103
1104 /* Wait until we have enough socket memory */
1105 if (sock_wspace(sk) < min_t(int, sk->sndbuf,XPRT_MIN_WRITE_SPACE))
1106 return;
1107
1108 if (!xprt_test_and_set_wspace(xprt)) {
1109 spin_lock(&xprt->sock_lock);
1110 if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
1111 rpc_wake_up_task(xprt->snd_task);
1112 spin_unlock(&xprt->sock_lock);
1113 }
1114
1115 if (sk->sleep && waitqueue_active(sk->sleep))
1116 wake_up_interruptible(sk->sleep);
1117 }
1118
1119 /*
1120 * RPC receive timeout handler.
1121 */
1122 static void
1123 xprt_timer(struct rpc_task *task)
1124 {
1125 struct rpc_rqst *req = task->tk_rqstp;
1126
1127 if (req)
1128 xprt_adjust_cwnd(task->tk_xprt, -ETIMEDOUT);
1129
1130 dprintk("RPC: %4d xprt_timer (%s request)\n",
1131 task->tk_pid, req ? "pending" : "backlogged");
1132
1133 task->tk_status = -ETIMEDOUT;
1134 task->tk_timeout = 0;
1135 rpc_wake_up_task(task);
1136 }
1137
1138 /*
1139 * Place the actual RPC call.
1140 * We have to copy the iovec because sendmsg fiddles with its contents.
1141 */
1142 void
1143 xprt_transmit(struct rpc_task *task)
1144 {
1145 struct rpc_rqst *req = task->tk_rqstp;
1146 struct rpc_xprt *xprt = req->rq_xprt;
1147
1148 dprintk("RPC: %4d xprt_transmit(%x)\n", task->tk_pid,
1149 *(u32 *)(req->rq_svec[0].iov_base));
1150
1151 if (xprt->shutdown)
1152 task->tk_status = -EIO;
1153
1154 if (!xprt_connected(xprt))
1155 task->tk_status = -ENOTCONN;
1156
1157 if (task->tk_status < 0)
1158 return;
1159
1160 if (task->tk_rpcwait)
1161 rpc_remove_wait_queue(task);
1162
1163 /* set up everything as needed. */
1164 /* Write the record marker */
1165 if (xprt->stream) {
1166 u32 *marker = req->rq_svec[0].iov_base;
1167
1168 *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
1169 }
1170
1171 if (!xprt_lock_write(xprt, task))
1172 return;
1173
1174 #ifdef RPC_PROFILE
1175 req->rq_xtime = jiffies;
1176 #endif
1177 do_xprt_transmit(task);
1178 }
1179
1180 static void
1181 do_xprt_transmit(struct rpc_task *task)
1182 {
1183 struct rpc_rqst *req = task->tk_rqstp;
1184 struct rpc_xprt *xprt = req->rq_xprt;
1185 int status, retry = 0;
1186
1187
1188 /* For fast networks/servers we have to put the request on
1189 * the pending list now:
1190 * Note that we don't want the task timing out during the
1191 * call to xprt_sendmsg(), so we initially disable the timeout,
1192 * and then reset it later...
1193 */
1194 xprt_receive(task);
1195
1196 /* Continue transmitting the packet/record. We must be careful
1197 * to cope with writespace callbacks arriving _after_ we have
1198 * called xprt_sendmsg().
1199 */
1200 while (1) {
1201 xprt_clear_wspace(xprt);
1202 status = xprt_sendmsg(xprt, req);
1203
1204 if (status < 0)
1205 break;
1206
1207 if (xprt->stream) {
1208 req->rq_bytes_sent += status;
1209
1210 if (req->rq_bytes_sent >= req->rq_slen)
1211 goto out_receive;
1212 } else {
1213 if (status >= req->rq_slen)
1214 goto out_receive;
1215 status = -ENOMEM;
1216 break;
1217 }
1218
1219 dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
1220 task->tk_pid, req->rq_slen - req->rq_bytes_sent,
1221 req->rq_slen);
1222
1223 status = -EAGAIN;
1224 if (retry++ > 50)
1225 break;
1226 }
1227 rpc_unlock_task(task);
1228
1229 /* Note: at this point, task->tk_sleeping has not yet been set,
1230 * hence there is no danger of the waking up task being put on
1231 * schedq, and being picked up by a parallel run of rpciod().
1232 */
1233 rpc_wake_up_task(task);
1234 if (!RPC_IS_RUNNING(task))
1235 goto out_release;
1236 if (req->rq_received)
1237 goto out_release;
1238
1239 task->tk_status = status;
1240
1241 switch (status) {
1242 case -ENOMEM:
1243 /* Protect against (udp|tcp)_write_space */
1244 spin_lock_bh(&xprt->sock_lock);
1245 if (!xprt_wspace(xprt)) {
1246 task->tk_timeout = req->rq_timeout.to_current;
1247 rpc_sleep_on(&xprt->sending, task, NULL, NULL);
1248 }
1249 spin_unlock_bh(&xprt->sock_lock);
1250 return;
1251 case -EAGAIN:
1252 /* Keep holding the socket if it is blocked */
1253 rpc_delay(task, HZ>>4);
1254 return;
1255 case -ECONNREFUSED:
1256 case -ENOTCONN:
1257 if (!xprt->stream)
1258 return;
1259 default:
1260 if (xprt->stream)
1261 xprt_disconnect(xprt);
1262 req->rq_bytes_sent = 0;
1263 goto out_release;
1264 }
1265
1266 out_receive:
1267 dprintk("RPC: %4d xmit complete\n", task->tk_pid);
1268 /* Set the task's receive timeout value */
1269 task->tk_timeout = req->rq_timeout.to_current;
1270 rpc_add_timer(task, xprt_timer);
1271 rpc_unlock_task(task);
1272 out_release:
1273 xprt_release_write(xprt, task);
1274 }
1275
1276 /*
1277 * Queue the task for a reply to our call.
1278 * When the callback is invoked, the congestion window should have
1279 * been updated already.
1280 */
1281 void
1282 xprt_receive(struct rpc_task *task)
1283 {
1284 struct rpc_rqst *req = task->tk_rqstp;
1285 struct rpc_xprt *xprt = req->rq_xprt;
1286
1287 dprintk("RPC: %4d xprt_receive\n", task->tk_pid);
1288
1289 req->rq_received = 0;
1290 task->tk_timeout = 0;
1291 rpc_sleep_locked(&xprt->pending, task, NULL, NULL);
1292 }
1293
1294 /*
1295 * Reserve an RPC call slot.
1296 */
1297 int
1298 xprt_reserve(struct rpc_task *task)
1299 {
1300 struct rpc_xprt *xprt = task->tk_xprt;
1301
1302 /* We already have an initialized request. */
1303 if (task->tk_rqstp)
1304 return 0;
1305
1306 dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n",
1307 task->tk_pid, xprt->cong, xprt->cwnd);
1308 spin_lock_bh(&xprt->xprt_lock);
1309 xprt_reserve_status(task);
1310 if (task->tk_rqstp) {
1311 task->tk_timeout = 0;
1312 } else if (!task->tk_timeout) {
1313 task->tk_status = -ENOBUFS;
1314 } else {
1315 dprintk("RPC: xprt_reserve waiting on backlog\n");
1316 task->tk_status = -EAGAIN;
1317 rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
1318 }
1319 spin_unlock_bh(&xprt->xprt_lock);
1320 dprintk("RPC: %4d xprt_reserve returns %d\n",
1321 task->tk_pid, task->tk_status);
1322 return task->tk_status;
1323 }
1324
1325 /*
1326 * Reservation callback
1327 */
1328 static void
1329 xprt_reserve_status(struct rpc_task *task)
1330 {
1331 struct rpc_xprt *xprt = task->tk_xprt;
1332 struct rpc_rqst *req;
1333
1334 if (xprt->shutdown) {
1335 task->tk_status = -EIO;
1336 } else if (task->tk_status < 0) {
1337 /* NOP */
1338 } else if (task->tk_rqstp) {
1339 /* We've already been given a request slot: NOP */
1340 } else {
1341 if (RPCXPRT_CONGESTED(xprt) || !(req = xprt->free))
1342 goto out_nofree;
1343 /* OK: There's room for us. Grab a free slot and bump
1344 * congestion value */
1345 xprt->free = req->rq_next;
1346 req->rq_next = NULL;
1347 xprt->cong += RPC_CWNDSCALE;
1348 task->tk_rqstp = req;
1349 xprt_request_init(task, xprt);
1350
1351 if (xprt->free)
1352 xprt_clear_backlog(xprt);
1353 }
1354
1355 return;
1356
1357 out_nofree:
1358 task->tk_status = -EAGAIN;
1359 }
1360
1361 /*
1362 * Initialize RPC request
1363 */
1364 static void
1365 xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
1366 {
1367 struct rpc_rqst *req = task->tk_rqstp;
1368 static u32 xid = 0;
1369
1370 if (!xid)
1371 xid = CURRENT_TIME << 12;
1372
1373 dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, req, xid);
1374 task->tk_status = 0;
1375 req->rq_timeout = xprt->timeout;
1376 req->rq_task = task;
1377 req->rq_xprt = xprt;
1378 req->rq_xid = xid++;
1379 if (!xid)
1380 xid++;
1381 }
1382
1383 /*
1384 * Release an RPC call slot
1385 */
1386 void
1387 xprt_release(struct rpc_task *task)
1388 {
1389 struct rpc_xprt *xprt = task->tk_xprt;
1390 struct rpc_rqst *req;
1391
1392 if (xprt->snd_task == task) {
1393 if (xprt->stream)
1394 xprt_disconnect(xprt);
1395 xprt_release_write(xprt, task);
1396 }
1397 if (!(req = task->tk_rqstp))
1398 return;
1399 task->tk_rqstp = NULL;
1400 memset(req, 0, sizeof(*req)); /* mark unused */
1401
1402 dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
1403
1404 /* remove slot from queue of pending */
1405 if (task->tk_rpcwait) {
1406 printk("RPC: task of released request still queued!\n");
1407 rpc_remove_wait_queue(task);
1408 }
1409
1410 spin_lock_bh(&xprt->xprt_lock);
1411 req->rq_next = xprt->free;
1412 xprt->free = req;
1413
1414 /* Decrease congestion value. */
1415 xprt->cong -= RPC_CWNDSCALE;
1416
1417 xprt_clear_backlog(xprt);
1418 spin_unlock_bh(&xprt->xprt_lock);
1419 }
1420
1421 /*
1422 * Set default timeout parameters
1423 */
1424 void
1425 xprt_default_timeout(struct rpc_timeout *to, int proto)
1426 {
1427 if (proto == IPPROTO_UDP)
1428 xprt_set_timeout(to, 5, 5 * HZ);
1429 else
1430 xprt_set_timeout(to, 5, 60 * HZ);
1431 }
1432
1433 /*
1434 * Set constant timeout
1435 */
1436 void
1437 xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
1438 {
1439 to->to_current =
1440 to->to_initval =
1441 to->to_increment = incr;
1442 to->to_maxval = incr * retr;
1443 to->to_resrvval = incr * retr;
1444 to->to_retries = retr;
1445 to->to_exponential = 0;
1446 }
1447
1448 /*
1449 * Initialize an RPC client
1450 */
1451 static struct rpc_xprt *
1452 xprt_setup(struct socket *sock, int proto,
1453 struct sockaddr_in *ap, struct rpc_timeout *to)
1454 {
1455 struct rpc_xprt *xprt;
1456 struct rpc_rqst *req;
1457 int i;
1458
1459 dprintk("RPC: setting up %s transport...\n",
1460 proto == IPPROTO_UDP? "UDP" : "TCP");
1461
1462 if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
1463 return NULL;
1464 memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
1465
1466 xprt->addr = *ap;
1467 xprt->prot = proto;
1468 xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;
1469 if (xprt->stream) {
1470 xprt->cwnd = RPC_MAXCWND;
1471 xprt->nocong = 1;
1472 } else
1473 xprt->cwnd = RPC_INITCWND;
1474 xprt->congtime = jiffies;
1475 spin_lock_init(&xprt->sock_lock);
1476 spin_lock_init(&xprt->xprt_lock);
1477 init_waitqueue_head(&xprt->cong_wait);
1478
1479 /* Set timeout parameters */
1480 if (to) {
1481 xprt->timeout = *to;
1482 xprt->timeout.to_current = to->to_initval;
1483 xprt->timeout.to_resrvval = to->to_maxval << 1;
1484 } else
1485 xprt_default_timeout(&xprt->timeout, xprt->prot);
1486
1487 xprt->pending = RPC_INIT_WAITQ("xprt_pending");
1488 xprt->sending = RPC_INIT_WAITQ("xprt_sending");
1489 xprt->backlog = RPC_INIT_WAITQ("xprt_backlog");
1490
1491 /* initialize free list */
1492 for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++)
1493 req->rq_next = req + 1;
1494 req->rq_next = NULL;
1495 xprt->free = xprt->slot;
1496
1497 INIT_LIST_HEAD(&xprt->rx_pending);
1498
1499 dprintk("RPC: created transport %p\n", xprt);
1500
1501 xprt_bind_socket(xprt, sock);
1502 return xprt;
1503 }
1504
1505 /*
1506 * Bind to a reserved port
1507 */
1508 static inline int
1509 xprt_bindresvport(struct socket *sock)
1510 {
1511 struct sockaddr_in myaddr;
1512 int err, port;
1513
1514 memset(&myaddr, 0, sizeof(myaddr));
1515 myaddr.sin_family = AF_INET;
1516 port = 800;
1517 do {
1518 myaddr.sin_port = htons(port);
1519 err = sock->ops->bind(sock, (struct sockaddr *) &myaddr,
1520 sizeof(myaddr));
1521 } while (err == -EADDRINUSE && --port > 0);
1522
1523 if (err < 0)
1524 printk("RPC: Can't bind to reserved port (%d).\n", -err);
1525
1526 return err;
1527 }
1528
1529 static int
1530 xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
1531 {
1532 struct sock *sk = sock->sk;
1533
1534 if (xprt->inet)
1535 return -EBUSY;
1536
1537 sk->user_data = xprt;
1538 xprt->old_data_ready = sk->data_ready;
1539 xprt->old_state_change = sk->state_change;
1540 xprt->old_write_space = sk->write_space;
1541 if (xprt->prot == IPPROTO_UDP) {
1542 sk->data_ready = udp_data_ready;
1543 sk->write_space = udp_write_space;
1544 sk->no_check = UDP_CSUM_NORCV;
1545 xprt_set_connected(xprt);
1546 } else {
1547 sk->data_ready = tcp_data_ready;
1548 sk->state_change = tcp_state_change;
1549 sk->write_space = tcp_write_space;
1550 xprt_clear_connected(xprt);
1551 }
1552
1553 /* Reset to new socket */
1554 xprt->sock = sock;
1555 xprt->inet = sk;
1556 /*
1557 * TCP requires the rpc I/O daemon is present
1558 */
1559 if(xprt->stream)
1560 rpciod_up();
1561
1562 return 0;
1563 }
1564
1565 /*
1566 * Create a client socket given the protocol and peer address.
1567 */
1568 static struct socket *
1569 xprt_create_socket(int proto, struct rpc_timeout *to)
1570 {
1571 struct socket *sock;
1572 int type, err;
1573
1574 dprintk("RPC: xprt_create_socket(%s %d)\n",
1575 (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
1576
1577 type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
1578
1579 if ((err = sock_create(PF_INET, type, proto, &sock)) < 0) {
1580 printk("RPC: can't create socket (%d).\n", -err);
1581 goto failed;
1582 }
1583
1584 /* If the caller has the capability, bind to a reserved port */
1585 if (capable(CAP_NET_BIND_SERVICE) && xprt_bindresvport(sock) < 0)
1586 goto failed;
1587
1588 return sock;
1589
1590 failed:
1591 sock_release(sock);
1592 return NULL;
1593 }
1594
1595 /*
1596 * Create an RPC client transport given the protocol and peer address.
1597 */
1598 struct rpc_xprt *
1599 xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
1600 {
1601 struct socket *sock;
1602 struct rpc_xprt *xprt;
1603
1604 dprintk("RPC: xprt_create_proto called\n");
1605
1606 if (!(sock = xprt_create_socket(proto, to)))
1607 return NULL;
1608
1609 if (!(xprt = xprt_setup(sock, proto, sap, to)))
1610 sock_release(sock);
1611
1612 return xprt;
1613 }
1614
1615 /*
1616 * Prepare for transport shutdown.
1617 */
1618 void
1619 xprt_shutdown(struct rpc_xprt *xprt)
1620 {
1621 xprt->shutdown = 1;
1622 rpc_wake_up(&xprt->sending);
1623 rpc_wake_up(&xprt->pending);
1624 rpc_wake_up(&xprt->backlog);
1625 if (waitqueue_active(&xprt->cong_wait))
1626 wake_up(&xprt->cong_wait);
1627 }
1628
1629 /*
1630 * Clear the xprt backlog queue
1631 */
1632 int
1633 xprt_clear_backlog(struct rpc_xprt *xprt) {
1634 if (RPCXPRT_CONGESTED(xprt))
1635 return 0;
1636 rpc_wake_up_next(&xprt->backlog);
1637 if (waitqueue_active(&xprt->cong_wait))
1638 wake_up(&xprt->cong_wait);
1639 return 1;
1640 }
1641
1642 /*
1643 * Destroy an RPC transport, killing off all requests.
1644 */
1645 int
1646 xprt_destroy(struct rpc_xprt *xprt)
1647 {
1648 dprintk("RPC: destroying transport %p\n", xprt);
1649 xprt_shutdown(xprt);
1650 xprt_close(xprt);
1651 kfree(xprt);
1652
1653 return 0;
1654 }
1655