Get schedule from dhamma.org and merge into center schedule#
Example usage:#
fetch_dhamma_courses("Mahi", 6, 0) fetch_dhamma_courses("Pajjota", 6, 0)
from pathlib import Path
# import pandas as pd # moved to "myFasthtml.py"
# import aiohttp # moved to "myFasthtml.py"
import json
import asyncio
from tabulate import tabulate
from datetime import date
from myFasthtml import *
from libs.utils import add_months_days
from libs.plancheck import get_dhamm_org_types_list, coming_center_courses
<<fetch-api>>
<<period-type>>
<<deduplicate>>
<<fetch-courses>>
Get the courses from www.dhamma.org for a specific center from date_start until date_end, keep only the relevant fields for courses inside the center.
Call to this async function from sync code with: courses = asyncio.run(fetch_courses_from_dhamma(location, date_start, date_end))
async def fetch_courses_from_dhamma(location, date_start, date_end):
url = "https://www.dhamma.org/en-US/courses/do_search"
headers = {"User-Agent": "entan-mkdocs-fetcher/1.0"}
all_courses = []
page = 1
async with aiohttp.ClientSession(headers=headers) as session:
while True:
data = {
"current_state": "OldStudents",
"regions[]": location,
"daterange": f"{date_start} - {date_end}",
"page": str(page),
}
print(f"Fetching courses Dhamma {location} - Page {page}...")
try:
async with session.post(url, data=data, timeout=15) as resp:
resp.raise_for_status()
payload = await resp.json()
except aiohttp.ClientError as e:
print("Request error:", e)
return []
except asyncio.TimeoutError as e:
print("Request timeout:", e)
return []
except ValueError as e:
# JSON decode error
print("Invalid JSON:", e)
return []
courses = payload.get("courses", [])
all_courses.extend(courses)
total_pages = payload.get("pages", 0)
if page >= total_pages:
break
page += 1
extracted = [
{
"course_start_date": c.get("course_start_date"),
"course_end_date": c.get("course_end_date"),
"raw_course_type": c.get("raw_course_type"),
"course_type_anchor": c.get("course_type_anchor"),
"course_type": c.get("course_type")
}
for c in all_courses
if c.get("location", {}).get("center_noncenter") != "noncenter"
]
return extracted
If anchor == "Other", find in other_dict the value of a key == course_type and return "UNKNOWN" if not found. if anchor != "Other"' find the dict in list_of_types where 'raw_course_type' matches anchor and return the value of 'period_type'.
def get_period_type(anchor, course_type: str, list_of_types, other_dict):
replacements = other_dict.get("replacements")
if replacements.get(anchor):
course_type_dict = replacements[anchor]
if course_type_dict.get("@ALL@"):
return course_type_dict.get("@ALL@")
cleaned_course_type_0 = ''.join(course_type.upper().split())
cleaned_course_type = ''.join(cleaned_course_type_0.split('-'))
for key, value in course_type_dict.items():
if key in cleaned_course_type:
return value
for item in list_of_types:
if anchor == item.get('raw_course_type'):
return item.get('period_type')
return anchor
Remove duplicates: if consecutive items have identical start_date and period_type, keep only one with source='BOTH'
def deduplicate(merged):
deduplicated = []
i = 0
while i < len(merged) -1:
current = merged[i]
if i + 1 < len(merged):
next_item = merged[i + 1]
if (current['start_date'] == next_item['start_date'] and
current['period_type'] == next_item['period_type'] and
current['end_date'] == next_item['end_date'] and
current['source'] != next_item['source']):
# Mark as BOTH and skip the next one
current['source'] = 'BOTH'
deduplicated.append(current)
i += 2 # skip next item
continue
deduplicated.append(current)
i += 1
return deduplicated
Get a merged list of courses from the current courses in the center db and the future courses in dhamma.org: - from the start of the current course/service until num_months plus num_days days in th future - course types mapped to the courses databases courses - with a source field "dhamma.org", "center db" or "BOTH"
def get_dhamma_courses_types(extracted, center_obj, list_of_types):
for course in extracted: ## [5]
if course['course_type_anchor'].endswith("OSC"):
course['course_type_anchor'] = course['course_type_anchor'][:-3].strip()
other_dict = json.loads(center_obj.other_course) ## [6.1]
periods_dhamma_org = [
{
"start_date": c.get("course_start_date"),
"end_date": c.get("course_end_date"),
"period_type": get_period_type(c.get("course_type_anchor"), c.get("course_type"), list_of_types, other_dict),
"source": "dhamma.org",
"course_type": c.get("course_type")
}
for c in extracted ## [7]
]
return periods_dhamma_org, other_dict
def check_within(deletion_check, this_row, other_row):
if other_row.get("period_type", "") != deletion_check:
return False
this_start = date.fromisoformat(this_row.get("start_date"))
other_start = date.fromisoformat(other_row.get("start_date"))
other_end = date.fromisoformat(other_row.get("end_date"))
if this_start >= other_start and this_start < other_end:
return True
return False
def clean_dhamma_courses(periods_dhamma_org, list_of_types, other_dict):
cleaned = []
default_type = next((x for x in list_of_types if x.get("tags") == "D"), {}).get('period_type',"") # Default type
delete_list = other_dict.get("delete", {})
for i, row in enumerate(periods_dhamma_org):
row_bef = cleaned[-1] if len(cleaned) > 0 else {}
row_aft = periods_dhamma_org[i+1] if i < len(periods_dhamma_org) - 1 else {}
deletion_check = delete_list.get(row["period_type"], "@TOKEEP@")
if row["period_type"] == default_type or deletion_check == "@ALL@":
continue
elif check_within(deletion_check, row, row_bef) or check_within(deletion_check, row, row_aft):
continue
else:
cleaned.append(row)
return cleaned
async def fetch_dhamma_courses(centers, center, num_months, num_days):
#db_central = get_central_db()
#centers = db_central.t.centers
#Center = centers.dataclass()
center_obj = centers[center]
list_of_types = get_dhamm_org_types_list()
periods_db_center, date_current_course = coming_center_courses(center_obj) ## [1-3]
dhamma_location = f"location_{center_obj.location}"
end_date = add_months_days(date_current_course, num_months, num_days)
extracted = await fetch_courses_from_dhamma(dhamma_location, date_current_course, end_date) ## [4]
#print(tabulate(extracted))
periods_dhamma_org, other_dict = get_dhamma_courses_types(extracted, center_obj, list_of_types) ## [5]
#print(tabulate(periods_dhamma_org))
cleaned_dhamma_org = clean_dhamma_courses(periods_dhamma_org, list_of_types, other_dict)
merged = periods_db_center + cleaned_dhamma_org
# Sort by end_date descending first then RE_SORT EVERYTHING by start_date ascending
# this keeps the first sorting order ok for identical start_dates
mer_sort = sorted(sorted(merged, key=lambda x: x['end_date'], reverse=True),
key=lambda x: x['start_date'])
deduplicated = deduplicate(mer_sort)
return deduplicated