File: /usr/src/linux/net/sunrpc/xprt.c

1     /*
2      *  linux/net/sunrpc/xprt.c
3      *
4      *  This is a generic RPC call interface supporting congestion avoidance,
5      *  and asynchronous calls.
6      *
7      *  The interface works like this:
8      *
9      *  -	When a process places a call, it allocates a request slot if
10      *	one is available. Otherwise, it sleeps on the backlog queue
11      *	(xprt_reserve).
12      *  -	Next, the caller puts together the RPC message, stuffs it into
13      *	the request struct, and calls xprt_call().
14      *  -	xprt_call transmits the message and installs the caller on the
15      *	socket's wait list. At the same time, it installs a timer that
16      *	is run after the packet's timeout has expired.
17      *  -	When a packet arrives, the data_ready handler walks the list of
18      *	pending requests for that socket. If a matching XID is found, the
19      *	caller is woken up, and the timer removed.
20      *  -	When no reply arrives within the timeout interval, the timer is
21      *	fired by the kernel and runs xprt_timer(). It either adjusts the
22      *	timeout values (minor timeout) or wakes up the caller with a status
23      *	of -ETIMEDOUT.
24      *  -	When the caller receives a notification from RPC that a reply arrived,
25      *	it should release the RPC slot, and process the reply.
26      *	If the call timed out, it may choose to retry the operation by
27      *	adjusting the initial timeout value, and simply calling rpc_call
28      *	again.
29      *
30      *  Support for async RPC is done through a set of RPC-specific scheduling
31      *  primitives that `transparently' work for processes as well as async
32      *  tasks that rely on callbacks.
33      *
34      *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
35      *
36      *  TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
37      *  TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
38      *  TCP NFS related read + write fixes
39      *   (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
40      *
41      *  Rewrite of larges part of the code in order to stabilize TCP stuff.
42      *  Fix behaviour when socket buffer is full.
43      *   (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
44      */
45     
46     #define __KERNEL_SYSCALLS__
47     
48     #include <linux/version.h>
49     #include <linux/types.h>
50     #include <linux/slab.h>
51     #include <linux/capability.h>
52     #include <linux/sched.h>
53     #include <linux/errno.h>
54     #include <linux/socket.h>
55     #include <linux/in.h>
56     #include <linux/net.h>
57     #include <linux/mm.h>
58     #include <linux/udp.h>
59     #include <linux/unistd.h>
60     #include <linux/sunrpc/clnt.h>
61     #include <linux/file.h>
62     
63     #include <net/sock.h>
64     #include <net/checksum.h>
65     #include <net/udp.h>
66     
67     #include <asm/uaccess.h>
68     
69     /* Following value should be > 32k + RPC overhead */
70     #define XPRT_MIN_WRITE_SPACE (35000 + SOCK_MIN_WRITE_SPACE)
71     
72     extern spinlock_t rpc_queue_lock;
73     
74     /*
75      * Local variables
76      */
77     
78     #ifdef RPC_DEBUG
79     # undef  RPC_DEBUG_DATA
80     # define RPCDBG_FACILITY	RPCDBG_XPRT
81     #endif
82     
83     /*
84      * Local functions
85      */
86     static void	xprt_request_init(struct rpc_task *, struct rpc_xprt *);
87     static void	do_xprt_transmit(struct rpc_task *);
88     static void	xprt_reserve_status(struct rpc_task *task);
89     static void	xprt_disconnect(struct rpc_xprt *);
90     static void	xprt_reconn_status(struct rpc_task *task);
91     static struct socket *xprt_create_socket(int, struct rpc_timeout *);
92     static int	xprt_bind_socket(struct rpc_xprt *, struct socket *);
93     static void	xprt_remove_pending(struct rpc_xprt *);
94     
95     #ifdef RPC_DEBUG_DATA
96     /*
97      * Print the buffer contents (first 128 bytes only--just enough for
98      * diropres return).
99      */
100     static void
101     xprt_pktdump(char *msg, u32 *packet, unsigned int count)
102     {
103     	u8	*buf = (u8 *) packet;
104     	int	j;
105     
106     	dprintk("RPC:      %s\n", msg);
107     	for (j = 0; j < count && j < 128; j += 4) {
108     		if (!(j & 31)) {
109     			if (j)
110     				dprintk("\n");
111     			dprintk("0x%04x ", j);
112     		}
113     		dprintk("%02x%02x%02x%02x ",
114     			buf[j], buf[j+1], buf[j+2], buf[j+3]);
115     	}
116     	dprintk("\n");
117     }
118     #else
119     static inline void
120     xprt_pktdump(char *msg, u32 *packet, unsigned int count)
121     {
122     	/* NOP */
123     }
124     #endif
125     
126     /*
127      * Look up RPC transport given an INET socket
128      */
129     static inline struct rpc_xprt *
130     xprt_from_sock(struct sock *sk)
131     {
132     	return (struct rpc_xprt *) sk->user_data;
133     }
134     
135     /*
136      *	Adjust the iovec to move on 'n' bytes
137      */
138      
139     extern inline void
140     xprt_move_iov(struct msghdr *msg, struct iovec *niv, unsigned amount)
141     {
142     	struct iovec *iv=msg->msg_iov;
143     	int i;
144     	
145     	/*
146     	 *	Eat any sent iovecs
147     	 */
148     	while (iv->iov_len <= amount) {
149     		amount -= iv->iov_len;
150     		iv++;
151     		msg->msg_iovlen--;
152     	}
153     
154     	/*
155     	 *	And chew down the partial one
156     	 */
157     	niv[0].iov_len = iv->iov_len-amount;
158     	niv[0].iov_base =((unsigned char *)iv->iov_base)+amount;
159     	iv++;
160     
161     	/*
162     	 *	And copy any others
163     	 */
164     	for(i = 1; i < msg->msg_iovlen; i++)
165     		niv[i]=*iv++;
166     
167     	msg->msg_iov=niv;
168     }
169     
170     /*
171      * Serialize write access to sockets, in order to prevent different
172      * requests from interfering with each other.
173      * Also prevents TCP socket reconnections from colliding with writes.
174      */
175     static int
176     xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
177     {
178     	int retval;
179     	spin_lock_bh(&xprt->sock_lock);
180     	if (!xprt->snd_task)
181     		xprt->snd_task = task;
182     	else if (xprt->snd_task != task) {
183     		dprintk("RPC: %4d TCP write queue full (task %d)\n",
184     			task->tk_pid, xprt->snd_task->tk_pid);
185     		task->tk_timeout = 0;
186     		task->tk_status = -EAGAIN;
187     		rpc_sleep_on(&xprt->sending, task, NULL, NULL);
188     	}
189     	retval = xprt->snd_task == task;
190     	spin_unlock_bh(&xprt->sock_lock);
191     	return retval;
192     }
193     
194     /*
195      * Releases the socket for use by other requests.
196      */
197     static void
198     xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
199     {
200     	spin_lock_bh(&xprt->sock_lock);
201     	if (xprt->snd_task == task) {
202     		xprt->snd_task = NULL;
203     		rpc_wake_up_next(&xprt->sending);
204     	}
205     	spin_unlock_bh(&xprt->sock_lock);
206     }
207     
208     /*
209      * Write data to socket.
210      */
211     static inline int
212     xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
213     {
214     	struct socket	*sock = xprt->sock;
215     	struct msghdr	msg;
216     	mm_segment_t	oldfs;
217     	int		result;
218     	int		slen = req->rq_slen - req->rq_bytes_sent;
219     	struct iovec	niv[MAX_IOVEC];
220     
221     	if (slen <= 0)
222     		return 0;
223     
224     	if (!sock)
225     		return -ENOTCONN;
226     
227     	xprt_pktdump("packet data:",
228     				req->rq_svec->iov_base,
229     				req->rq_svec->iov_len);
230     
231     	msg.msg_flags   = MSG_DONTWAIT|MSG_NOSIGNAL;
232     	msg.msg_iov	= req->rq_svec;
233     	msg.msg_iovlen	= req->rq_snr;
234     	msg.msg_name	= (struct sockaddr *) &xprt->addr;
235     	msg.msg_namelen = sizeof(xprt->addr);
236     	msg.msg_control = NULL;
237     	msg.msg_controllen = 0;
238     
239     	/* Dont repeat bytes */
240     	if (req->rq_bytes_sent)
241     		xprt_move_iov(&msg, niv, req->rq_bytes_sent);
242     
243     	oldfs = get_fs(); set_fs(get_ds());
244     	result = sock_sendmsg(sock, &msg, slen);
245     	set_fs(oldfs);
246     
247     	dprintk("RPC:      xprt_sendmsg(%d) = %d\n", slen, result);
248     
249     	if (result >= 0)
250     		return result;
251     
252     	switch (result) {
253     	case -ECONNREFUSED:
254     		/* When the server has died, an ICMP port unreachable message
255     		 * prompts ECONNREFUSED.
256     		 */
257     		break;
258     	case -EAGAIN:
259     		if (test_bit(SOCK_NOSPACE, &sock->flags))
260     			result = -ENOMEM;
261     		break;
262     	case -ENOTCONN:
263     	case -EPIPE:
264     		/* connection broken */
265     		if (xprt->stream)
266     			result = -ENOTCONN;
267     		break;
268     	default:
269     		printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result);
270     	}
271     	return result;
272     }
273     
274     /*
275      * Read data from socket
276      */
277     static int
278     xprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, unsigned len, unsigned shift)
279     {
280     	struct socket	*sock = xprt->sock;
281     	struct msghdr	msg;
282     	mm_segment_t	oldfs;
283     	struct iovec	niv[MAX_IOVEC];
284     	int		result;
285     
286     	if (!sock)
287     		return -ENOTCONN;
288     
289     	msg.msg_flags   = MSG_DONTWAIT|MSG_NOSIGNAL;
290     	msg.msg_iov	= iov;
291     	msg.msg_iovlen	= nr;
292     	msg.msg_name	= NULL;
293     	msg.msg_namelen = 0;
294     	msg.msg_control = NULL;
295     	msg.msg_controllen = 0;
296     
297     	/* Adjust the iovec if we've already filled it */
298     	if (shift)
299     		xprt_move_iov(&msg, niv, shift);
300     
301     	oldfs = get_fs(); set_fs(get_ds());
302     	result = sock_recvmsg(sock, &msg, len, MSG_DONTWAIT);
303     	set_fs(oldfs);
304     
305     	dprintk("RPC:      xprt_recvmsg(iov %p, len %d) = %d\n",
306     						iov, len, result);
307     	return result;
308     }
309     
310     
311     /*
312      * Adjust RPC congestion window
313      * We use a time-smoothed congestion estimator to avoid heavy oscillation.
314      */
315     static void
316     xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
317     {
318     	unsigned long	cwnd;
319     
320     	if (xprt->nocong)
321     		return;
322     	/*
323     	 * Note: we're in a BH context
324     	 */
325     	spin_lock(&xprt->xprt_lock);
326     	cwnd = xprt->cwnd;
327     	if (result >= 0) {
328     		if (xprt->cong < cwnd || time_before(jiffies, xprt->congtime))
329     			goto out;
330     		/* The (cwnd >> 1) term makes sure
331     		 * the result gets rounded properly. */
332     		cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
333     		if (cwnd > RPC_MAXCWND)
334     			cwnd = RPC_MAXCWND;
335     		else
336     			pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);
337     		xprt->congtime = jiffies + ((cwnd * HZ) << 2) / RPC_CWNDSCALE;
338     		dprintk("RPC:      cong %08lx, cwnd was %08lx, now %08lx, "
339     			"time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,
340     			(xprt->congtime-jiffies)*1000/HZ);
341     	} else if (result == -ETIMEDOUT) {
342     		if ((cwnd >>= 1) < RPC_CWNDSCALE)
343     			cwnd = RPC_CWNDSCALE;
344     		xprt->congtime = jiffies + ((cwnd * HZ) << 3) / RPC_CWNDSCALE;
345     		dprintk("RPC:      cong %ld, cwnd was %ld, now %ld, "
346     			"time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,
347     			(xprt->congtime-jiffies)*1000/HZ);
348     		pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);
349     	}
350     
351     	xprt->cwnd = cwnd;
352      out:
353     	spin_unlock(&xprt->xprt_lock);
354     }
355     
356     /*
357      * Adjust timeout values etc for next retransmit
358      */
359     int
360     xprt_adjust_timeout(struct rpc_timeout *to)
361     {
362     	if (to->to_retries > 0) {
363     		if (to->to_exponential)
364     			to->to_current <<= 1;
365     		else
366     			to->to_current += to->to_increment;
367     		if (to->to_maxval && to->to_current >= to->to_maxval)
368     			to->to_current = to->to_maxval;
369     	} else {
370     		if (to->to_exponential)
371     			to->to_initval <<= 1;
372     		else
373     			to->to_initval += to->to_increment;
374     		if (to->to_maxval && to->to_initval >= to->to_maxval)
375     			to->to_initval = to->to_maxval;
376     		to->to_current = to->to_initval;
377     	}
378     
379     	if (!to->to_current) {
380     		printk(KERN_WARNING "xprt_adjust_timeout: to_current = 0!\n");
381     		to->to_current = 5 * HZ;
382     	}
383     	pprintk("RPC: %lu %s\n", jiffies,
384     			to->to_retries? "retrans" : "timeout");
385     	return to->to_retries-- > 0;
386     }
387     
388     /*
389      * Close down a transport socket
390      */
391     static void
392     xprt_close(struct rpc_xprt *xprt)
393     {
394     	struct socket	*sock = xprt->sock;
395     	struct sock	*sk = xprt->inet;
396     
397     	if (!sk)
398     		return;
399     
400     	xprt->inet = NULL;
401     	xprt->sock = NULL;
402     
403     	sk->user_data    = NULL;
404     	sk->data_ready   = xprt->old_data_ready;
405     	sk->state_change = xprt->old_state_change;
406     	sk->write_space  = xprt->old_write_space;
407     
408     	xprt_disconnect(xprt);
409     	sk->no_check	 = 0;
410     
411     	sock_release(sock);
412     	/*
413     	 *	TCP doesnt require the rpciod now - other things may
414     	 *	but rpciod handles that not us.
415     	 */
416     	if(xprt->stream)
417     		rpciod_down();
418     }
419     
420     /*
421      * Mark a transport as disconnected
422      */
423     static void
424     xprt_disconnect(struct rpc_xprt *xprt)
425     {
426     	dprintk("RPC:      disconnected transport %p\n", xprt);
427     	xprt_clear_connected(xprt);
428     	xprt_remove_pending(xprt);
429     	rpc_wake_up_status(&xprt->pending, -ENOTCONN);
430     }
431     
432     /*
433      * Reconnect a broken TCP connection.
434      *
435      * Note: This cannot collide with the TCP reads, as both run from rpciod
436      */
437     void
438     xprt_reconnect(struct rpc_task *task)
439     {
440     	struct rpc_xprt	*xprt = task->tk_xprt;
441     	struct socket	*sock = xprt->sock;
442     	struct sock	*inet = xprt->inet;
443     	int		status;
444     
445     	dprintk("RPC: %4d xprt_reconnect %p connected %d\n",
446     				task->tk_pid, xprt, xprt_connected(xprt));
447     	if (xprt->shutdown)
448     		return;
449     
450     	if (!xprt->stream)
451     		return;
452     
453     	if (!xprt->addr.sin_port) {
454     		task->tk_status = -EIO;
455     		return;
456     	}
457     
458     	if (!xprt_lock_write(xprt, task))
459     		return;
460     	if (xprt_connected(xprt))
461     		goto out_write;
462     
463     	status = -ENOTCONN;
464     	if (!inet) {
465     		/* Create an unconnected socket */
466     		if (!(sock = xprt_create_socket(xprt->prot, &xprt->timeout)))
467     			goto defer;
468     		xprt_bind_socket(xprt, sock);
469     		inet = sock->sk;
470     	}
471     
472     	xprt_disconnect(xprt);
473     
474     	/* Reset TCP record info */
475     	xprt->tcp_offset = 0;
476     	xprt->tcp_reclen = 0;
477     	xprt->tcp_copied = 0;
478     	xprt->tcp_more = 0;
479     
480     	/* Now connect it asynchronously. */
481     	dprintk("RPC: %4d connecting new socket\n", task->tk_pid);
482     	status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
483     				sizeof(xprt->addr), O_NONBLOCK);
484     
485     	if (status < 0) {
486     		switch (status) {
487     		case -EALREADY:
488     		case -EINPROGRESS:
489     			status = 0;
490     			break;
491     		case -EISCONN:
492     		case -EPIPE:
493     			status = 0;
494     			xprt_close(xprt);
495     			goto defer;
496     		default:
497     			printk("RPC: TCP connect error %d!\n", -status);
498     			xprt_close(xprt);
499     			goto defer;
500     		}
501     
502     		dprintk("RPC: %4d connect status %d connected %d\n",
503     				task->tk_pid, status, xprt_connected(xprt));
504     
505     		spin_lock_bh(&xprt->sock_lock);
506     		if (!xprt_connected(xprt)) {
507     			task->tk_timeout = xprt->timeout.to_maxval;
508     			rpc_sleep_on(&xprt->sending, task, xprt_reconn_status, NULL);
509     			spin_unlock_bh(&xprt->sock_lock);
510     			return;
511     		}
512     		spin_unlock_bh(&xprt->sock_lock);
513     	}
514     defer:
515     	if (status < 0) {
516     		rpc_delay(task, 5*HZ);
517     		task->tk_status = -ENOTCONN;
518     	}
519      out_write:
520     	xprt_release_write(xprt, task);
521     }
522     
523     /*
524      * Reconnect timeout. We just mark the transport as not being in the
525      * process of reconnecting, and leave the rest to the upper layers.
526      */
527     static void
528     xprt_reconn_status(struct rpc_task *task)
529     {
530     	struct rpc_xprt	*xprt = task->tk_xprt;
531     
532     	dprintk("RPC: %4d xprt_reconn_timeout %d\n",
533     				task->tk_pid, task->tk_status);
534     
535     	xprt_release_write(xprt, task);
536     }
537     
538     /*
539      * Look up the RPC request corresponding to a reply, and then lock it.
540      */
541     static inline struct rpc_rqst *
542     xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
543     {
544     	struct rpc_task	*head, *task;
545     	struct rpc_rqst	*req;
546     	int		safe = 0;
547     
548     	spin_lock_bh(&rpc_queue_lock);
549     	if ((head = xprt->pending.task) != NULL) {
550     		task = head;
551     		do {
552     			if ((req = task->tk_rqstp) && req->rq_xid == xid)
553     				goto out;
554     			task = task->tk_next;
555     			if (++safe > 100) {
556     				printk("xprt_lookup_rqst: loop in Q!\n");
557     				goto out_bad;
558     			}
559     		} while (task != head);
560     	}
561     	dprintk("RPC:      unknown XID %08x in reply.\n", xid);
562      out_bad:
563     	req = NULL;
564      out:
565     	if (req && !__rpc_lock_task(req->rq_task))
566     		req = NULL;
567     	spin_unlock_bh(&rpc_queue_lock);
568     	return req;
569     }
570     
571     /*
572      * Complete reply received.
573      * The TCP code relies on us to remove the request from xprt->pending.
574      */
575     static inline void
576     xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
577     {
578     	struct rpc_task	*task = req->rq_task;
579     
580     	/* Adjust congestion window */
581     	xprt_adjust_cwnd(xprt, copied);
582     
583     #ifdef RPC_PROFILE
584     	/* Profile only reads for now */
585     	if (copied > 1024) {
586     		static unsigned long	nextstat = 0;
587     		static unsigned long	pkt_rtt = 0, pkt_len = 0, pkt_cnt = 0;
588     
589     		pkt_cnt++;
590     		pkt_len += req->rq_slen + copied;
591     		pkt_rtt += jiffies - req->rq_xtime;
592     		if (time_before(nextstat, jiffies)) {
593     			printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd);
594     			printk("RPC: %ld %ld %ld %ld stat\n",
595     					jiffies, pkt_cnt, pkt_len, pkt_rtt);
596     			pkt_rtt = pkt_len = pkt_cnt = 0;
597     			nextstat = jiffies + 5 * HZ;
598     		}
599     	}
600     #endif
601     
602     	dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
603     	task->tk_status = copied;
604     	req->rq_received = 1;
605     
606     	/* ... and wake up the process. */
607     	rpc_wake_up_task(task);
608     	return;
609     }
610     
611     /*
612      * We have set things up such that we perform the checksum of the UDP
613      * packet in parallel with the copies into the RPC client iovec.  -DaveM
614      */
615     static int csum_partial_copy_to_page_cache(struct iovec *iov,
616     					   struct sk_buff *skb,
617     					   int copied)
618     {
619     	int offset = sizeof(struct udphdr);
620     	__u8 *cur_ptr = iov->iov_base;
621     	__kernel_size_t cur_len = iov->iov_len;
622     	unsigned int csum = skb->csum;
623     	int need_csum = (skb->ip_summed != CHECKSUM_UNNECESSARY);
624     	int slack = skb->len - copied - sizeof(struct udphdr);
625     
626     	if (need_csum)
627     		csum = csum_partial(skb->data, sizeof(struct udphdr), csum);
628     	while (copied > 0) {
629     		if (cur_len) {
630     			int to_move = cur_len;
631     			if (to_move > copied)
632     				to_move = copied;
633     			if (need_csum)
634     				csum = skb_copy_and_csum_bits(skb, offset, cur_ptr,
635     							      to_move, csum);
636     			else
637     				skb_copy_bits(skb, offset, cur_ptr, to_move);
638     			offset += to_move;
639     			copied -= to_move;
640     			cur_ptr += to_move;
641     			cur_len -= to_move;
642     		}
643     		if (cur_len <= 0) {
644     			iov++;
645     			cur_len = iov->iov_len;
646     			cur_ptr = iov->iov_base;
647     		}
648     	}
649     	if (need_csum) {
650     		if (slack > 0)
651     			csum = skb_checksum(skb, offset, slack, csum);
652     		if ((unsigned short)csum_fold(csum))
653     			return -1;
654     	}
655     	return 0;
656     }
657     
658     /*
659      * Input handler for RPC replies. Called from a bottom half and hence
660      * atomic.
661      */
662     static void
663     udp_data_ready(struct sock *sk, int len)
664     {
665     	struct rpc_task	*task;
666     	struct rpc_xprt	*xprt;
667     	struct rpc_rqst *rovr;
668     	struct sk_buff	*skb;
669     	int		err, repsize, copied;
670     
671     	dprintk("RPC:      udp_data_ready...\n");
672     	if (!(xprt = xprt_from_sock(sk))) {
673     		printk("RPC:      udp_data_ready request not found!\n");
674     		goto out;
675     	}
676     
677     	dprintk("RPC:      udp_data_ready client %p\n", xprt);
678     
679     	if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
680     		goto out;
681     
682     	if (xprt->shutdown)
683     		goto dropit;
684     
685     	repsize = skb->len - sizeof(struct udphdr);
686     	if (repsize < 4) {
687     		printk("RPC: impossible RPC reply size %d!\n", repsize);
688     		goto dropit;
689     	}
690     
691     	/* Look up and lock the request corresponding to the given XID */
692     	rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + sizeof(struct udphdr)));
693     	if (!rovr)
694     		goto dropit;
695     	task = rovr->rq_task;
696     
697     	dprintk("RPC: %4d received reply\n", task->tk_pid);
698     	xprt_pktdump("packet data:",
699     		     (u32 *) (skb->h.raw+sizeof(struct udphdr)), repsize);
700     
701     	if ((copied = rovr->rq_rlen) > repsize)
702     		copied = repsize;
703     
704     	/* Suck it into the iovec, verify checksum if not done by hw. */
705     	if (csum_partial_copy_to_page_cache(rovr->rq_rvec, skb, copied))
706     		goto out_unlock;
707     
708     	/* Something worked... */
709     	dst_confirm(skb->dst);
710     
711     	xprt_complete_rqst(xprt, rovr, copied);
712     
713      out_unlock:
714     	rpc_unlock_task(task);
715     
716      dropit:
717     	skb_free_datagram(sk, skb);
718      out:
719     	if (sk->sleep && waitqueue_active(sk->sleep))
720     		wake_up_interruptible(sk->sleep);
721     }
722     
723     /*
724      * TCP read fragment marker
725      */
726     static inline int
727     tcp_read_fraghdr(struct rpc_xprt *xprt)
728     {
729     	struct iovec	riov;
730     	int		want, result;
731     
732     	if (xprt->tcp_offset >= sizeof(xprt->tcp_recm))
733     		goto done;
734     
735     	want = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
736     	dprintk("RPC:      reading header (%d bytes)\n", want);
737     	do {
738     		riov.iov_base = ((u8*) &xprt->tcp_recm) + xprt->tcp_offset;
739     		riov.iov_len  = want;
740     		result = xprt_recvmsg(xprt, &riov, 1, want, 0);
741     		if (result < 0)
742     			return result;
743     		xprt->tcp_offset += result;
744     		want -= result;
745     	} while (want);
746     
747     	/* Get the record length and mask out the last fragment bit */
748     	xprt->tcp_reclen = ntohl(xprt->tcp_recm);
749     	xprt->tcp_more = (xprt->tcp_reclen & 0x80000000) ? 0 : 1;
750     	xprt->tcp_reclen &= 0x7fffffff;
751     
752     	dprintk("RPC:      New record reclen %d morefrags %d\n",
753     				   xprt->tcp_reclen, xprt->tcp_more);
754      done:
755     	return xprt->tcp_reclen + sizeof(xprt->tcp_recm) - xprt->tcp_offset;
756     }
757     
758     /*
759      * TCP read xid
760      */
761     static inline int
762     tcp_read_xid(struct rpc_xprt *xprt, int avail)
763     {
764     	struct iovec	riov;
765     	int		want, result;
766     
767     	if (xprt->tcp_copied >= sizeof(xprt->tcp_xid) || !avail)
768     		goto done;
769     	want = min_t(unsigned int, sizeof(xprt->tcp_xid) - xprt->tcp_copied, avail);
770     	do {
771     		dprintk("RPC:      reading xid (%d bytes)\n", want);
772     		riov.iov_base = ((u8*) &xprt->tcp_xid) + xprt->tcp_copied;
773     		riov.iov_len  = want;
774     		result = xprt_recvmsg(xprt, &riov, 1, want, 0);
775     		if (result < 0)
776     			return result;
777     		xprt->tcp_copied += result;
778     		xprt->tcp_offset += result;
779     		want  -= result;
780     		avail -= result;
781     	} while (want);
782      done:
783     	return avail;
784     }
785     
786     /*
787      * TCP read and complete request
788      */
789     static inline int
790     tcp_read_request(struct rpc_xprt *xprt, struct rpc_rqst *req, int avail)
791     {
792     	int	want, result;
793     
794     	if (req->rq_rlen <= xprt->tcp_copied || !avail)
795     		goto done;
796     	want = min_t(unsigned int, req->rq_rlen - xprt->tcp_copied, avail);
797     	do {
798     		dprintk("RPC: %4d TCP receiving %d bytes\n",
799     			req->rq_task->tk_pid, want);
800     
801     		result = xprt_recvmsg(xprt, req->rq_rvec, req->rq_rnr, want, xprt->tcp_copied);
802     		if (result < 0)
803     			return result;
804     		xprt->tcp_copied += result;
805     		xprt->tcp_offset += result;
806     		avail  -= result;
807     		want   -= result;
808     	} while (want);
809     
810      done:
811     	if (req->rq_rlen > xprt->tcp_copied && xprt->tcp_more)
812     		return avail;
813     	dprintk("RPC: %4d received reply complete\n", req->rq_task->tk_pid);
814     	xprt_complete_rqst(xprt, req, xprt->tcp_copied);
815     
816     	return avail;
817     }
818     
819     /*
820      * TCP discard extra bytes from a short read
821      */
822     static inline int
823     tcp_read_discard(struct rpc_xprt *xprt, int avail)
824     {
825     	struct iovec	riov;
826     	static u8	dummy[64];
827     	int		want, result = 0;
828     
829     	while (avail) {
830     		want = min_t(unsigned int, avail, sizeof(dummy));
831     		riov.iov_base = dummy;
832     		riov.iov_len  = want;
833     		dprintk("RPC:      TCP skipping %d bytes\n", want);
834     		result = xprt_recvmsg(xprt, &riov, 1, want, 0);
835     		if (result < 0)
836     			return result;
837     		xprt->tcp_offset += result;
838     		avail  -= result;
839     	}
840     	return avail;
841     }
842     
843     /*
844      * TCP record receive routine
845      * This is not the most efficient code since we call recvfrom thrice--
846      * first receiving the record marker, then the XID, then the data.
847      * 
848      * The optimal solution would be a RPC support in the TCP layer, which
849      * would gather all data up to the next record marker and then pass us
850      * the list of all TCP segments ready to be copied.
851      */
852     static int
853     tcp_input_record(struct rpc_xprt *xprt)
854     {
855     	struct rpc_rqst	*req = NULL;
856     	struct rpc_task	*task = NULL;
857     	int		avail, result;
858     
859     	dprintk("RPC:      tcp_input_record\n");
860     
861     	if (xprt->shutdown)
862     		return -EIO;
863     	if (!xprt_connected(xprt))
864     		return -ENOTCONN;
865     
866     	/* Read in a new fragment marker if necessary */
867     	/* Can we ever really expect to get completely empty fragments? */
868     	if ((result = tcp_read_fraghdr(xprt)) < 0)
869     		return result;
870     	avail = result;
871     
872     	/* Read in the xid if necessary */
873     	if ((result = tcp_read_xid(xprt, avail)) < 0)
874     		return result;
875     	if (!(avail = result))
876     		goto out_ok;
877     
878     	/* Find and lock the request corresponding to this xid */
879     	req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
880     	if (req) {
881     		task = req->rq_task;
882     		/* Read in the request data */
883     		result = tcp_read_request(xprt,  req, avail);
884     		rpc_unlock_task(task);
885     		if (result < 0)
886     			return result;
887     		avail = result;
888     	}
889     
890     	/* Skip over any trailing bytes on short reads */
891     	if ((result = tcp_read_discard(xprt, avail)) < 0)
892     		return result;
893     
894      out_ok:
895     	dprintk("RPC:      tcp_input_record done (off %d reclen %d copied %d)\n",
896     			xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_copied);
897     	result = xprt->tcp_reclen;
898     	xprt->tcp_reclen = 0;
899     	xprt->tcp_offset = 0;
900     	if (!xprt->tcp_more)
901     		xprt->tcp_copied = 0;
902     	return result;
903     }
904     
905     /*
906      *	TCP task queue stuff
907      */
908     LIST_HEAD(rpc_xprt_pending);	/* List of xprts having pending tcp requests */
909     
910     static inline
911     void tcp_rpciod_queue(void)
912     {
913     	rpciod_wake_up();
914     }
915     
916     int xprt_tcp_pending(void)
917     {
918     	int retval;
919     
920     	spin_lock_bh(&rpc_queue_lock);
921     	retval = !list_empty(&rpc_xprt_pending);
922     	spin_unlock_bh(&rpc_queue_lock);
923     	return retval;
924     }
925     
926     static inline
927     void xprt_append_pending(struct rpc_xprt *xprt)
928     {
929     	spin_lock_bh(&rpc_queue_lock);
930     	if (list_empty(&xprt->rx_pending)) {
931     		list_add(&xprt->rx_pending, rpc_xprt_pending.prev);
932     		dprintk("RPC:     xprt queue %p\n", xprt);
933     		tcp_rpciod_queue();
934     	}
935     	spin_unlock_bh(&rpc_queue_lock);
936     }
937     
938     static
939     void xprt_remove_pending(struct rpc_xprt *xprt)
940     {
941     	spin_lock_bh(&rpc_queue_lock);
942     	if (!list_empty(&xprt->rx_pending)) {
943     		list_del(&xprt->rx_pending);
944     		INIT_LIST_HEAD(&xprt->rx_pending);
945     	}
946     	spin_unlock_bh(&rpc_queue_lock);
947     }
948     
949     static inline
950     struct rpc_xprt *xprt_remove_pending_next(void)
951     {
952     	struct rpc_xprt	*xprt = NULL;
953     
954     	spin_lock_bh(&rpc_queue_lock);
955     	if (!list_empty(&rpc_xprt_pending)) {
956     		xprt = list_entry(rpc_xprt_pending.next, struct rpc_xprt, rx_pending);
957     		list_del(&xprt->rx_pending);
958     		INIT_LIST_HEAD(&xprt->rx_pending);
959     	}
960     	spin_unlock_bh(&rpc_queue_lock);
961     	return xprt;
962     }
963     
964     /*
965      *	This is protected from tcp_data_ready and the stack as its run
966      *	inside of the RPC I/O daemon
967      */
968     void
969     __rpciod_tcp_dispatcher(void)
970     {
971     	struct rpc_xprt *xprt;
972     	int safe_retry = 0, result;
973     
974     	dprintk("rpciod_tcp_dispatcher: Queue Running\n");
975     
976     	/*
977     	 *	Empty each pending socket
978     	 */
979     	while ((xprt = xprt_remove_pending_next()) != NULL) {
980     		dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt);
981     
982     		do {
983     			result = tcp_input_record(xprt);
984     		} while (result >= 0);
985     
986     		if (safe_retry++ > 200) {
987     			schedule();
988     			safe_retry = 0;
989     		}
990     	}
991     }
992     
993     /*
994      *	data_ready callback for TCP. We can't just jump into the
995      *	tcp recvmsg functions inside of the network receive bh or
996      * 	bad things occur. We queue it to pick up after networking
997      *	is done.
998      */
999      
1000     static void tcp_data_ready(struct sock *sk, int len)
1001     {
1002     	struct rpc_xprt	*xprt;
1003     
1004     	dprintk("RPC:      tcp_data_ready...\n");
1005     	if (!(xprt = xprt_from_sock(sk)))
1006     	{
1007     		printk("Not a socket with xprt %p\n", sk);
1008     		goto out;
1009     	}
1010     
1011     	if (xprt->shutdown)
1012     		goto out;
1013     
1014     	xprt_append_pending(xprt);
1015     
1016     	dprintk("RPC:      tcp_data_ready client %p\n", xprt);
1017     	dprintk("RPC:      state %x conn %d dead %d zapped %d\n",
1018     				sk->state, xprt_connected(xprt),
1019     				sk->dead, sk->zapped);
1020      out:
1021     	if (sk->sleep && waitqueue_active(sk->sleep))
1022     		wake_up_interruptible(sk->sleep);
1023     }
1024     
1025     
1026     static void
1027     tcp_state_change(struct sock *sk)
1028     {
1029     	struct rpc_xprt	*xprt;
1030     
1031     	if (!(xprt = xprt_from_sock(sk)))
1032     		goto out;
1033     	dprintk("RPC:      tcp_state_change client %p...\n", xprt);
1034     	dprintk("RPC:      state %x conn %d dead %d zapped %d\n",
1035     				sk->state, xprt_connected(xprt),
1036     				sk->dead, sk->zapped);
1037     
1038     	switch (sk->state) {
1039     	case TCP_ESTABLISHED:
1040     		if (xprt_test_and_set_connected(xprt))
1041     			break;
1042     		spin_lock(&xprt->sock_lock);
1043     		if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
1044     			rpc_wake_up_task(xprt->snd_task);
1045     		spin_unlock(&xprt->sock_lock);
1046     		break;
1047     	case TCP_SYN_SENT:
1048     	case TCP_SYN_RECV:
1049     		break;
1050     	default:
1051     		xprt_disconnect(xprt);
1052     		break;
1053     	}
1054      out:
1055     	if (sk->sleep && waitqueue_active(sk->sleep))
1056     		wake_up_interruptible_all(sk->sleep);
1057     }
1058     
1059     /*
1060      * The following 2 routines allow a task to sleep while socket memory is
1061      * low.
1062      */
1063     static void
1064     tcp_write_space(struct sock *sk)
1065     {
1066     	struct rpc_xprt	*xprt;
1067     	struct socket	*sock;
1068     
1069     	if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->socket))
1070     		return;
1071     	if (xprt->shutdown)
1072     		return;
1073     
1074     	/* Wait until we have enough socket memory */
1075     	if (!sock_writeable(sk))
1076     		return;
1077     
1078     	if (!xprt_test_and_set_wspace(xprt)) {
1079     		spin_lock(&xprt->sock_lock);
1080     		if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
1081     			rpc_wake_up_task(xprt->snd_task);
1082     		spin_unlock(&xprt->sock_lock);
1083     	}
1084     
1085     	if (test_bit(SOCK_NOSPACE, &sock->flags)) {
1086     		if (sk->sleep && waitqueue_active(sk->sleep)) {
1087     			clear_bit(SOCK_NOSPACE, &sock->flags);
1088     			wake_up_interruptible(sk->sleep);
1089     		}
1090     	}
1091     }
1092     
1093     static void
1094     udp_write_space(struct sock *sk)
1095     {
1096     	struct rpc_xprt *xprt;
1097     
1098     	if (!(xprt = xprt_from_sock(sk)))
1099     		return;
1100     	if (xprt->shutdown)
1101     		return;
1102     
1103     
1104     	/* Wait until we have enough socket memory */
1105     	if (sock_wspace(sk) < min_t(int, sk->sndbuf,XPRT_MIN_WRITE_SPACE))
1106     		return;
1107     
1108     	if (!xprt_test_and_set_wspace(xprt)) {
1109     		spin_lock(&xprt->sock_lock);
1110     		if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
1111     			rpc_wake_up_task(xprt->snd_task);
1112     		spin_unlock(&xprt->sock_lock);
1113     	}
1114     
1115     	if (sk->sleep && waitqueue_active(sk->sleep))
1116     		wake_up_interruptible(sk->sleep);
1117     }
1118     
1119     /*
1120      * RPC receive timeout handler.
1121      */
1122     static void
1123     xprt_timer(struct rpc_task *task)
1124     {
1125     	struct rpc_rqst	*req = task->tk_rqstp;
1126     
1127     	if (req)
1128     		xprt_adjust_cwnd(task->tk_xprt, -ETIMEDOUT);
1129     
1130     	dprintk("RPC: %4d xprt_timer (%s request)\n",
1131     		task->tk_pid, req ? "pending" : "backlogged");
1132     
1133     	task->tk_status  = -ETIMEDOUT;
1134     	task->tk_timeout = 0;
1135     	rpc_wake_up_task(task);
1136     }
1137     
1138     /*
1139      * Place the actual RPC call.
1140      * We have to copy the iovec because sendmsg fiddles with its contents.
1141      */
1142     void
1143     xprt_transmit(struct rpc_task *task)
1144     {
1145     	struct rpc_rqst	*req = task->tk_rqstp;
1146     	struct rpc_xprt	*xprt = req->rq_xprt;
1147     
1148     	dprintk("RPC: %4d xprt_transmit(%x)\n", task->tk_pid, 
1149     				*(u32 *)(req->rq_svec[0].iov_base));
1150     
1151     	if (xprt->shutdown)
1152     		task->tk_status = -EIO;
1153     
1154     	if (!xprt_connected(xprt))
1155     		task->tk_status = -ENOTCONN;
1156     
1157     	if (task->tk_status < 0)
1158     		return;
1159     
1160     	if (task->tk_rpcwait)
1161     		rpc_remove_wait_queue(task);
1162     
1163     	/* set up everything as needed. */
1164     	/* Write the record marker */
1165     	if (xprt->stream) {
1166     		u32	*marker = req->rq_svec[0].iov_base;
1167     
1168     		*marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
1169     	}
1170     
1171     	if (!xprt_lock_write(xprt, task))
1172     		return;
1173     
1174     #ifdef RPC_PROFILE
1175     	req->rq_xtime = jiffies;
1176     #endif
1177     	do_xprt_transmit(task);
1178     }
1179     
1180     static void
1181     do_xprt_transmit(struct rpc_task *task)
1182     {
1183     	struct rpc_rqst	*req = task->tk_rqstp;
1184     	struct rpc_xprt	*xprt = req->rq_xprt;
1185     	int status, retry = 0;
1186     
1187     
1188     	/* For fast networks/servers we have to put the request on
1189     	 * the pending list now:
1190     	 * Note that we don't want the task timing out during the
1191     	 * call to xprt_sendmsg(), so we initially disable the timeout,
1192     	 * and then reset it later...
1193     	 */
1194     	xprt_receive(task);
1195     
1196     	/* Continue transmitting the packet/record. We must be careful
1197     	 * to cope with writespace callbacks arriving _after_ we have
1198     	 * called xprt_sendmsg().
1199     	 */
1200     	while (1) {
1201     		xprt_clear_wspace(xprt);
1202     		status = xprt_sendmsg(xprt, req);
1203     
1204     		if (status < 0)
1205     			break;
1206     
1207     		if (xprt->stream) {
1208     			req->rq_bytes_sent += status;
1209     
1210     			if (req->rq_bytes_sent >= req->rq_slen)
1211     				goto out_receive;
1212     		} else {
1213     			if (status >= req->rq_slen)
1214     				goto out_receive;
1215     			status = -ENOMEM;
1216     			break;
1217     		}
1218     
1219     		dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
1220     				task->tk_pid, req->rq_slen - req->rq_bytes_sent,
1221     				req->rq_slen);
1222     
1223     		status = -EAGAIN;
1224     		if (retry++ > 50)
1225     			break;
1226     	}
1227     	rpc_unlock_task(task);
1228     
1229     	/* Note: at this point, task->tk_sleeping has not yet been set,
1230     	 *	 hence there is no danger of the waking up task being put on
1231     	 *	 schedq, and being picked up by a parallel run of rpciod().
1232     	 */
1233     	rpc_wake_up_task(task);
1234     	if (!RPC_IS_RUNNING(task))
1235     		goto out_release;
1236     	if (req->rq_received)
1237     		goto out_release;
1238     
1239     	task->tk_status = status;
1240     
1241     	switch (status) {
1242     	case -ENOMEM:
1243     		/* Protect against (udp|tcp)_write_space */
1244     		spin_lock_bh(&xprt->sock_lock);
1245     		if (!xprt_wspace(xprt)) {
1246     			task->tk_timeout = req->rq_timeout.to_current;
1247     			rpc_sleep_on(&xprt->sending, task, NULL, NULL);
1248     		}
1249     		spin_unlock_bh(&xprt->sock_lock);
1250     		return;
1251     	case -EAGAIN:
1252     		/* Keep holding the socket if it is blocked */
1253     		rpc_delay(task, HZ>>4);
1254     		return;
1255     	case -ECONNREFUSED:
1256     	case -ENOTCONN:
1257     		if (!xprt->stream)
1258     			return;
1259     	default:
1260     		if (xprt->stream)
1261     			xprt_disconnect(xprt);
1262     		req->rq_bytes_sent = 0;
1263     		goto out_release;
1264     	}
1265     
1266      out_receive:
1267     	dprintk("RPC: %4d xmit complete\n", task->tk_pid);
1268     	/* Set the task's receive timeout value */
1269     	task->tk_timeout = req->rq_timeout.to_current;
1270     	rpc_add_timer(task, xprt_timer);
1271     	rpc_unlock_task(task);
1272      out_release:
1273     	xprt_release_write(xprt, task);
1274     }
1275     
1276     /*
1277      * Queue the task for a reply to our call.
1278      * When the callback is invoked, the congestion window should have
1279      * been updated already.
1280      */
1281     void
1282     xprt_receive(struct rpc_task *task)
1283     {
1284     	struct rpc_rqst	*req = task->tk_rqstp;
1285     	struct rpc_xprt	*xprt = req->rq_xprt;
1286     
1287     	dprintk("RPC: %4d xprt_receive\n", task->tk_pid);
1288     
1289     	req->rq_received = 0;
1290     	task->tk_timeout = 0;
1291     	rpc_sleep_locked(&xprt->pending, task, NULL, NULL);
1292     }
1293     
1294     /*
1295      * Reserve an RPC call slot.
1296      */
1297     int
1298     xprt_reserve(struct rpc_task *task)
1299     {
1300     	struct rpc_xprt	*xprt = task->tk_xprt;
1301     
1302     	/* We already have an initialized request. */
1303     	if (task->tk_rqstp)
1304     		return 0;
1305     
1306     	dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n",
1307     				task->tk_pid, xprt->cong, xprt->cwnd);
1308     	spin_lock_bh(&xprt->xprt_lock);
1309     	xprt_reserve_status(task);
1310     	if (task->tk_rqstp) {
1311     		task->tk_timeout = 0;
1312     	} else if (!task->tk_timeout) {
1313     		task->tk_status = -ENOBUFS;
1314     	} else {
1315     		dprintk("RPC:      xprt_reserve waiting on backlog\n");
1316     		task->tk_status = -EAGAIN;
1317     		rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
1318     	}
1319     	spin_unlock_bh(&xprt->xprt_lock);
1320     	dprintk("RPC: %4d xprt_reserve returns %d\n",
1321     				task->tk_pid, task->tk_status);
1322     	return task->tk_status;
1323     }
1324     
1325     /*
1326      * Reservation callback
1327      */
1328     static void
1329     xprt_reserve_status(struct rpc_task *task)
1330     {
1331     	struct rpc_xprt	*xprt = task->tk_xprt;
1332     	struct rpc_rqst	*req;
1333     
1334     	if (xprt->shutdown) {
1335     		task->tk_status = -EIO;
1336     	} else if (task->tk_status < 0) {
1337     		/* NOP */
1338     	} else if (task->tk_rqstp) {
1339     		/* We've already been given a request slot: NOP */
1340     	} else {
1341     		if (RPCXPRT_CONGESTED(xprt) || !(req = xprt->free))
1342     			goto out_nofree;
1343     		/* OK: There's room for us. Grab a free slot and bump
1344     		 * congestion value */
1345     		xprt->free     = req->rq_next;
1346     		req->rq_next   = NULL;
1347     		xprt->cong    += RPC_CWNDSCALE;
1348     		task->tk_rqstp = req;
1349     		xprt_request_init(task, xprt);
1350     
1351     		if (xprt->free)
1352     			xprt_clear_backlog(xprt);
1353     	}
1354     
1355     	return;
1356     
1357     out_nofree:
1358     	task->tk_status = -EAGAIN;
1359     }
1360     
1361     /*
1362      * Initialize RPC request
1363      */
1364     static void
1365     xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
1366     {
1367     	struct rpc_rqst	*req = task->tk_rqstp;
1368     	static u32	xid = 0;
1369     
1370     	if (!xid)
1371     		xid = CURRENT_TIME << 12;
1372     
1373     	dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, req, xid);
1374     	task->tk_status = 0;
1375     	req->rq_timeout = xprt->timeout;
1376     	req->rq_task	= task;
1377     	req->rq_xprt    = xprt;
1378     	req->rq_xid     = xid++;
1379     	if (!xid)
1380     		xid++;
1381     }
1382     
1383     /*
1384      * Release an RPC call slot
1385      */
1386     void
1387     xprt_release(struct rpc_task *task)
1388     {
1389     	struct rpc_xprt	*xprt = task->tk_xprt;
1390     	struct rpc_rqst	*req;
1391     
1392     	if (xprt->snd_task == task) {
1393     		if (xprt->stream)
1394     			xprt_disconnect(xprt);
1395     		xprt_release_write(xprt, task);
1396     	}
1397     	if (!(req = task->tk_rqstp))
1398     		return;
1399     	task->tk_rqstp = NULL;
1400     	memset(req, 0, sizeof(*req));	/* mark unused */
1401     
1402     	dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
1403     
1404     	/* remove slot from queue of pending */
1405     	if (task->tk_rpcwait) {
1406     		printk("RPC: task of released request still queued!\n");
1407     		rpc_remove_wait_queue(task);
1408     	}
1409     
1410     	spin_lock_bh(&xprt->xprt_lock);
1411     	req->rq_next = xprt->free;
1412     	xprt->free   = req;
1413     
1414     	/* Decrease congestion value. */
1415     	xprt->cong -= RPC_CWNDSCALE;
1416     
1417     	xprt_clear_backlog(xprt);
1418     	spin_unlock_bh(&xprt->xprt_lock);
1419     }
1420     
1421     /*
1422      * Set default timeout parameters
1423      */
1424     void
1425     xprt_default_timeout(struct rpc_timeout *to, int proto)
1426     {
1427     	if (proto == IPPROTO_UDP)
1428     		xprt_set_timeout(to, 5,  5 * HZ);
1429     	else
1430     		xprt_set_timeout(to, 5, 60 * HZ);
1431     }
1432     
1433     /*
1434      * Set constant timeout
1435      */
1436     void
1437     xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
1438     {
1439     	to->to_current   = 
1440     	to->to_initval   = 
1441     	to->to_increment = incr;
1442     	to->to_maxval    = incr * retr;
1443     	to->to_resrvval  = incr * retr;
1444     	to->to_retries   = retr;
1445     	to->to_exponential = 0;
1446     }
1447     
1448     /*
1449      * Initialize an RPC client
1450      */
1451     static struct rpc_xprt *
1452     xprt_setup(struct socket *sock, int proto,
1453     			struct sockaddr_in *ap, struct rpc_timeout *to)
1454     {
1455     	struct rpc_xprt	*xprt;
1456     	struct rpc_rqst	*req;
1457     	int		i;
1458     
1459     	dprintk("RPC:      setting up %s transport...\n",
1460     				proto == IPPROTO_UDP? "UDP" : "TCP");
1461     
1462     	if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
1463     		return NULL;
1464     	memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
1465     
1466     	xprt->addr = *ap;
1467     	xprt->prot = proto;
1468     	xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;
1469     	if (xprt->stream) {
1470     		xprt->cwnd = RPC_MAXCWND;
1471     		xprt->nocong = 1;
1472     	} else
1473     		xprt->cwnd = RPC_INITCWND;
1474     	xprt->congtime = jiffies;
1475     	spin_lock_init(&xprt->sock_lock);
1476     	spin_lock_init(&xprt->xprt_lock);
1477     	init_waitqueue_head(&xprt->cong_wait);
1478     
1479     	/* Set timeout parameters */
1480     	if (to) {
1481     		xprt->timeout = *to;
1482     		xprt->timeout.to_current = to->to_initval;
1483     		xprt->timeout.to_resrvval = to->to_maxval << 1;
1484     	} else
1485     		xprt_default_timeout(&xprt->timeout, xprt->prot);
1486     
1487     	xprt->pending = RPC_INIT_WAITQ("xprt_pending");
1488     	xprt->sending = RPC_INIT_WAITQ("xprt_sending");
1489     	xprt->backlog = RPC_INIT_WAITQ("xprt_backlog");
1490     
1491     	/* initialize free list */
1492     	for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++)
1493     		req->rq_next = req + 1;
1494     	req->rq_next = NULL;
1495     	xprt->free = xprt->slot;
1496     
1497     	INIT_LIST_HEAD(&xprt->rx_pending);
1498     
1499     	dprintk("RPC:      created transport %p\n", xprt);
1500     	
1501     	xprt_bind_socket(xprt, sock);
1502     	return xprt;
1503     }
1504     
1505     /*
1506      * Bind to a reserved port
1507      */
1508     static inline int
1509     xprt_bindresvport(struct socket *sock)
1510     {
1511     	struct sockaddr_in myaddr;
1512     	int		err, port;
1513     
1514     	memset(&myaddr, 0, sizeof(myaddr));
1515     	myaddr.sin_family = AF_INET;
1516     	port = 800;
1517     	do {
1518     		myaddr.sin_port = htons(port);
1519     		err = sock->ops->bind(sock, (struct sockaddr *) &myaddr,
1520     						sizeof(myaddr));
1521     	} while (err == -EADDRINUSE && --port > 0);
1522     
1523     	if (err < 0)
1524     		printk("RPC: Can't bind to reserved port (%d).\n", -err);
1525     
1526     	return err;
1527     }
1528     
1529     static int 
1530     xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
1531     {
1532     	struct sock	*sk = sock->sk;
1533     
1534     	if (xprt->inet)
1535     		return -EBUSY;
1536     
1537     	sk->user_data = xprt;
1538     	xprt->old_data_ready = sk->data_ready;
1539     	xprt->old_state_change = sk->state_change;
1540     	xprt->old_write_space = sk->write_space;
1541     	if (xprt->prot == IPPROTO_UDP) {
1542     		sk->data_ready = udp_data_ready;
1543     		sk->write_space = udp_write_space;
1544     		sk->no_check = UDP_CSUM_NORCV;
1545     		xprt_set_connected(xprt);
1546     	} else {
1547     		sk->data_ready = tcp_data_ready;
1548     		sk->state_change = tcp_state_change;
1549     		sk->write_space = tcp_write_space;
1550     		xprt_clear_connected(xprt);
1551     	}
1552     
1553     	/* Reset to new socket */
1554     	xprt->sock = sock;
1555     	xprt->inet = sk;
1556     	/*
1557     	 *	TCP requires the rpc I/O daemon is present
1558     	 */
1559     	if(xprt->stream)
1560     		rpciod_up();
1561     
1562     	return 0;
1563     }
1564     
1565     /*
1566      * Create a client socket given the protocol and peer address.
1567      */
1568     static struct socket *
1569     xprt_create_socket(int proto, struct rpc_timeout *to)
1570     {
1571     	struct socket	*sock;
1572     	int		type, err;
1573     
1574     	dprintk("RPC:      xprt_create_socket(%s %d)\n",
1575     			   (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
1576     
1577     	type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
1578     
1579     	if ((err = sock_create(PF_INET, type, proto, &sock)) < 0) {
1580     		printk("RPC: can't create socket (%d).\n", -err);
1581     		goto failed;
1582     	}
1583     
1584     	/* If the caller has the capability, bind to a reserved port */
1585     	if (capable(CAP_NET_BIND_SERVICE) && xprt_bindresvport(sock) < 0)
1586     		goto failed;
1587     
1588     	return sock;
1589     
1590     failed:
1591     	sock_release(sock);
1592     	return NULL;
1593     }
1594     
1595     /*
1596      * Create an RPC client transport given the protocol and peer address.
1597      */
1598     struct rpc_xprt *
1599     xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
1600     {
1601     	struct socket	*sock;
1602     	struct rpc_xprt	*xprt;
1603     
1604     	dprintk("RPC:      xprt_create_proto called\n");
1605     
1606     	if (!(sock = xprt_create_socket(proto, to)))
1607     		return NULL;
1608     
1609     	if (!(xprt = xprt_setup(sock, proto, sap, to)))
1610     		sock_release(sock);
1611     
1612     	return xprt;
1613     }
1614     
1615     /*
1616      * Prepare for transport shutdown.
1617      */
1618     void
1619     xprt_shutdown(struct rpc_xprt *xprt)
1620     {
1621     	xprt->shutdown = 1;
1622     	rpc_wake_up(&xprt->sending);
1623     	rpc_wake_up(&xprt->pending);
1624     	rpc_wake_up(&xprt->backlog);
1625     	if (waitqueue_active(&xprt->cong_wait))
1626     		wake_up(&xprt->cong_wait);
1627     }
1628     
1629     /*
1630      * Clear the xprt backlog queue
1631      */
1632     int
1633     xprt_clear_backlog(struct rpc_xprt *xprt) {
1634     	if (RPCXPRT_CONGESTED(xprt))
1635     		return 0;
1636     	rpc_wake_up_next(&xprt->backlog);
1637     	if (waitqueue_active(&xprt->cong_wait))
1638     		wake_up(&xprt->cong_wait);
1639     	return 1;
1640     }
1641     
1642     /*
1643      * Destroy an RPC transport, killing off all requests.
1644      */
1645     int
1646     xprt_destroy(struct rpc_xprt *xprt)
1647     {
1648     	dprintk("RPC:      destroying transport %p\n", xprt);
1649     	xprt_shutdown(xprt);
1650     	xprt_close(xprt);
1651     	kfree(xprt);
1652     
1653     	return 0;
1654     }
1655