@@ -256,13 +256,13 @@ def setup_tables(spark_session, input_prefix, input_format, use_decimal, executi
256256 spark_app_id = spark_session .sparkContext .applicationId
257257 # Create TempView for tables
258258 for table_name in get_schemas (False ).keys ():
259- start = time .time_ns () // 1_000_000
259+ start = int ( time .time () * 1000 )
260260 table_path = input_prefix + '/' + table_name
261261 reader = spark_session .read .format (input_format )
262262 if input_format in ['csv' , 'json' ]:
263263 reader = reader .schema (get_schemas (use_decimal )[table_name ])
264264 reader .load (table_path ).createOrReplaceTempView (table_name )
265- end = time .time_ns () // 1_000_000
265+ end = int ( time .time () * 1000 )
266266 print ("====== Creating TempView for table {} ======" .format (table_name ))
267267 print ("Time taken: {} millis for table {}" .format (end - start , table_name ))
268268 execution_time_list .append (
@@ -273,12 +273,12 @@ def register_delta_tables(spark_session, input_prefix, execution_time_list):
273273 spark_app_id = spark_session .sparkContext .applicationId
274274 # Register tables for Delta Lake
275275 for table_name in get_schemas (False ).keys ():
276- start = time .time_ns () // 1_000_000
276+ start = int ( time .time () * 1000 )
277277 # input_prefix must be absolute path: https://github.com/delta-io/delta/issues/555
278278 register_sql = f"CREATE TABLE IF NOT EXISTS { table_name } USING DELTA LOCATION '{ input_prefix } /{ table_name } '"
279279 print (register_sql )
280280 spark_session .sql (register_sql )
281- end = time .time_ns () // 1_000_000
281+ end = int ( time .time () * 1000 )
282282 print ("====== Registering for table {} ======" .format (table_name ))
283283 print ("Time taken: {} millis for table {}" .format (end - start , table_name ))
284284 execution_time_list .append (
@@ -415,7 +415,7 @@ def run_query_stream(input_prefix,
415415 """
416416 queries_reports = []
417417 execution_time_list = []
418- total_time_start = time .time_ns ()
418+ total_time_start = time .time ()
419419 # check if it's running specific query or Power Run
420420 if len (query_dict ) == 1 :
421421 app_name = "NDS - " + list (query_dict .keys ())[0 ]
@@ -457,7 +457,7 @@ def run_query_stream(input_prefix,
457457 profiler = Profiler (profiling_hook = profiling_hook , output_root = json_summary_folder )
458458
459459 # Run query
460- power_start = time .time_ns ( )
460+ power_start = int ( time .time () )
461461 setup_time = 0
462462 cleanup_time = 0
463463
@@ -505,16 +505,16 @@ def run_query_stream(input_prefix,
505505 else :
506506 summary_prefix = os .path .join (json_summary_folder , '' )
507507 q_report .write_summary (prefix = summary_prefix )
508- power_end = time .time_ns ( )
509- power_elapse = int ((power_end - power_start )/ 1000 )
510-
508+ power_end = int ( time .time () )
509+ power_elapse = int ((power_end - power_start )* 1000 )
510+
511511 # Calculate Power Test Time (excluding setup and cleanup)
512512 power_test_time = power_elapse - setup_time - cleanup_time
513513
514514 if not keep_sc :
515515 spark_session .sparkContext .stop ()
516- total_time_end = time .time_ns ()
517- total_elapse = int ((total_time_end - total_time_start )/ 1000 )
516+ total_time_end = time .time ()
517+ total_elapse = int ((total_time_end - total_time_start )* 1000 )
518518 print ("====== Power Test Time: {} milliseconds ======" .format (power_test_time ))
519519 if setup_time > 0 :
520520 print ("====== Power Setup Time: {} milliseconds ======" .format (setup_time ))
0 commit comments