[torqueusers] [PATCH 3/3] resmom: create cgroup for jobs to limit cpu and mem usage

levin li levin108 at gmail.com
Tue Nov 20 19:18:56 MST 2012


Hi, David

Thanks for you advice, I've post the patches to your bugzilla.

thanks,

levin

On 2012年11月21日 01:19, David Beer wrote:
> Levin,
>
> Could you create a bugzilla issue and then attach the complete patch to
> the bugzilla ticket? This will ensure that your work is not lost and
> make it easier to track the issue.
>
> Cheers,
>
> David
>
> On Tue, Nov 20, 2012 at 2:02 AM, levin li <levin108 at gmail.com
> <mailto:levin108 at gmail.com>> wrote:
>
>
>     Signed-off-by: levin li <levin108 at gmail.com <mailto:levin108 at gmail.com>>
>     ---
>       src/resmom/catch_child.c |    4 +
>       src/resmom/mom_cgroup.c  |  445
>     ++++++++++++++++++++++++++++++++++++++++++++++
>       src/resmom/mom_cgroup.h  |   14 ++
>       src/resmom/mom_comm.c    |   10 +-
>       src/resmom/mom_main.c    |   21 ++-
>       src/resmom/start_exec.c  |   19 ++-
>       6 files changed, 509 insertions(+), 4 deletions(-)
>       create mode 100644 src/resmom/mom_cgroup.c
>       create mode 100644 src/resmom/mom_cgroup.h
>
>     diff --git a/src/resmom/catch_child.c b/src/resmom/catch_child.c
>     index 8e3527f..72d79a8 100644
>     --- a/src/resmom/catch_child.c
>     +++ b/src/resmom/catch_child.c
>     @@ -438,6 +438,10 @@ void scan_for_exiting(void)
>                 pjob->ji_qs.ji_un.ji_momt.ji_exitstat =
>     ptask->ti_qs.ti_exitstat;
>                 }
>
>     +        if (use_cgroup(pjob)) {
>     +          remove_job_cgroup(pjob);
>     +        }
>     +
>               log_event(
>                 PBSEVENT_JOB,
>                 PBS_EVENTCLASS_JOB,
>     diff --git a/src/resmom/mom_cgroup.c b/src/resmom/mom_cgroup.c
>     new file mode 100644
>     index 0000000..a9f388c
>     --- /dev/null
>     +++ b/src/resmom/mom_cgroup.c
>     @@ -0,0 +1,445 @@
>     +#include <pbs_config.h>   /* the master config generated by
>     configure */
>     +
>     +#include <stdio.h>
>     +#include <stdlib.h>
>     +#include <unistd.h>
>     +#ifdef USEJOBCREATE
>     +#ifndef JOBFAKE
>     +#include <job.h>
>     +#endif /* JOBFAKE */
>     +#endif /* USEJOBCREATE */
>     +#include <errno.h>
>     +#include <fcntl.h>
>     +#include <grp.h>
>     +#include <string.h>
>     +#include <limits.h>
>     +#include <assert.h>
>     +#include <signal.h>
>     +#include <ctype.h>
>     +#include <time.h>
>     +#include <sys/param.h>
>     +#include <sys/types.h>
>     +#include <sys/stat.h>
>     +#include <sys/wait.h>
>     +#include <sys/mman.h>
>     +#include <sys/ioctl.h>
>     +#include <pthread.h>
>     +
>     +#include "rpp.h"
>     +#include "libpbs.h"
>     +#include "portability.h"
>     +#include "list_link.h"
>     +#include "server_limits.h"
>     +#include "attribute.h"
>     +#include "resource.h"
>     +#include "resmon.h"
>     +#include "pbs_job.h"
>     +#include "log.h"
>     +#include "../lib/Liblog/pbs_log.h"
>     +#include "../lib/Liblog/log_event.h"
>     +#include "mom_mach.h"
>     +#include "mom_func.h"
>     +#include "pbs_error.h"
>     +#include "svrfunc.h"
>     +#include "dis.h"
>     +#include "batch_request.h"
>     +#include "mcom.h"
>     +#include "resource.h"
>     +#include "utils.h"
>     +#include "mom_comm.h"
>     +#include "mom_cgroup.h"
>     +#include "server.h"
>     +#include "svrfunc.h"
>     +
>     +char **cpuset_array = NULL;
>     +int nr_cpus;
>     +int nr_free_cpus;
>     +pthread_mutex_t cpuset_lock = PTHREAD_MUTEX_INITIALIZER;
>     +
>     +int use_cgroup(job *pjob)
>     +{
>     +  if (pjob->ji_wattr[JOB_ATR_cgroup_enable].at_flags & ATR_VFLAG_SET)
>     +    return TRUE;
>     +  return FALSE;
>     +}
>     +
>     +static int mm_getsize(resource *pres, unsigned long *ret) {
>     +  unsigned long value;
>     +  if (pres->rs_value.at_type != ATR_TYPE_SIZE)
>     +    return PBSE_ATTRTYPE;
>     +
>     +  value = pres->rs_value.at_val.at_size.atsv_num;
>     +  if (pres->rs_value.at_val.at_size.atsv_units == ATR_SV_WORDSZ) {
>     +    if (value > ULONG_MAX / sizeof(int))
>     +      return PBSE_BADATVAL;
>     +
>     +    value *= sizeof(int);
>     +  }
>     +
>     +  if (value > (ULONG_MAX >> pres->rs_value.at_val.at_size.atsv_shift))
>     +    return PBSE_BADATVAL;
>     +
>     +  *ret = (value << pres->rs_value.at_val.at_size.atsv_shift);
>     +
>     +  return PBSE_NONE;
>     +}  /* END mm_getsize() */
>     +
>     +static int get_job_pmem(job *pjob)
>     +{
>     +  resource *pres;
>     +  char *pname = NULL;
>     +
>     +  pres = (resource
>     *)GET_NEXT(pjob->ji_wattr[JOB_ATR_resource].at_val.at_list);
>     +
>     +  while (pres) {
>     +    int ret;
>     +    unsigned long pmem;
>     +
>     +    if (pres->rs_defin != NULL)
>     +      pname = pres->rs_defin->rs_name;
>     +    else
>     +      goto next;
>     +
>     +    if (strcmp(pname, "pmem"))
>     +      goto next;
>     +
>     +    ret = mm_getsize(pres, &pmem);
>     +    if (ret != PBSE_NONE) {
>     +      DBPRT(("Memory restriction not set\n"));
>     +      return -1;
>     +    }
>     +
>     +    return pmem;
>     +next:
>     +    pres = (resource *)GET_NEXT(pres->rs_link);
>     +  }
>     +
>     +  return -1;
>     +}
>     +
>     +static int set_job_mem_limit(job *pjob)
>     +{
>     +  unsigned long pmem;
>     +  unsigned long ppn;
>     +  char cg_path[MAXPATHLEN], mem_limit[32];
>     +  int fd, ret;
>     +
>     +  pmem = get_job_pmem(pjob);
>     +
>     +  ppn = pjob->ji_numvnod / pjob->ji_numnodes;
>     +  pmem *= ppn;
>     +
>     +  sprintf(log_buffer, "job %s set memory limitation to %ld",
>     +          pjob->ji_qs.ji_jobid, pmem);
>     +  log_ext(-1, __func__, log_buffer, LOG_INFO);
>     +
>     +  if (pmem > 0) {
>     +    snprintf(cg_path, MAXPATHLEN, "%s/%s/memory.limit_in_bytes",
>     +             CGROUP_ROOT_PATH, pjob->ji_qs.ji_jobid);
>     +    fd = open(cg_path, O_RDWR);
>     +    if (fd < 0) {
>     +      sprintf(log_buffer, "can not open %s:%m", cg_path);
>     +      log_err(errno, __func__, log_buffer);
>     +      return -1;
>     +    }
>     +
>     +    snprintf(mem_limit, sizeof(mem_limit), "%ldM", pmem);
>     +    ret = write(fd, mem_limit, strlen(mem_limit));
>     +    if (ret != strlen(mem_limit)) {
>     +      sprintf(log_buffer, "set memory.limit_in_bytes failed: %m");
>     +      log_err(errno, __func__, log_buffer);
>     +      close(fd);
>     +      return -1;
>     +    }
>     +    close(fd);
>     +  }
>     +
>     +  return 0;
>     +}
>     +
>     +static int get_all_cpus(char *buf)
>     +{
>     +  char cg_path[MAXPATHLEN];
>     +  int fd, ret, n;
>     +
>     +  /* read cpuset info from root cgroup */
>     +  snprintf(cg_path, MAXPATHLEN, "%s/cpuset.cpus", CGROUP_ROOT_PATH);
>     +  fd = open(cg_path, O_RDWR);
>     +  if (fd < 0) {
>     +    sprintf(log_buffer, "can not read cpuset info from root
>     cgroup:%m");
>     +    log_err(errno, __func__, log_buffer);
>     +
>     +    return -1;
>     +  }
>     +
>     +  n = 0;
>     +  memset(buf, 0, sizeof(buf));
>     +  while (1) {
>     +    ret = read(fd, buf + n, sizeof(buf) - n);
>     +    n += ret;
>     +    if (!ret)
>     +      break;
>     +  }
>     +
>     +  return 0;
>     +}
>     +
>     +static int get_aval_cpus(char *buf, const char *jobid, int ppn)
>     +{
>     +  int i, start, end, count = 0;
>     +
>     +  if (!nr_free_cpus)
>     +    return -1;
>     +
>     +  start = 0, end = 0;
>     +  for (i = 0; i < nr_cpus; i++) {
>     +    if (strlen(cpuset_array[i]) == 0 && ppn == 1) {
>     +      memcpy(cpuset_array[i], jobid, strlen(jobid) + 1);
>     +      sprintf(buf, "%d", i);
>     +      break;
>     +    }
>     +
>     +    if (strlen(cpuset_array[i]) == 0) {
>     +      count ++;
>     +      memcpy(cpuset_array[i], jobid, strlen(jobid) + 1);
>     +      if (count == ppn) {
>     +        if (strlen(buf) == 0)
>     +          sprintf(buf, "%d-%d", start, i);
>     +        else
>     +          sprintf(buf, "%s,%d-%d", buf, start, i);
>     +        break;
>     +      }
>     +    } else {
>     +
>     +      if (count == 0) {
>     +        start = i + 1;
>     +        continue;
>     +      }
>     +
>     +      if (!strcmp(cpuset_array[i - 1], jobid)) {
>     +        if (strlen(buf) == 0)
>     +          sprintf(buf, "%d-%d", start, i - 1);
>     +        else
>     +          sprintf(buf, "%s,%d-%d", buf, start, i - 1);
>     +      }
>     +
>     +      start = i + 1;
>     +    }
>     +  }
>     +  sprintf(buf, "%s\n", buf);
>     +  nr_free_cpus -= ppn;
>     +
>     +  sprintf(log_buffer, "available cpuset is %s", buf);
>     +  log_ext(-1, __func__, log_buffer, LOG_INFO);
>     +
>     +  return 0;
>     +}
>     +
>     +static int set_job_cpu_limit(job *pjob)
>     +{
>     +  char cg_path[MAXPATHLEN];
>     +  char buf[BUFSIZ];
>     +  int fd, ret, n;
>     +  int ppn;
>     +
>     +  ppn = pjob->ji_numvnod / pjob->ji_numnodes;
>     +
>     +  pthread_mutex_lock(&cpuset_lock);
>     +  if (ppn > nr_free_cpus)
>     +    ret = get_all_cpus(buf);
>     +  else {
>     +    memset(buf, 0, sizeof(buf));
>     +    ret = get_aval_cpus(buf, pjob->ji_qs.ji_jobid, ppn);
>     +  }
>     +  pthread_mutex_unlock(&cpuset_lock);
>     +
>     +  if (ret)
>     +    return -1;
>     +
>     +  /* set cpuset to cpuset of root cgroup */
>     +  snprintf(cg_path, MAXPATHLEN, "%s/%s/cpuset.cpus",
>     +           CGROUP_ROOT_PATH, pjob->ji_qs.ji_jobid);
>     +  fd = open(cg_path, O_RDWR);
>     +  if (fd < 0) {
>     +    sprintf(log_buffer, "can not open %s:%m", cg_path);
>     +    log_err(errno, __func__, log_buffer);
>     +
>     +    return -1;
>     +  }
>     +
>     +  ret = write(fd, buf, n);
>     +  if (ret != n) {
>     +    sprintf(log_buffer, "set cpuset.cpus failed: %m");
>     +    log_err(errno, __func__, log_buffer);
>     +
>     +    close(fd);
>     +    return -1;
>     +  }
>     +
>     +  close(fd);
>     +
>     +  return 0;
>     +}
>     +
>     +static int set_job_numa_node_limit(job *pjob)
>     +{
>     +  char cg_path[MAXPATHLEN];
>     +  char buf[BUFSIZ];
>     +  int fd, ret, n;
>     +
>     +  /* read NUMA node info from root cgroup */
>     +  snprintf(cg_path, MAXPATHLEN, "%s/cpuset.mems", CGROUP_ROOT_PATH);
>     +  fd = open(cg_path, O_RDWR);
>     +  if (fd < 0) {
>     +    sprintf(log_buffer, "can not read numa node info from root
>     cgroup:%m");
>     +    log_err(errno, __func__, log_buffer);
>     +
>     +    return -1;
>     +  }
>     +
>     +  n = 0;
>     +  memset(buf, 0, sizeof(buf));
>     +  while (1) {
>     +    ret = read(fd, buf + n, sizeof(buf) - n);
>     +    n += ret;
>     +    if (!ret)
>     +      break;
>     +  }
>     +
>     +  close(fd);
>     +
>     +  sprintf(log_buffer, "NUMA node info of root cgroup is: %s", buf);
>     +  log_ext(-1, __func__, log_buffer, LOG_INFO);
>     +
>     +  /* set this cgroup to use memory from all NUMA nodes */
>     +  snprintf(cg_path, MAXPATHLEN, "%s/%s/cpuset.mems",
>     +           CGROUP_ROOT_PATH, pjob->ji_qs.ji_jobid);
>     +  fd = open(cg_path, O_RDWR);
>     +  if (fd < 0) {
>     +    sprintf(log_buffer, "can not open %s:%m", cg_path);
>     +    log_err(errno, __func__, log_buffer);
>     +
>     +    return -1;
>     +  }
>     +
>     +  ret = write(fd, buf, n);
>     +  if (ret != n) {
>     +    sprintf(log_buffer, "set cpuset.mems failed:%m");
>     +    log_err(errno, __func__, log_buffer);
>     +
>     +    close(fd);
>     +    return -1;
>     +  }
>     +
>     +  close(fd);
>     +
>     +  return 0;
>     +}
>     +
>     +int create_job_cgroup(job *pjob)
>     +{
>     +  char cg_path[MAXPATHLEN];
>     +  int ret;
>     +
>     +  sprintf(log_buffer, "creating cgroup for job %s",
>     pjob->ji_qs.ji_jobid);
>     +  log_ext(-1, __func__, log_buffer, LOG_INFO);
>     +
>     +  ret = access(CGROUP_ROOT_PATH, R_OK | W_OK);
>     +  if (ret) {
>     +    sprintf(log_buffer, "cgroup mount point '%s' not found",
>     CGROUP_ROOT_PATH);
>     +    log_err(errno, __func__, log_buffer);
>     +
>     +    return -1;
>     +  }
>     +
>     +  snprintf(cg_path, MAXPATHLEN, "%s/%s", CGROUP_ROOT_PATH,
>     +           pjob->ji_qs.ji_jobid);
>     +
>     +  ret = mkdir(cg_path, 0755);
>     +  if (ret) {
>     +    sprintf(log_buffer, "create cgroup for %s failed: %m",
>     +            pjob->ji_qs.ji_jobid);
>     +    log_err(errno, __func__, log_buffer);
>     +
>     +    return -1;
>     +  }
>     +
>     +  ret = set_job_cpu_limit(pjob);
>     +  if (ret)
>     +    return -1;
>     +
>     +  ret = set_job_numa_node_limit(pjob);
>     +  if (ret)
>     +    return -1;
>     +
>     +  ret = set_job_mem_limit(pjob);
>     +  if (ret)
>     +    return -1;
>     +
>     +  return 0;
>     +}
>     +
>     +int move_task_to_cgroup(pid_t pid, job *pjob)
>     +{
>     +  char cg_path[MAXPATHLEN], buf[BUFSIZ];
>     +  int fd, ret;
>     +
>     +  sprintf(log_buffer, "moving job %s to cgroup", pjob->ji_qs.ji_jobid);
>     +  log_ext(-1, __func__, log_buffer, LOG_INFO);
>     +
>     +  snprintf(cg_path, MAXPATHLEN, "%s/%s/tasks", CGROUP_ROOT_PATH,
>     +           pjob->ji_qs.ji_jobid);
>     +
>     +  fd = open(cg_path, O_RDWR);
>     +  if (fd < 0) {
>     +    sprintf(log_buffer, "failed to open %s:%m", cg_path);
>     +    log_err(errno, __func__, log_buffer);
>     +
>     +    return -1;
>     +  }
>     +
>     +  snprintf(buf, sizeof(buf), "%d\n", pid);
>     +  ret = write(fd, buf, strlen(buf));
>     +  if (ret != strlen(buf)) {
>     +    sprintf(log_buffer, "failed to move task to cgroup: %m");
>     +    log_err(errno, __func__, log_buffer);
>     +
>     +    return -1;
>     +  }
>     +
>     +  close(fd);
>     +
>     +  return 0;
>     +}
>     +
>     +int remove_job_cgroup(job *pjob)
>     +{
>     +  char cg_path[MAXPATHLEN];
>     +  int ret, i;
>     +
>     +  sprintf(log_buffer, "removing cgroup for job %s",
>     pjob->ji_qs.ji_jobid);
>     +  log_ext(-1, __func__, log_buffer, LOG_INFO);
>     +
>     +  pthread_mutex_lock(&cpuset_lock);
>     +  for (i = 0; i < nr_cpus; i++) {
>     +    if (!strcmp(cpuset_array[i], pjob->ji_qs.ji_jobid)) {
>     +      cpuset_array[i][0] = '\0';
>     +      nr_free_cpus++;
>     +    }
>     +  }
>     +  pthread_mutex_unlock(&cpuset_lock);
>     +
>     +  snprintf(cg_path, MAXPATHLEN, "%s/%s/", CGROUP_ROOT_PATH,
>     +           pjob->ji_qs.ji_jobid);
>     +
>     +  ret = rmdir(cg_path);
>     +  if (ret) {
>     +    sprintf(log_buffer, "remove cgroup for job %s failed:%m",
>     +           pjob->ji_qs.ji_jobid);
>     +    log_err(errno, __func__, log_buffer);
>     +
>     +    return -1;
>     +  }
>     +
>     +  return 0;
>     +}
>     diff --git a/src/resmom/mom_cgroup.h b/src/resmom/mom_cgroup.h
>     new file mode 100644
>     index 0000000..4e7a006
>     --- /dev/null
>     +++ b/src/resmom/mom_cgroup.h
>     @@ -0,0 +1,14 @@
>     +#ifndef _MOM_CGROUP_H_
>     +#define _MOM_CGROUP_H_
>     +
>     +#define CGROUP_ROOT_PATH "/dev/torque"
>     +
>     +int create_job_cgroup(job *pjob);
>     +
>     +int move_task_to_cgroup(pid_t pid, job *pjob);
>     +
>     +int remove_job_cgroup(job *pjob);
>     +
>     +int use_cgroup(job *pjob);
>     +
>     +#endif
>     diff --git a/src/resmom/mom_comm.c b/src/resmom/mom_comm.c
>     index 0a34d04..cc54c0e 100644
>     --- a/src/resmom/mom_comm.c
>     +++ b/src/resmom/mom_comm.c
>     @@ -2387,7 +2387,11 @@ int im_join_job_as_sister(
>
>       #endif  /* ndef NUMA_SUPPORT */
>       #endif  /* (PENABLE_LINUX26_CPUSETS) */
>     -
>     +
>     +  if (use_cgroup(pjob) == TRUE) {
>     +    create_job_cgroup(pjob);
>     +  }
>     +
>         ret = run_prologue_scripts(pjob);
>         if (ret != PBSE_NONE)
>           {
>     @@ -2574,6 +2578,10 @@ void im_kill_job_as_sister(
>           {
>           momport = pbs_rm_port;
>           }
>     +
>     +  if (use_cgroup(pjob)) {
>     +    remove_job_cgroup(pjob);
>     +  }
>
>         job_save(pjob, SAVEJOB_QUICK, momport);
>
>     diff --git a/src/resmom/mom_main.c b/src/resmom/mom_main.c
>     index c620ae7..4d25284 100644
>     --- a/src/resmom/mom_main.c
>     +++ b/src/resmom/mom_main.c
>     @@ -8708,6 +8708,24 @@ int setup_nodeboards()
>       #endif /* ifdef NUMA_SUPPORT */
>
>
>     +int setup_cpuset_array()
>     +{
>     +  /* defined in mom_cgroup.c */
>     +  extern char **cpuset_array;
>     +  extern int nr_cpus;
>     +  extern int nr_free_cpus;
>     +  int i;
>     +
>     +  nr_free_cpus = nr_cpus = sysconf(_SC_NPROCESSORS_ONLN);
>     +
>     +  cpuset_array = malloc(nr_cpus * sizeof(char *));
>     +  for (i = 0; i < nr_cpus; i++) {
>     +    cpuset_array[i] = malloc(PBS_MAXSVRJOBID + 1);
>     +    memset(cpuset_array[i], 0, PBS_MAXSVRJOBID + 1);
>     +  }
>     +
>     +  return 0;
>     +}
>
>
>       /*
>     @@ -8724,7 +8742,6 @@ int main(
>         int       rc;
>         int       tmpFD;
>
>     -  tmpFD = sysconf(_SC_OPEN_MAX);
>
>         /* close any inherited extra files, leaving stdin, stdout, and
>     stderr open */
>
>     @@ -8763,6 +8780,8 @@ int main(
>
>       #endif  /* NVIDIA_GPUS */
>
>     +  setup_cpuset_array();
>     +
>         main_loop();
>
>         if (mom_run_state == MOM_RUN_STATE_KILLALL)
>     diff --git a/src/resmom/start_exec.c b/src/resmom/start_exec.c
>     index 4c16414..85c99b8 100644
>     --- a/src/resmom/start_exec.c
>     +++ b/src/resmom/start_exec.c
>     @@ -2294,8 +2294,20 @@ int TMomFinalizeJob2(
>
>           /*NOTREACHED*/
>           }
>     -
>     +
>         /* parent */
>     +
>     +  log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer);
>     +
>     +  if (use_cgroup(pjob) == TRUE) {
>     +    sprintf(log_buffer, "create cgroup for job %s",
>     pjob->ji_qs.ji_jobid);
>     +    log_ext(-1, __func__, log_buffer, LOG_DEBUG);
>     +
>     +    if (create_job_cgroup(pjob) == 0) {
>     +      move_task_to_cgroup(cpid, pjob);
>     +    }
>     +
>     +  }
>
>         close(TJE->upfds);
>
>     @@ -3971,7 +3983,6 @@ int TMomFinalizeChild(
>             }
>           move_to_job_cpuset(getpid(), pjob);
>           }
>     -
>       #endif  /* (PENABLE_LINUX26_CPUSETS) */
>
>         if (site_job_setup(pjob) != 0)
>     @@ -4736,6 +4747,10 @@ int start_process(
>               "task set to running/saving task (start_process)");
>             }
>
>     +    if (use_cgroup(pjob)) {
>     +      move_task_to_cgroup(ptask->ti_qs.ti_sid, pjob);
>     +    }
>     +
>           task_save(ptask);
>
>           if (pjob->ji_qs.ji_substate != JOB_SUBSTATE_RUNNING)
>     --
>     1.7.6.1
>
>     _______________________________________________
>     torqueusers mailing list
>     torqueusers at supercluster.org <mailto:torqueusers at supercluster.org>
>     http://www.supercluster.org/mailman/listinfo/torqueusers
>
>
>
>
> --
> David Beer | Senior Software Engineer
> Adaptive Computing
>
>
>
> _______________________________________________
> torqueusers mailing list
> torqueusers at supercluster.org
> http://www.supercluster.org/mailman/listinfo/torqueusers
>



More information about the torqueusers mailing list