Utilities#

  • checking if the program runs on a development machine
  • sending an email
  • displaying the content of a markdown file
  • route or function not implemented yet
import socket
import tempfile
import calendar
from zoneinfo import ZoneInfo
from myFasthtml import *
# import resend # moved to "myFasthtml.py"
# import markdown2 # moved to "myFasthtml.py"
import os
from datetime import datetime, date, timedelta

temp_paths = {}

class Globals:
    INITIAL_COUNTDOWN = 4000 # seconds before auto-abandoning an edit session, set in planning_page and used in JS_CLIENT_TIMER
    SUBDIR_TEMP = "temp" # subdir of get_db_path() for temp files
    MONTHS_TO_FETCH = 12 # when fetching dhamma courses from dhamma.org, how many months to fetch starting from current month
    DAYS_TO_FETCH = 0 # when fetching dharma courses from dhamma.org, how many extra days to fetch after the last day of the last month (to catch late announcements)
    SHORT_DELAY = 3 # seconds: waiting time before uploading file to Pi IN DEV MODE
    PI_FOLDER_TEST = "/home/pi/test"  # PI folder used for ssh get/put tests
    PI_FILE_TEST = "test22.json"  # file used for ssh get/put tests with PI
    DEV_USER = "spegoff@authentica.eu" # IN PROD: can force state to free AND TEMPORALY SAVE CHANGES
    TEST_CENTER = "Testx" # used for testing in DEV mode

    @classmethod
    def get(cls, name, default=None):
        return getattr(cls, name, default)

<<dummy>>
<<isdev-computer>>
<<istest-db>>
<<send-email>>
<<display-markdown>>
<<temp-files>>
<<plus-months-days>>
<<feedback-to-user>>

Dummy start#

def dummy():
    return "dummy"

Check if the current computer is a development machine#

This function checks if the program runs on one of a predefined list of development machines. This is useful to determine whether to use a local or remote base URL for building the registation link.

def isa_dev_computer():
    DEV_COMPUTERS = ["serge-asrock","DESKTOP-UIPS8J2","serge-framework", "serge-bosgame", "Solaris" ]
    hostname = socket.gethostname()
    return hostname in DEV_COMPUTERS

def get_db_path():
    if isa_dev_computer():
        root = ""
    elif os.environ.get('Github_CI') == 'true': # Github CI actions
        root = ""
    else:   # Railway production permanent storage
        root = os.environ.get('RAILWAY_VOLUME_MOUNT_PATH',"None") + "/"
    return root + "data/"

def dev_comp_or_user(session):
    return isa_dev_computer() or session["auth"] == Globals.DEV_USER

Check if current db is a teporary db in memory#

def isa_db_test(db):
    return 'Database <apsw.Connection object ""' in str(db)

Send email#

via Google smtp#

We will need to create an App Password in your Google Account settings as we have 2-Step Verification enabled. And we set up environmemt variables 'GOOGLE_SMTP_USER' and 'GOOGLE_SMTP_PASS' .

via resend.com#

As railway does not give smtp access to Hobby plan, we are using instead the resend API: https://resend.com/docs/send-with-python . And we set up the environmemt variable 'RESEND_API_KEY' .

Example#

Using: send_email(subject, body, recipients)

  • subject = "Hello from Python"
  • body = "This is a test email sent from Python."
  • recipients = ["recipient1@gmail.com"] : list of recipients
def send_email(subject, body, recipients):
    # using resend
    sender = "spegoff@authentica.eu" 
    resend.api_key = os.environ['RESEND_API_KEY']
    params: resend.Emails.SendParams = {
        "from": sender,
        "to": recipients,
        "subject": subject,
        "text": body,
    }
    email = resend.Emails.send(params)
    print(f'Message sent: {email} to {recipients}')

Managing temp files#

def create_temp_path(center):
    temp_dir = get_db_path() + Globals.SUBDIR_TEMP 
    with tempfile.NamedTemporaryFile(mode='w', delete=False, dir=temp_dir) as tmp_file:
        temp_paths[center] = tmp_file.name

def delete_temp_path(center):
    if center in temp_paths and os.path.exists(temp_paths[center]):
        os.unlink(temp_paths[center])
    temp_paths[center] = ""

def wipe_all_temps():
    temp_dir =  get_db_path() + Globals.SUBDIR_TEMP
    for filename in os.listdir(temp_dir):
        file_path = os.path.join(temp_dir, filename)
        if os.path.isfile(file_path):
            os.remove(file_path)

Displaying the content of a markdown file#

This function reads a markdown file name, without the extension '.md', then finds the file in the 'md-text' directory and converts it to HTML using the markdown2 library, which is then returned as a NotStr object for rendering in the app.

def display_markdown(file_name:str):
    file_path = os.path.join('md-text', f"{file_name}.md")
    if os.path.exists(file_path):
        with open(file_path, "r") as f:
            html_content = markdown2.markdown(f.read())
        return NotStr(html_content)
    else:
        return f"!!! NO markdown file {file_name}.md IN md-text folder !!!"

Add months and days to an ISO date#

Return an ISO date string num_months and num_days after date_str (YYYY-MM-DD). Uses divmod to compute year/month rollover and preserves end-of-month.

def add_months_days(date_str, num_months, num_days):
    dt = datetime.strptime(date_str, "%Y-%m-%d").date()
    # total months since year 0 (make month zero-based)
    total = dt.year * 12 + (dt.month - 1) + num_months
    new_year, new_month0 = divmod(total, 12)
    new_month = new_month0 + 1
    last_day = calendar.monthrange(new_year, new_month)[1]
    new_day = min(dt.day, last_day)
    result_date = date(new_year, new_month, new_day)
    # Add num_days to the result
    result_date += timedelta(days=num_days)
    return result_date.isoformat()

def short_iso(date_time: datetime, timezon="UTC"):
    return date_time.astimezone(ZoneInfo(timezon)).strftime('%Y-%m-%dT%H:%M:%S%z')

def seconds_to_hours_minutes(total_seconds):
    hours = total_seconds // 3600
    remaining_minutes = (total_seconds % 3600) // 60
    return hours, remaining_minutes

Success/error message#

def feed_text(params):
    # query_params = dict(request.query_params)
    # Handle success and error messages
    success_messages = {
        'login_code_sent': "A code to sign in has been sent to your email. Please check your inbox and enter the code here below. The code will expire in 15 minutes.",
        'user_added': 'User added successfully!',
        'center_added': 'Center added successfully!',
        'planner_added': 'Planner association adSded successfully!',
        'user_deleted': 'User deleted successfully!',
        'center_deleted': 'Center and associated database deleted successfully!',
        'planner_deleted': 'Planner association deleted successfully!',
        'new_course': 'New line added. Please review the plan and submit changes to update the center gong.',
        'line_deleted': 'Line deleted. Please review the plan and submit changes to update the center gong.'
    }
    error_messages = {
        'missing_email':'Email is required.',
        'not_registered':f'Email "{params.get("email", "")}" is not registered, try again or send a message to xxx@xxx.xx to get registered',
        'invalid_or_expired_code': 'The code is invalid ot expired. Check if if it is correct. Or refresh the page to ask for another code.',
        'missing_fields': 'Please fill in all required fields.',
        'plan_not_ok': 'Correct the planning errors before saving this plan',
        'user_exists': 'User with this email already exists.',
        'center_exists': 'Center with this name already exists.',
        'planner_exists': 'This planner association already exists.',
        'user_not_found': 'User not found.',
        'center_not_found': 'Center not found.',
        'invalid_role': 'Invalid role selected.',
        'db_error': f'Database error occurred: {params.get("etext", "")}. Please contact the program support.',
        'db_file_exists': 'Database file with this name already exists.',
        'template_not_found': 'Template database (mahi.db) not found.',
        'user_has_planners': f'Cannot delete user. User is still associated with centers: {params.get("centers", "")}. Please remove all planner associations first.',
        'center_has_planners': f'Cannot delete center. Center is still associated with users: {params.get("users", "")}. Please remove all planner associations first.',
        'last_planner_for_center': f'Cannot delete planner. This is the last planner for center: "{params.get("center", "")}". Each center must have at least one planner.'
    }
    message = ""
    result = ""
    if 'success' in params:
        message = success_messages.get(params['success'], 'Operation completed successfully!')
        result = "success"
    elif 'error' in params:
        message = error_messages.get(params['error'], 'An error occurred.')
        result = "error"
    return {"mess": message, "res": result}

def feedback_to_user(params):
    # query_params = dict(request.query_params)
    # Handle success and error messages
    mess_dict = feed_text(params)
    message_div = Div(P(""))
    if mess_dict["res"] == 'success':
        message_div = Div(
            Div(P(mess_dict['mess']), style="color: #daecdaff; background: #187449ff; padding: 10px; border-radius: 5px; margin: 10px 0; border: 1px solid #198754; font-weight: 500;")
        )
    elif mess_dict["res"] == 'error':
        message_div = Div(
            Div(P(mess_dict['mess']), style="color: #f8d7da; background: #842029; padding: 10px; border-radius: 5px; margin: 10px 0; border: 1px solid #dc3545; font-weight: 500;")
        )
    return message_div