[torqueusers] [PATCH 3/3] resmom: create cgroup for jobs to limit cpu and mem usage
levin li
levin108 at gmail.com
Tue Nov 20 19:18:56 MST 2012
Hi, David
Thanks for you advice, I've post the patches to your bugzilla.
thanks,
levin
On 2012年11月21日 01:19, David Beer wrote:
> Levin,
>
> Could you create a bugzilla issue and then attach the complete patch to
> the bugzilla ticket? This will ensure that your work is not lost and
> make it easier to track the issue.
>
> Cheers,
>
> David
>
> On Tue, Nov 20, 2012 at 2:02 AM, levin li <levin108 at gmail.com
> <mailto:levin108 at gmail.com>> wrote:
>
>
> Signed-off-by: levin li <levin108 at gmail.com <mailto:levin108 at gmail.com>>
> ---
> src/resmom/catch_child.c | 4 +
> src/resmom/mom_cgroup.c | 445
> ++++++++++++++++++++++++++++++++++++++++++++++
> src/resmom/mom_cgroup.h | 14 ++
> src/resmom/mom_comm.c | 10 +-
> src/resmom/mom_main.c | 21 ++-
> src/resmom/start_exec.c | 19 ++-
> 6 files changed, 509 insertions(+), 4 deletions(-)
> create mode 100644 src/resmom/mom_cgroup.c
> create mode 100644 src/resmom/mom_cgroup.h
>
> diff --git a/src/resmom/catch_child.c b/src/resmom/catch_child.c
> index 8e3527f..72d79a8 100644
> --- a/src/resmom/catch_child.c
> +++ b/src/resmom/catch_child.c
> @@ -438,6 +438,10 @@ void scan_for_exiting(void)
> pjob->ji_qs.ji_un.ji_momt.ji_exitstat =
> ptask->ti_qs.ti_exitstat;
> }
>
> + if (use_cgroup(pjob)) {
> + remove_job_cgroup(pjob);
> + }
> +
> log_event(
> PBSEVENT_JOB,
> PBS_EVENTCLASS_JOB,
> diff --git a/src/resmom/mom_cgroup.c b/src/resmom/mom_cgroup.c
> new file mode 100644
> index 0000000..a9f388c
> --- /dev/null
> +++ b/src/resmom/mom_cgroup.c
> @@ -0,0 +1,445 @@
> +#include <pbs_config.h> /* the master config generated by
> configure */
> +
> +#include <stdio.h>
> +#include <stdlib.h>
> +#include <unistd.h>
> +#ifdef USEJOBCREATE
> +#ifndef JOBFAKE
> +#include <job.h>
> +#endif /* JOBFAKE */
> +#endif /* USEJOBCREATE */
> +#include <errno.h>
> +#include <fcntl.h>
> +#include <grp.h>
> +#include <string.h>
> +#include <limits.h>
> +#include <assert.h>
> +#include <signal.h>
> +#include <ctype.h>
> +#include <time.h>
> +#include <sys/param.h>
> +#include <sys/types.h>
> +#include <sys/stat.h>
> +#include <sys/wait.h>
> +#include <sys/mman.h>
> +#include <sys/ioctl.h>
> +#include <pthread.h>
> +
> +#include "rpp.h"
> +#include "libpbs.h"
> +#include "portability.h"
> +#include "list_link.h"
> +#include "server_limits.h"
> +#include "attribute.h"
> +#include "resource.h"
> +#include "resmon.h"
> +#include "pbs_job.h"
> +#include "log.h"
> +#include "../lib/Liblog/pbs_log.h"
> +#include "../lib/Liblog/log_event.h"
> +#include "mom_mach.h"
> +#include "mom_func.h"
> +#include "pbs_error.h"
> +#include "svrfunc.h"
> +#include "dis.h"
> +#include "batch_request.h"
> +#include "mcom.h"
> +#include "resource.h"
> +#include "utils.h"
> +#include "mom_comm.h"
> +#include "mom_cgroup.h"
> +#include "server.h"
> +#include "svrfunc.h"
> +
> +char **cpuset_array = NULL;
> +int nr_cpus;
> +int nr_free_cpus;
> +pthread_mutex_t cpuset_lock = PTHREAD_MUTEX_INITIALIZER;
> +
> +int use_cgroup(job *pjob)
> +{
> + if (pjob->ji_wattr[JOB_ATR_cgroup_enable].at_flags & ATR_VFLAG_SET)
> + return TRUE;
> + return FALSE;
> +}
> +
> +static int mm_getsize(resource *pres, unsigned long *ret) {
> + unsigned long value;
> + if (pres->rs_value.at_type != ATR_TYPE_SIZE)
> + return PBSE_ATTRTYPE;
> +
> + value = pres->rs_value.at_val.at_size.atsv_num;
> + if (pres->rs_value.at_val.at_size.atsv_units == ATR_SV_WORDSZ) {
> + if (value > ULONG_MAX / sizeof(int))
> + return PBSE_BADATVAL;
> +
> + value *= sizeof(int);
> + }
> +
> + if (value > (ULONG_MAX >> pres->rs_value.at_val.at_size.atsv_shift))
> + return PBSE_BADATVAL;
> +
> + *ret = (value << pres->rs_value.at_val.at_size.atsv_shift);
> +
> + return PBSE_NONE;
> +} /* END mm_getsize() */
> +
> +static int get_job_pmem(job *pjob)
> +{
> + resource *pres;
> + char *pname = NULL;
> +
> + pres = (resource
> *)GET_NEXT(pjob->ji_wattr[JOB_ATR_resource].at_val.at_list);
> +
> + while (pres) {
> + int ret;
> + unsigned long pmem;
> +
> + if (pres->rs_defin != NULL)
> + pname = pres->rs_defin->rs_name;
> + else
> + goto next;
> +
> + if (strcmp(pname, "pmem"))
> + goto next;
> +
> + ret = mm_getsize(pres, &pmem);
> + if (ret != PBSE_NONE) {
> + DBPRT(("Memory restriction not set\n"));
> + return -1;
> + }
> +
> + return pmem;
> +next:
> + pres = (resource *)GET_NEXT(pres->rs_link);
> + }
> +
> + return -1;
> +}
> +
> +static int set_job_mem_limit(job *pjob)
> +{
> + unsigned long pmem;
> + unsigned long ppn;
> + char cg_path[MAXPATHLEN], mem_limit[32];
> + int fd, ret;
> +
> + pmem = get_job_pmem(pjob);
> +
> + ppn = pjob->ji_numvnod / pjob->ji_numnodes;
> + pmem *= ppn;
> +
> + sprintf(log_buffer, "job %s set memory limitation to %ld",
> + pjob->ji_qs.ji_jobid, pmem);
> + log_ext(-1, __func__, log_buffer, LOG_INFO);
> +
> + if (pmem > 0) {
> + snprintf(cg_path, MAXPATHLEN, "%s/%s/memory.limit_in_bytes",
> + CGROUP_ROOT_PATH, pjob->ji_qs.ji_jobid);
> + fd = open(cg_path, O_RDWR);
> + if (fd < 0) {
> + sprintf(log_buffer, "can not open %s:%m", cg_path);
> + log_err(errno, __func__, log_buffer);
> + return -1;
> + }
> +
> + snprintf(mem_limit, sizeof(mem_limit), "%ldM", pmem);
> + ret = write(fd, mem_limit, strlen(mem_limit));
> + if (ret != strlen(mem_limit)) {
> + sprintf(log_buffer, "set memory.limit_in_bytes failed: %m");
> + log_err(errno, __func__, log_buffer);
> + close(fd);
> + return -1;
> + }
> + close(fd);
> + }
> +
> + return 0;
> +}
> +
> +static int get_all_cpus(char *buf)
> +{
> + char cg_path[MAXPATHLEN];
> + int fd, ret, n;
> +
> + /* read cpuset info from root cgroup */
> + snprintf(cg_path, MAXPATHLEN, "%s/cpuset.cpus", CGROUP_ROOT_PATH);
> + fd = open(cg_path, O_RDWR);
> + if (fd < 0) {
> + sprintf(log_buffer, "can not read cpuset info from root
> cgroup:%m");
> + log_err(errno, __func__, log_buffer);
> +
> + return -1;
> + }
> +
> + n = 0;
> + memset(buf, 0, sizeof(buf));
> + while (1) {
> + ret = read(fd, buf + n, sizeof(buf) - n);
> + n += ret;
> + if (!ret)
> + break;
> + }
> +
> + return 0;
> +}
> +
> +static int get_aval_cpus(char *buf, const char *jobid, int ppn)
> +{
> + int i, start, end, count = 0;
> +
> + if (!nr_free_cpus)
> + return -1;
> +
> + start = 0, end = 0;
> + for (i = 0; i < nr_cpus; i++) {
> + if (strlen(cpuset_array[i]) == 0 && ppn == 1) {
> + memcpy(cpuset_array[i], jobid, strlen(jobid) + 1);
> + sprintf(buf, "%d", i);
> + break;
> + }
> +
> + if (strlen(cpuset_array[i]) == 0) {
> + count ++;
> + memcpy(cpuset_array[i], jobid, strlen(jobid) + 1);
> + if (count == ppn) {
> + if (strlen(buf) == 0)
> + sprintf(buf, "%d-%d", start, i);
> + else
> + sprintf(buf, "%s,%d-%d", buf, start, i);
> + break;
> + }
> + } else {
> +
> + if (count == 0) {
> + start = i + 1;
> + continue;
> + }
> +
> + if (!strcmp(cpuset_array[i - 1], jobid)) {
> + if (strlen(buf) == 0)
> + sprintf(buf, "%d-%d", start, i - 1);
> + else
> + sprintf(buf, "%s,%d-%d", buf, start, i - 1);
> + }
> +
> + start = i + 1;
> + }
> + }
> + sprintf(buf, "%s\n", buf);
> + nr_free_cpus -= ppn;
> +
> + sprintf(log_buffer, "available cpuset is %s", buf);
> + log_ext(-1, __func__, log_buffer, LOG_INFO);
> +
> + return 0;
> +}
> +
> +static int set_job_cpu_limit(job *pjob)
> +{
> + char cg_path[MAXPATHLEN];
> + char buf[BUFSIZ];
> + int fd, ret, n;
> + int ppn;
> +
> + ppn = pjob->ji_numvnod / pjob->ji_numnodes;
> +
> + pthread_mutex_lock(&cpuset_lock);
> + if (ppn > nr_free_cpus)
> + ret = get_all_cpus(buf);
> + else {
> + memset(buf, 0, sizeof(buf));
> + ret = get_aval_cpus(buf, pjob->ji_qs.ji_jobid, ppn);
> + }
> + pthread_mutex_unlock(&cpuset_lock);
> +
> + if (ret)
> + return -1;
> +
> + /* set cpuset to cpuset of root cgroup */
> + snprintf(cg_path, MAXPATHLEN, "%s/%s/cpuset.cpus",
> + CGROUP_ROOT_PATH, pjob->ji_qs.ji_jobid);
> + fd = open(cg_path, O_RDWR);
> + if (fd < 0) {
> + sprintf(log_buffer, "can not open %s:%m", cg_path);
> + log_err(errno, __func__, log_buffer);
> +
> + return -1;
> + }
> +
> + ret = write(fd, buf, n);
> + if (ret != n) {
> + sprintf(log_buffer, "set cpuset.cpus failed: %m");
> + log_err(errno, __func__, log_buffer);
> +
> + close(fd);
> + return -1;
> + }
> +
> + close(fd);
> +
> + return 0;
> +}
> +
> +static int set_job_numa_node_limit(job *pjob)
> +{
> + char cg_path[MAXPATHLEN];
> + char buf[BUFSIZ];
> + int fd, ret, n;
> +
> + /* read NUMA node info from root cgroup */
> + snprintf(cg_path, MAXPATHLEN, "%s/cpuset.mems", CGROUP_ROOT_PATH);
> + fd = open(cg_path, O_RDWR);
> + if (fd < 0) {
> + sprintf(log_buffer, "can not read numa node info from root
> cgroup:%m");
> + log_err(errno, __func__, log_buffer);
> +
> + return -1;
> + }
> +
> + n = 0;
> + memset(buf, 0, sizeof(buf));
> + while (1) {
> + ret = read(fd, buf + n, sizeof(buf) - n);
> + n += ret;
> + if (!ret)
> + break;
> + }
> +
> + close(fd);
> +
> + sprintf(log_buffer, "NUMA node info of root cgroup is: %s", buf);
> + log_ext(-1, __func__, log_buffer, LOG_INFO);
> +
> + /* set this cgroup to use memory from all NUMA nodes */
> + snprintf(cg_path, MAXPATHLEN, "%s/%s/cpuset.mems",
> + CGROUP_ROOT_PATH, pjob->ji_qs.ji_jobid);
> + fd = open(cg_path, O_RDWR);
> + if (fd < 0) {
> + sprintf(log_buffer, "can not open %s:%m", cg_path);
> + log_err(errno, __func__, log_buffer);
> +
> + return -1;
> + }
> +
> + ret = write(fd, buf, n);
> + if (ret != n) {
> + sprintf(log_buffer, "set cpuset.mems failed:%m");
> + log_err(errno, __func__, log_buffer);
> +
> + close(fd);
> + return -1;
> + }
> +
> + close(fd);
> +
> + return 0;
> +}
> +
> +int create_job_cgroup(job *pjob)
> +{
> + char cg_path[MAXPATHLEN];
> + int ret;
> +
> + sprintf(log_buffer, "creating cgroup for job %s",
> pjob->ji_qs.ji_jobid);
> + log_ext(-1, __func__, log_buffer, LOG_INFO);
> +
> + ret = access(CGROUP_ROOT_PATH, R_OK | W_OK);
> + if (ret) {
> + sprintf(log_buffer, "cgroup mount point '%s' not found",
> CGROUP_ROOT_PATH);
> + log_err(errno, __func__, log_buffer);
> +
> + return -1;
> + }
> +
> + snprintf(cg_path, MAXPATHLEN, "%s/%s", CGROUP_ROOT_PATH,
> + pjob->ji_qs.ji_jobid);
> +
> + ret = mkdir(cg_path, 0755);
> + if (ret) {
> + sprintf(log_buffer, "create cgroup for %s failed: %m",
> + pjob->ji_qs.ji_jobid);
> + log_err(errno, __func__, log_buffer);
> +
> + return -1;
> + }
> +
> + ret = set_job_cpu_limit(pjob);
> + if (ret)
> + return -1;
> +
> + ret = set_job_numa_node_limit(pjob);
> + if (ret)
> + return -1;
> +
> + ret = set_job_mem_limit(pjob);
> + if (ret)
> + return -1;
> +
> + return 0;
> +}
> +
> +int move_task_to_cgroup(pid_t pid, job *pjob)
> +{
> + char cg_path[MAXPATHLEN], buf[BUFSIZ];
> + int fd, ret;
> +
> + sprintf(log_buffer, "moving job %s to cgroup", pjob->ji_qs.ji_jobid);
> + log_ext(-1, __func__, log_buffer, LOG_INFO);
> +
> + snprintf(cg_path, MAXPATHLEN, "%s/%s/tasks", CGROUP_ROOT_PATH,
> + pjob->ji_qs.ji_jobid);
> +
> + fd = open(cg_path, O_RDWR);
> + if (fd < 0) {
> + sprintf(log_buffer, "failed to open %s:%m", cg_path);
> + log_err(errno, __func__, log_buffer);
> +
> + return -1;
> + }
> +
> + snprintf(buf, sizeof(buf), "%d\n", pid);
> + ret = write(fd, buf, strlen(buf));
> + if (ret != strlen(buf)) {
> + sprintf(log_buffer, "failed to move task to cgroup: %m");
> + log_err(errno, __func__, log_buffer);
> +
> + return -1;
> + }
> +
> + close(fd);
> +
> + return 0;
> +}
> +
> +int remove_job_cgroup(job *pjob)
> +{
> + char cg_path[MAXPATHLEN];
> + int ret, i;
> +
> + sprintf(log_buffer, "removing cgroup for job %s",
> pjob->ji_qs.ji_jobid);
> + log_ext(-1, __func__, log_buffer, LOG_INFO);
> +
> + pthread_mutex_lock(&cpuset_lock);
> + for (i = 0; i < nr_cpus; i++) {
> + if (!strcmp(cpuset_array[i], pjob->ji_qs.ji_jobid)) {
> + cpuset_array[i][0] = '\0';
> + nr_free_cpus++;
> + }
> + }
> + pthread_mutex_unlock(&cpuset_lock);
> +
> + snprintf(cg_path, MAXPATHLEN, "%s/%s/", CGROUP_ROOT_PATH,
> + pjob->ji_qs.ji_jobid);
> +
> + ret = rmdir(cg_path);
> + if (ret) {
> + sprintf(log_buffer, "remove cgroup for job %s failed:%m",
> + pjob->ji_qs.ji_jobid);
> + log_err(errno, __func__, log_buffer);
> +
> + return -1;
> + }
> +
> + return 0;
> +}
> diff --git a/src/resmom/mom_cgroup.h b/src/resmom/mom_cgroup.h
> new file mode 100644
> index 0000000..4e7a006
> --- /dev/null
> +++ b/src/resmom/mom_cgroup.h
> @@ -0,0 +1,14 @@
> +#ifndef _MOM_CGROUP_H_
> +#define _MOM_CGROUP_H_
> +
> +#define CGROUP_ROOT_PATH "/dev/torque"
> +
> +int create_job_cgroup(job *pjob);
> +
> +int move_task_to_cgroup(pid_t pid, job *pjob);
> +
> +int remove_job_cgroup(job *pjob);
> +
> +int use_cgroup(job *pjob);
> +
> +#endif
> diff --git a/src/resmom/mom_comm.c b/src/resmom/mom_comm.c
> index 0a34d04..cc54c0e 100644
> --- a/src/resmom/mom_comm.c
> +++ b/src/resmom/mom_comm.c
> @@ -2387,7 +2387,11 @@ int im_join_job_as_sister(
>
> #endif /* ndef NUMA_SUPPORT */
> #endif /* (PENABLE_LINUX26_CPUSETS) */
> -
> +
> + if (use_cgroup(pjob) == TRUE) {
> + create_job_cgroup(pjob);
> + }
> +
> ret = run_prologue_scripts(pjob);
> if (ret != PBSE_NONE)
> {
> @@ -2574,6 +2578,10 @@ void im_kill_job_as_sister(
> {
> momport = pbs_rm_port;
> }
> +
> + if (use_cgroup(pjob)) {
> + remove_job_cgroup(pjob);
> + }
>
> job_save(pjob, SAVEJOB_QUICK, momport);
>
> diff --git a/src/resmom/mom_main.c b/src/resmom/mom_main.c
> index c620ae7..4d25284 100644
> --- a/src/resmom/mom_main.c
> +++ b/src/resmom/mom_main.c
> @@ -8708,6 +8708,24 @@ int setup_nodeboards()
> #endif /* ifdef NUMA_SUPPORT */
>
>
> +int setup_cpuset_array()
> +{
> + /* defined in mom_cgroup.c */
> + extern char **cpuset_array;
> + extern int nr_cpus;
> + extern int nr_free_cpus;
> + int i;
> +
> + nr_free_cpus = nr_cpus = sysconf(_SC_NPROCESSORS_ONLN);
> +
> + cpuset_array = malloc(nr_cpus * sizeof(char *));
> + for (i = 0; i < nr_cpus; i++) {
> + cpuset_array[i] = malloc(PBS_MAXSVRJOBID + 1);
> + memset(cpuset_array[i], 0, PBS_MAXSVRJOBID + 1);
> + }
> +
> + return 0;
> +}
>
>
> /*
> @@ -8724,7 +8742,6 @@ int main(
> int rc;
> int tmpFD;
>
> - tmpFD = sysconf(_SC_OPEN_MAX);
>
> /* close any inherited extra files, leaving stdin, stdout, and
> stderr open */
>
> @@ -8763,6 +8780,8 @@ int main(
>
> #endif /* NVIDIA_GPUS */
>
> + setup_cpuset_array();
> +
> main_loop();
>
> if (mom_run_state == MOM_RUN_STATE_KILLALL)
> diff --git a/src/resmom/start_exec.c b/src/resmom/start_exec.c
> index 4c16414..85c99b8 100644
> --- a/src/resmom/start_exec.c
> +++ b/src/resmom/start_exec.c
> @@ -2294,8 +2294,20 @@ int TMomFinalizeJob2(
>
> /*NOTREACHED*/
> }
> -
> +
> /* parent */
> +
> + log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer);
> +
> + if (use_cgroup(pjob) == TRUE) {
> + sprintf(log_buffer, "create cgroup for job %s",
> pjob->ji_qs.ji_jobid);
> + log_ext(-1, __func__, log_buffer, LOG_DEBUG);
> +
> + if (create_job_cgroup(pjob) == 0) {
> + move_task_to_cgroup(cpid, pjob);
> + }
> +
> + }
>
> close(TJE->upfds);
>
> @@ -3971,7 +3983,6 @@ int TMomFinalizeChild(
> }
> move_to_job_cpuset(getpid(), pjob);
> }
> -
> #endif /* (PENABLE_LINUX26_CPUSETS) */
>
> if (site_job_setup(pjob) != 0)
> @@ -4736,6 +4747,10 @@ int start_process(
> "task set to running/saving task (start_process)");
> }
>
> + if (use_cgroup(pjob)) {
> + move_task_to_cgroup(ptask->ti_qs.ti_sid, pjob);
> + }
> +
> task_save(ptask);
>
> if (pjob->ji_qs.ji_substate != JOB_SUBSTATE_RUNNING)
> --
> 1.7.6.1
>
> _______________________________________________
> torqueusers mailing list
> torqueusers at supercluster.org <mailto:torqueusers at supercluster.org>
> http://www.supercluster.org/mailman/listinfo/torqueusers
>
>
>
>
> --
> David Beer | Senior Software Engineer
> Adaptive Computing
>
>
>
> _______________________________________________
> torqueusers mailing list
> torqueusers at supercluster.org
> http://www.supercluster.org/mailman/listinfo/torqueusers
>
More information about the torqueusers
mailing list