Replies: 3 comments 5 replies
-
I've asked the question also on stackoverflow including example code. I will report here if I get help there and find a solution. |
Beta Was this translation helpful? Give feedback.
-
Thanks @david-shu for your suggestion. I've never used Celery before and are trying to wrap my head around and understand it. HWcomms.py import flet as ft
from fastapi import FastAPI
import flet.fastapi as flet_fastapi
from datetime import datetime
import threading
import json
import uvicorn
from celery import Celery
capp = Celery('celery_app', broker='pyamqp://guest@localhost//')
@capp.task
def mycapp():
print('executing celery_app')
class PeriodicThread(object):
def __init__(self, callback=None, period=1, name=None, *args, **kwargs):
self.name = name
self.args = args
self.kwargs = kwargs
self.callback = callback
self.period = period
self.stop = False
self.current_timer = None
self.schedule_lock = threading.Lock()
def set_period(self, period):
self.period = period
def start(self):
self.stop = False
self.schedule_timer()
def run(self):
if self.callback is not None:
self.callback(*self.args, **self.kwargs)
def _run(self):
try:
self.run()
except Exception as e:
print(e)
pass
finally:
with self.schedule_lock:
if not self.stop:
self.schedule_timer()
def schedule_timer(self):
self.current_timer = threading.Timer(self.period, self._run, *self.args, **self.kwargs)
if self.name:
self.current_timer.name = self.name
self.current_timer.start()
def cancel(self):
with self.schedule_lock:
self.stop = True
if self.current_timer is not None:
self.current_timer.cancel()
def join(self):
self.current_timer.join()
class hardware_interface():
def __init__(self):
self.lock = threading.Lock()
self.cnt = 0
self.timer = PeriodicThread(callback=self.cb_timer, period=1, name="timer_periodic_update")
self.timer.start()
def send_command(self, cmd, cmd_data):
print(f'Instrument: executing command "{cmd} {cmd_data}"')
def read_oneoff_data(self):
data = self.cnt
self.cnt += 1
print(f'Instrument: data requested, replied {data}')
return data
def cb_timer(self):
data = datetime.now()
print(f'Instrument: data requested, replied {data}')
#TODO: Send broadcast message with the data to all app sessions
# If this would be a flet page object I could use
# self.pubsub.send_all(data)
mycapp.delay()
class CeleryAppMonitor( threading.Thread ):
def __init__( self, capp, pubsub=None ):
super().__init__()
self.capp = capp
self.pubsub = pubsub
def run( self ):
def on_task_succeeded( event ):
state = capp.events.State()
state.event( event )
eid = event.get( 'uuid' )
task = state.tasks.get( eid )
if task:
info = task.info()
if self.pubsub:
self.pubsub.send_all( json.dumps( info ) )
capp=self.capp
while 1:
with capp.connection() as conn:
recv = capp.events.Receiver( conn, handlers={ 'task-succeeded':on_task_succeeded})
recv.capture( )
class App(ft.UserControl):
monitor = None
def __new__( cls, page ):
if not cls.monitor:
cls.monitor = CeleryAppMonitor( capp, pubsub=page.pubsub )
cls.monitor.start()
return super().__new__(cls)
def build(self):
self.tf_cmd = ft.TextField(label="CMD")
self.tf_cmd_data = ft.TextField(label="CMD Data")
self.tf_req_data = ft.TextField(label="Requested Data")
self.tf_per_data = ft.TextField(label="Periodic Data")
page = ft.Column(
[
ft.Row(
[
self.tf_cmd,
self.tf_cmd_data,
ft.ElevatedButton(text="Send Command", on_click=self.btn_send_click)
]
),
ft.Row(
[
self.tf_req_data,
ft.ElevatedButton(text="Request Data", on_click=self.btn_reqdata_click)
]
),
ft.Row(
[
self.tf_per_data
]
)
]
)
self.page.pubsub.subscribe(self.on_HW_periodic_data_msg)
return page
def btn_send_click(self, e):
g_hw_if.lock.acquire(timeout=3)
g_hw_if.send_command(self.tf_cmd.value, self.tf_cmd_data.value)
g_hw_if.lock.release()
def btn_reqdata_click(self, e):
g_hw_if.lock.acquire(timeout=3)
self.tf_req_data.value = g_hw_if.read_oneoff_data()
g_hw_if.lock.release()
self.update()
def on_HW_periodic_data_msg(self, msg):
self.tf_per_data.value = msg
self.update()
async def session_main(page: ft.Page):
app = App(page)
page.add(app)
def main():
global g_hw_if
g_hw_if = hardware_interface()
#ft.app(target=session_main, view=ft.WEB_BROWSER, port=45678)
app = FastAPI()
fletapp = flet_fastapi.app( session_main )
app.mount('/', fletapp )
uvicorn.run( app )
if __name__ == '__main__':
main() It runs without errors. I installed RabbitMQ as the broker. I have several questions
|
Beta Was this translation helpful? Give feedback.
-
I found a solution for my problem which can be found in my original post at the top. It works as intended. There is only one problem left and maybe somebody here knows how to fix it. ft.app(target=session_main, view=ft.WEB_BROWSER, port=45678)
#ft.app(target=session_main) to #ft.app(target=session_main, view=ft.WEB_BROWSER, port=45678)
ft.app(target=session_main) then I get the following error when I close the GUI window
|
Beta Was this translation helpful? Give feedback.
-
Question
I'm building a flet application which will be used to command a hardware device and display data received from the device.
![image](https://private-user-images.githubusercontent.com/54610151/308457514-a3fd10a5-a6b9-4865-a0d2-1022ceb825fc.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjA0MjQ4ODQsIm5iZiI6MTcyMDQyNDU4NCwicGF0aCI6Ii81NDYxMDE1MS8zMDg0NTc1MTQtYTNmZDEwYTUtYTZiOS00ODY1LWEwZDItMTAyMmNlYjgyNWZjLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDglMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzA4VDA3NDMwNFomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPThkM2JiMDM2ZDY1ZTk5MDA0NjBmNmQxYjAyNzExZDZiZTU0OWFiZDFmZTEzZTMxN2RkNzE4Y2MxZTZiOWE5ZjYmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.d0mccIguctHEwPZrCLBp3_8SB8RoMu0E5GiE8Qecomk)
In addition to a desktop application I would also like to give multiple users access using a web app with multiple sessions.
I have a class instance in a global scope which interfaces to the hardware. For commanding (sending data to the hardware) the sessions call the write function of the class using a lock for thread safety (blue arrows). For the opposite direction (receiving data from the hardware) I currently use a similar mechanism where a click event in the sessions calls a read function (again using a lock) which then requests data from the hardware and returns it to the sessions with the function return value.
Now I want to implement periodic data requests from the hardware. I don't want each session to periodically read data using the mechanism described above because that would send 3 data requests to the hardware, receive 3 replies and return them to the corresponding sessions (for 3 sessions as in the diagram). Instead I would like to have a periodic timer in my interface class (a periodic thread) which would send 1 request to the hardware, receive the reply and then broadcast the data to all sessions using pubsub (red arrows).
Is there a way I can use pubsub outside of the flet application (in the timer thread) to send a broadcast message to the sessions? Or could there be a headless session, a page which is not showing anything and is not starting a web server in which I can embed my interface class and use its page.pubsub.send_all() function to do the broadcast?
If possible I would like the same code to work as a desktop app.
Edit
Here is a minimalist example to help understand what I'm looking for
Code sample
If I run this code a tab with the web app will open in my browser. I open another tab and enter the same URL as in the first one to start a new session of the web app. Now I can send a command to the simulated instrument (class hardware_interface) by entering e.g. "loglevel" in the CMD textbox, "3" in the CMD Data textbox and then clicking the "Send command" button in either of the tabs. Similarly each session can request data from the simulated instrument by clicking the "Request Data" button. The class hardware_interface has a timer callback function which periodically requests data from the instrument. What I'm looking for is a way to send the data I get in the callback to all sessions of the flet app (at the #TODO comment). If the hardware_interface class would be a flet page then I could use self.pubsub.send_all(data) but it isn't.
Edit 2: My Solution
Triggered by the idea from @david-shu I came up with the following solution. For each flet session there is a receiver task running in a separate thread. It integrates a queue which is registered centrally in a sessions object. The timer thread which requests data from the hardware puts the data into the queues of all active sessions using the sessions object.
Beta Was this translation helpful? Give feedback.
All reactions