diff --git a/src/include/batch_request.h b/src/include/batch_request.h index b36e268..253e986 100644 --- a/src/include/batch_request.h +++ b/src/include/batch_request.h @@ -370,6 +370,8 @@ struct batch_request struct rq_returnfiles rq_returnfiles; struct rq_jobobit rq_jobobit; + + int rq_lock; } rq_ind; }; @@ -434,6 +436,7 @@ extern int decode_DIS_Rescl A_((int socket, struct batch_request *)); extern int decode_DIS_Rescq A_((int socket, struct batch_request *)); extern int decode_DIS_RunJob A_((int socket, struct batch_request *)); extern int decode_DIS_ShutDown A_((int socket, struct batch_request *)); +extern int decode_DIS_SchedLock (int socket, struct batch_request *); extern int decode_DIS_SignalJob A_((int socket, struct batch_request *)); extern int decode_DIS_Status A_((int socket, struct batch_request *)); extern int decode_DIS_TrackJob A_((int socket, struct batch_request *)); diff --git a/src/include/libpbs.h b/src/include/libpbs.h index 0f34a44..8ef1e24 100644 --- a/src/include/libpbs.h +++ b/src/include/libpbs.h @@ -323,6 +323,8 @@ extern int encode_DIS_Status A_((int socket, char *objid, struct attrl *)); extern int encode_DIS_attrl A_((int socket, struct attrl *)); extern int encode_DIS_attropl A_((int socket, struct attropl *)); +extern int encode_DIS_SchedLock(int socket, int action); + extern int DIS_reply_read A_((int socket, struct batch_reply *preply)); #endif /* LIBPBS_H */ diff --git a/src/include/net_connect.h b/src/include/net_connect.h index e694f75..b319bb2 100644 --- a/src/include/net_connect.h +++ b/src/include/net_connect.h @@ -205,6 +205,8 @@ struct connection time_t cn_lasttime; /* time last active */ void (*cn_func) A_((int)); /* read function when data rdy */ void (*cn_oncl) A_((int)); /* func to call on close */ + unsigned int cn_schlock :1; /** < Is there an active lock on this connection? + @see check_schlock() */ }; struct netcounter diff --git a/src/include/pbs_batchreqtype_db.h b/src/include/pbs_batchreqtype_db.h index f4f1c6f..aa3ebec 100644 --- a/src/include/pbs_batchreqtype_db.h +++ b/src/include/pbs_batchreqtype_db.h @@ -73,6 +73,7 @@ PbsBatchReqType(PBS_BATCH_DelFiles, "DeleteFiles") PbsBatchReqType(PBS_BATCH_JobObit, "JobObituary") PbsBatchReqType(PBS_BATCH_MvJobFile, "MoveJobFile") PbsBatchReqType(PBS_BATCH_StatusNode, "StatusNode") +PbsBatchReqType(PBS_BATCH_SchedulerLock, "SchedulerLock") PbsBatchReqType(PBS_BATCH_Disconnect, "Disconnect") PbsBatchReqType(PBS_BATCH_AsySignalJob, "AsyncSignalJob") /* = 60 */ #endif diff --git a/src/include/pbs_error_db.h b/src/include/pbs_error_db.h index 81c8c7c..20ea3e1 100644 --- a/src/include/pbs_error_db.h +++ b/src/include/pbs_error_db.h @@ -193,6 +193,12 @@ PbsErrClient(PBSE_NOFAULTTOLERANT, "Queue does not allow fault tolerant jobs") /* only fault tolerant jobs allowed in queue */ PbsErrClient(PBSE_NOFAULTINTOLERANT, "Queue does not allow fault intolerant jobs") PbsErrClient(PBSE_NOJOBARRAYS, "Queue does not allow job arrays") +/* scheduler lock is active */ +PbsErrClient(PBSE_SCHLOCKACTIVE, "Scheduler lock is active on this server") +/* bad command received in scheduler lock request */ +PbsErrClient(PBSE_SCHLOCKBADCOMM, "Bad command for scheduler lock management") +/* the lock was broken (timeout, or breaking command) */ +PbsErrClient(PBSE_SCHLOCKBROKEN, "Scheduler lock was broken or timed out") /* pbs client errors ceiling (max_client_err + 1) */ PbsErrClient(PBSE_CEILING, (char*)0) #endif diff --git a/src/include/pbs_ifl.h b/src/include/pbs_ifl.h index 3067fe3..1871dbc 100644 --- a/src/include/pbs_ifl.h +++ b/src/include/pbs_ifl.h @@ -528,6 +528,8 @@ extern int pbs_holdjob A_((int connect, char *job_id, char *hold_type, char *ext extern int pbs_checkpointjob A_((int connect, char *job_id, char *extend)); extern char *pbs_locjob A_((int connect, char *job_id, char *extend)); +extern int pbs_lock(int connect, int action, char *extend); + extern int pbs_manager A_((int connect, int command, int obj_type, char *obj_name, struct attropl *attrib, char *extend)); diff --git a/src/lib/Libifl/dec_Lock.c b/src/lib/Libifl/dec_Lock.c new file mode 100644 index 0000000..f6e41ff --- /dev/null +++ b/src/lib/Libifl/dec_Lock.c @@ -0,0 +1,29 @@ +/* + * decode_DIS_SchedLock() - decode a Scheduler lock batch request + * + * The batch_request structure must already exist (be allocated by the + * caller. It is assumed that the header fields (protocol type, + * protocol version, request type, and user name) have already be decoded. + * + * Data items are: u int manner + */ + +#include /* the master config generated by configure */ + +#include +#include "libpbs.h" +#include "list_link.h" +#include "server_limits.h" +#include "attribute.h" +#include "credential.h" +#include "batch_request.h" +#include "dis.h" + +int decode_DIS_SchedLock(int sock, struct batch_request *preq) + { + int rc; + + preq->rq_ind.rq_lock = disrui(sock, &rc); + + return rc; + } diff --git a/src/lib/Libifl/enc_Lock.c b/src/lib/Libifl/enc_Lock.c new file mode 100644 index 0000000..c222493 --- /dev/null +++ b/src/lib/Libifl/enc_Lock.c @@ -0,0 +1,16 @@ +/* + * encode_DIS_SchedLock() - encode a Scheduler lock batch requests + * + * Data items are: unsigned int manner + */ + +#include /* the master config generated by configure */ + +#include "libpbs.h" +#include "pbs_error.h" +#include "dis.h" + +int encode_DIS_SchedLock(int sock, int action) + { + return (diswui(sock, action)); + } diff --git a/src/lib/Libifl/pbsD_lock.c b/src/lib/Libifl/pbsD_lock.c new file mode 100644 index 0000000..ca9005e --- /dev/null +++ b/src/lib/Libifl/pbsD_lock.c @@ -0,0 +1,59 @@ +/* pbs_lock + +*/ + +#include /* the master config generated by configure */ + +#include +#include +#include "libpbs.h" +#include "dis.h" + +int pbs_lock( + + int c, /* I */ + int action, /* I */ + char *extend) /* I */ + + { + + struct batch_reply *reply; + + int rc = 0; + int sock; + + /* send request */ + + sock = connection[c].ch_socket; + + /* setup DIS support routines for following DIS calls */ + + DIS_tcp_setup(sock); + + if ((rc = encode_DIS_ReqHdr(sock, PBS_BATCH_SchedulerLock, pbs_current_user)) || + (rc = encode_DIS_SchedLock(sock, action)) || + (rc = encode_DIS_ReqExtend(sock, extend))) + { + connection[c].ch_errtxt = strdup(dis_emsg[rc]); + + pbs_errno = PBSE_PROTOCOL; + + return(pbs_errno); + } + + if (DIS_tcp_wflush(sock)) + { + pbs_errno = PBSE_PROTOCOL; + + return(pbs_errno); + } + + /* read in reply */ + + reply = PBSD_rdrpy(c); + rc = connection[c].ch_errno; + + PBSD_FreeReply(reply); + + return(rc); + } /* END pbs_terminate() */ diff --git a/src/lib/Libnet/net_server.c b/src/lib/Libnet/net_server.c index 4c9ebe7..0603b47 100644 --- a/src/lib/Libnet/net_server.c +++ b/src/lib/Libnet/net_server.c @@ -668,6 +668,7 @@ void add_conn( svr_conn[sock].cn_func = func; svr_conn[sock].cn_oncl = 0; svr_conn[sock].cn_socktype = socktype; + svr_conn[sock].cn_schlock = 0; #ifndef NOPRIVPORTS @@ -750,6 +751,8 @@ void close_conn( svr_conn[sd].cn_authen = 0; + svr_conn[sd].cn_schlock = 0; + num_connections--; return; diff --git a/src/lib/Libpbs/Makefile.am b/src/lib/Libpbs/Makefile.am index 129e099..5377172 100644 --- a/src/lib/Libpbs/Makefile.am +++ b/src/lib/Libpbs/Makefile.am @@ -67,6 +67,7 @@ libtorque_la_SOURCES = ../Libcsv/csv.c ../Libdis/dis.c \ ../Libifl/pbsD_submit.c ../Libifl/PBSD_submit_caps.c \ ../Libifl/pbsD_termin.c ../Libifl/pbs_geterrmg.c \ ../Libifl/pbs_statfree.c ../Libifl/rpp.c \ + ../Libifl/pbsD_lock.c ../Libifl/enc_Lock.c ../Libifl/dec_Lock.c \ ../Libifl/tcp_dis.c ../Libifl/tm.c ../Libifl/list_link.c \ ../Libcmds/ck_job_name.c ../Libcmds/cnt2server.c \ ../Libcmds/cvtdate.c ../Libcmds/get_server.c \ diff --git a/src/server/dis_read.c b/src/server/dis_read.c index 99fcc62..31e1188 100644 --- a/src/server/dis_read.c +++ b/src/server/dis_read.c @@ -260,6 +260,12 @@ int dis_request_read( break; + case PBS_BATCH_SchedulerLock: + + rc = decode_DIS_SchedLock(sfds, request); + + break; + case PBS_BATCH_SignalJob: rc = decode_DIS_SignalJob(sfds, request); diff --git a/src/server/process_request.c b/src/server/process_request.c index d4a3c92..24b4f13 100644 --- a/src/server/process_request.c +++ b/src/server/process_request.c @@ -696,9 +696,291 @@ void process_request( return; } /* END process_request() */ +#ifndef PBS_MOM + +#include "work_task.h" + +enum SchLockActions + { + SCHLOCK_ACQUIRE, + SCHLOCK_REFRESH, + SCHLOCK_RELEASE + }; + +enum SchLockCheckResult + { + SchLockContinue, + SchLockBlocked, + SchLockInternal + }; + + +void deffered_dispatch_request(struct work_task *wt) + { + dispatch_request(wt->wt_event,wt->wt_parm1); + } + +/** Check the command against the scheduler lock + * + * @param connId Connection ID of the received request + * @param request The received request + * @return 0 if it is ok to process the request, 1 on reject, 2 on internally handled + */ +int check_schlock(int connId, struct batch_request *request) + { + static int locked_on = -1; + static const int sch_lock_timeout = 3; /* how long (in seconds) to timeout */ + + /* setup */ + static const int break_on_job_remove = 0; + static const int break_on_manager = 0; + + /* data for deffered requests */ + static int init = 1; + static tlist_head waiting_requests; + struct work_task *wtnew; + int ok = 0; + + if (init) + { + init = 0; + CLEAR_HEAD(waiting_requests); + } + + if (request->rq_type == PBS_BATCH_SchedulerLock) + switch (request->rq_ind.rq_lock) + { + case SCHLOCK_ACQUIRE: + if (locked_on < 0) + { /* new lock */ + + /* if the lock is down, the queue should be empty, but just make sure */ + while ((wtnew = (struct work_task *)GET_NEXT(waiting_requests)) != NULL) + { + dispatch_task(wtnew); + } + + locked_on = connId; + svr_conn[connId].cn_schlock = 1; + reply_ack(request); + log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER, "Scheduler LOCK ACQUIRE", "--- success --- new lock ---"); + return 2; + } + + if (locked_on == connId && svr_conn[locked_on].cn_schlock) + { /* new acquire from current lock owner */ + reply_ack(request); + log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER, "Scheduler LOCK ACQUIRE", "--- success --- lock owner ---"); + return 2; + } + + if (svr_conn[locked_on].cn_addr == svr_conn[connId].cn_addr && + svr_conn[locked_on].cn_port == svr_conn[connId].cn_port) + { /* new acquire from previous lock owner */ + locked_on = connId; + svr_conn[connId].cn_schlock = 1; + reply_ack(request); + log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER, "Scheduler LOCK ACQUIRE", "--- success --- previous lock owner ---"); + return 2; + } + + if (!svr_conn[locked_on].cn_schlock) + { /* lock has been taken down */ + locked_on = connId; + svr_conn[connId].cn_schlock = 1; + reply_ack(request); + log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER, "Scheduler LOCK ACQUIRE", "--- success --- lock off ---"); + return 2; + } + + if ((time((time_t *)0) - svr_conn[locked_on].cn_lasttime) > sch_lock_timeout) + { /* lock timed out */ + + /* before the new lock is acquired, process all waiting requests */ + locked_on = -1; + while ((wtnew = (struct work_task *)GET_NEXT(waiting_requests)) != NULL) + { + dispatch_task(wtnew); + } + + locked_on = connId; + svr_conn[connId].cn_schlock = 1; + reply_ack(request); + log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER, "Scheduler LOCK ACQUIRE", "--- success --- timed out ---"); + return 2; + } + + log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER, "Scheduler LOCK RELEASE", "--- fail --- another lock active ---"); + req_reject(PBSE_SCHLOCKACTIVE, 0, request, NULL, NULL); + return 2; + + case SCHLOCK_REFRESH: + if (locked_on == connId) /* can only refresh on the same connection */ + { + reply_ack(request); + log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER, "Scheduler LOCK REFRESH", "--- success -- connection ok ---"); + } + else + { + req_reject(PBSE_SCHLOCKBROKEN, 0, request, NULL, NULL); + log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER, "Scheduler LOCK REFRESH", "--- fail -- lock broken ---"); + } + return 2; + + case SCHLOCK_RELEASE: + + ok = 0; + + if (locked_on < 0) + { /* no lock on server */ + locked_on = -1; + svr_conn[connId].cn_schlock = 0; + log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER, "Scheduler LOCK RELEASE", "--- success --- no lock on server ---"); + ok = 1; + } + + if (locked_on == connId && svr_conn[locked_on].cn_schlock) + { /* lock owner */ + locked_on = -1; + svr_conn[connId].cn_schlock = 0; + log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER, "Scheduler LOCK RELEASE", "--- success --- lock owner ---"); + ok = 1; + } + + if (svr_conn[locked_on].cn_addr == svr_conn[connId].cn_addr && + svr_conn[locked_on].cn_port == svr_conn[connId].cn_port) + { /* different connID, but same scheduler */ + svr_conn[locked_on].cn_schlock = 0; + locked_on = -1; + svr_conn[connId].cn_schlock = 0; + log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER, "Scheduler LOCK RELEASE", "--- success --- previous lock owner ---"); + ok = 1; + } + + /* lock has been unlocked, process all waiting requests */ + while ((wtnew = (struct work_task *)GET_NEXT(waiting_requests)) != NULL) + { + dispatch_task(wtnew); + } + + if (ok) + reply_ack(request); + else + { + log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER, "Scheduler LOCK RELEASE", "--- fail --- another lock active ---"); + req_reject(PBSE_SCHLOCKACTIVE, 0, request, NULL, NULL); + } + + return 2; + + default: + req_reject(PBSE_SCHLOCKBADCOMM, 0, request, NULL, NULL); + close_client(connId); + return 2; + } + + if ((connId == locked_on && svr_conn[locked_on].cn_schlock) || locked_on < 0) + /* if the source is the lock owner, do not check anything */ + return 0; + + /* otherwise react to different requests differently */ + switch (request->rq_type) + { + /* read only requests, we don't care about those, just pass thru */ + case PBS_BATCH_JobCred: + case PBS_BATCH_LocateJob: + case PBS_BATCH_SelectJobs: + case PBS_BATCH_SelStat: + case PBS_BATCH_StatusJob: + case PBS_BATCH_StatusQue: + case PBS_BATCH_StatusNode: + case PBS_BATCH_StatusSvr: + case PBS_BATCH_Rescq: + return 0; + + /* requests working with jobs in other states then queued, ignore as well */ + case PBS_BATCH_StageIn: + case PBS_BATCH_JobObit: + case PBS_BATCH_MvJobFile: + case PBS_BATCH_SignalJob: + case PBS_BATCH_Rerun: + case PBS_BATCH_MessJob: + case PBS_BATCH_CheckpointJob: + case PBS_BATCH_Commit: + case PBS_BATCH_RdytoCommit: + case PBS_BATCH_jobscript: + case PBS_BATCH_ReleaseJob: + case PBS_BATCH_QueueJob: + return 0; + + /* few commands that mangle with the server, but in such way, that we don't care */ + case PBS_BATCH_TrackJob: + case PBS_BATCH_AuthenUser: + case PBS_BATCH_ReleaseResc: + return 0; + + /* these commands are eating server resources, we are blocking them */ + case PBS_BATCH_RunJob: + case PBS_BATCH_AsyrunJob: + case PBS_BATCH_ReserveResc: + return 1; + + /* these commands are removing the jobs from queued state, we can let them + * pass thru, or they can be set to break the lock + */ + case PBS_BATCH_MoveJob: + case PBS_BATCH_DeleteJob: + case PBS_BATCH_HoldJob: + if (break_on_job_remove) + { + locked_on = -1; + return 0; + } + + wtnew = set_task(WORK_Deferred_Other, connId, deffered_dispatch_request, request); + if (wtnew) + { + /* deffer the processing of this request to the time the lock will open */ + append_link(&waiting_requests, &wtnew->wt_linkobj, wtnew); + } + + return 2; + /* these commands are modifying server settings, either break the lock + * or block them + */ + case PBS_BATCH_Manager: + case PBS_BATCH_ModifyJob: + case PBS_BATCH_AsyModifyJob: + case PBS_BATCH_OrderJob: + if (break_on_manager) + { + locked_on = -1; + return 0; + } + wtnew = set_task(WORK_Deferred_Other, connId, deffered_dispatch_request, request); + if (wtnew) + { + /* deffer the processing of this request to the time the lock will open */ + append_link(&waiting_requests, &wtnew->wt_linkobj, wtnew); + } + return 2; + + /* when shutting down, break the lock, no reason to run more jobs */ + case PBS_BATCH_Shutdown: + locked_on = -1; + return 0; + + /* something else coming, well, lets pass, we don't know what it is */ + default: + return 0; + } + + return 0; + } +#endif /* * dispatch_request - Determine the request type and invoke the corresponding @@ -716,6 +998,9 @@ void dispatch_request( { char *id = "dispatch_request"; +#ifndef PBS_MOM + int ret; +#endif if (LOGLEVEL >= 5) { @@ -730,6 +1015,16 @@ void dispatch_request( log_buffer); } +#ifndef PBS_MOM + /* check the scheduler lock, also handles scheduler lock requests */ + ret = check_schlock(sfds, request); + if (ret == 1) + req_reject(PBSE_SCHLOCKACTIVE, 0, request, NULL, NULL); + + if (ret != 0) /* already handled */ + return; +#endif + switch (request->rq_type) { case PBS_BATCH_QueueJob: