Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"name": "Python 3",
// Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile
"image": "mcr.microsoft.com/devcontainers/python:1-3.11-bookworm",
"customizations": {
"codespaces": {
"openFiles": [
"README.md",
"streamlit_app.py"
]
},
"vscode": {
"settings": {},
"extensions": [
"ms-python.python",
"ms-python.vscode-pylance"
]
}
},
"updateContentCommand": "[ -f packages.txt ] && sudo apt update && sudo apt upgrade -y && sudo xargs apt install -y <packages.txt; [ -f requirements.txt ] && pip3 install --user -r requirements.txt; pip3 install --user streamlit; echo '✅ Packages installed and Requirements met'",
"postAttachCommand": {
"server": "streamlit run streamlit_app.py --server.enableCORS false --server.enableXsrfProtection false"
},
"portsAttributes": {
"8501": {
"label": "Application",
"onAutoForward": "openPreview"
}
},
"forwardPorts": [
8501
]
}
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@ __pycache__/
.pytest_cache/

# Streamlit secrets
.streamlit/secrets.toml
.streamlit/secrets.toml

# Local Google Cloud service account keys
sipa-adv-c-bouncing-penguin-*.json
65 changes: 64 additions & 1 deletion load_data_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

PROJECT_ID = "sipa-adv-c-bouncing-penguin"
DATASET_ID = "mta_data"
API_ROW_LIMIT = 50000

SCOPES = [
"https://www.googleapis.com/auth/bigquery",
Expand All @@ -28,6 +29,9 @@ class DataSource:
order_column: str
date_columns: tuple[str, ...]
numeric_columns: tuple[str, ...]
required_columns: tuple[str, ...]
minimum_rows: int
minimum_date: str


DATA_SOURCES = {
Expand All @@ -53,6 +57,15 @@ class DataSource:
"staten_island_railway_total_estimated_ridership",
"staten_island_railway_pct_of_comparable_pre_pandemic_day",
),
required_columns=(
"date",
"subways_total_estimated_ridership",
"subways_pct_of_comparable_pre_pandemic_day",
"buses_total_estimated_ridership",
"buses_pct_of_comparable_pre_pandemic_day",
),
minimum_rows=1000,
minimum_date="2020-03-01",
),
"covid": DataSource(
name="NYC COVID cases",
Expand All @@ -72,6 +85,9 @@ class DataSource:
"qn_case_count",
"si_case_count",
),
required_columns=("date_of_interest", "case_count"),
minimum_rows=1000,
minimum_date="2020-03-01",
),
}

Expand Down Expand Up @@ -129,13 +145,58 @@ def ensure_dataset_exists(credentials) -> None:
client.create_dataset(dataset, exists_ok=True)


def validate_source_frame(df: pd.DataFrame, source: DataSource) -> None:
"""Fail before replacing BigQuery when a source pull looks incomplete."""
missing = [column for column in source.required_columns if column not in df.columns]
if missing:
raise RuntimeError(f"{source.name} is missing required columns: {missing}")

if len(df) < source.minimum_rows:
raise RuntimeError(
f"{source.name} returned only {len(df)} rows; "
f"expected at least {source.minimum_rows}."
)

if len(df) >= API_ROW_LIMIT:
raise RuntimeError(
f"{source.name} hit the {API_ROW_LIMIT:,}-row API limit. "
"Add pagination before replacing the BigQuery table."
)

date_column = source.date_columns[0]
if df[date_column].isna().any():
raise RuntimeError(f"{source.name} has null values in {date_column}.")

expected_min = pd.Timestamp(source.minimum_date)
actual_min = df[date_column].min()
if actual_min > expected_min:
raise RuntimeError(
f"{source.name} starts at {actual_min.date()}, "
f"but should include data from {expected_min.date()}."
)

actual_max = df[date_column].max()
if actual_max <= actual_min:
raise RuntimeError(f"{source.name} does not cover a usable date range.")

bad_numeric_columns = [
column
for column in source.required_columns
if column in source.numeric_columns and df[column].isna().all()
]
if bad_numeric_columns:
raise RuntimeError(
f"{source.name} has all-null required numeric columns: {bad_numeric_columns}"
)


def fetch_source(source: DataSource) -> pd.DataFrame:
"""Pull a dataset from an NYC Open Data endpoint."""
print(f"Fetching {source.name} from {source.api_url} ...")
sys.stdout.flush()
response = requests.get(
source.api_url,
params={"$limit": 50000, "$order": source.order_column},
params={"$limit": API_ROW_LIMIT, "$order": source.order_column},
timeout=60,
)
response.raise_for_status()
Expand All @@ -155,6 +216,8 @@ def fetch_source(source: DataSource) -> pd.DataFrame:
if column in df.columns:
df[column] = pd.to_numeric(df[column], errors="coerce")

validate_source_frame(df, source)

date_column = source.date_columns[0]
print(
"Fetched "
Expand Down
61 changes: 32 additions & 29 deletions pages/1_MTA_Ridership.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def get_mta_page_columns(selected_services: list[str]) -> tuple[str, ...]:

def main() -> None:
st.title("MTA Daily Ridership Analysis")
st.caption("Default view loads only the latest 180 days for a faster deployed app.")

selected_services = st.multiselect(
"Select services",
Expand All @@ -35,35 +36,37 @@ def main() -> None:
key="mta_page_time_window_v1",
)

try:
if time_window == "Recent 180 days":
df = load_mta_data(columns=get_mta_page_columns(selected_services), lookback_days=180)
elif time_window == "Recent 365 days":
df = load_mta_data(columns=get_mta_page_columns(selected_services), lookback_days=365)
elif time_window == "Full history":
df = load_mta_data(columns=get_mta_page_columns(selected_services))
else:
today = date.today()
default_start = today - timedelta(days=180)
selected_dates = st.date_input(
"Date range",
value=(default_start, today),
min_value=MTA_MIN_DATE,
max_value=today,
key="mta_page_date_range_v3",
)
start_date = default_start
end_date = today
if len(selected_dates) == 2:
start_date, end_date = selected_dates
df = load_mta_data(
columns=get_mta_page_columns(selected_services),
start_date=str(start_date),
end_date=str(end_date),
)
except Exception as exc:
st.error(f"Failed to load MTA data from BigQuery: {exc}")
return
requested_columns = get_mta_page_columns(selected_services)
with st.spinner("Loading MTA data from BigQuery..."):
try:
if time_window == "Recent 180 days":
df = load_mta_data(columns=requested_columns, lookback_days=180)
elif time_window == "Recent 365 days":
df = load_mta_data(columns=requested_columns, lookback_days=365)
elif time_window == "Full history":
df = load_mta_data(columns=requested_columns)
else:
today = date.today()
default_start = today - timedelta(days=180)
selected_dates = st.date_input(
"Date range",
value=(default_start, today),
min_value=MTA_MIN_DATE,
max_value=today,
key="mta_page_date_range_v3",
)
start_date = default_start
end_date = today
if isinstance(selected_dates, tuple) and len(selected_dates) == 2:
start_date, end_date = selected_dates
df = load_mta_data(
columns=requested_columns,
start_date=str(start_date),
end_date=str(end_date),
)
except Exception as exc:
st.error(f"Failed to load MTA data from BigQuery: {exc}")
return

st.caption(
"Source: BigQuery table `mta_data.daily_ridership` refreshed with `load_data_to_bq.py`."
Expand Down
52 changes: 27 additions & 25 deletions pages/2_Second_Dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def main() -> None:
st.markdown(
"This page uses BigQuery-hosted COVID case data to contextualize changes in MTA ridership."
)
st.caption("Default view loads only the latest 180 days for a faster deployed app.")

time_window = st.radio(
"Time window",
Expand All @@ -20,31 +21,32 @@ def main() -> None:
key="covid_page_time_window_v1",
)

try:
if time_window == "Recent 180 days":
df = load_covid_data(lookback_days=180)
elif time_window == "Recent 365 days":
df = load_covid_data(lookback_days=365)
elif time_window == "Full history":
df = load_covid_data()
else:
today = date.today()
default_start = today - timedelta(days=180)
selected_dates = st.date_input(
"Date range",
value=(default_start, today),
min_value=COVID_MIN_DATE,
max_value=today,
key="covid_page_date_range_v3",
)
start_date = default_start
end_date = today
if len(selected_dates) == 2:
start_date, end_date = selected_dates
df = load_covid_data(start_date=str(start_date), end_date=str(end_date))
except Exception as exc:
st.error(f"Failed to load COVID data from BigQuery: {exc}")
return
with st.spinner("Loading COVID data from BigQuery..."):
try:
if time_window == "Recent 180 days":
df = load_covid_data(lookback_days=180)
elif time_window == "Recent 365 days":
df = load_covid_data(lookback_days=365)
elif time_window == "Full history":
df = load_covid_data()
else:
today = date.today()
default_start = today - timedelta(days=180)
selected_dates = st.date_input(
"Date range",
value=(default_start, today),
min_value=COVID_MIN_DATE,
max_value=today,
key="covid_page_date_range_v3",
)
start_date = default_start
end_date = today
if isinstance(selected_dates, tuple) and len(selected_dates) == 2:
start_date, end_date = selected_dates
df = load_covid_data(start_date=str(start_date), end_date=str(end_date))
except Exception as exc:
st.error(f"Failed to load COVID data from BigQuery: {exc}")
return

st.caption("Source: BigQuery table `mta_data.nyc_covid_cases`.")
st.write(
Expand Down
Loading
Loading