2222
2323import awkward as ak
2424import cabinetry
25- from func_adl import ObjectStream
26- from func_adl_servicex import ServiceXSourceUpROOT
2725import hist
2826import mplhep
2927import numpy as np
3028import pyhf
3129import uproot
32- from servicex import ServiceXDataset
3330
3431from coffea import processor
3532from coffea .nanoevents .schemas .base import BaseSchema
3633import utils
3734from utils import infofile # contains cross-section information
3835
36+ import servicex
37+
3938import vector
4039vector .register_awkward ()
4140
5453# ServiceX behavior: ignore cache with repeated queries
5554IGNORE_CACHE = False
5655
56+ # ServiceX behavior: choose query language
57+ USE_SERVICEX_UPROOT_RAW = True
58+
5759# %% [markdown]
5860# ## Introduction
5961#
147149# <span style="color:darkgreen">**Systematic uncertainty added:**</span> scale factor variation, applied already at event selection stage. Imagine that this could be a calculation that requires a lot of different variables which are no longer needed downstream afterwards, so it makes sense to do it here.
148150
149151# %%
150- def get_lepton_query (source : ObjectStream ) -> ObjectStream :
151- """Performs event selection: require events with exactly four leptons.
152+ def get_lepton_query () :
153+ """Performs event selection with func_adl transformer : require events with exactly four leptons.
152154 Also select all columns needed further downstream for processing &
153155 histogram filling.
154156 """
155- return source .Where (lambda event : event .lep_n == 4 ).Select (
157+ from servicex import query as q
158+ return q .FuncADL_Uproot ().FromTree ('mini' )\
159+ .Where (lambda event : event .lep_n == 4 ).Select (
156160 lambda e : {
157161 "lep_pt" : e .lep_pt ,
158162 "lep_eta" : e .lep_eta ,
@@ -179,23 +183,33 @@ def get_lepton_query(source: ObjectStream) -> ObjectStream:
179183 }
180184 )
181185
186+ def get_lepton_query_uproot_raw ():
187+ """Performs event selection with uproot-raw transformer: require events with exactly four leptons.
188+ Also select all columns needed further downstream for processing &
189+ histogram filling.
190+ """
191+ from servicex import query as q
192+ return q .UprootRaw ([{'treename' : 'mini' ,
193+ 'expressions' : ['lep_pt' , 'lep_eta' , 'lep_phi' , 'lep_energy' , 'lep_charge' ,
194+ 'lep_typeid' , 'mcWeight' , 'scaleFactor' , 'scaleFactorUP' , 'scaleFactorDOWN' ],
195+ 'aliases' : { 'lep_typeid' : 'lep_type' , 'lep_energy' : 'lep_E' ,
196+ 'scaleFactor' : 'scaleFactor_ELE*scaleFactor_MUON*scaleFactor_LepTRIGGER*scaleFactor_PILEUP' ,
197+ 'scaleFactorUP' : 'scaleFactor*1.1' ,
198+ 'scaleFactorDOWN' : 'scaleFactor*0.9' }
199+ }])
182200
183201# %% [markdown]
184202# # Caching the queried datasets with `ServiceX`
185203#
186204# Using the queries created with `func_adl`, we are using `ServiceX` to read the ATLAS Open Data files to build cached files with only the specific event information as dictated by the query.
187205
188206# %%
189- # dummy dataset on which to generate the query
190- dummy_ds = ServiceXSourceUpROOT ("cernopendata://dummy" , "mini" , backend_name = "uproot" )
191-
192- # tell low-level infrastructure not to contact ServiceX yet, only to
193- # return the qastle string it would have sent
194- dummy_ds .return_qastle = True
195-
196207# create the query
197- lepton_query = get_lepton_query (dummy_ds )
198- query = lepton_query .value ()
208+ if USE_SERVICEX_UPROOT_RAW :
209+ query = get_lepton_query_uproot_raw ()
210+ else :
211+ query = get_lepton_query ()
212+
199213
200214# now we query the files and create a fileset dictionary containing the
201215# URLs pointing to the queried files
@@ -204,13 +218,15 @@ def get_lepton_query(source: ObjectStream) -> ObjectStream:
204218
205219fileset = {}
206220
207- for ds_name in input_files .keys ():
208- ds = ServiceXDataset (input_files [ds_name ], backend_name = "uproot" , ignore_cache = IGNORE_CACHE )
209- files = ds .get_data_rootfiles_uri (query , as_signed_url = True , title = ds_name )
221+ bundle = { 'General' : { 'Delivery' : 'URLs' },
222+ 'Sample' : [ { 'Name' : ds_name ,
223+ 'Query' : query ,
224+ 'Dataset' : servicex .dataset .FileList (input_files [ds_name ]),
225+ 'IgnoreLocalCache' : IGNORE_CACHE } for ds_name in input_files .keys () ]
226+ }
210227
211- fileset [ds_name ] = {"files" : [f .url for f in files ],
212- "metadata" : {"dataset_name" : ds_name }
213- }
228+ results = servicex .deliver (bundle )
229+ fileset = { _ : {"files" : results [_ ], "metadata" : {"dataset_name" : _ }} for _ in results }
214230
215231print (f"execution took { time .time () - t0 :.2f} seconds" )
216232
@@ -383,7 +399,8 @@ def postprocess(self, accumulator):
383399executor = processor .FuturesExecutor (workers = NUM_CORES )
384400run = processor .Runner (executor = executor , savemetrics = True , metadata_cache = {},
385401 chunksize = CHUNKSIZE , schema = BaseSchema )
386- all_histograms , metrics = run (fileset , "servicex" , processor_instance = HZZAnalysis ())
402+ # The trees returned by ServiceX will have different names depending on the query language used
403+ all_histograms , metrics = run (fileset , "mini" if USE_SERVICEX_UPROOT_RAW else "servicex" , processor_instance = HZZAnalysis ())
387404
388405print (f"execution took { time .time () - t0 :.2f} seconds" )
389406
0 commit comments