30.12 TCP Prethreaded Server, Main
Thread accept
Our final server design using threads has the
main thread create a pool of threads when it starts, and then only
the main thread calls accept and passes each client
connection to one of the available threads in the pool. This is
similar to the descriptor passing version in Section
30.9.
The design problem is how does the main thread
"pass" the connected socket to one of the available threads in the
pool? There are various ways to implement this. We could use
descriptor passing, as we did earlier, but there's no need to pass
a descriptor from one thread to another since all the threads and
all the descriptors are in the same process. All the receiving
thread needs to know is the descriptor number. Figure 30.30 shows the pthread08.h
header that defines a Thread structure, which is identical
to Figure 30.27.
Figure 30.30
pthread08.h header.
server/pthread08.h
1 typedef struct {
2 pthread_t thread_tid; /* thread ID */
3 long thread_count; /* # connections handled */
4 } Thread;
5 Thread *tptr; /* array of Thread structures; calloc'ed */
6 #define MAXNCLI 32
7 int clifd[MAXNCLI], iget, iput;
8 pthread_mutex_t clifd_mutex;
9 pthread_cond_t clifd_cond;
Define shared array to hold connected
sockets
6鈥? We
also define a clifd array in which the main thread will
store the connected socket descriptors. The available threads in
the pool take one of these connected sockets and service the
corresponding client. iput is the index into this array of
the next entry to be stored into by the main thread and
iget is the index of the next entry to be fetched by one
of the threads in the pool. Naturally, this data structure that is
shared between all the threads must be protected and we use a mutex
along with a condition variable.
Figure
30.31 is the main function.
Create pool of threads
23鈥?5
thread_make creates each of the threads.
Wait for each client connection
27鈥?8
The main thread blocks in the call to accept, waiting for
each client connection to arrive. When one arrives, the connected
socket is stored in the next entry in the clifd array,
after obtaining the mutex lock on the array. We also check that the
iput index has not caught up with the iget index,
which indicates that our array is not big enough. The condition
variable is signaled and the mutex is released, allowing one of the
threads in the pool to service this client.
The thread_make and
thread_main functions are shown in Figure 30.32. The former is identical to the
version in Figure 30.29.
Wait for client descriptor to
service
17鈥?6
Each thread in the pool tries to obtain a lock on the mutex that
protects the clifd array. When the lock is obtained, there
is nothing to do if the iget and iput indexes are
equal. In that case, the thread goes to sleep by calling
pthread_cond_wait. It will be awakened by the call to
pthread_cond_signal in the main thread after a connection
is accepted. When the thread obtains a connection, it calls
web_child.
The times in Figure 30.1 show that
this server is slower than the one in the previous section, in
which each thread called accept after obtaining a mutex
lock. The reason is that this section's example requires both a
mutex and a condition variable, compared to just a mutex in
Figure 30.29.
If we examine the histogram of the number of
clients serviced by each thread in the pool, it is similar to the
final column in Figure 30.2. This
means the threads library cycles through all the available threads
when doing the wakeup based on the condition variable when the main
thread calls pthread_cond_signal.
Figure 30.31
main function for prethreaded server.
server/serv08.c
1 #include "unpthread.h"
2 #include "pthread08.h"
3 static int nthreads;
4 pthread_mutex_t clifd_mutex = PTHREAD_MUTEX_INITIALIZER;
5 pthread_cond_t clifd_cond = PTHREAD_COND_INITIALIZER;
6 int
7 main(int argc, char **argv)
8 {
9 int i, listenfd, connfd;
10 void sig_int(int), thread_make(int);
11 socklen_t addrlen, clilen;
12 struct sockaddr *cliaddr;
13 if (argc == 3)
14 listenfd = Tcp_listen(NULL, argv[1], &addrlen);
15 else if (argc == 4)
16 listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
17 else
18 err_quit("usage: serv08 [ <host> ] <port#> <#threads>");
19 cliaddr = Malloc(addrlen);
20 nthreads = atoi(argv[argc - 1]);
21 tptr = Calloc(nthreads, sizeof(Thread));
22 iget = iput = 0;
23 /* create all the threads */
24 for (i = 0; i < nthreads; i++)
25 thread_make(i); /* only main thread returns */
26 Signal(SIGINT, sig_int);
27 for ( ; ; ) {
28 clilen = addrlen;
29 connfd = Accept(listenfd, cliaddr, &clilen);
30 Pthread_mutex_lock(&clifd_mutex);
31 clifd[iput] = connfd;
32 if (++iput == MAXNCLI)
33 iput = 0;
34 if (iput == iget)
35 err_quit("iput = iget = %d", iput);
36 Pthread_cond_signal(&clifd_cond);
37 Pthread_mutex_unlock(&clifd_mutex);
38 }
39 }
Figure 30.32
thread_make and thread_main functions.
server/pthread08.c
1 #include "unpthread.h"
2 #include "pthread08.h"
3 void
4 thread_make(int i)
5 {
6 void *thread_main(void *);
7 Pthread_create(&tptr[i].thread_tid, NULL, &thread_main, (void *) i);
8 return; /* main thread returns */
9 }
10 void *
11 thread_main(void *arg)
12 {
13 int connfd;
14 void web_child(int);
15 printf("thread %d starting\n", (int) arg);
16 for ( ; ; ) {
17 Pthread_mutex_lock(&clifd_mutex);
18 while (iget == iput)
19 Pthread_cond_wait(&clifd_cond, &clifd_mutex);
20 connfd = clifd[iget]; /* connected socket to service */
21 if (++iget == MAXNCLI)
22 iget = 0;
23 Pthread_mutex_unlock(&clifd_mutex);
24 tptr[(int) arg].thread_count++;
25 web_child(connfd); /* process request */
26 Close(connfd);
27 }
28 }
|