@@ -3431,17 +3431,47 @@ def _build_location_data_from_sessions(project_id: str, db, selected_session_ids
period_type = entry [ " period_type " ]
period_type = entry [ " period_type " ]
raw_rows = sorted ( entry [ " rows " ] , key = lambda r : r . get ( ' Start Time ' , ' ' ) )
raw_rows = sorted ( entry [ " rows " ] , key = lambda r : r . get ( ' Start Time ' , ' ' ) )
spreadsheet_data = [ ]
# Parse all rows to datetimes first so we can apply period-aware filtering
for idx , row in enumerate ( raw_rows , 1 ) :
parsed = [ ]
for row in raw_rows :
start_time_str = row . get ( ' Start Time ' , ' ' )
start_time_str = row . get ( ' Start Time ' , ' ' )
date_str = time_str = ' '
dt = None
if start_time_str :
if start_time_str :
try :
try :
dt = datetime . strptime ( start_time_str , ' % Y/ % m/ %d % H: % M: % S ' )
dt = datetime . strptime ( start_time_str , ' % Y/ % m/ %d % H: % M: % S ' )
date_str = dt . strftime ( ' % Y- % m- %d ' )
time_str = dt . strftime ( ' % H: % M ' )
except ValueError :
except ValueError :
date_str = start_time_str
pass
parsed . append ( ( dt , row ) )
# Determine which rows to keep based on period_type
is_day_session = period_type in ( ' weekday_day ' , ' weekend_day ' )
target_date = None
if is_day_session :
# Day: 07:00– 18:59 only, restricted to the LAST calendar date that has daytime rows
daytime_dates = sorted ( {
dt . date ( ) for dt , row in parsed
if dt and 7 < = dt . hour < 19
} )
target_date = daytime_dates [ - 1 ] if daytime_dates else None
filtered = [
( dt , row ) for dt , row in parsed
if dt and dt . date ( ) == target_date and 7 < = dt . hour < 19
]
else :
# Night: 19:00– 06:59, spanning both calendar days — no date restriction
filtered = [
( dt , row ) for dt , row in parsed
if dt and ( dt . hour > = 19 or dt . hour < 7 )
]
# Fall back to all rows if filtering removed everything
if not filtered :
filtered = parsed
spreadsheet_data = [ ]
for idx , ( dt , row ) in enumerate ( filtered , 1 ) :
date_str = dt . strftime ( ' % Y- % m- %d ' ) if dt else ' '
time_str = dt . strftime ( ' % H: % M ' ) if dt else ' '
lmax = row . get ( ' Lmax(Main) ' , ' ' )
lmax = row . get ( ' Lmax(Main) ' , ' ' )
ln1 = row . get ( ' LN1(Main) ' , ' ' )
ln1 = row . get ( ' LN1(Main) ' , ' ' )
@@ -3458,14 +3488,33 @@ def _build_location_data_from_sessions(project_id: str, db, selected_session_ids
period_type , # col index 7 — hidden, used by report gen for day/night bucketing
period_type , # col index 7 — hidden, used by report gen for day/night bucketing
] )
] )
# For the label/filename, use target_date (day sessions) or started_at (night sessions)
from datetime import timedelta as _td
started_at_dt = entry [ " started_at " ]
if is_day_session and target_date :
# Use the actual target date from data filtering (last date with daytime rows)
label_dt = datetime . combine ( target_date , datetime . min . time ( ) )
else :
label_dt = started_at_dt
# Rebuild session label using the correct label date
if label_dt and entry [ " loc_name " ] :
period_str = { " weekday_day " : " Day " , " weekday_night " : " Night " ,
" weekend_day " : " Day " , " weekend_night " : " Night " } . get ( period_type , " " )
day_abbr = label_dt . strftime ( " %a " )
date_label = f " { label_dt . month } / { label_dt . day } "
session_label = " — " . join ( p for p in [ loc_name , f " { day_abbr } { date_label } " , period_str ] if p )
else :
session_label = entry [ " session_label " ]
location_data . append ( {
location_data . append ( {
" session_id " : session_id ,
" session_id " : session_id ,
" location_name " : loc_name ,
" location_name " : loc_name ,
" session_label " : entry [ " session_label" ] ,
" session_label " : session_label ,
" period_type " : period_type ,
" period_type " : period_type ,
" started_at " : entry [ " started_at " ] . isoformat ( ) if entry [ " started_at " ] else " " ,
" started_at " : label_dt . isoformat ( ) if label_dt else " " ,
" raw_count " : len ( raw_rows ) ,
" raw_count " : len ( raw_rows ) ,
" filtered_count " : len ( raw_rows ) ,
" filtered_count " : len ( filtered ) ,
" spreadsheet_data " : spreadsheet_data ,
" spreadsheet_data " : spreadsheet_data ,
} )
} )
@@ -3569,7 +3618,7 @@ async def generate_combined_from_preview(
tbl_bot_mid = Border ( left = thin , right = thin , top = thin , bottom = med )
tbl_bot_mid = Border ( left = thin , right = thin , top = thin , bottom = med )
tbl_bot_right = Border ( left = thin , right = med , top = thin , bottom = med )
tbl_bot_right = Border ( left = thin , right = med , top = thin , bottom = med )
col_widths = [ 9.43 , 10.14 , 8.14 , 12.86 , 10.86 , 10.86 , 25.0 , 6.43 , 12.43 , 12.43 , 10 .0 , 14.71 , 8 .0, 6.43 , 6.43 , 6.43 ]
col_widths = [ 9.43 , 10.14 , 8.14 , 12.86 , 10.86 , 10.86 , 25.0 , 6.43 , 18.0 , 18.0 , 14 .0 , 14.0 , 10 .0, 8.0 , 6.43 , 6.43 ]
def _build_location_sheet ( ws , loc_name , day_rows , final_title ) :
def _build_location_sheet ( ws , loc_name , day_rows , final_title ) :
""" Write one location ' s data onto ws. day_rows is a list of spreadsheet row arrays. """
""" Write one location ' s data onto ws. day_rows is a list of spreadsheet row arrays. """
@@ -3586,6 +3635,28 @@ async def generate_combined_from_preview(
ws [ ' A3 ' ] = loc_name
ws [ ' A3 ' ] = loc_name
ws [ ' A3 ' ] . font = f_title ; ws [ ' A3 ' ] . alignment = center_a
ws [ ' A3 ' ] . font = f_title ; ws [ ' A3 ' ] . alignment = center_a
ws . row_dimensions [ 3 ] . height = 15.75
ws . row_dimensions [ 3 ] . height = 15.75
# Row 4: date range derived from the data rows
def _fmt_date ( d ) :
try :
from datetime import datetime as _dt
return _dt . strptime ( d , ' % Y- % m- %d ' ) . strftime ( ' % -m/ %-d / % y ' )
except Exception :
return d
dates_in_data = sorted ( {
row [ 1 ] for row in day_rows
if len ( row ) > 1 and row [ 1 ]
} )
if len ( dates_in_data ) > = 2 :
date_label = f " { _fmt_date ( dates_in_data [ 0 ] ) } to { _fmt_date ( dates_in_data [ - 1 ] ) } "
elif len ( dates_in_data ) == 1 :
date_label = _fmt_date ( dates_in_data [ 0 ] )
else :
date_label = " "
ws . merge_cells ( ' A4:G4 ' )
ws [ ' A4 ' ] = date_label
ws [ ' A4 ' ] . font = f_data ; ws [ ' A4 ' ] . alignment = center_a
ws . row_dimensions [ 4 ] . height = 15
ws . row_dimensions [ 4 ] . height = 15
ws . row_dimensions [ 5 ] . height = 15.75
ws . row_dimensions [ 5 ] . height = 15.75
@@ -3608,7 +3679,7 @@ async def generate_combined_from_preview(
b_right = last_right if is_last else data_right
b_right = last_right if is_last else data_right
test_num = row [ 0 ] if len ( row ) > 0 else row_idx + 1
test_num = row [ 0 ] if len ( row ) > 0 else row_idx + 1
date_val = row [ 1 ] if len ( row ) > 1 else ' '
date_val = _fmt_date ( row[ 1 ] ) if len ( row ) > 1 and row [ 1 ] else ' '
time_val = row [ 2 ] if len ( row ) > 2 else ' '
time_val = row [ 2 ] if len ( row ) > 2 else ' '
lmax = row [ 3 ] if len ( row ) > 3 else ' '
lmax = row [ 3 ] if len ( row ) > 3 else ' '
ln1 = row [ 4 ] if len ( row ) > 4 else ' '
ln1 = row [ 4 ] if len ( row ) > 4 else ' '
@@ -3640,7 +3711,7 @@ async def generate_combined_from_preview(
ln2_vals . append ( ln2 )
ln2_vals . append ( ln2 )
if isinstance ( lmax , ( int , float ) ) and isinstance ( ln1 , ( int , float ) ) and isinstance ( ln2 , ( int , float ) ) :
if isinstance ( lmax , ( int , float ) ) and isinstance ( ln1 , ( int , float ) ) and isinstance ( ln2 , ( int , float ) ) :
parsed_rows . append ( ( row_period , float ( lmax ) , float ( ln1 ) , float ( ln2 ) ) )
parsed_rows . append ( ( row_period , time_val , float ( lmax ) , float ( ln1 ) , float ( ln2 ) ) )
data_end_row = data_start_row + len ( day_rows ) - 1
data_end_row = data_start_row + len ( day_rows ) - 1
@@ -3667,116 +3738,112 @@ async def generate_combined_from_preview(
ws . add_chart ( chart , " H4 " )
ws . add_chart ( chart , " H4 " )
note1 = ws . cell ( row = 28 , column = 9 , value = " Note: Averages are calculated by determining the arithmetic average " )
note1 . font = f_data ; note1 . alignment = left_a
ws . merge_cells ( start_row = 28 , start_column = 9 , end_row = 28 , end_column = 14 )
note2 = ws . cell ( row = 29 , column = 9 , value = " for each specified range of time intervals. " )
note2 . font = f_data ; note2 . alignment = left_a
ws . merge_cells ( start_row = 29 , start_column = 9 , end_row = 29 , end_column = 14 )
hdr_fill_tbl = PatternFill ( start_color = " F2F2F2 " , end_color = " F2F2F2 " , fill_type = " solid " )
hdr_fill_tbl = PatternFill ( start_color = " F2F2F2 " , end_color = " F2F2F2 " , fill_type = " solid " )
def _avg ( vals ) : return round ( sum ( vals ) / len ( vals ) , 1 ) if vals else None
def _avg ( vals ) : return round ( sum ( vals ) / len ( vals ) , 1 ) if vals else None
def _max ( vals ) : return round ( max ( vals ) , 1 ) if vals else None
def _max ( vals ) : return round ( max ( vals ) , 1 ) if vals else None
# --- Dynamic period detection ----------------------------------------
# --- Period bucketing -------- ----------------------------------------
# Use the period_type stored on each row (from the session record ).
# For night sessions: split into Evening (7PM– 10PM) and Nighttime (10PM– 7AM ).
# Rows without a period_type fall back to time-of-day detection .
# For day sessions: single Daytime bucket .
# The four canonical types map to two display columns:
# Day -> "Daytime (7AM to 10PM)"
# Night -> "Nighttime (10PM to 7AM)"
PERIOD_TYPE_IS_DAY = { " weekday_day " , " weekend_day " }
PERIOD_TYPE_IS_DAY = { " weekday_day " , " weekend_day " }
PERIOD_TYPE_IS_NIGHT = { " weekday_night " , " weekend_night " }
PERIOD_TYPE_IS_NIGHT = { " weekday_night " , " weekend_night " }
day_rows_data = [ ]
day_rows_data = [ ]
evening_rows_data = [ ]
night_rows_data = [ ]
night_rows_data = [ ]
for pt , lmx , l1 , l2 in parsed_rows :
for pt , time_v , lmx , l1 , l2 in parsed_rows :
if pt in PERIOD_TYPE_IS_DAY :
if pt in PERIOD_TYPE_IS_DAY :
day_rows_data . append ( ( lmx , l1 , l2 ) )
day_rows_data . append ( ( lmx , l1 , l2 ) )
elif pt in PERIOD_TYPE_IS_NIGHT :
elif pt in PERIOD_TYPE_IS_NIGHT :
# Split by time: Evening = 19:00– 21:59, Nighttime = 22:00– 06:59
hour = 0
if time_v and ' : ' in str ( time_v ) :
try :
hour = int ( str ( time_v ) . split ( ' : ' ) [ 0 ] )
except ValueError :
pass
if 19 < = hour < = 21 :
evening_rows_data . append ( ( lmx , l1 , l2 ) )
else :
night_rows_data . append ( ( lmx , l1 , l2 ) )
night_rows_data . append ( ( lmx , l1 , l2 ) )
else :
else :
# No period_type — fall back to time-of-day (shouldn't happen for
# new uploads, but handles legacy data gracefully)
# We can't derive from time here since parsed_rows no longer stores dt.
# Put in day as a safe default.
day_rows_data . append ( ( lmx , l1 , l2 ) )
day_rows_data . append ( ( lmx , l1 , l2 ) )
all_candidate_periods = [
all_candidate_periods = [
( " Daytime (7AM to 10PM) " , day_rows_data ) ,
( " Daytime (7AM to 10PM) " , day_rows_data ) ,
( " Evening (7PM to 10PM) " , evening_rows_data ) ,
( " Nighttime (10PM to 7AM) " , night_rows_data ) ,
( " Nighttime (10PM to 7AM) " , night_rows_data ) ,
]
]
active_periods = [ ( label , rows ) for label , rows in all_candidate_periods if rows ]
active_periods = [ ( label , rows ) for label , rows in all_candidate_periods if rows ]
# If nothing at all, show both columns empty
if not active_periods :
if not active_periods :
active_periods = [ ( " Daytime (7AM to 10PM) " , [ ] ) , ( " Nighttime (10PM to 7AM) " , [ ] ) ]
active_periods = [ ( " Daytime (7AM to 10PM) " , [ ] ) ]
# --- Stats table — fixed position alongside the chart ---
note1 = ws . cell ( row = 28 , column = 9 ,
value = " Note: Averages are calculated by determining the arithmetic average " )
note1 . font = f_data ; note1 . alignment = left_a
ws . merge_cells ( start_row = 28 , start_column = 9 , end_row = 28 , end_column = 14 )
note2 = ws . cell ( row = 29 , column = 9 ,
value = " for each specified range of time intervals. " )
note2 . font = f_data ; note2 . alignment = left_a
ws . merge_cells ( start_row = 29 , start_column = 9 , end_row = 29 , end_column = 14 )
for r in [ 28 , 29 , 30 , 31 , 32 , 33 , 34 ] :
ws . row_dimensions [ r ] . height = 15
tbl_hdr_row = 31
tbl_data_row = 32
# Build header row (row 31) with one merged pair of columns per active period
# Layout: col 9 = row label, then pairs: (10,11), (12,13), (14,15)
# Layout: col 9 = row label, then pairs: (10,11), (12,13), (14,15)
num_periods = len ( active_periods )
num_periods = len ( active_periods )
period_start_cols = [ 10 + i * 2 for i in range ( num_periods ) ]
period_start_cols = [ 10 + i * 2 for i in range ( num_periods ) ]
# Left/right border helpers for the header row
def _hdr_border ( i , n ) :
def _hdr_border ( i , n ) :
is_first = ( i == 0 )
is_last = ( i == n - 1 )
return Border (
return Border (
left = med if is_first else thin ,
left = med if i == 0 else thin ,
right = med if is_last else thin ,
right = med if i == n - 1 else thin ,
top = med ,
top = med , bottom = thin ,
bottom = thin ,
)
def _mid_border ( i , n , is_data_last = False ) :
is_first = ( i == 0 )
is_last = ( i == n - 1 )
b = tbl_bot_mid if is_data_last else tbl_mid_mid
return Border (
left = med if is_first else thin ,
right = med if is_last else thin ,
top = b . top ,
bottom = b . bottom ,
)
)
c = ws . cell ( row = 31 , column = 9 , value = " " ) ; c . border = tbl_top_left ; c . font = f_bold
c = ws . cell ( row = tbl_hdr_row , column = 9 , value = " " ) ; c . border = tbl_top_left ; c . font = f_bold
ws . row_dimensions [ 31 ] . height = 30
for i , ( period_label , _ ) in enumerate ( active_periods ) :
for i , ( period_label , _ ) in enumerate ( active_periods ) :
sc = period_start_cols [ i ]
sc = period_start_cols [ i ]
is_last_col = ( i == num_periods - 1 )
c = ws . cell ( row = tbl_hdr_row , column = sc , value = period_label )
c = ws . cell ( row = 31 , column = sc , value = period_label . replace ( ' \n ' , ' ' ) )
c . font = f_bold
c . font = f_bold
c . alignment = Alignment ( horizontal = ' center ' , vertical = ' center ' , wrap_text = Tru e)
c . alignment = Alignment ( horizontal = ' center ' , vertical = ' center ' , wrap_text = Fals e)
c . border = _hdr_border ( i , num_periods )
c . border = _hdr_border ( i , num_periods )
c . fill = hdr_fill_tbl
c . fill = hdr_fill_tbl
ws . merge_cells ( start_row = 31 , start_column = sc , end_row = 31 , end _column= sc + 1 )
ws . merge_cells ( start_row = tbl_hdr_row , start _column= sc ,
end_row = tbl_hdr_row , end_column = sc + 1 )
def write_stat_dynamic ( row_num , row_label , period_vals_list , is_last = False ) :
def write_stat_dynamic ( row_num , row_label , period_vals_list , is_last = False ) :
bl = tbl_bot_left if is_last else tbl_mid_left
lbl = ws . cell ( row = row_num , column = 9 , value = row_label )
lbl = ws . cell ( row = row_num , column = 9 , value = row_label )
lbl . font = f_data ; lbl . border = bl
lbl . font = f_data ; lbl . border = t bl_bot_left if is_last else tbl_mid_left
lbl . alignment = Alignment ( horizontal = ' left ' , vertical = ' center ' )
lbl . alignment = Alignment ( horizontal = ' left ' , vertical = ' center ' )
n = len ( period_vals_list )
n = len ( period_vals_list )
for i , val in enumerate ( period_vals_list ) :
for i , val in enumerate ( period_vals_list ) :
sc = period_start_cols [ i ]
sc = period_start_cols [ i ]
is_last_col = ( i == n - 1 )
val_str = f " { val } dBA " if val is not None else " "
val_str = f " { val } dBA " if val is not None else " "
c = ws . cell ( row = row_num , column = sc , value = val_str )
c = ws . cell ( row = row_num , column = sc , value = val_str )
c . font = f_bold
c . font = f_bold
c . alignment = Alignment ( horizontal = ' center ' , vertical = ' center ' )
c . alignment = Alignment ( horizontal = ' center ' , vertical = ' center ' )
c . border = Border (
c . border = Border (
left = med if i == 0 else thin ,
left = med if i == 0 else thin ,
right = med if is_last_col else thin ,
right = med if i == n - 1 else thin ,
top = tbl_bot_mid . top if is_last else tbl_mid_mid . top ,
top = tbl_bot_mid . top if is_last else tbl_mid_mid . top ,
bottom = tbl_bot_mid . bottom if is_last else tbl_mid_mid . bottom ,
bottom = tbl_bot_mid . bottom if is_last else tbl_mid_mid . bottom ,
)
)
ws . merge_cells ( start_row = row_num , start_column = sc , end_row = row_num , end_column = sc + 1 )
ws . merge_cells ( start_row = row_num , start_column = sc ,
end_row = row_num , end_column = sc + 1 )
write_stat_dynamic ( 32 , " LAmax " ,
write_stat_dynamic ( tbl_data_row , " LAmax " ,
[ _max ( [ v [ 0 ] for v in rows ] ) for _ , rows in active_periods ] )
[ _max ( [ v [ 0 ] for v in rows ] ) for _ , rows in active_periods ] )
write_stat_dynamic ( 33 , " LA01 Average " ,
write_stat_dynamic ( tbl_data_row + 1 , " LA01 Average " ,
[ _avg ( [ v [ 1 ] for v in rows ] ) for _ , rows in active_periods ] )
[ _avg ( [ v [ 1 ] for v in rows ] ) for _ , rows in active_periods ] )
write_stat_dynamic ( 34 , " LA10 Average " ,
write_stat_dynamic ( tbl_data_row + 2 , " LA10 Average " ,
[ _avg ( [ v [ 2 ] for v in rows ] ) for _ , rows in active_periods ] , is_last = True )
[ _avg ( [ v [ 2 ] for v in rows ] ) for _ , rows in active_periods ] , is_last = True )
ws . sheet_properties . pageSetUpPr = PageSetupProperties ( fitToPage = False )
ws . sheet_properties . pageSetUpPr = PageSetupProperties ( fitToPage = False )
@@ -3798,7 +3865,7 @@ async def generate_combined_from_preview(
}
}
def _build_summary_sheet ( wb , day_label , project_name , loc_summaries ) :
def _build_summary_sheet ( wb , day_label , project_name , loc_summaries ) :
summary_ws = wb . create_sheet ( title = " Summary " , index = 0 )
summary_ws = wb . create_sheet ( title = " Summary " )
summary_ws [ ' A1 ' ] = f " { report_title } - { project_name } - { day_label } "
summary_ws [ ' A1 ' ] = f " { report_title } - { project_name } - { day_label } "
summary_ws [ ' A1 ' ] . font = f_title
summary_ws [ ' A1 ' ] . font = f_title
summary_ws . merge_cells ( ' A1:E1 ' )
summary_ws . merge_cells ( ' A1:E1 ' )
@@ -3867,7 +3934,7 @@ async def generate_combined_from_preview(
# Build a clean filename from label or location+date
# Build a clean filename from label or location+date
label_clean = session_label or loc_name
label_clean = session_label or loc_name
label_clean = " " . join ( c for c in label_clean if c . isalnum ( ) or c in ( ' ' , ' - ' , ' _ ' , ' / ' ) ) . strip ( ) . replace ( ' ' , ' _ ' ) . replace ( ' / ' , ' - ' )
label_clean = " " . join ( c for c in label_clean if c . isalnum ( ) or c in ( ' ' , ' - ' , ' _ ' , ' / ' ) ) . strip ( ) . replace ( ' ' , ' _ ' ) . replace ( ' / ' , ' - ' )
xlsx_name = f " { project_name _clean} _ { label _clean} _report.xlsx "
xlsx_name = f " { label _clean} _ { project_name _clean} _report.xlsx "
zf . writestr ( xlsx_name , xlsx_buf . read ( ) )
zf . writestr ( xlsx_name , xlsx_buf . read ( ) )
zip_buffer . seek ( 0 )
zip_buffer . seek ( 0 )