In [3]:
import os
import sys
import tempfile
import time
import glob

import uproot
import uproot_methods
import numpy as np
import awkward
import concurrent.futures

from tqdm.auto import tqdm

In [4]:
def save_array(fname,x,compress=True):
    if type(x) == awkward.JaggedArray:
        d = dict(content=x.content,counts=x.counts)
    else:
        d = dict(content=x)
    if compress:
        np.savez_compressed(fname,**d)
    else:
        np.savez(fname,**d)
    
def load_array(fname):
    f = np.load(fname)
    if "counts" in f.files:
        return awkward.JaggedArray.fromcounts(content=f["content"],counts=f["counts"])
    else:
        return f["content"]

In [5]:
executor = concurrent.futures.ProcessPoolExecutor(8)
# executor = concurrent.futures.ThreadPoolExecutor(8)

def convert_branch(inputfnames,treename,branchname,fulloutpath):
    toconcat = []
    for branches in uproot.iterate(inputfnames,treename,entrysteps=300000,branches=[branchname],outputtype=tuple):
        toconcat.append(branches[0])
    arr = awkward.concatenate(toconcat)
    tempname = ".temp_{}.npz".format(branchname)
    save_array(tempname,arr)
    os.system("cp {} {}".format(tempname, fulloutpath))
    os.system("rm {}".format(tempname))
    


In [6]:
def convert_dir_parallel(nanodir):
    outdir = "{}/column_data/".format(nanodir)
    os.system("mkdir -p {}".format(outdir))
    inputfnames = list(sorted(glob.glob(nanodir+"/*.root")))
    treename = "Events"
    futures = []
    for branchname in uproot.open(inputfnames[0])[treename].keys():
        branchname = branchname.decode("ascii")
        outname = "branch__{}__0.npz".format(branchname)
        fulloutpath = "{}/{}".format(outdir, outname)
        if os.path.exists(fulloutpath): continue
        futures.append(executor.submit(convert_branch,inputfnames,treename,branchname,fulloutpath))
    for future in tqdm(concurrent.futures.as_completed(futures),total=len(futures),position=1,desc="branches"):
        pass

In [None]:
nanodirs = glob.glob("/hadoop/cms/store/user/namin/nanoaod/DoubleMuon__Run201*/")

for nanodir in tqdm(nanodirs,position=0,desc="dirs"):
    convert_dir_parallel(nanodir)

HBox(children=(IntProgress(value=0, description='dirs', max=14, style=ProgressStyle(description_width='initial…

HBox(children=(IntProgress(value=0, description='branches', max=1141, style=ProgressStyle(description_width='i…