Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 55 additions & 2 deletions src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from collections import defaultdict
from pathlib import Path

from diraccfg import CFG

from DIRAC import S_ERROR, S_OK, gConfig
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getQueues
Expand Down Expand Up @@ -243,6 +245,22 @@ def execute(self):
ce = queueDictionary["CE"]
ce.setProxy(pilotProxy)

# Resolve per-queue CPU info to advertise to the payload via
# /LocalSite/CPUTimeLeft and /LocalSite/CPUNormalizationFactor.
# For push CEs both CPUTime (wall-clock seconds) and
# CPUNormalizationFactor (HS06) are CS fields on the queue;
# /LocalSite/CPUTimeLeft is read downstream as CPU work in
# HS06-seconds (see JobAgent._computeCPUWorkLeft docstring), so we
# multiply here.
queueParams = queueDictionary["ParametersDict"]
cpuTime = int(queueParams.get("CPUTime", 0) or 0)
cpuNormalizationFactor = float(queueParams.get("CPUNormalizationFactor", 0) or 0)
cpuInfo = {
"CPUTimeLeft": int(cpuTime * cpuNormalizationFactor),
"CPUNormalizationFactor": cpuNormalizationFactor,
}
self.log.info("Injecting per-queue CPU info", f"queue={queueName} info={cpuInfo}")

if self.submissionPolicy == "JobWrapper":
# Check errors that could have occurred during job submission and/or execution
result = self._checkSubmittedJobWrappers(ce, queueDictionary["ParametersDict"]["Site"])
Expand Down Expand Up @@ -342,6 +360,16 @@ def execute(self):
break
proxyChain = result_setupProxy.get("Value")

# Splice /LocalSite CPU info onto the dirac-jobexec command line
# (PoolCE workers are reused across queues, so a per-job arg is the
# safest way to deliver per-queue values to the payload).
if "dirac-jobexec" in params.get("Executable", "").strip():
cpuOpts = (
f"-o /LocalSite/CPUTimeLeft={cpuInfo['CPUTimeLeft']} "
f"-o /LocalSite/CPUNormalizationFactor={cpuInfo['CPUNormalizationFactor']}"
)
params["Arguments"] = (params.get("Arguments", "") + " " + cpuOpts).strip()

resultSubmission = self._submitJob(
jobID=jobID,
jobParams=params,
Expand All @@ -353,7 +381,7 @@ def execute(self):
maxNumberOfProcessors=submissionParams["maxNumberOfProcessors"],
mpTag=submissionParams["mpTag"],
)
if not result["OK"]:
if not resultSubmission["OK"]:
self._rescheduleFailedJob(jobID, resultSubmission["Message"])
self.failedQueues[queueName] += 1
break
Expand All @@ -362,12 +390,13 @@ def execute(self):
jobID=jobID,
ce=ce,
diracInstallLocation=queueDictionary["ParametersDict"]["DIRACInstallLocation"],
cpuInfo=cpuInfo,
jobParams=params,
resourceParams=ceDict,
optimizerParams=optimizerParams,
processors=submissionParams["processors"],
)
if not result["OK"]:
if not resultSubmission["OK"]:
self.failedQueues[queueName] += 1
break

Expand Down Expand Up @@ -512,11 +541,30 @@ def preProcessJob(self, job: JobWrapper):
job.jobReport.commit()
return S_OK(result["Value"])

def _appendLocalSiteCFG(self, cfgFilename, cpuInfo):
"""Append /LocalSite/CPUTimeLeft and /LocalSite/CPUNormalizationFactor
to a CFG file produced by ``gConfig.dumpRemoteCFGToFile``.

Necessary because ``dumpRemoteCFGToFile`` only writes ``remoteCFG``
while ``gConfig.setOptionValue`` writes to ``localCFG`` — so an
in-memory mutation followed by the dump would not propagate the
values to the JobWrapper.
"""
cfg = CFG()
cfg.loadFromFile(str(cfgFilename))
if not cfg.isSection("LocalSite"):
cfg.createNewSection("LocalSite")
cfg.setOption("/LocalSite/CPUTimeLeft", str(cpuInfo["CPUTimeLeft"]))
cfg.setOption("/LocalSite/CPUNormalizationFactor", str(cpuInfo["CPUNormalizationFactor"]))
with open(cfgFilename, "w") as fd:
fd.write(str(cfg))

def _submitJobWrapper(
self,
jobID: str,
ce: ComputingElement,
diracInstallLocation: str,
cpuInfo: dict,
jobParams: dict,
resourceParams: dict,
optimizerParams: dict,
Expand All @@ -526,6 +574,8 @@ def _submitJobWrapper(

:param jobID: job ID
:param ce: ComputingElement instance
:param cpuInfo: per-queue CPU info ``{'CPUTimeLeft', 'CPUNormalizationFactor'}``
to advertise to the payload via the shipped ``dirac.cfg``
:param jobParams: job parameters
:param resourceParams: resource parameters
:param optimizerParams: optimizer parameters
Expand Down Expand Up @@ -571,6 +621,9 @@ def _submitJobWrapper(
# Dump the remote CFG config into the job directory: it is needed for the JobWrapperTemplate
cfgFilename = Path(job.jobIDPath) / "dirac.cfg"
gConfig.dumpRemoteCFGToFile(cfgFilename)
# Inject /LocalSite/CPUTimeLeft and /LocalSite/CPUNormalizationFactor for the payload.
# dumpRemoteCFGToFile only dumps remoteCFG, so we patch the dirac.cfg file directly.
self._appendLocalSiteCFG(cfgFilename, cpuInfo)

# Generate a light JobWrapper executor script
jobDesc = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ def test_submitJobWrapper(mocker, jobID):
jobID=jobID,
ce=jobAgent.queueDict["ce1.site2.com_condor"]["CE"],
diracInstallLocation="diracInstallLocation",
cpuInfo={"CPUTimeLeft": 0, "CPUNormalizationFactor": 0.0},
jobParams=jobParams,
resourceParams={},
optimizerParams={},
Expand Down Expand Up @@ -258,6 +259,7 @@ def test_submitJobWrapper(mocker, jobID):
jobID=jobID,
ce=jobAgent.queueDict["ce1.site2.com_condor"]["CE"],
diracInstallLocation="diracInstallLocation",
cpuInfo={"CPUTimeLeft": 0, "CPUNormalizationFactor": 0.0},
jobParams=jobParams,
resourceParams={},
optimizerParams={},
Expand Down Expand Up @@ -293,6 +295,7 @@ def test_submitJobWrapper(mocker, jobID):
jobID=jobID,
ce=jobAgent.queueDict["ce1.site2.com_condor"]["CE"],
diracInstallLocation="diracInstallLocation",
cpuInfo={"CPUTimeLeft": 0, "CPUNormalizationFactor": 0.0},
jobParams=jobParams,
resourceParams={},
optimizerParams={},
Expand Down Expand Up @@ -327,6 +330,7 @@ def test_submitJobWrapper(mocker, jobID):
jobID=jobID,
ce=jobAgent.queueDict["ce1.site2.com_condor"]["CE"],
diracInstallLocation="diracInstallLocation",
cpuInfo={"CPUTimeLeft": 0, "CPUNormalizationFactor": 0.0},
jobParams=jobParams,
resourceParams={},
optimizerParams={},
Expand Down Expand Up @@ -370,6 +374,7 @@ def test_submitJobWrapper(mocker, jobID):
jobID=jobID,
ce=ce,
diracInstallLocation="diracInstallLocation",
cpuInfo={"CPUTimeLeft": 0, "CPUNormalizationFactor": 0.0},
jobParams=jobParams,
resourceParams={},
optimizerParams={},
Expand Down Expand Up @@ -412,6 +417,7 @@ def test_submitJobWrapper(mocker, jobID):
jobID=jobID,
ce=ce,
diracInstallLocation="diracInstallLocation",
cpuInfo={"CPUTimeLeft": 0, "CPUNormalizationFactor": 0.0},
jobParams=jobParams,
resourceParams={},
optimizerParams={},
Expand All @@ -429,3 +435,110 @@ def test_submitJobWrapper(mocker, jobID):

job.sendJobAccounting.assert_not_called()
shutil.rmtree("job")


def _bareAgent(mocker):
"""Create a minimal PushJobAgent stub for unit-testing helper methods."""
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__")
mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule._AgentModule__moduleProperties",
side_effect=lambda x, y=None: y,
create=True,
)
agent = PushJobAgent("Test", "Test1")
agent.log = gLogger
return agent


def test__appendLocalSiteCFG_writes_options(mocker, tmp_path):
"""The helper adds /LocalSite options to a CFG file without clobbering existing ones."""
from diraccfg import CFG

agent = _bareAgent(mocker)
cfgFile = tmp_path / "dirac.cfg"
seed = CFG()
seed.createNewSection("/DIRAC")
seed.setOption("/DIRAC/VirtualOrganization", "lhcb")
cfgFile.write_text(str(seed))

agent._appendLocalSiteCFG(cfgFile, {"CPUTimeLeft": 7200, "CPUNormalizationFactor": 12.5})

reloaded = CFG()
reloaded.loadFromFile(str(cfgFile))
assert reloaded.getOption("/LocalSite/CPUTimeLeft") == "7200"
assert reloaded.getOption("/LocalSite/CPUNormalizationFactor") == "12.5"
# Existing section is preserved
assert reloaded.getOption("/DIRAC/VirtualOrganization") == "lhcb"


@pytest.mark.slow
def test_submitJobWrapper_writes_local_site_to_dirac_cfg(mocker, jobID):
"""End-to-end: _submitJobWrapper patches the dumped dirac.cfg with /LocalSite options."""
from diraccfg import CFG

mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__")
mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule._AgentModule__moduleProperties",
side_effect=lambda x, y=None: y,
create=True,
)

jobAgent = PushJobAgent("Test", "Test1")
jobAgent.submissionPolicy = "JobWrapper"
jobAgent.queueDict = jobAgent._buildQueueDict(
siteNames=["LCG.Site1.com", "LCG.Site2.site2"], ces=None, ceTypes=None
)["Value"]
jobAgent.log = gLogger

jobAgent.jobs[jobID] = {"JobReport": JobReport(jobID)}
jobParams = {"InputSandbox": True, "InputData": True}

job = Mock()
job.owner = None
job.userGroup = None
job.jobArgs = jobParams
job.jobIDPath = Path(jobID)
job.initialize = Mock()
job.transferInputSandbox = Mock(return_value=S_OK())
job.resolveInputData = Mock(return_value=S_OK())
job.preProcess = Mock(return_value=S_OK())
job.sendJobAccounting = Mock()

mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperUtilities.JobWrapper", return_value=job)

# dumpRemoteCFGToFile writes a minimal stub so _appendLocalSiteCFG has something to patch
def fakeDump(filename):
seed = CFG()
seed.createNewSection("/DIRAC")
seed.setOption("/DIRAC/VirtualOrganization", "lhcb")
Path(filename).write_text(str(seed))

mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.PushJobAgent.gConfig.dumpRemoteCFGToFile",
side_effect=fakeDump,
)

ce = Mock()
ce.submitJob = Mock(return_value={"OK": True, "Value": ["789"], "PilotStampDict": {"789": "abcdef"}})

result = jobAgent._submitJobWrapper(
jobID=jobID,
ce=ce,
diracInstallLocation="diracInstallLocation",
cpuInfo={"CPUTimeLeft": 4242, "CPUNormalizationFactor": 7.5},
jobParams=jobParams,
resourceParams={},
optimizerParams={},
processors=1,
)

assert result["OK"], result

# Verify the CFG that was shipped has the LocalSite section we injected
reloaded = CFG()
reloaded.loadFromFile(str(Path(jobID) / "dirac.cfg"))
assert reloaded.getOption("/LocalSite/CPUTimeLeft") == "4242"
assert reloaded.getOption("/LocalSite/CPUNormalizationFactor") == "7.5"
assert reloaded.getOption("/DIRAC/VirtualOrganization") == "lhcb"

shutil.rmtree("job", ignore_errors=True)
Loading