Center machines transitions#
The status of a center data is managed with a state machine. The state is persisted into the center table of the central gongUsers database, using an abstract model and a database persistent model. Here are the transitions processes used by the state machines
import json
import asyncio
from myFasthtml import *
from datetime import datetime, timedelta, timezone
from pathlib import Path
from zoneinfo import ZoneInfo
import libs.utils as utils
import libs.send2pi as send2pi
pending_tasks = {}
<<user-transitions>>
<<workflow-supervisor>>
<<system-transitions>>
The workflow supervisor#
def register_task(center: str, task: asyncio.Task):
pending_tasks[center] = task
print(pending_tasks)
async def check_and_advance(center: str, csms):
sm = csms[center]
task = pending_tasks.get(center)
if not task:
return
if not task.done():
return
result = task.result()
del pending_tasks[center]
sm.model.last_result = result
if "success" in result:
sm.progress()
return
else:
sm.problem()
return
async def check_center_free(state_mach, center_lock, this_user):
async with center_lock:
center_is_free = False
tnow = datetime.now(timezone.utc)
start_state_time = state_mach.model.get_start_time()
past = datetime.fromisoformat(start_state_time.replace("Z", "+00:00"))
delta = (tnow-past).total_seconds()
time_to_go = utils.Globals.INITIAL_COUNTDOWN
if state_mach.configuration[0].id == "edit" and delta > utils.Globals.INITIAL_COUNTDOWN:
state_mach.abandon_changes()
if state_mach.configuration[0].id == "free":
state_mach.model.user = this_user
state_mach.progress()
center_is_free = True
return center_is_free, time_to_go
def abandon_edit(session, csms):
this_center = session["center"]
session["center"] = ""
if this_center in csms and csms[this_center].configuration[0].id == "edit":
csms[this_center].abandon_changes()
csms[this_center].model.user = None
elif utils.dev_comp_or_user(session):
csms[this_center].force_to_free()
return Redirect('/dashboard')
State machines creation and access#
1 state machine per center. To create them: csms = create_center_state_machines() To access the sm for one center: sm = csms["Mahi"]
async def wait_until(model, until_hour, minutes=0):
center_tz = ZoneInfo(model.centers[model.center_name].timezone)
if model.center_name == utils.Globals.TEST_CENTER:
delay = utils.Globals.SHORT_DELAY
else:
now_center = datetime.now(center_tz)
next_event = now_center.replace(hour=until_hour, minute=minutes)
if now_center.hour >= until_hour and now_center.minute >= minutes:
# If it's already past the target time, schedule for tomorrow
next_event += timedelta(days=1)
delay = (next_event - now_center).total_seconds()
await asyncio.sleep(delay)
return {"success": f"Date/time now at center: {datetime.now(center_tz).isoformat()}"}
async def transfer_new_db(model):
# FIXME try 3 times at 10 min. intervals
localDBPath = Path(utils.get_db_path())
port = model.centers[model.center_name].routing_port
try:
if model.center_name == utils.Globals.TEST_CENTER:
remoteDBPath = Path(utils.Globals.PI_FOLDER_TEST)
db_file = utils.Globals.PI_FILE_TEST
else:
# FIXME after discussion with Ivan
remoteDBPath = Path("/home/pi/prod")
db_file = model.centers[model.center_name].save_db_file
ssh_session = await asyncio.to_thread(send2pi.session_connect,port)
localDBFilePath = localDBPath / db_file
await asyncio.to_thread(send2pi.file_upload, localDBFilePath, remoteDBPath, ssh_session)
except Exception as e:
return {"error": f"ssh transfer production db failed: {e}"}
else:
center_tz = ZoneInfo(model.centers[model.center_name].timezone)
return {"success": f"production db sent at {datetime.now(center_tz).isoformat()} center time"}
async def get_version_prod(model):
# FIXME try 3 times at 10 min. intervals
localDBPath = Path(utils.get_db_path())
port = model.centers[model.center_name].routing_port
try:
if model.center_name == utils.Globals.TEST_CENTER:
folder = utils.Globals.PI_FOLDER_TEST
file = utils.Globals.PI_FILE_TEST
remoteDBFilePath = Path(folder + "/" + file)
else:
# FIXME after discussion with Ivan
remoteDBFilePath = Path("/home/pi/prod")
ssh_session = await asyncio.to_thread(send2pi.session_connect,port)
await asyncio.to_thread(send2pi.file_download, remoteDBFilePath, localDBPath, ssh_session)
file_transfered = localDBPath / file
with open(file_transfered, 'r') as f:
data = json.load(f)
print(data)
model.version_prod = data["date"]
except Exception as e:
return {"error": f"ssh get production version failed: {e}"}
else:
return {"success": f"production version is {data["date"]}"}
async def check_version_prod(model):
# FIXME after discussion with Ivan
now_at_center = datetime.now(ZoneInfo(model.centers[model.center_name].timezone))
date_at_center = now_at_center.date().isoformat()
if date_at_center == model.version_prod:
return {"success": f"production version OK at center date: {date_at_center}"}
else:
return {"error": f"production version is NOT OK with center date: {date_at_center}"}