[torqueusers] [PATCH 3/3] resmom: create cgroup for jobs to limit cpu and mem usage

levin li levin108 at gmail.com
Tue Nov 20 02:02:04 MST 2012


Signed-off-by: levin li <levin108 at gmail.com>
---
 src/resmom/catch_child.c |    4 +
 src/resmom/mom_cgroup.c  |  445 ++++++++++++++++++++++++++++++++++++++++++++++
 src/resmom/mom_cgroup.h  |   14 ++
 src/resmom/mom_comm.c    |   10 +-
 src/resmom/mom_main.c    |   21 ++-
 src/resmom/start_exec.c  |   19 ++-
 6 files changed, 509 insertions(+), 4 deletions(-)
 create mode 100644 src/resmom/mom_cgroup.c
 create mode 100644 src/resmom/mom_cgroup.h

diff --git a/src/resmom/catch_child.c b/src/resmom/catch_child.c
index 8e3527f..72d79a8 100644
--- a/src/resmom/catch_child.c
+++ b/src/resmom/catch_child.c
@@ -438,6 +438,10 @@ void scan_for_exiting(void)
           pjob->ji_qs.ji_un.ji_momt.ji_exitstat = ptask->ti_qs.ti_exitstat;
           }
 
+        if (use_cgroup(pjob)) {
+          remove_job_cgroup(pjob);
+        }
+
         log_event(
           PBSEVENT_JOB,
           PBS_EVENTCLASS_JOB,
diff --git a/src/resmom/mom_cgroup.c b/src/resmom/mom_cgroup.c
new file mode 100644
index 0000000..a9f388c
--- /dev/null
+++ b/src/resmom/mom_cgroup.c
@@ -0,0 +1,445 @@
+#include <pbs_config.h>   /* the master config generated by configure */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#ifdef USEJOBCREATE
+#ifndef JOBFAKE
+#include <job.h>
+#endif /* JOBFAKE */
+#endif /* USEJOBCREATE */
+#include <errno.h>
+#include <fcntl.h>
+#include <grp.h>
+#include <string.h>
+#include <limits.h>
+#include <assert.h>
+#include <signal.h>
+#include <ctype.h>
+#include <time.h>
+#include <sys/param.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/wait.h>
+#include <sys/mman.h>
+#include <sys/ioctl.h>
+#include <pthread.h>
+
+#include "rpp.h"
+#include "libpbs.h"
+#include "portability.h"
+#include "list_link.h"
+#include "server_limits.h"
+#include "attribute.h"
+#include "resource.h"
+#include "resmon.h"
+#include "pbs_job.h"
+#include "log.h"
+#include "../lib/Liblog/pbs_log.h"
+#include "../lib/Liblog/log_event.h"
+#include "mom_mach.h"
+#include "mom_func.h"
+#include "pbs_error.h"
+#include "svrfunc.h"
+#include "dis.h"
+#include "batch_request.h"
+#include "mcom.h"
+#include "resource.h"
+#include "utils.h"
+#include "mom_comm.h"
+#include "mom_cgroup.h"
+#include "server.h"
+#include "svrfunc.h"
+
+char **cpuset_array = NULL;
+int nr_cpus;
+int nr_free_cpus;
+pthread_mutex_t cpuset_lock = PTHREAD_MUTEX_INITIALIZER;
+
+int use_cgroup(job *pjob)
+{
+  if (pjob->ji_wattr[JOB_ATR_cgroup_enable].at_flags & ATR_VFLAG_SET)
+    return TRUE;
+  return FALSE;
+}
+
+static int mm_getsize(resource *pres, unsigned long *ret) {
+  unsigned long value;
+  if (pres->rs_value.at_type != ATR_TYPE_SIZE)
+    return PBSE_ATTRTYPE;
+
+  value = pres->rs_value.at_val.at_size.atsv_num;
+  if (pres->rs_value.at_val.at_size.atsv_units == ATR_SV_WORDSZ) {
+    if (value > ULONG_MAX / sizeof(int))
+      return PBSE_BADATVAL;
+
+    value *= sizeof(int);
+  }
+
+  if (value > (ULONG_MAX >> pres->rs_value.at_val.at_size.atsv_shift))
+    return PBSE_BADATVAL;
+
+  *ret = (value << pres->rs_value.at_val.at_size.atsv_shift);
+
+  return PBSE_NONE;
+}  /* END mm_getsize() */
+
+static int get_job_pmem(job *pjob)
+{
+  resource *pres;
+  char *pname = NULL;
+
+  pres = (resource *)GET_NEXT(pjob->ji_wattr[JOB_ATR_resource].at_val.at_list);
+
+  while (pres) {
+    int ret;
+    unsigned long pmem;
+
+    if (pres->rs_defin != NULL)
+      pname = pres->rs_defin->rs_name;
+    else
+      goto next;
+
+    if (strcmp(pname, "pmem"))
+      goto next;
+
+    ret = mm_getsize(pres, &pmem);
+    if (ret != PBSE_NONE) {
+      DBPRT(("Memory restriction not set\n"));
+      return -1;
+    }
+
+    return pmem;
+next:
+    pres = (resource *)GET_NEXT(pres->rs_link);
+  }
+
+  return -1;
+}
+
+static int set_job_mem_limit(job *pjob)
+{
+  unsigned long pmem;
+  unsigned long ppn;
+  char cg_path[MAXPATHLEN], mem_limit[32];
+  int fd, ret;
+
+  pmem = get_job_pmem(pjob);
+
+  ppn = pjob->ji_numvnod / pjob->ji_numnodes;
+  pmem *= ppn;
+
+  sprintf(log_buffer, "job %s set memory limitation to %ld",
+          pjob->ji_qs.ji_jobid, pmem);
+  log_ext(-1, __func__, log_buffer, LOG_INFO);
+
+  if (pmem > 0) {
+    snprintf(cg_path, MAXPATHLEN, "%s/%s/memory.limit_in_bytes",
+             CGROUP_ROOT_PATH, pjob->ji_qs.ji_jobid);
+    fd = open(cg_path, O_RDWR);
+    if (fd < 0) {
+      sprintf(log_buffer, "can not open %s:%m", cg_path);
+      log_err(errno, __func__, log_buffer);
+      return -1;
+    }
+
+    snprintf(mem_limit, sizeof(mem_limit), "%ldM", pmem);
+    ret = write(fd, mem_limit, strlen(mem_limit));
+    if (ret != strlen(mem_limit)) {
+      sprintf(log_buffer, "set memory.limit_in_bytes failed: %m");
+      log_err(errno, __func__, log_buffer);
+      close(fd);
+      return -1;
+    }
+    close(fd);
+  }
+
+  return 0;
+}
+
+static int get_all_cpus(char *buf)
+{
+  char cg_path[MAXPATHLEN];
+  int fd, ret, n;
+
+  /* read cpuset info from root cgroup */
+  snprintf(cg_path, MAXPATHLEN, "%s/cpuset.cpus", CGROUP_ROOT_PATH);
+  fd = open(cg_path, O_RDWR);
+  if (fd < 0) {
+    sprintf(log_buffer, "can not read cpuset info from root cgroup:%m");
+    log_err(errno, __func__, log_buffer);
+
+    return -1;
+  }
+
+  n = 0;
+  memset(buf, 0, sizeof(buf));
+  while (1) {
+    ret = read(fd, buf + n, sizeof(buf) - n);
+    n += ret;
+    if (!ret)
+      break;
+  }
+
+  return 0;
+}
+
+static int get_aval_cpus(char *buf, const char *jobid, int ppn)
+{
+  int i, start, end, count = 0;
+
+  if (!nr_free_cpus)
+    return -1;
+
+  start = 0, end = 0;
+  for (i = 0; i < nr_cpus; i++) {
+    if (strlen(cpuset_array[i]) == 0 && ppn == 1) {
+      memcpy(cpuset_array[i], jobid, strlen(jobid) + 1);
+      sprintf(buf, "%d", i);
+      break;
+    }
+
+    if (strlen(cpuset_array[i]) == 0) {
+      count ++;
+      memcpy(cpuset_array[i], jobid, strlen(jobid) + 1);
+      if (count == ppn) {
+        if (strlen(buf) == 0)
+          sprintf(buf, "%d-%d", start, i);
+        else
+          sprintf(buf, "%s,%d-%d", buf, start, i);
+        break;
+      }
+    } else {
+
+      if (count == 0) {
+        start = i + 1;
+        continue;
+      }
+
+      if (!strcmp(cpuset_array[i - 1], jobid)) {
+        if (strlen(buf) == 0)
+          sprintf(buf, "%d-%d", start, i - 1);
+        else
+          sprintf(buf, "%s,%d-%d", buf, start, i - 1);
+      }
+
+      start = i + 1;
+    }
+  }
+  sprintf(buf, "%s\n", buf);
+  nr_free_cpus -= ppn;
+
+  sprintf(log_buffer, "available cpuset is %s", buf);
+  log_ext(-1, __func__, log_buffer, LOG_INFO);
+
+  return 0;
+}
+
+static int set_job_cpu_limit(job *pjob)
+{
+  char cg_path[MAXPATHLEN];
+  char buf[BUFSIZ];
+  int fd, ret, n;
+  int ppn;
+
+  ppn = pjob->ji_numvnod / pjob->ji_numnodes;
+
+  pthread_mutex_lock(&cpuset_lock);
+  if (ppn > nr_free_cpus)
+    ret = get_all_cpus(buf);
+  else {
+    memset(buf, 0, sizeof(buf));
+    ret = get_aval_cpus(buf, pjob->ji_qs.ji_jobid, ppn);
+  }
+  pthread_mutex_unlock(&cpuset_lock);
+
+  if (ret)
+    return -1;
+
+  /* set cpuset to cpuset of root cgroup */
+  snprintf(cg_path, MAXPATHLEN, "%s/%s/cpuset.cpus",
+           CGROUP_ROOT_PATH, pjob->ji_qs.ji_jobid);
+  fd = open(cg_path, O_RDWR);
+  if (fd < 0) {
+    sprintf(log_buffer, "can not open %s:%m", cg_path);
+    log_err(errno, __func__, log_buffer);
+
+    return -1;
+  }
+
+  ret = write(fd, buf, n);
+  if (ret != n) {
+    sprintf(log_buffer, "set cpuset.cpus failed: %m");
+    log_err(errno, __func__, log_buffer);
+
+    close(fd);
+    return -1;
+  }
+
+  close(fd);
+
+  return 0;
+}
+
+static int set_job_numa_node_limit(job *pjob)
+{
+  char cg_path[MAXPATHLEN];
+  char buf[BUFSIZ];
+  int fd, ret, n;
+
+  /* read NUMA node info from root cgroup */
+  snprintf(cg_path, MAXPATHLEN, "%s/cpuset.mems", CGROUP_ROOT_PATH);
+  fd = open(cg_path, O_RDWR);
+  if (fd < 0) {
+    sprintf(log_buffer, "can not read numa node info from root cgroup:%m");
+    log_err(errno, __func__, log_buffer);
+
+    return -1;
+  }
+
+  n = 0;
+  memset(buf, 0, sizeof(buf));
+  while (1) {
+    ret = read(fd, buf + n, sizeof(buf) - n);
+    n += ret;
+    if (!ret)
+      break;
+  }
+
+  close(fd);
+
+  sprintf(log_buffer, "NUMA node info of root cgroup is: %s", buf);
+  log_ext(-1, __func__, log_buffer, LOG_INFO);
+
+  /* set this cgroup to use memory from all NUMA nodes */
+  snprintf(cg_path, MAXPATHLEN, "%s/%s/cpuset.mems",
+           CGROUP_ROOT_PATH, pjob->ji_qs.ji_jobid);
+  fd = open(cg_path, O_RDWR);
+  if (fd < 0) {
+    sprintf(log_buffer, "can not open %s:%m", cg_path);
+    log_err(errno, __func__, log_buffer);
+
+    return -1;
+  }
+
+  ret = write(fd, buf, n);
+  if (ret != n) {
+    sprintf(log_buffer, "set cpuset.mems failed:%m");
+    log_err(errno, __func__, log_buffer);
+
+    close(fd);
+    return -1;
+  }
+
+  close(fd);
+
+  return 0;
+}
+
+int create_job_cgroup(job *pjob)
+{
+  char cg_path[MAXPATHLEN];
+  int ret;
+
+  sprintf(log_buffer, "creating cgroup for job %s", pjob->ji_qs.ji_jobid);
+  log_ext(-1, __func__, log_buffer, LOG_INFO);
+
+  ret = access(CGROUP_ROOT_PATH, R_OK | W_OK);
+  if (ret) {
+    sprintf(log_buffer, "cgroup mount point '%s' not found", CGROUP_ROOT_PATH);
+    log_err(errno, __func__, log_buffer);
+
+    return -1;
+  }
+
+  snprintf(cg_path, MAXPATHLEN, "%s/%s", CGROUP_ROOT_PATH,
+           pjob->ji_qs.ji_jobid);
+
+  ret = mkdir(cg_path, 0755);
+  if (ret) {
+    sprintf(log_buffer, "create cgroup for %s failed: %m",
+            pjob->ji_qs.ji_jobid);
+    log_err(errno, __func__, log_buffer);
+
+    return -1;
+  }
+
+  ret = set_job_cpu_limit(pjob);
+  if (ret)
+    return -1;
+
+  ret = set_job_numa_node_limit(pjob);
+  if (ret)
+    return -1;
+
+  ret = set_job_mem_limit(pjob);
+  if (ret)
+    return -1;
+
+  return 0;
+}
+
+int move_task_to_cgroup(pid_t pid, job *pjob)
+{
+  char cg_path[MAXPATHLEN], buf[BUFSIZ];
+  int fd, ret;
+
+  sprintf(log_buffer, "moving job %s to cgroup", pjob->ji_qs.ji_jobid);
+  log_ext(-1, __func__, log_buffer, LOG_INFO);
+
+  snprintf(cg_path, MAXPATHLEN, "%s/%s/tasks", CGROUP_ROOT_PATH,
+           pjob->ji_qs.ji_jobid);
+
+  fd = open(cg_path, O_RDWR);
+  if (fd < 0) {
+    sprintf(log_buffer, "failed to open %s:%m", cg_path);
+    log_err(errno, __func__, log_buffer);
+
+    return -1;
+  }
+
+  snprintf(buf, sizeof(buf), "%d\n", pid);
+  ret = write(fd, buf, strlen(buf));
+  if (ret != strlen(buf)) {
+    sprintf(log_buffer, "failed to move task to cgroup: %m");
+    log_err(errno, __func__, log_buffer);
+
+    return -1;
+  }
+
+  close(fd);
+
+  return 0;
+}
+
+int remove_job_cgroup(job *pjob)
+{
+  char cg_path[MAXPATHLEN];
+  int ret, i;
+
+  sprintf(log_buffer, "removing cgroup for job %s", pjob->ji_qs.ji_jobid);
+  log_ext(-1, __func__, log_buffer, LOG_INFO);
+
+  pthread_mutex_lock(&cpuset_lock);
+  for (i = 0; i < nr_cpus; i++) {
+    if (!strcmp(cpuset_array[i], pjob->ji_qs.ji_jobid)) {
+      cpuset_array[i][0] = '\0';
+      nr_free_cpus++;
+    }
+  }
+  pthread_mutex_unlock(&cpuset_lock);
+
+  snprintf(cg_path, MAXPATHLEN, "%s/%s/", CGROUP_ROOT_PATH,
+           pjob->ji_qs.ji_jobid);
+
+  ret = rmdir(cg_path);
+  if (ret) {
+    sprintf(log_buffer, "remove cgroup for job %s failed:%m",
+           pjob->ji_qs.ji_jobid);
+    log_err(errno, __func__, log_buffer);
+
+    return -1;
+  }
+
+  return 0;
+}
diff --git a/src/resmom/mom_cgroup.h b/src/resmom/mom_cgroup.h
new file mode 100644
index 0000000..4e7a006
--- /dev/null
+++ b/src/resmom/mom_cgroup.h
@@ -0,0 +1,14 @@
+#ifndef _MOM_CGROUP_H_
+#define _MOM_CGROUP_H_
+
+#define CGROUP_ROOT_PATH "/dev/torque"
+
+int create_job_cgroup(job *pjob);
+
+int move_task_to_cgroup(pid_t pid, job *pjob);
+
+int remove_job_cgroup(job *pjob);
+
+int use_cgroup(job *pjob);
+
+#endif
diff --git a/src/resmom/mom_comm.c b/src/resmom/mom_comm.c
index 0a34d04..cc54c0e 100644
--- a/src/resmom/mom_comm.c
+++ b/src/resmom/mom_comm.c
@@ -2387,7 +2387,11 @@ int im_join_job_as_sister(
   
 #endif  /* ndef NUMA_SUPPORT */
 #endif  /* (PENABLE_LINUX26_CPUSETS) */
-    
+
+  if (use_cgroup(pjob) == TRUE) {
+    create_job_cgroup(pjob);
+  }
+
   ret = run_prologue_scripts(pjob);
   if (ret != PBSE_NONE)
     {
@@ -2574,6 +2578,10 @@ void im_kill_job_as_sister(
     {
     momport = pbs_rm_port;
     }
+
+  if (use_cgroup(pjob)) {
+    remove_job_cgroup(pjob);
+  }
   
   job_save(pjob, SAVEJOB_QUICK, momport);
   
diff --git a/src/resmom/mom_main.c b/src/resmom/mom_main.c
index c620ae7..4d25284 100644
--- a/src/resmom/mom_main.c
+++ b/src/resmom/mom_main.c
@@ -8708,6 +8708,24 @@ int setup_nodeboards()
 #endif /* ifdef NUMA_SUPPORT */
 
 
+int setup_cpuset_array()
+{
+  /* defined in mom_cgroup.c */
+  extern char **cpuset_array;
+  extern int nr_cpus;
+  extern int nr_free_cpus;
+  int i;
+
+  nr_free_cpus = nr_cpus = sysconf(_SC_NPROCESSORS_ONLN);
+
+  cpuset_array = malloc(nr_cpus * sizeof(char *));
+  for (i = 0; i < nr_cpus; i++) {
+    cpuset_array[i] = malloc(PBS_MAXSVRJOBID + 1);
+    memset(cpuset_array[i], 0, PBS_MAXSVRJOBID + 1);
+  }
+
+  return 0;
+}
 
 
 /* 
@@ -8724,7 +8742,6 @@ int main(
   int       rc;
   int       tmpFD;
 
-  tmpFD = sysconf(_SC_OPEN_MAX);
 
   /* close any inherited extra files, leaving stdin, stdout, and stderr open */
 
@@ -8763,6 +8780,8 @@ int main(
 
 #endif  /* NVIDIA_GPUS */
 
+  setup_cpuset_array();
+
   main_loop();
 
   if (mom_run_state == MOM_RUN_STATE_KILLALL)
diff --git a/src/resmom/start_exec.c b/src/resmom/start_exec.c
index 4c16414..85c99b8 100644
--- a/src/resmom/start_exec.c
+++ b/src/resmom/start_exec.c
@@ -2294,8 +2294,20 @@ int TMomFinalizeJob2(
     
     /*NOTREACHED*/
     }
-  
+
   /* parent */
+
+  log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer);
+
+  if (use_cgroup(pjob) == TRUE) {
+    sprintf(log_buffer, "create cgroup for job %s", pjob->ji_qs.ji_jobid);
+    log_ext(-1, __func__, log_buffer, LOG_DEBUG);
+
+    if (create_job_cgroup(pjob) == 0) {
+      move_task_to_cgroup(cpid, pjob);
+    }
+
+  }
   
   close(TJE->upfds);
   
@@ -3971,7 +3983,6 @@ int TMomFinalizeChild(
       }
     move_to_job_cpuset(getpid(), pjob);
     }
-
 #endif  /* (PENABLE_LINUX26_CPUSETS) */
 
   if (site_job_setup(pjob) != 0)
@@ -4736,6 +4747,10 @@ int start_process(
         "task set to running/saving task (start_process)");
       }
 
+    if (use_cgroup(pjob)) {
+      move_task_to_cgroup(ptask->ti_qs.ti_sid, pjob);
+    }
+
     task_save(ptask);
 
     if (pjob->ji_qs.ji_substate != JOB_SUBSTATE_RUNNING)
-- 
1.7.6.1



More information about the torqueusers mailing list