import sys
import subprocess
from mkShapesRDF.processor.framework.Steps_cfg import Steps
from mkShapesRDF.processor.framework.Productions_cfg import Productions
from mkShapesRDF.lib.search_files import SearchFiles
import os
from pathlib import Path
from math import ceil
from textwrap import dedent
from mkShapesRDF.lib.utils import getFrameworkPath
[docs]class Processor:
"""
The ``Processor`` class is used to create the folder structure for batch submission and the scripts to run the processing.
"""
[docs] def __init__(
self,
condorDir,
eosDir,
prodName,
step,
selTree=[],
excTree=[],
isLatino=True,
inputFolder="",
redirector="",
maxFilesPerJob=1,
limitFiles=-1,
dryRun=0,
MT=False,
):
"""
Initialize the processor object
Parameters
----------
condorDir : str
Path to the directory to use for condor jobs
eosDir : str
Path to the directory to use for eos (output files)
prodName : str
Production name (must be in ``Productions_cfg.py``)
step : str
Step name (must be in ``Steps_cfg.py``)
selTree : `list of str`
List of sample names to process (must be in the sample file associated to the production)
excTree : `list of str`
List of sample names to exclude (must be in the sample file associated to the production)
isLatino : bool
If True, use the latino naming convention (e.g. ``nanoLatino_DYJetsToLL_M-50__part*.root``)
inputFolder : str, optional, default: ""
Path to the input folder (where input files are searched, if empty, DAS is used)
redirector : str, optional, default: ""
Redirector to use (if empty, use direct access)
maxFilesPerJob : int, optional, default: 1
Maximum number of files per job (If 20 files and maxFilesPerJob=10, 2 jobs will be created)
limitFiles : int, optional, default: -1
Limit the number of input files to consider (if -1, all files are considered)
dryRun : int, optional, default: 0
If 1, do not submit jobs, just create the folder structure and the scripts
MT : bool, optional, default: False
If True, use multi-threading in the runner (``ROOT.EnableImplicitMT()``)
Be careful since the events order is not preserved!
"""
self.condorDir = condorDir
self.eosDir = eosDir
self.searchFiles = SearchFiles()
self.prodName = prodName
self.selTree = selTree
self.excTree = excTree
self.isLatino = isLatino
self.step = step
self.inputFolder = inputFolder
self.redirector = redirector
self.maxFilesPerJob = maxFilesPerJob
self.limitFiles = limitFiles
self.dryRun = dryRun
self.path = getFrameworkPath() + "mkShapesRDF/processor/framework/"
self.MT = MT
[docs] def getFiles_cfg(self, sampleName):
"""
Utility function to get a dictionary to be passed to the proper function of ``SearchFiles``
Parameters
----------
sampleName : str
Sample name (must be in the sample file associated to the production)
Returns
-------
dict
Dictionary to be passed to the proper function of ``SearchFiles``
"""
if self.inputFolder == "":
# if no inputFolder is given -> DAS
d = {
"process": self.Samples[sampleName]["nanoAOD"],
"instance": self.Samples[sampleName].get("instance", ""),
}
if self.redirector != "":
d["redirector"] = self.redirector
else:
d = {
"folder": self.inputFolder,
"process": sampleName,
"isLatino": self.isLatino,
"redirector": self.redirector,
}
return d
[docs] def addDeclareLines(self, step):
"""
Add the declare lines to the python script file
Parameters
----------
step : str
the step name to consider. Must be in ``Steps_cfg.py``. If the step is a chain, the declare lines of the substeps are added recursively.
The imports, the declaration and the module call are added to the python script file.
"""
if step not in Steps.keys():
print(f"Error, step {step} not found in Steps")
sys.exit()
if "selection" in Steps[step].keys():
self.fPy += "df = df.Filter(" + Steps[step]["selection"] + ")\n"
if Steps[step]["isChain"]:
for subtarget in Steps[step]["subTargets"]:
self.addDeclareLines(subtarget)
elif "declare" in Steps[step].keys():
self.fPy += "from " + Steps[step]["import"] + " import *\n"
self.fPy += Steps[step]["declare"] + "\n"
self.fPy += "module = " + Steps[step]["module"] + "\n"
self.fPy += "df = module.run(df, values) \n"
[docs] def run(self):
"""Create the folder structure and the scripts to run the processing. Submit if dryRun==0"""
global fPy
self.fPy = dedent(
"""
import ROOT
ROOT.gROOT.SetBatch(True)
"""
)
if self.MT:
self.fPy += "ROOT.EnableImplicitMT()\n"
self.fPy += "from mkShapesRDF.processor.framework.mRDF import mRDF\n"
self.fPy += "import subprocess\n"
self.fPy += "import sys\n"
if Productions[self.prodName]["isData"]:
self.fPy += (
'lumiFile = "'
+ os.path.abspath(self.path + Productions[self.prodName]["jsonFile"])
+ '"\n'
)
else:
xsFile = Productions[self.prodName]["xsFile"]
with open(self.path + xsFile) as file:
exec(file.read(), globals())
self.fPy += "xs_db = " + str(xs_db) + "\n" # noqa F821
with open(self.path + Productions[self.prodName]["samples"]) as file:
exec(file.read(), globals())
self.Samples = Samples # noqa F821
fSh = ""
with open(self.path + "../../../start.sh") as file:
fSh += file.read()
if self.inputFolder == "":
cmd = "voms-proxy-info"
proc = subprocess.Popen(
cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True
)
out, err = proc.communicate()
if "Proxy not found" in err.decode("utf-8"):
print("WARNING: No GRID proxy -> Get one first with:")
print("voms-proxy-init -voms cms -rfc --valid 168:0")
sys.exit()
proxypath = " xxx "
for line in out.decode("utf-8").split("\n"):
if "path" in line:
proxypath = line.split(":")[1].strip()
print(proxypath)
os.system("cp " + proxypath + " " + os.environ["HOME"] + "/.proxy")
fSh += f"export X509_USER_PROXY={os.environ['HOME']}/.proxy\n"
fSh += "time python script.py\n"
jobDir = self.condorDir + "/" + self.prodName + "/" + self.step + "/"
Path(jobDir).mkdir(parents=True, exist_ok=True)
with open(jobDir + "run.sh", "w") as file:
file.write(fSh)
os.system("chmod +x " + jobDir + "run.sh")
frameworkPath = getFrameworkPath() + "mkShapesRDF"
self.fPy += "sampleName = 'RPLME_SAMPLENAME'\n"
self.fPy += "_files = RPLME_FILES\n"
self.fPy += dedent(
"""
files = []
for f in _files:
filename = f.split('/')[-1]
filename = 'input__' + filename
files.append(filename)
proc = 0
if "root://" in f:
proc = subprocess.Popen(f"xrdcp {f} {filename}", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
else:
proc = subprocess.Popen(f"cp {f} {filename}", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate()
print(out.decode('utf-8'))
print(err.decode('utf-8'), file=sys.stderr)
if proc.returncode != 0:
print(f"Error copying file {f}", file=sys.stderr)
sys.exit(1)\n
"""
)
self.fPy += f"ROOT.gInterpreter.Declare('#include \"{frameworkPath}/include/headers.hh\"')\n"
self.fPy += "df = mRDF()\n"
self.fPy += 'df = df.readRDF("Events", files)\n'
self.fPy += "values = []\n"
self.addDeclareLines(self.step)
self.fPy += dedent(
"""
snapshots = []
snapshot_destinations = []
for val in values:
if "snapshot" == val[0]:
snapshots.append(val[1])
snapshot_destinations.append(val[2])
if len(snapshots) != 0:
ROOT.RDF.RunGraphs(snapshots)
histos = []
for val in values:
if val[0] == "variables":
h = val[2]
for var in h.GetKeys():
variation = val[1] + '_' + str(var).replace(":", "")
_h = h[var]
_h.SetName(variation)
histos.append( _h )
f = ROOT.TFile.Open("output.root", "UPDATE")
f.cd()
for h in histos:
h.Write()
f.Close()
for destination in snapshot_destinations:
copyFromInputFiles = destination[1]
outputFilename = destination[0]
if copyFromInputFiles:
Snapshot.CopyFromInputFiles(outputFilename, files)
outputFolderPath = destination[2]
outputFilenameEOS = destination[3]
# Create output folder
proc = subprocess.Popen(f"mkdir -p {outputFolderPath}", shell=True)
proc.wait()
# Copy output file in output folder
proc = subprocess.Popen(f"cp {outputFilename} {outputFolderPath}/{outputFilenameEOS}", shell=True)
proc.wait()
# Remove the output file from local
proc = subprocess.Popen(f"rm {outputFilename}", shell=True)
proc.wait()
def sciNot(value):
# scientific notation
return "{:.3e}".format(value)
data = []
reservedValuesNames = ["snapshot", "variables"]
for val in values:
if val[0] in reservedValuesNames:
continue
if "list" in str(type(val)):
if str(type(val[0])) == "<class 'function'>":
data.append(val[0](*val[1:]))
else:
data.append([val[1], sciNot(val[0].GetValue())])
else:
data.append("", sciNot(val.GetValue()))
from tabulate import tabulate
print(tabulate(data, headers=["desc.", "value"]))
for f in files:
print('Removing input file', f)
proc = subprocess.Popen(f"rm {f}", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate()
print(out.decode('utf-8'))
print(err.decode('utf-8'), file=sys.stderr)
"""
)
self.fPy = self.fPy.replace("RPLME_FW", frameworkPath)
#: folderPathEos is the output folder path (not ending with ``/`` so that is possible to add suffix to the folder)
folderPathEos = self.eosDir + "/" + self.prodName + "/" + self.step
self.fPy = self.fPy.replace("RPLME_EOSPATH", folderPathEos)
allSamples = []
samplesToProcess = self.Samples.keys()
samplesNotToProcess = []
if len(self.selTree) != 0:
for i, sampleName in enumerate(samplesToProcess):
if sampleName not in self.selTree:
samplesNotToProcess.append(i)
if len(self.excTree) != 0:
for i, sampleName in enumerate(samplesToProcess):
if sampleName in self.excTree:
samplesNotToProcess.append(i)
samplesToProcess = [
sampleName
for i, sampleName in enumerate(samplesToProcess)
if i not in samplesNotToProcess
]
if len(samplesToProcess) == 0:
print("No samples to process", file=sys.stderr)
sys.exit(1)
for sampleName in samplesToProcess:
files_cfg = self.getFiles_cfg(sampleName)
files = []
if self.inputFolder == "":
files = self.searchFiles.searchFilesDAS(**files_cfg)
else:
files = self.searchFiles.searchFiles(**files_cfg)
files = files[: self.limitFiles]
if len(files) == 0:
print("No files found for", sampleName, "and configuration", files_cfg)
sys.exit()
print(files[0])
nParts = ceil(len(files) / self.maxFilesPerJob)
sample_fPy = self.fPy.replace("RPLME_SAMPLENAME", sampleName)
if "RPLME_genEventSumw" in self.fPy:
import ROOT
ROOT.gROOT.SetBatch(True)
ROOT.EnableImplicitMT()
df = ROOT.RDataFrame("Runs", files)
genEventSumw = df.Sum("genEventSumw").GetValue()
sample_fPy = sample_fPy.replace("RPLME_genEventSumw", str(genEventSumw))
for part in range(nParts):
_files = files[
part * self.maxFilesPerJob : (part + 1) * self.maxFilesPerJob
]
_fPy = sample_fPy.replace("RPLME_FILES", str(_files))
outputFilename = (
"nanoLatino_" + sampleName + "__part" + str(part) + ".root"
)
if self.inputFolder != "":
outputFilename = _files[0].split("/")[-1]
_fPy = _fPy.replace("RPLME_OUTPUTFILENAME", outputFilename)
jobDirPart = jobDir + sampleName + "__part" + str(part) + "/"
Path(jobDirPart).mkdir(parents=True, exist_ok=True)
with open(jobDirPart + "/script.py", "w") as f:
f.write(_fPy)
allSamples.append(sampleName + "__part" + str(part))
fJdl = dedent(
"""
universe = vanilla
executable = run.sh
arguments = $(Folder)
should_transfer_files = YES
transfer_input_files = $(Folder)/script.py
output = $(Folder)/out.txt
error = $(Folder)/err.txt
log = $(Folder)/log.txt
request_cpus = 1
+JobFlavour = "workday"
queue 1 Folder in RPLME_ALLSAMPLES"""
)
fJdl = fJdl.replace("RPLME_ALLSAMPLES", " ".join(allSamples))
with open(jobDir + "/submit.jdl", "w") as f:
f.write(fJdl)
if self.dryRun == 0:
proc = subprocess.Popen(
f"cd {jobDir}; condor_submit submit.jdl;", shell=True
)
proc.wait()