[torqueusers] [PATCH 3/3] resmom: create cgroup for jobs to limit cpu and mem usage
levin li
levin108 at gmail.com
Tue Nov 20 02:02:04 MST 2012
Signed-off-by: levin li <levin108 at gmail.com>
---
src/resmom/catch_child.c | 4 +
src/resmom/mom_cgroup.c | 445 ++++++++++++++++++++++++++++++++++++++++++++++
src/resmom/mom_cgroup.h | 14 ++
src/resmom/mom_comm.c | 10 +-
src/resmom/mom_main.c | 21 ++-
src/resmom/start_exec.c | 19 ++-
6 files changed, 509 insertions(+), 4 deletions(-)
create mode 100644 src/resmom/mom_cgroup.c
create mode 100644 src/resmom/mom_cgroup.h
diff --git a/src/resmom/catch_child.c b/src/resmom/catch_child.c
index 8e3527f..72d79a8 100644
--- a/src/resmom/catch_child.c
+++ b/src/resmom/catch_child.c
@@ -438,6 +438,10 @@ void scan_for_exiting(void)
pjob->ji_qs.ji_un.ji_momt.ji_exitstat = ptask->ti_qs.ti_exitstat;
}
+ if (use_cgroup(pjob)) {
+ remove_job_cgroup(pjob);
+ }
+
log_event(
PBSEVENT_JOB,
PBS_EVENTCLASS_JOB,
diff --git a/src/resmom/mom_cgroup.c b/src/resmom/mom_cgroup.c
new file mode 100644
index 0000000..a9f388c
--- /dev/null
+++ b/src/resmom/mom_cgroup.c
@@ -0,0 +1,445 @@
+#include <pbs_config.h> /* the master config generated by configure */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#ifdef USEJOBCREATE
+#ifndef JOBFAKE
+#include <job.h>
+#endif /* JOBFAKE */
+#endif /* USEJOBCREATE */
+#include <errno.h>
+#include <fcntl.h>
+#include <grp.h>
+#include <string.h>
+#include <limits.h>
+#include <assert.h>
+#include <signal.h>
+#include <ctype.h>
+#include <time.h>
+#include <sys/param.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/wait.h>
+#include <sys/mman.h>
+#include <sys/ioctl.h>
+#include <pthread.h>
+
+#include "rpp.h"
+#include "libpbs.h"
+#include "portability.h"
+#include "list_link.h"
+#include "server_limits.h"
+#include "attribute.h"
+#include "resource.h"
+#include "resmon.h"
+#include "pbs_job.h"
+#include "log.h"
+#include "../lib/Liblog/pbs_log.h"
+#include "../lib/Liblog/log_event.h"
+#include "mom_mach.h"
+#include "mom_func.h"
+#include "pbs_error.h"
+#include "svrfunc.h"
+#include "dis.h"
+#include "batch_request.h"
+#include "mcom.h"
+#include "resource.h"
+#include "utils.h"
+#include "mom_comm.h"
+#include "mom_cgroup.h"
+#include "server.h"
+#include "svrfunc.h"
+
+char **cpuset_array = NULL;
+int nr_cpus;
+int nr_free_cpus;
+pthread_mutex_t cpuset_lock = PTHREAD_MUTEX_INITIALIZER;
+
+int use_cgroup(job *pjob)
+{
+ if (pjob->ji_wattr[JOB_ATR_cgroup_enable].at_flags & ATR_VFLAG_SET)
+ return TRUE;
+ return FALSE;
+}
+
+static int mm_getsize(resource *pres, unsigned long *ret) {
+ unsigned long value;
+ if (pres->rs_value.at_type != ATR_TYPE_SIZE)
+ return PBSE_ATTRTYPE;
+
+ value = pres->rs_value.at_val.at_size.atsv_num;
+ if (pres->rs_value.at_val.at_size.atsv_units == ATR_SV_WORDSZ) {
+ if (value > ULONG_MAX / sizeof(int))
+ return PBSE_BADATVAL;
+
+ value *= sizeof(int);
+ }
+
+ if (value > (ULONG_MAX >> pres->rs_value.at_val.at_size.atsv_shift))
+ return PBSE_BADATVAL;
+
+ *ret = (value << pres->rs_value.at_val.at_size.atsv_shift);
+
+ return PBSE_NONE;
+} /* END mm_getsize() */
+
+static int get_job_pmem(job *pjob)
+{
+ resource *pres;
+ char *pname = NULL;
+
+ pres = (resource *)GET_NEXT(pjob->ji_wattr[JOB_ATR_resource].at_val.at_list);
+
+ while (pres) {
+ int ret;
+ unsigned long pmem;
+
+ if (pres->rs_defin != NULL)
+ pname = pres->rs_defin->rs_name;
+ else
+ goto next;
+
+ if (strcmp(pname, "pmem"))
+ goto next;
+
+ ret = mm_getsize(pres, &pmem);
+ if (ret != PBSE_NONE) {
+ DBPRT(("Memory restriction not set\n"));
+ return -1;
+ }
+
+ return pmem;
+next:
+ pres = (resource *)GET_NEXT(pres->rs_link);
+ }
+
+ return -1;
+}
+
+static int set_job_mem_limit(job *pjob)
+{
+ unsigned long pmem;
+ unsigned long ppn;
+ char cg_path[MAXPATHLEN], mem_limit[32];
+ int fd, ret;
+
+ pmem = get_job_pmem(pjob);
+
+ ppn = pjob->ji_numvnod / pjob->ji_numnodes;
+ pmem *= ppn;
+
+ sprintf(log_buffer, "job %s set memory limitation to %ld",
+ pjob->ji_qs.ji_jobid, pmem);
+ log_ext(-1, __func__, log_buffer, LOG_INFO);
+
+ if (pmem > 0) {
+ snprintf(cg_path, MAXPATHLEN, "%s/%s/memory.limit_in_bytes",
+ CGROUP_ROOT_PATH, pjob->ji_qs.ji_jobid);
+ fd = open(cg_path, O_RDWR);
+ if (fd < 0) {
+ sprintf(log_buffer, "can not open %s:%m", cg_path);
+ log_err(errno, __func__, log_buffer);
+ return -1;
+ }
+
+ snprintf(mem_limit, sizeof(mem_limit), "%ldM", pmem);
+ ret = write(fd, mem_limit, strlen(mem_limit));
+ if (ret != strlen(mem_limit)) {
+ sprintf(log_buffer, "set memory.limit_in_bytes failed: %m");
+ log_err(errno, __func__, log_buffer);
+ close(fd);
+ return -1;
+ }
+ close(fd);
+ }
+
+ return 0;
+}
+
+static int get_all_cpus(char *buf)
+{
+ char cg_path[MAXPATHLEN];
+ int fd, ret, n;
+
+ /* read cpuset info from root cgroup */
+ snprintf(cg_path, MAXPATHLEN, "%s/cpuset.cpus", CGROUP_ROOT_PATH);
+ fd = open(cg_path, O_RDWR);
+ if (fd < 0) {
+ sprintf(log_buffer, "can not read cpuset info from root cgroup:%m");
+ log_err(errno, __func__, log_buffer);
+
+ return -1;
+ }
+
+ n = 0;
+ memset(buf, 0, sizeof(buf));
+ while (1) {
+ ret = read(fd, buf + n, sizeof(buf) - n);
+ n += ret;
+ if (!ret)
+ break;
+ }
+
+ return 0;
+}
+
+static int get_aval_cpus(char *buf, const char *jobid, int ppn)
+{
+ int i, start, end, count = 0;
+
+ if (!nr_free_cpus)
+ return -1;
+
+ start = 0, end = 0;
+ for (i = 0; i < nr_cpus; i++) {
+ if (strlen(cpuset_array[i]) == 0 && ppn == 1) {
+ memcpy(cpuset_array[i], jobid, strlen(jobid) + 1);
+ sprintf(buf, "%d", i);
+ break;
+ }
+
+ if (strlen(cpuset_array[i]) == 0) {
+ count ++;
+ memcpy(cpuset_array[i], jobid, strlen(jobid) + 1);
+ if (count == ppn) {
+ if (strlen(buf) == 0)
+ sprintf(buf, "%d-%d", start, i);
+ else
+ sprintf(buf, "%s,%d-%d", buf, start, i);
+ break;
+ }
+ } else {
+
+ if (count == 0) {
+ start = i + 1;
+ continue;
+ }
+
+ if (!strcmp(cpuset_array[i - 1], jobid)) {
+ if (strlen(buf) == 0)
+ sprintf(buf, "%d-%d", start, i - 1);
+ else
+ sprintf(buf, "%s,%d-%d", buf, start, i - 1);
+ }
+
+ start = i + 1;
+ }
+ }
+ sprintf(buf, "%s\n", buf);
+ nr_free_cpus -= ppn;
+
+ sprintf(log_buffer, "available cpuset is %s", buf);
+ log_ext(-1, __func__, log_buffer, LOG_INFO);
+
+ return 0;
+}
+
+static int set_job_cpu_limit(job *pjob)
+{
+ char cg_path[MAXPATHLEN];
+ char buf[BUFSIZ];
+ int fd, ret, n;
+ int ppn;
+
+ ppn = pjob->ji_numvnod / pjob->ji_numnodes;
+
+ pthread_mutex_lock(&cpuset_lock);
+ if (ppn > nr_free_cpus)
+ ret = get_all_cpus(buf);
+ else {
+ memset(buf, 0, sizeof(buf));
+ ret = get_aval_cpus(buf, pjob->ji_qs.ji_jobid, ppn);
+ }
+ pthread_mutex_unlock(&cpuset_lock);
+
+ if (ret)
+ return -1;
+
+ /* set cpuset to cpuset of root cgroup */
+ snprintf(cg_path, MAXPATHLEN, "%s/%s/cpuset.cpus",
+ CGROUP_ROOT_PATH, pjob->ji_qs.ji_jobid);
+ fd = open(cg_path, O_RDWR);
+ if (fd < 0) {
+ sprintf(log_buffer, "can not open %s:%m", cg_path);
+ log_err(errno, __func__, log_buffer);
+
+ return -1;
+ }
+
+ ret = write(fd, buf, n);
+ if (ret != n) {
+ sprintf(log_buffer, "set cpuset.cpus failed: %m");
+ log_err(errno, __func__, log_buffer);
+
+ close(fd);
+ return -1;
+ }
+
+ close(fd);
+
+ return 0;
+}
+
+static int set_job_numa_node_limit(job *pjob)
+{
+ char cg_path[MAXPATHLEN];
+ char buf[BUFSIZ];
+ int fd, ret, n;
+
+ /* read NUMA node info from root cgroup */
+ snprintf(cg_path, MAXPATHLEN, "%s/cpuset.mems", CGROUP_ROOT_PATH);
+ fd = open(cg_path, O_RDWR);
+ if (fd < 0) {
+ sprintf(log_buffer, "can not read numa node info from root cgroup:%m");
+ log_err(errno, __func__, log_buffer);
+
+ return -1;
+ }
+
+ n = 0;
+ memset(buf, 0, sizeof(buf));
+ while (1) {
+ ret = read(fd, buf + n, sizeof(buf) - n);
+ n += ret;
+ if (!ret)
+ break;
+ }
+
+ close(fd);
+
+ sprintf(log_buffer, "NUMA node info of root cgroup is: %s", buf);
+ log_ext(-1, __func__, log_buffer, LOG_INFO);
+
+ /* set this cgroup to use memory from all NUMA nodes */
+ snprintf(cg_path, MAXPATHLEN, "%s/%s/cpuset.mems",
+ CGROUP_ROOT_PATH, pjob->ji_qs.ji_jobid);
+ fd = open(cg_path, O_RDWR);
+ if (fd < 0) {
+ sprintf(log_buffer, "can not open %s:%m", cg_path);
+ log_err(errno, __func__, log_buffer);
+
+ return -1;
+ }
+
+ ret = write(fd, buf, n);
+ if (ret != n) {
+ sprintf(log_buffer, "set cpuset.mems failed:%m");
+ log_err(errno, __func__, log_buffer);
+
+ close(fd);
+ return -1;
+ }
+
+ close(fd);
+
+ return 0;
+}
+
+int create_job_cgroup(job *pjob)
+{
+ char cg_path[MAXPATHLEN];
+ int ret;
+
+ sprintf(log_buffer, "creating cgroup for job %s", pjob->ji_qs.ji_jobid);
+ log_ext(-1, __func__, log_buffer, LOG_INFO);
+
+ ret = access(CGROUP_ROOT_PATH, R_OK | W_OK);
+ if (ret) {
+ sprintf(log_buffer, "cgroup mount point '%s' not found", CGROUP_ROOT_PATH);
+ log_err(errno, __func__, log_buffer);
+
+ return -1;
+ }
+
+ snprintf(cg_path, MAXPATHLEN, "%s/%s", CGROUP_ROOT_PATH,
+ pjob->ji_qs.ji_jobid);
+
+ ret = mkdir(cg_path, 0755);
+ if (ret) {
+ sprintf(log_buffer, "create cgroup for %s failed: %m",
+ pjob->ji_qs.ji_jobid);
+ log_err(errno, __func__, log_buffer);
+
+ return -1;
+ }
+
+ ret = set_job_cpu_limit(pjob);
+ if (ret)
+ return -1;
+
+ ret = set_job_numa_node_limit(pjob);
+ if (ret)
+ return -1;
+
+ ret = set_job_mem_limit(pjob);
+ if (ret)
+ return -1;
+
+ return 0;
+}
+
+int move_task_to_cgroup(pid_t pid, job *pjob)
+{
+ char cg_path[MAXPATHLEN], buf[BUFSIZ];
+ int fd, ret;
+
+ sprintf(log_buffer, "moving job %s to cgroup", pjob->ji_qs.ji_jobid);
+ log_ext(-1, __func__, log_buffer, LOG_INFO);
+
+ snprintf(cg_path, MAXPATHLEN, "%s/%s/tasks", CGROUP_ROOT_PATH,
+ pjob->ji_qs.ji_jobid);
+
+ fd = open(cg_path, O_RDWR);
+ if (fd < 0) {
+ sprintf(log_buffer, "failed to open %s:%m", cg_path);
+ log_err(errno, __func__, log_buffer);
+
+ return -1;
+ }
+
+ snprintf(buf, sizeof(buf), "%d\n", pid);
+ ret = write(fd, buf, strlen(buf));
+ if (ret != strlen(buf)) {
+ sprintf(log_buffer, "failed to move task to cgroup: %m");
+ log_err(errno, __func__, log_buffer);
+
+ return -1;
+ }
+
+ close(fd);
+
+ return 0;
+}
+
+int remove_job_cgroup(job *pjob)
+{
+ char cg_path[MAXPATHLEN];
+ int ret, i;
+
+ sprintf(log_buffer, "removing cgroup for job %s", pjob->ji_qs.ji_jobid);
+ log_ext(-1, __func__, log_buffer, LOG_INFO);
+
+ pthread_mutex_lock(&cpuset_lock);
+ for (i = 0; i < nr_cpus; i++) {
+ if (!strcmp(cpuset_array[i], pjob->ji_qs.ji_jobid)) {
+ cpuset_array[i][0] = '\0';
+ nr_free_cpus++;
+ }
+ }
+ pthread_mutex_unlock(&cpuset_lock);
+
+ snprintf(cg_path, MAXPATHLEN, "%s/%s/", CGROUP_ROOT_PATH,
+ pjob->ji_qs.ji_jobid);
+
+ ret = rmdir(cg_path);
+ if (ret) {
+ sprintf(log_buffer, "remove cgroup for job %s failed:%m",
+ pjob->ji_qs.ji_jobid);
+ log_err(errno, __func__, log_buffer);
+
+ return -1;
+ }
+
+ return 0;
+}
diff --git a/src/resmom/mom_cgroup.h b/src/resmom/mom_cgroup.h
new file mode 100644
index 0000000..4e7a006
--- /dev/null
+++ b/src/resmom/mom_cgroup.h
@@ -0,0 +1,14 @@
+#ifndef _MOM_CGROUP_H_
+#define _MOM_CGROUP_H_
+
+#define CGROUP_ROOT_PATH "/dev/torque"
+
+int create_job_cgroup(job *pjob);
+
+int move_task_to_cgroup(pid_t pid, job *pjob);
+
+int remove_job_cgroup(job *pjob);
+
+int use_cgroup(job *pjob);
+
+#endif
diff --git a/src/resmom/mom_comm.c b/src/resmom/mom_comm.c
index 0a34d04..cc54c0e 100644
--- a/src/resmom/mom_comm.c
+++ b/src/resmom/mom_comm.c
@@ -2387,7 +2387,11 @@ int im_join_job_as_sister(
#endif /* ndef NUMA_SUPPORT */
#endif /* (PENABLE_LINUX26_CPUSETS) */
-
+
+ if (use_cgroup(pjob) == TRUE) {
+ create_job_cgroup(pjob);
+ }
+
ret = run_prologue_scripts(pjob);
if (ret != PBSE_NONE)
{
@@ -2574,6 +2578,10 @@ void im_kill_job_as_sister(
{
momport = pbs_rm_port;
}
+
+ if (use_cgroup(pjob)) {
+ remove_job_cgroup(pjob);
+ }
job_save(pjob, SAVEJOB_QUICK, momport);
diff --git a/src/resmom/mom_main.c b/src/resmom/mom_main.c
index c620ae7..4d25284 100644
--- a/src/resmom/mom_main.c
+++ b/src/resmom/mom_main.c
@@ -8708,6 +8708,24 @@ int setup_nodeboards()
#endif /* ifdef NUMA_SUPPORT */
+int setup_cpuset_array()
+{
+ /* defined in mom_cgroup.c */
+ extern char **cpuset_array;
+ extern int nr_cpus;
+ extern int nr_free_cpus;
+ int i;
+
+ nr_free_cpus = nr_cpus = sysconf(_SC_NPROCESSORS_ONLN);
+
+ cpuset_array = malloc(nr_cpus * sizeof(char *));
+ for (i = 0; i < nr_cpus; i++) {
+ cpuset_array[i] = malloc(PBS_MAXSVRJOBID + 1);
+ memset(cpuset_array[i], 0, PBS_MAXSVRJOBID + 1);
+ }
+
+ return 0;
+}
/*
@@ -8724,7 +8742,6 @@ int main(
int rc;
int tmpFD;
- tmpFD = sysconf(_SC_OPEN_MAX);
/* close any inherited extra files, leaving stdin, stdout, and stderr open */
@@ -8763,6 +8780,8 @@ int main(
#endif /* NVIDIA_GPUS */
+ setup_cpuset_array();
+
main_loop();
if (mom_run_state == MOM_RUN_STATE_KILLALL)
diff --git a/src/resmom/start_exec.c b/src/resmom/start_exec.c
index 4c16414..85c99b8 100644
--- a/src/resmom/start_exec.c
+++ b/src/resmom/start_exec.c
@@ -2294,8 +2294,20 @@ int TMomFinalizeJob2(
/*NOTREACHED*/
}
-
+
/* parent */
+
+ log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer);
+
+ if (use_cgroup(pjob) == TRUE) {
+ sprintf(log_buffer, "create cgroup for job %s", pjob->ji_qs.ji_jobid);
+ log_ext(-1, __func__, log_buffer, LOG_DEBUG);
+
+ if (create_job_cgroup(pjob) == 0) {
+ move_task_to_cgroup(cpid, pjob);
+ }
+
+ }
close(TJE->upfds);
@@ -3971,7 +3983,6 @@ int TMomFinalizeChild(
}
move_to_job_cpuset(getpid(), pjob);
}
-
#endif /* (PENABLE_LINUX26_CPUSETS) */
if (site_job_setup(pjob) != 0)
@@ -4736,6 +4747,10 @@ int start_process(
"task set to running/saving task (start_process)");
}
+ if (use_cgroup(pjob)) {
+ move_task_to_cgroup(ptask->ti_qs.ti_sid, pjob);
+ }
+
task_save(ptask);
if (pjob->ji_qs.ji_substate != JOB_SUBSTATE_RUNNING)
--
1.7.6.1
More information about the torqueusers
mailing list