Skip to content

Commit

Permalink
#1 Decoupled connection handling with streams
Browse files Browse the repository at this point in the history
  • Loading branch information
Balaji-Ganesh committed Jun 16, 2023
1 parent 87d32fc commit 99b66fa
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 31 deletions.
50 changes: 36 additions & 14 deletions middleware/communication/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
# Get the helpers..
from . import esp32


class ESP32Manager:
"""Manages all the communication related to the ESP32.
"""

def __init__(self, esp32IP: str, cam_port: int = 81, data_port: int = 82):
# set it to the assigned IP address to ESP32 when connected to WiFi.
esp32_ip: str = esp32IP
Expand All @@ -32,29 +34,38 @@ def __init__(self, esp32IP: str, cam_port: int = 81, data_port: int = 82):
@self.router.get("/")
async def read_root():
return {"Hello": "World"}
@self.router.get("/connection/{function}")
async def camera_feed_handler(function: str):

@self.router.get("/connection/{function:str}")
async def connections_handler(function: str):
if function == 'establish':
if self.cam_ws is None and self.data_ws is None:
# asyncio.create_task(esp32._connection_establisher(self))
await esp32._connection_establisher(self)
return {"message": "Connection to ESP32 established."}
task = asyncio.create_task(
esp32._connection_establisher(self))
success = await task
return {"message": "Connection to ESP32 established."} if success else {"message": "Failure in ESP32 connection establishment. Please check log."}
else:
return {"message": "Connections already established."}
elif function == 'terminate':
if self.cam_ws is not None and self.data_ws is not None:
# asyncio.create_task(esp32._connection_establisher(self))
await esp32._connection_terminater(self)
return {"message": "Connection to ESP32 terminated."}
task = asyncio.create_task(
esp32._connection_terminater(self))
success = await task
return {"message": "ES32 connections terminated successfully.."} if success else {"message": "Failure in termination of ESP32 connections. Please check log."}
else:
return {"message": "Connections already terminated."}

@self.router.get("/camera-feed/{function}")
else:
return {'error': 'Invalid function invoked.'}

@self.router.get("/camera-feed/{function:str}")
async def camera_feed_handler(function: str):
if function == 'start':
# check connection status..
if self.cam_ws is None:
return {'error': 'No connection established with ESP32. Please establish first.'}
# when connection already established..
if self.cam_task is None:
self.cam_task = asyncio.create_task(esp32._camera_client(self))
self.cam_task = asyncio.create_task(
esp32._camera_client(self))
return {"message": "Camera task started."}
else:
return {"message": "Camera task is already running."}
Expand All @@ -66,15 +77,21 @@ async def camera_feed_handler(function: str):
except asyncio.CancelledError:
pass
self.cam_task = None
#FIXME: The below one, is for testing purposes. Remove once done.
# FIXME: The below one, is for testing purposes. Remove once done.
cv2.destroyAllWindows()
return {"message": "Camera task stopped."}
else:
return {"message": "No camera task is currently running."}
else:
return {'error': 'Invalid function invoked.'}

@self.router.get("/collision-distance/{function}")
async def collision_dist_handler(function: str):
if function == 'start':
# check connection status..
if self.data_ws is None:
return {'error': 'No connection established with ESP32. Please establish first.'}
# when connection already established..
if self.data_task is None:
self.data_task = asyncio.create_task(
esp32._collision_dist_fetcher(self))
Expand All @@ -92,7 +109,9 @@ async def collision_dist_handler(function: str):
return {"message": "Collision-data task stopped."}
else:
return {"message": "No collision-data task is currently running."}

else:
return {'error': 'Invalid function invoked.'}


class ConnectionManager:
def __init__(self):
Expand All @@ -112,11 +131,14 @@ async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)


manager = ConnectionManager()


class WebManager:
"""Manages all the connections to the web-app.
"""

def __init__(self):
self.router = APIRouter()

Expand Down
42 changes: 25 additions & 17 deletions middleware/communication/esp32.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,63 +6,71 @@
# The below functions will become data members of ESP32Manager in __init__.py


async def _connection_establisher(self):
async def _connection_establisher(self) -> bool:
try:
self.cam_ws = await websockets.connect(self.camera_ws_url)
self.data_ws = await websockets.connect(self.data_ws_url)
self.data_ws = await websockets.connect(self.data_txrx_url)
return True
except Exception as e:
logging.error(
"[EXCEPTION] Connection establishment failed. Error: ", e)
finally:
logging.debug("Connection establishment done.")
return 'ERROR in connection establishment'
logging.debug("Connection establishment initiated.")
return False


async def _connection_terminater(self):
async def _connection_terminater(self) -> bool:
try:
# Close the connection
await self.cam_ws.close()
await self.data_ws.close()
# Empty the connection holders
self.cam_ws, self.data_ws = None, None
return True
except Exception as e:
logging.error(
"[EXCEPTION] Connection termination failed. Error: ", e)
finally:
logging.debug("Connection termination done.")
return 'ERROR in connection termination'
logging.debug("Connection termination initiated.")
return False


async def _camera_client(self):
async def _camera_client(self) -> bool:
try:
# self.cam_ws = await websockets.connect(self.camera_ws_url)
while True:
msg = await self.cam_ws.recv()
# even try with msg.data
npimg = np.array(bytearray(msg), dtype=np.uint8)
img = cv2.imdecode(npimg, -1)
cv2.imshow("img", img)
if cv2.waitKey(1) == 27:
print('EXITING')
print('camera task stopping..')
cv2.destroyAllWindows()
return 'Camera feed stopped by user'
return True
except Exception as e:
logging.error(
"[EXCEPTION] Camera streaming interrupted. Error: ", e)
finally:
await self.cam_ws.close()
logging.debug("Camera Websockets Connection closed successfully")
return 'ERROR in fetching feed'
return False


async def _collision_dist_fetcher(self):
async with websockets.connect(self.data_txrx_url) as websocket:
try:
while True:
message = await websocket.recv()
message = await self.data_ws.recv()
print(f"Received message: {message}")

# FIXME: Find way to send some status (True, False) -- like using some condition at While loop
# Process the message or perform any other task logic

# response = f"Processed: {message}"
# await websocket.send(response)
# print(f"Sent response: {response}")
# as currently using forever loop, need to break using ctrl+c
except KeyboardInterrupt:
logging.info("Collision data fetching interrupted.")
return True
# if any other exceptions..
except Exception as e:
print("[EXCEPTION] data fetching failed. Error: ", e)
logging.error("Collision data fetching interrupted. Error: ", e)
return False

0 comments on commit 99b66fa

Please sign in to comment.