[torqueusers] [PATCH 3/3] resmom: create cgroup for jobs to limit cpu and mem usage

David Beer dbeer at adaptivecomputing.com
Tue Nov 20 10:19:57 MST 2012


Levin,

Could you create a bugzilla issue and then attach the complete patch to the
bugzilla ticket? This will ensure that your work is not lost and make it
easier to track the issue.

Cheers,

David

On Tue, Nov 20, 2012 at 2:02 AM, levin li <levin108 at gmail.com> wrote:

>
> Signed-off-by: levin li <levin108 at gmail.com>
> ---
>  src/resmom/catch_child.c |    4 +
>  src/resmom/mom_cgroup.c  |  445
> ++++++++++++++++++++++++++++++++++++++++++++++
>  src/resmom/mom_cgroup.h  |   14 ++
>  src/resmom/mom_comm.c    |   10 +-
>  src/resmom/mom_main.c    |   21 ++-
>  src/resmom/start_exec.c  |   19 ++-
>  6 files changed, 509 insertions(+), 4 deletions(-)
>  create mode 100644 src/resmom/mom_cgroup.c
>  create mode 100644 src/resmom/mom_cgroup.h
>
> diff --git a/src/resmom/catch_child.c b/src/resmom/catch_child.c
> index 8e3527f..72d79a8 100644
> --- a/src/resmom/catch_child.c
> +++ b/src/resmom/catch_child.c
> @@ -438,6 +438,10 @@ void scan_for_exiting(void)
>            pjob->ji_qs.ji_un.ji_momt.ji_exitstat =
> ptask->ti_qs.ti_exitstat;
>            }
>
> +        if (use_cgroup(pjob)) {
> +          remove_job_cgroup(pjob);
> +        }
> +
>          log_event(
>            PBSEVENT_JOB,
>            PBS_EVENTCLASS_JOB,
> diff --git a/src/resmom/mom_cgroup.c b/src/resmom/mom_cgroup.c
> new file mode 100644
> index 0000000..a9f388c
> --- /dev/null
> +++ b/src/resmom/mom_cgroup.c
> @@ -0,0 +1,445 @@
> +#include <pbs_config.h>   /* the master config generated by configure */
> +
> +#include <stdio.h>
> +#include <stdlib.h>
> +#include <unistd.h>
> +#ifdef USEJOBCREATE
> +#ifndef JOBFAKE
> +#include <job.h>
> +#endif /* JOBFAKE */
> +#endif /* USEJOBCREATE */
> +#include <errno.h>
> +#include <fcntl.h>
> +#include <grp.h>
> +#include <string.h>
> +#include <limits.h>
> +#include <assert.h>
> +#include <signal.h>
> +#include <ctype.h>
> +#include <time.h>
> +#include <sys/param.h>
> +#include <sys/types.h>
> +#include <sys/stat.h>
> +#include <sys/wait.h>
> +#include <sys/mman.h>
> +#include <sys/ioctl.h>
> +#include <pthread.h>
> +
> +#include "rpp.h"
> +#include "libpbs.h"
> +#include "portability.h"
> +#include "list_link.h"
> +#include "server_limits.h"
> +#include "attribute.h"
> +#include "resource.h"
> +#include "resmon.h"
> +#include "pbs_job.h"
> +#include "log.h"
> +#include "../lib/Liblog/pbs_log.h"
> +#include "../lib/Liblog/log_event.h"
> +#include "mom_mach.h"
> +#include "mom_func.h"
> +#include "pbs_error.h"
> +#include "svrfunc.h"
> +#include "dis.h"
> +#include "batch_request.h"
> +#include "mcom.h"
> +#include "resource.h"
> +#include "utils.h"
> +#include "mom_comm.h"
> +#include "mom_cgroup.h"
> +#include "server.h"
> +#include "svrfunc.h"
> +
> +char **cpuset_array = NULL;
> +int nr_cpus;
> +int nr_free_cpus;
> +pthread_mutex_t cpuset_lock = PTHREAD_MUTEX_INITIALIZER;
> +
> +int use_cgroup(job *pjob)
> +{
> +  if (pjob->ji_wattr[JOB_ATR_cgroup_enable].at_flags & ATR_VFLAG_SET)
> +    return TRUE;
> +  return FALSE;
> +}
> +
> +static int mm_getsize(resource *pres, unsigned long *ret) {
> +  unsigned long value;
> +  if (pres->rs_value.at_type != ATR_TYPE_SIZE)
> +    return PBSE_ATTRTYPE;
> +
> +  value = pres->rs_value.at_val.at_size.atsv_num;
> +  if (pres->rs_value.at_val.at_size.atsv_units == ATR_SV_WORDSZ) {
> +    if (value > ULONG_MAX / sizeof(int))
> +      return PBSE_BADATVAL;
> +
> +    value *= sizeof(int);
> +  }
> +
> +  if (value > (ULONG_MAX >> pres->rs_value.at_val.at_size.atsv_shift))
> +    return PBSE_BADATVAL;
> +
> +  *ret = (value << pres->rs_value.at_val.at_size.atsv_shift);
> +
> +  return PBSE_NONE;
> +}  /* END mm_getsize() */
> +
> +static int get_job_pmem(job *pjob)
> +{
> +  resource *pres;
> +  char *pname = NULL;
> +
> +  pres = (resource
> *)GET_NEXT(pjob->ji_wattr[JOB_ATR_resource].at_val.at_list);
> +
> +  while (pres) {
> +    int ret;
> +    unsigned long pmem;
> +
> +    if (pres->rs_defin != NULL)
> +      pname = pres->rs_defin->rs_name;
> +    else
> +      goto next;
> +
> +    if (strcmp(pname, "pmem"))
> +      goto next;
> +
> +    ret = mm_getsize(pres, &pmem);
> +    if (ret != PBSE_NONE) {
> +      DBPRT(("Memory restriction not set\n"));
> +      return -1;
> +    }
> +
> +    return pmem;
> +next:
> +    pres = (resource *)GET_NEXT(pres->rs_link);
> +  }
> +
> +  return -1;
> +}
> +
> +static int set_job_mem_limit(job *pjob)
> +{
> +  unsigned long pmem;
> +  unsigned long ppn;
> +  char cg_path[MAXPATHLEN], mem_limit[32];
> +  int fd, ret;
> +
> +  pmem = get_job_pmem(pjob);
> +
> +  ppn = pjob->ji_numvnod / pjob->ji_numnodes;
> +  pmem *= ppn;
> +
> +  sprintf(log_buffer, "job %s set memory limitation to %ld",
> +          pjob->ji_qs.ji_jobid, pmem);
> +  log_ext(-1, __func__, log_buffer, LOG_INFO);
> +
> +  if (pmem > 0) {
> +    snprintf(cg_path, MAXPATHLEN, "%s/%s/memory.limit_in_bytes",
> +             CGROUP_ROOT_PATH, pjob->ji_qs.ji_jobid);
> +    fd = open(cg_path, O_RDWR);
> +    if (fd < 0) {
> +      sprintf(log_buffer, "can not open %s:%m", cg_path);
> +      log_err(errno, __func__, log_buffer);
> +      return -1;
> +    }
> +
> +    snprintf(mem_limit, sizeof(mem_limit), "%ldM", pmem);
> +    ret = write(fd, mem_limit, strlen(mem_limit));
> +    if (ret != strlen(mem_limit)) {
> +      sprintf(log_buffer, "set memory.limit_in_bytes failed: %m");
> +      log_err(errno, __func__, log_buffer);
> +      close(fd);
> +      return -1;
> +    }
> +    close(fd);
> +  }
> +
> +  return 0;
> +}
> +
> +static int get_all_cpus(char *buf)
> +{
> +  char cg_path[MAXPATHLEN];
> +  int fd, ret, n;
> +
> +  /* read cpuset info from root cgroup */
> +  snprintf(cg_path, MAXPATHLEN, "%s/cpuset.cpus", CGROUP_ROOT_PATH);
> +  fd = open(cg_path, O_RDWR);
> +  if (fd < 0) {
> +    sprintf(log_buffer, "can not read cpuset info from root cgroup:%m");
> +    log_err(errno, __func__, log_buffer);
> +
> +    return -1;
> +  }
> +
> +  n = 0;
> +  memset(buf, 0, sizeof(buf));
> +  while (1) {
> +    ret = read(fd, buf + n, sizeof(buf) - n);
> +    n += ret;
> +    if (!ret)
> +      break;
> +  }
> +
> +  return 0;
> +}
> +
> +static int get_aval_cpus(char *buf, const char *jobid, int ppn)
> +{
> +  int i, start, end, count = 0;
> +
> +  if (!nr_free_cpus)
> +    return -1;
> +
> +  start = 0, end = 0;
> +  for (i = 0; i < nr_cpus; i++) {
> +    if (strlen(cpuset_array[i]) == 0 && ppn == 1) {
> +      memcpy(cpuset_array[i], jobid, strlen(jobid) + 1);
> +      sprintf(buf, "%d", i);
> +      break;
> +    }
> +
> +    if (strlen(cpuset_array[i]) == 0) {
> +      count ++;
> +      memcpy(cpuset_array[i], jobid, strlen(jobid) + 1);
> +      if (count == ppn) {
> +        if (strlen(buf) == 0)
> +          sprintf(buf, "%d-%d", start, i);
> +        else
> +          sprintf(buf, "%s,%d-%d", buf, start, i);
> +        break;
> +      }
> +    } else {
> +
> +      if (count == 0) {
> +        start = i + 1;
> +        continue;
> +      }
> +
> +      if (!strcmp(cpuset_array[i - 1], jobid)) {
> +        if (strlen(buf) == 0)
> +          sprintf(buf, "%d-%d", start, i - 1);
> +        else
> +          sprintf(buf, "%s,%d-%d", buf, start, i - 1);
> +      }
> +
> +      start = i + 1;
> +    }
> +  }
> +  sprintf(buf, "%s\n", buf);
> +  nr_free_cpus -= ppn;
> +
> +  sprintf(log_buffer, "available cpuset is %s", buf);
> +  log_ext(-1, __func__, log_buffer, LOG_INFO);
> +
> +  return 0;
> +}
> +
> +static int set_job_cpu_limit(job *pjob)
> +{
> +  char cg_path[MAXPATHLEN];
> +  char buf[BUFSIZ];
> +  int fd, ret, n;
> +  int ppn;
> +
> +  ppn = pjob->ji_numvnod / pjob->ji_numnodes;
> +
> +  pthread_mutex_lock(&cpuset_lock);
> +  if (ppn > nr_free_cpus)
> +    ret = get_all_cpus(buf);
> +  else {
> +    memset(buf, 0, sizeof(buf));
> +    ret = get_aval_cpus(buf, pjob->ji_qs.ji_jobid, ppn);
> +  }
> +  pthread_mutex_unlock(&cpuset_lock);
> +
> +  if (ret)
> +    return -1;
> +
> +  /* set cpuset to cpuset of root cgroup */
> +  snprintf(cg_path, MAXPATHLEN, "%s/%s/cpuset.cpus",
> +           CGROUP_ROOT_PATH, pjob->ji_qs.ji_jobid);
> +  fd = open(cg_path, O_RDWR);
> +  if (fd < 0) {
> +    sprintf(log_buffer, "can not open %s:%m", cg_path);
> +    log_err(errno, __func__, log_buffer);
> +
> +    return -1;
> +  }
> +
> +  ret = write(fd, buf, n);
> +  if (ret != n) {
> +    sprintf(log_buffer, "set cpuset.cpus failed: %m");
> +    log_err(errno, __func__, log_buffer);
> +
> +    close(fd);
> +    return -1;
> +  }
> +
> +  close(fd);
> +
> +  return 0;
> +}
> +
> +static int set_job_numa_node_limit(job *pjob)
> +{
> +  char cg_path[MAXPATHLEN];
> +  char buf[BUFSIZ];
> +  int fd, ret, n;
> +
> +  /* read NUMA node info from root cgroup */
> +  snprintf(cg_path, MAXPATHLEN, "%s/cpuset.mems", CGROUP_ROOT_PATH);
> +  fd = open(cg_path, O_RDWR);
> +  if (fd < 0) {
> +    sprintf(log_buffer, "can not read numa node info from root
> cgroup:%m");
> +    log_err(errno, __func__, log_buffer);
> +
> +    return -1;
> +  }
> +
> +  n = 0;
> +  memset(buf, 0, sizeof(buf));
> +  while (1) {
> +    ret = read(fd, buf + n, sizeof(buf) - n);
> +    n += ret;
> +    if (!ret)
> +      break;
> +  }
> +
> +  close(fd);
> +
> +  sprintf(log_buffer, "NUMA node info of root cgroup is: %s", buf);
> +  log_ext(-1, __func__, log_buffer, LOG_INFO);
> +
> +  /* set this cgroup to use memory from all NUMA nodes */
> +  snprintf(cg_path, MAXPATHLEN, "%s/%s/cpuset.mems",
> +           CGROUP_ROOT_PATH, pjob->ji_qs.ji_jobid);
> +  fd = open(cg_path, O_RDWR);
> +  if (fd < 0) {
> +    sprintf(log_buffer, "can not open %s:%m", cg_path);
> +    log_err(errno, __func__, log_buffer);
> +
> +    return -1;
> +  }
> +
> +  ret = write(fd, buf, n);
> +  if (ret != n) {
> +    sprintf(log_buffer, "set cpuset.mems failed:%m");
> +    log_err(errno, __func__, log_buffer);
> +
> +    close(fd);
> +    return -1;
> +  }
> +
> +  close(fd);
> +
> +  return 0;
> +}
> +
> +int create_job_cgroup(job *pjob)
> +{
> +  char cg_path[MAXPATHLEN];
> +  int ret;
> +
> +  sprintf(log_buffer, "creating cgroup for job %s", pjob->ji_qs.ji_jobid);
> +  log_ext(-1, __func__, log_buffer, LOG_INFO);
> +
> +  ret = access(CGROUP_ROOT_PATH, R_OK | W_OK);
> +  if (ret) {
> +    sprintf(log_buffer, "cgroup mount point '%s' not found",
> CGROUP_ROOT_PATH);
> +    log_err(errno, __func__, log_buffer);
> +
> +    return -1;
> +  }
> +
> +  snprintf(cg_path, MAXPATHLEN, "%s/%s", CGROUP_ROOT_PATH,
> +           pjob->ji_qs.ji_jobid);
> +
> +  ret = mkdir(cg_path, 0755);
> +  if (ret) {
> +    sprintf(log_buffer, "create cgroup for %s failed: %m",
> +            pjob->ji_qs.ji_jobid);
> +    log_err(errno, __func__, log_buffer);
> +
> +    return -1;
> +  }
> +
> +  ret = set_job_cpu_limit(pjob);
> +  if (ret)
> +    return -1;
> +
> +  ret = set_job_numa_node_limit(pjob);
> +  if (ret)
> +    return -1;
> +
> +  ret = set_job_mem_limit(pjob);
> +  if (ret)
> +    return -1;
> +
> +  return 0;
> +}
> +
> +int move_task_to_cgroup(pid_t pid, job *pjob)
> +{
> +  char cg_path[MAXPATHLEN], buf[BUFSIZ];
> +  int fd, ret;
> +
> +  sprintf(log_buffer, "moving job %s to cgroup", pjob->ji_qs.ji_jobid);
> +  log_ext(-1, __func__, log_buffer, LOG_INFO);
> +
> +  snprintf(cg_path, MAXPATHLEN, "%s/%s/tasks", CGROUP_ROOT_PATH,
> +           pjob->ji_qs.ji_jobid);
> +
> +  fd = open(cg_path, O_RDWR);
> +  if (fd < 0) {
> +    sprintf(log_buffer, "failed to open %s:%m", cg_path);
> +    log_err(errno, __func__, log_buffer);
> +
> +    return -1;
> +  }
> +
> +  snprintf(buf, sizeof(buf), "%d\n", pid);
> +  ret = write(fd, buf, strlen(buf));
> +  if (ret != strlen(buf)) {
> +    sprintf(log_buffer, "failed to move task to cgroup: %m");
> +    log_err(errno, __func__, log_buffer);
> +
> +    return -1;
> +  }
> +
> +  close(fd);
> +
> +  return 0;
> +}
> +
> +int remove_job_cgroup(job *pjob)
> +{
> +  char cg_path[MAXPATHLEN];
> +  int ret, i;
> +
> +  sprintf(log_buffer, "removing cgroup for job %s", pjob->ji_qs.ji_jobid);
> +  log_ext(-1, __func__, log_buffer, LOG_INFO);
> +
> +  pthread_mutex_lock(&cpuset_lock);
> +  for (i = 0; i < nr_cpus; i++) {
> +    if (!strcmp(cpuset_array[i], pjob->ji_qs.ji_jobid)) {
> +      cpuset_array[i][0] = '\0';
> +      nr_free_cpus++;
> +    }
> +  }
> +  pthread_mutex_unlock(&cpuset_lock);
> +
> +  snprintf(cg_path, MAXPATHLEN, "%s/%s/", CGROUP_ROOT_PATH,
> +           pjob->ji_qs.ji_jobid);
> +
> +  ret = rmdir(cg_path);
> +  if (ret) {
> +    sprintf(log_buffer, "remove cgroup for job %s failed:%m",
> +           pjob->ji_qs.ji_jobid);
> +    log_err(errno, __func__, log_buffer);
> +
> +    return -1;
> +  }
> +
> +  return 0;
> +}
> diff --git a/src/resmom/mom_cgroup.h b/src/resmom/mom_cgroup.h
> new file mode 100644
> index 0000000..4e7a006
> --- /dev/null
> +++ b/src/resmom/mom_cgroup.h
> @@ -0,0 +1,14 @@
> +#ifndef _MOM_CGROUP_H_
> +#define _MOM_CGROUP_H_
> +
> +#define CGROUP_ROOT_PATH "/dev/torque"
> +
> +int create_job_cgroup(job *pjob);
> +
> +int move_task_to_cgroup(pid_t pid, job *pjob);
> +
> +int remove_job_cgroup(job *pjob);
> +
> +int use_cgroup(job *pjob);
> +
> +#endif
> diff --git a/src/resmom/mom_comm.c b/src/resmom/mom_comm.c
> index 0a34d04..cc54c0e 100644
> --- a/src/resmom/mom_comm.c
> +++ b/src/resmom/mom_comm.c
> @@ -2387,7 +2387,11 @@ int im_join_job_as_sister(
>
>  #endif  /* ndef NUMA_SUPPORT */
>  #endif  /* (PENABLE_LINUX26_CPUSETS) */
> -
> +
> +  if (use_cgroup(pjob) == TRUE) {
> +    create_job_cgroup(pjob);
> +  }
> +
>    ret = run_prologue_scripts(pjob);
>    if (ret != PBSE_NONE)
>      {
> @@ -2574,6 +2578,10 @@ void im_kill_job_as_sister(
>      {
>      momport = pbs_rm_port;
>      }
> +
> +  if (use_cgroup(pjob)) {
> +    remove_job_cgroup(pjob);
> +  }
>
>    job_save(pjob, SAVEJOB_QUICK, momport);
>
> diff --git a/src/resmom/mom_main.c b/src/resmom/mom_main.c
> index c620ae7..4d25284 100644
> --- a/src/resmom/mom_main.c
> +++ b/src/resmom/mom_main.c
> @@ -8708,6 +8708,24 @@ int setup_nodeboards()
>  #endif /* ifdef NUMA_SUPPORT */
>
>
> +int setup_cpuset_array()
> +{
> +  /* defined in mom_cgroup.c */
> +  extern char **cpuset_array;
> +  extern int nr_cpus;
> +  extern int nr_free_cpus;
> +  int i;
> +
> +  nr_free_cpus = nr_cpus = sysconf(_SC_NPROCESSORS_ONLN);
> +
> +  cpuset_array = malloc(nr_cpus * sizeof(char *));
> +  for (i = 0; i < nr_cpus; i++) {
> +    cpuset_array[i] = malloc(PBS_MAXSVRJOBID + 1);
> +    memset(cpuset_array[i], 0, PBS_MAXSVRJOBID + 1);
> +  }
> +
> +  return 0;
> +}
>
>
>  /*
> @@ -8724,7 +8742,6 @@ int main(
>    int       rc;
>    int       tmpFD;
>
> -  tmpFD = sysconf(_SC_OPEN_MAX);
>
>    /* close any inherited extra files, leaving stdin, stdout, and stderr
> open */
>
> @@ -8763,6 +8780,8 @@ int main(
>
>  #endif  /* NVIDIA_GPUS */
>
> +  setup_cpuset_array();
> +
>    main_loop();
>
>    if (mom_run_state == MOM_RUN_STATE_KILLALL)
> diff --git a/src/resmom/start_exec.c b/src/resmom/start_exec.c
> index 4c16414..85c99b8 100644
> --- a/src/resmom/start_exec.c
> +++ b/src/resmom/start_exec.c
> @@ -2294,8 +2294,20 @@ int TMomFinalizeJob2(
>
>      /*NOTREACHED*/
>      }
> -
> +
>    /* parent */
> +
> +  log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer);
> +
> +  if (use_cgroup(pjob) == TRUE) {
> +    sprintf(log_buffer, "create cgroup for job %s", pjob->ji_qs.ji_jobid);
> +    log_ext(-1, __func__, log_buffer, LOG_DEBUG);
> +
> +    if (create_job_cgroup(pjob) == 0) {
> +      move_task_to_cgroup(cpid, pjob);
> +    }
> +
> +  }
>
>    close(TJE->upfds);
>
> @@ -3971,7 +3983,6 @@ int TMomFinalizeChild(
>        }
>      move_to_job_cpuset(getpid(), pjob);
>      }
> -
>  #endif  /* (PENABLE_LINUX26_CPUSETS) */
>
>    if (site_job_setup(pjob) != 0)
> @@ -4736,6 +4747,10 @@ int start_process(
>          "task set to running/saving task (start_process)");
>        }
>
> +    if (use_cgroup(pjob)) {
> +      move_task_to_cgroup(ptask->ti_qs.ti_sid, pjob);
> +    }
> +
>      task_save(ptask);
>
>      if (pjob->ji_qs.ji_substate != JOB_SUBSTATE_RUNNING)
> --
> 1.7.6.1
>
> _______________________________________________
> torqueusers mailing list
> torqueusers at supercluster.org
> http://www.supercluster.org/mailman/listinfo/torqueusers
>



-- 
David Beer | Senior Software Engineer
Adaptive Computing
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://www.supercluster.org/pipermail/torqueusers/attachments/20121120/07361e12/attachment-0001.html 


More information about the torqueusers mailing list