[torquedev] poll() vs select() in torque

Josh Butikofer josh at clusterresources.com
Wed Mar 25 11:46:02 MDT 2009


Michael,

The discussed patch is attached.

This patch is actually already checked into 2.3-fixes (was put there yesterday),
but we are still evaluating whether or not to leave it in. We will want to have
more people test it in the wild than the few customers of ours that already have
been using it in production.

If you use this patch, please let us know how it works for you.

Thanks,

Josh Butikofer
Cluster Resources, Inc.
#############################


Michael Barnes wrote:
>> I will dig up the patch and post it to the mailing list.
>>
>> Oh, and by the way, customers have been using this patch heavily in 
>> production for several months now.
> 
> I would really appreciat the patch.  Thanks.
> 
> -michael
> 
-------------- next part --------------
Index: CHANGELOG
===================================================================
--- CHANGELOG	(revision 2835)
+++ CHANGELOG	(revision 2836)
@@ -13,6 +13,9 @@
       from Yahoo R2461)
   e - added code which prefixes the severity tag on all log_ext() and log_err()
       messages (ported from Yahoo R2358)
+  f - added code from 2.3-extreme that allows TORQUE to handle more than 1024 sockets.
+      Also, increased the size of TORQUE's internal socket handle table to avoid
+      running out of handles under busy conditions.
 
 2.3.6
   e - in Linux, a pbs_mom will now "kill" a job's task, even if that task can no longer be
Index: src/include/server_limits.h
===================================================================
--- src/include/server_limits.h	(revision 2835)
+++ src/include/server_limits.h	(revision 2836)
@@ -130,9 +130,19 @@
 
 
 #ifndef PBS_NET_MAX_CONNECTIONS
-#define PBS_NET_MAX_CONNECTIONS 1024  /* increased from 256 */
+/* 
+ * PBS_NET_MAX_CONNECTIONS is used to limit both socket descriptors and socket
+ * handles. Tests show that often socket handles are used up much faster than
+ * socket descriptors (is there a leak of socket handles???). In the future, we
+ * should probably break out handle size and socket descriptor size and the
+ * socket descriptor size should equal getdtablesize(). For now, however,
+ * PBS_NET_MAX_CONNECTIONS should be larger than ulimit -n due to more socket
+ * handles being handed out than sockets are available.
+ */
+#define PBS_NET_MAX_CONNECTIONS 10240  /* increased from 256--should be larger than ulimit -n */
 #endif /* PBS_NET_MAX_CONNECTIONS */
 
+
 #define PBS_LOCAL_CONNECTION PBS_NET_MAX_CONNECTIONS
 
 /*
Index: src/include/net_connect.h
===================================================================
--- src/include/net_connect.h	(revision 2835)
+++ src/include/net_connect.h	(revision 2836)
@@ -189,6 +189,8 @@
 void net_close A_((int));
 int  wait_request(time_t waittime, long *);
 void net_add_close_func A_((int, void(*)()));
+int get_max_num_descriptors(void);
+int get_fdset_size(void);
 
 
 struct connection
Index: src/server/svr_connect.c
===================================================================
--- src/server/svr_connect.c	(revision 2835)
+++ src/server/svr_connect.c	(revision 2836)
@@ -198,7 +198,7 @@
     return(PBS_NET_RC_RETRY);
     }
 
-  /* establish UDP socket connection to specified host */
+  /* establish socket connection to specified host */
 
   sock = client_to_svr(hostaddr, port, 1, EMsg);
 
Index: src/server/pbsd_main.c
===================================================================
--- src/server/pbsd_main.c	(revision 2835)
+++ src/server/pbsd_main.c	(revision 2836)
@@ -1371,7 +1371,7 @@
 
     /* touch the rpp streams that need to send */
 
-    rpp_request(42);
+    rpp_request(0);
 
     /* wait for a request and process it */
 
Index: src/lib/Libnet/rm.c
===================================================================
--- src/lib/Libnet/rm.c	(revision 2835)
+++ src/lib/Libnet/rm.c	(revision 2836)
@@ -1045,13 +1045,16 @@
 
   struct timeval tv;
 
-  fd_set         fdset;
+  fd_set *FDSet;
 
+  int MaxNumDescriptors = 0;
+
   pbs_errno = 0;
 
   flushreq();
 
-  FD_ZERO(&fdset);
+  MaxNumDescriptors = get_max_num_descriptors();
+  FDSet = (fd_set *)calloc(1,sizeof(char) * get_fdset_size());
 
 #if RPP
   for (try = 0;try < 3;)
@@ -1060,6 +1063,7 @@
       {
       if ((op = findout(i)) != NULL)
         {
+        free(FDSet);
         return(i);
         }
 
@@ -1069,6 +1073,7 @@
         {
         pbs_errno = errno;
 
+        free(FDSet);
         return(-1);
         }
 
@@ -1084,20 +1089,22 @@
       {
       pbs_errno = errno;
 
+      free(FDSet);
       return(-1);
       }
     else
       {
 
-      FD_SET(rpp_fd, &fdset);
+      FD_SET(rpp_fd, FDSet);
       tv.tv_sec = 5;
       tv.tv_usec = 0;
-      num = select(FD_SETSIZE, &fdset, NULL, NULL, &tv);
+      num = select(FD_SETSIZE, FDSet, NULL, NULL, &tv);
 
       if (num == -1)
         {
         pbs_errno = errno;
         DBPRT(("%s: select %d %s\n", id, pbs_errno, pbs_strerror(pbs_errno)))
+        free(FDSet);
         return -1;
         }
 
@@ -1110,6 +1117,7 @@
       }
     }
 
+  free(FDSet);
   return i;
 
 #else
@@ -1124,7 +1132,7 @@
 
     while (op)
       {
-      FD_SET(op->stream, &fdset);
+      FD_SET(op->stream, FDSet);
       op = op->next;
       }
     }
@@ -1132,16 +1140,21 @@
   tv.tv_sec = 15;
 
   tv.tv_usec = 0;
-  num = select(FD_SETSIZE, &fdset, NULL, NULL, &tv);
 
+  num = select(MaxNumDescriptors, FDSet, NULL, NULL, &tv);
+
   if (num == -1)
     {
     pbs_errno = errno;
     DBPRT(("%s: select %d %s\n", id, pbs_errno, pbs_strerror(pbs_errno)))
+    free(FDSet);
     return -1;
     }
   else if (num == 0)
-    return -2;
+    {
+    free(FDSet);
+		return -2;
+    }
 
   for (i = 0; i < HASHOUT; i++)
     {
@@ -1152,13 +1165,17 @@
 
     while (op)
       {
-      if (FD_ISSET(op->stream, &fdset))
-        return op->stream;
+			if (FD_ISSET(op->stream, FDSet))
+        {
+        free(FDSet);
+				return op->stream;
+        }
 
       op = op->next;
       }
     }
 
+  free(FDSet);
   return(-2);
 
 #endif
Index: src/lib/Libnet/net_client.c
===================================================================
--- src/lib/Libnet/net_client.c	(revision 2835)
+++ src/lib/Libnet/net_client.c	(revision 2836)
@@ -86,6 +86,7 @@
 #include <errno.h>
 #include <netdb.h>
 #include <string.h>
+#include <stdlib.h>
 #include "portability.h"
 #include "server_limits.h"
 #include "net_connect.h"
@@ -106,6 +107,64 @@
 int bindresvport(int sd, struct sockaddr_in *sin);
 #endif
 
+/**
+ * Returns the max number of possible file descriptors (as
+ * per the OS limits).
+ *
+ */
+
+int get_max_num_descriptors(void)
+  {
+  static int max_num_descriptors = 0;
+
+  if (max_num_descriptors <= 0)
+    max_num_descriptors = getdtablesize();
+
+  return(max_num_descriptors);
+  }  /* END get_num_max_descriptors() */
+
+/**
+ * Returns the number of bytes needed to allocate
+ * a fd_set array that can hold all of the possible
+ * socket descriptors.
+ */
+
+int get_fdset_size(void)
+  {
+  unsigned int MaxNumDescriptors = 0;
+  int NumFDSetsNeeded = 0;
+  int NumBytesInFDSet = 0;
+  int Result = 0;
+
+  MaxNumDescriptors = get_max_num_descriptors();
+
+  NumBytesInFDSet = sizeof(fd_set);
+  NumFDSetsNeeded = MaxNumDescriptors / FD_SETSIZE;
+
+  if (MaxNumDescriptors < FD_SETSIZE)
+    {
+    /* the default size already provides sufficient space */
+
+    Result = NumBytesInFDSet;
+    }
+  else if ((MaxNumDescriptors % FD_SETSIZE) > 0)
+    {
+    /* we need to allocate more memory to cover extra
+     * bits--add an extra FDSet worth of memory to the size */
+
+    Result = (NumFDSetsNeeded + 1) * NumBytesInFDSet;
+    }
+  else
+    {
+    /* division was exact--we know exactly how many bytes we need */
+
+    Result = NumFDSetsNeeded * NumBytesInFDSet;
+    }
+
+  return(Result);
+  }  /* END get_fdset_size() */
+
+
 /*
 ** wait for connect to complete.  We use non-blocking sockets,
 ** so have to wait for completion this way.
@@ -113,27 +172,42 @@
 
 static int await_connect(
 
-  int timeout,   /* I */
+  long timeout,   /* I */
   int sockd)     /* I */
 
   {
-  fd_set fs;
   int n, val, rc;
 
+  int MaxNumDescriptors = 0;
+
+  fd_set *BigFDSet = NULL;
+
   struct timeval tv;
 
   torque_socklen_t len;
 
-  tv.tv_sec = timeout;
-  tv.tv_usec = 0;
+  /* 
+   * some operating systems (like FreeBSD) cannot have a value for tv.tv_usec
+   * larger than 1,000,000 so we need to split up the timeout duration between
+   * seconds and microseconds
+   */
 
-  FD_ZERO(&fs);
-  FD_SET(sockd, &fs);
+  tv.tv_sec = timeout / 1000000;
+  tv.tv_usec = timeout % 1000000;
 
-  if ((n = select(sockd + 1, 0, &fs, 0, &tv)) != 1)
+  /* calculate needed size for fd_set in select() */
+
+  MaxNumDescriptors = get_max_num_descriptors();
+
+  BigFDSet = (fd_set *)calloc(1,sizeof(char) * get_fdset_size());
+
+  FD_SET(sockd, BigFDSet);
+
+  if ((n = select(sockd+1,0,BigFDSet,0,&tv)) != 1)
     {
     /* FAILURE:  socket not ready for write */
 
+    free(BigFDSet);
     return(-1);
     }
 
@@ -145,6 +219,7 @@
     {
     /* SUCCESS:  no failures detected */
 
+    free(BigFDSet);
     return(0);
     }
 
@@ -152,13 +227,15 @@
 
   /* FAILURE:  socket error detected */
 
+    free(BigFDSet);
   return(-1);
   }  /* END await_connect() */
 
 
 
 
-#define TORQUE_MAXCONNECTTIMEOUT  5
+/* in microseconds */
+#define TORQUE_MAXCONNECTTIMEOUT  5000000
 
 /*
  * client_to_svr - connect to a server
@@ -182,10 +259,10 @@
 
 int client_to_svr(
 
-  pbs_net_t     hostaddr, /* I - internet addr of host */
-  unsigned int  port,  /* I - port to which to connect */
-  int           local_port, /* I - BOOLEAN:  not 0 to use local reserved port */
-  char         *EMsg)           /* O (optional,minsize=1024) */
+  pbs_net_t     hostaddr,	  /* I - internet addr of host */
+  unsigned int  port,		    /* I - port to which to connect */
+  int           local_port,	/* I - BOOLEAN:  not 0 to use local reserved port */
+  char         *EMsg)       /* O (optional,minsize=1024) */
 
   {
   const char id[] = "client_to_svr";
@@ -420,7 +497,7 @@
     case ECONNREFUSED:
 
       if (EMsg != NULL)
-        sprintf(EMsg, "cannot bind to port %d in %s - connection refused",
+        sprintf(EMsg, "cannot connect to port %d in %s - connection refused",
                 tryport,
                 id);
 
@@ -435,7 +512,7 @@
     default:
 
       if (EMsg != NULL)
-        sprintf(EMsg, "cannot bind to port %d in %s - errno:%d %s",
+        sprintf(EMsg, "cannot connect to port %d in %s - errno:%d %s",
                 tryport,
                 id,
                 errno,
Index: src/lib/Libnet/net_server.c
===================================================================
--- src/lib/Libnet/net_server.c	(revision 2835)
+++ src/lib/Libnet/net_server.c	(revision 2836)
@@ -85,6 +85,7 @@
 #include <signal.h>
 #include <stdio.h>
 #include <unistd.h>
+#include <stdlib.h>
 #include <time.h>
 
 #include <sys/types.h>
@@ -133,7 +134,7 @@
 
 static int max_connection = PBS_NET_MAX_CONNECTIONS;
 static int num_connections = 0;
-static fd_set readset;
+static fd_set *GlobalSocketReadSet = NULL;
 static void (*read_func[2]) A_((int));
 
 pbs_net_t pbs_server_addr;
@@ -246,8 +247,11 @@
   static int  initialized = 0;
   int    sock;
 
+  int MaxNumDescriptors = 0;
+
   struct sockaddr_in socname;
   enum conn_type   type;
+
 #ifdef ENABLE_UNIX_SOCKETS
 
   struct sockaddr_un unsocname;
@@ -255,6 +259,8 @@
   memset(&unsocname, 0, sizeof(unsocname));
 #endif
  
+  MaxNumDescriptors = get_max_num_descriptors();
+
   memset(&socname, 0, sizeof(socname));
 
   if (initialized == 0)
@@ -262,7 +268,8 @@
     for (i = 0;i < PBS_NET_MAX_CONNECTIONS;i++)
       svr_conn[i].cn_active = Idle;
 
-    FD_ZERO(&readset);
+    /* initialize global "read" socket FD bitmap */
+    GlobalSocketReadSet = (fd_set *)calloc(1,sizeof(char) * get_fdset_size());
 
     type = Primary;
     }
@@ -289,8 +296,8 @@
       return(-1);
       }
 
-    if (FD_SETSIZE < PBS_NET_MAX_CONNECTIONS)
-      max_connection = FD_SETSIZE;
+  if (MaxNumDescriptors < PBS_NET_MAX_CONNECTIONS)
+    max_connection = MaxNumDescriptors;
 
     i = 1;
 
@@ -403,19 +410,22 @@
 
   {
   extern char *PAddrToString(pbs_net_t *);
+  void close_conn();
 
   int i;
   int n;
 
   time_t now;
 
-  fd_set selset;
+  fd_set *SelectSet = NULL;
+  int SelectSetSize = 0;
 
+  int MaxNumDescriptors = 0;
+
   char id[] = "wait_request";
   char tmpLine[1024];
 
   struct timeval timeout;
-  void close_conn();
 
   long OrigState = 0;
 
@@ -426,10 +436,16 @@
 
   timeout.tv_sec  = waittime;
 
-  selset = readset;  /* readset is global */
+  SelectSetSize = sizeof(char) * get_fdset_size();
+  SelectSet = (fd_set *)calloc(1,SelectSetSize);
 
-  n = select(FD_SETSIZE, &selset, (fd_set *)0, (fd_set *)0, &timeout);
+  memcpy(SelectSet,GlobalSocketReadSet,SelectSetSize);
+ 
+  /* selset = readset;*/  /* readset is global */
+  MaxNumDescriptors = get_max_num_descriptors();
 
+  n = select(MaxNumDescriptors, SelectSet, (fd_set *)0, (fd_set *)0, &timeout);
+
   if (n == -1)
     {
     if (errno == EINTR)
@@ -447,9 +463,9 @@
 
       /* NOTE:  selset may be modified by failed select() */
 
-      for (i = 0;i < (int)FD_SETSIZE;i++)
+      for (i = 0;i < MaxNumDescriptors;i++)
         {
-        if (FD_ISSET(i, &readset) == 0)
+        if (FD_ISSET(i, GlobalSocketReadSet) == 0)
           continue;
 
         if (fstat(i, &fbuf) == 0)
@@ -457,16 +473,17 @@
 
         /* clean up SdList and bad sd... */
 
-        FD_CLR(i, &readset);
+        FD_CLR(i, GlobalSocketReadSet);
         }    /* END for (i) */
 
+      free(SelectSet);
       return(-1);
       }  /* END else (errno == EINTR) */
     }    /* END if (n == -1) */
 
   for (i = 0;(i < max_connection) && (n != 0);i++)
     {
-    if (FD_ISSET(i, &selset))
+    if (FD_ISSET(i,SelectSet))
       {
       /* this socket has data */
 
@@ -486,11 +503,11 @@
         }
       else
         {
-        FD_CLR(i, &readset);
+        FD_CLR(i, GlobalSocketReadSet);
 
         close(i);
 
-        num_connections--;  /* added by CRI - should this be here? */
+        num_connections--;
 
         sprintf(tmpLine,"closed connection to fd %d - num_connections=%d (select bad socket)",
           i,
@@ -505,6 +522,7 @@
 
   if ((SState != NULL) && (OrigState != *SState))
     {
+    free(SelectSet);
     return(0);
     }
 
@@ -544,6 +562,7 @@
     close_conn(i);
     }  /* END for (i) */
 
+  free(SelectSet);
   return(0);
   }  /* END wait_request() */
 
@@ -638,7 +657,7 @@
   {
   num_connections++;
 
-  FD_SET(sock, &readset);
+  FD_SET(sock, GlobalSocketReadSet);
 
   svr_conn[sock].cn_active   = type;
   svr_conn[sock].cn_addr     = addr;
@@ -709,7 +728,7 @@
   if (svr_conn[sd].cn_oncl != 0)
     svr_conn[sd].cn_oncl(sd);
 
-  FD_CLR(sd, &readset);
+  FD_CLR(sd, GlobalSocketReadSet);
 
   svr_conn[sd].cn_addr = 0;
Index: src/resmom/pbs_demux.c
===================================================================
--- src/resmom/pbs_demux.c  (revision 2840)
+++ src/resmom/pbs_demux.c  (working copy)
@@ -103,7 +103,10 @@
 #  include <sys/select.h>
 #endif
 
+int get_max_num_descriptors(void);
+int get_fdset_size(void);
 
+
 enum rwhere {invalid, new_out, new_err, old_out, old_err};
 
 struct routem
@@ -112,7 +115,7 @@
   short  r_nl;
   };
 
-fd_set readset;
+fd_set *ReadSet;
 
 
 void readit(
@@ -186,7 +189,7 @@
 
     prm->r_where = invalid;
 
-    FD_CLR(sock, &readset);
+    FD_CLR(sock, ReadSet);
     }
 
   return;
@@ -203,17 +206,19 @@
   {
   struct timeval timeout;
   int i;
-  int maxfd;
   int main_sock_out = 3;
   int main_sock_err = 4;
   int n;
   int newsock;
   pid_t parent;
-  fd_set selset;
+  fd_set *SelectSet;
+  int    SelectSetSize = 0;
+  int    MaxNumDescriptors = 0;
 
   struct routem *routem;
 
@@ -234,10 +238,9 @@
   #endif
   */
 
-  maxfd = sysconf(_SC_OPEN_MAX);
+  MaxNumDescriptors = get_max_num_descriptors();
+  routem = (struct routem *)malloc(MaxNumDescriptors * sizeof(struct routem));
 
-  routem = (struct routem *)malloc(maxfd * sizeof(struct routem));
-
   if (routem == NULL)
     {
     fprintf(stderr, "%s: malloc failed\n",
@@ -246,7 +250,7 @@
     exit(1);
     }
 
-  for (i = 0;i < maxfd;++i)
+  for (i = 0;i < MaxNumDescriptors;++i)
     {
     routem[i].r_where = invalid;
     routem[i].r_nl    = 1;
@@ -255,10 +259,14 @@
   routem[main_sock_out].r_where = new_out;
   routem[main_sock_err].r_where = new_err;
 
-  FD_ZERO(&readset);
-  FD_SET(main_sock_out,&readset);
-  FD_SET(main_sock_err,&readset);
+  SelectSetSize = sizeof(char) * get_fdset_size();
+  ReadSet = (fd_set *)calloc(1,SelectSetSize);
 
+  FD_ZERO(ReadSet);
+  FD_SET(main_sock_out,ReadSet);
+  FD_SET(main_sock_err,ReadSet);
+
+
   if (listen(main_sock_out,TORQUE_LISTENQUEUE) < 0)
     {
     perror("listen on out");
@@ -275,11 +283,11 @@
 
   while (1)
     {
-    selset = readset;
+    SelectSet = ReadSet;
     timeout.tv_usec = 0;
     timeout.tv_sec  = 10;
 
-    n = select(FD_SETSIZE,&selset,(fd_set *)0,(fd_set *)0,&timeout);
+    n = select(MaxNumDescriptors,SelectSet,(fd_set *)0,(fd_set *)0,&timeout);
 
     if (n == -1)
       {
@@ -292,6 +300,7 @@
         fprintf(stderr, "%s: select failed\n",
           argv[0]);
 
+        free(ReadSet);
         exit(1);
         }
       }
@@ -310,9 +319,9 @@
         }
       }    /* END else if (n == 0) */
 
-    for (i = 0;(n != 0) && (i < maxfd);++i)
+    for (i = 0;(n != 0) && (i < MaxNumDescriptors);++i)
       {
-      if (FD_ISSET(i, &selset))
+      if (FD_ISSET(i, SelectSet))
         {
         /* this socket has data */
         n--;
@@ -330,7 +339,7 @@
             old_out :
             old_err;
 
-            FD_SET(newsock, &readset);
+            FD_SET(newsock, ReadSet);
 
             break;
 
@@ -347,6 +356,7 @@
             fprintf(stderr, "%s: internal error\n",
                     argv[0]);
 
+            free(ReadSet);
             exit(2);
 
             /*NOTREACHED*/
@@ -357,6 +367,7 @@
       }
     }    /* END while(1) */
 
+  free(ReadSet);
   return(0);
   }  /* END main() */




More information about the torquedev mailing list