Initial commit
This commit is contained in:
0
src/__init__.py
Normal file
0
src/__init__.py
Normal file
1
src/websocket_manager/__init__.py
Normal file
1
src/websocket_manager/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .connection_manager import ThreadedConnectionManager
|
||||
71
src/websocket_manager/connection_manager.py
Normal file
71
src/websocket_manager/connection_manager.py
Normal file
@@ -0,0 +1,71 @@
|
||||
import json
|
||||
from typing import List, Union
|
||||
from fastapi import WebSocket
|
||||
import threading
|
||||
import queue
|
||||
import time
|
||||
import asyncio
|
||||
|
||||
class ThreadedConnectionManager(threading.Thread):
|
||||
def __init__(self) -> None:
|
||||
threading.Thread.__init__(self)
|
||||
self.active_connections: List[WebSocket] = []
|
||||
self.message_queue: queue.Queue = queue.Queue(maxsize=1000)
|
||||
self.running = True
|
||||
self.setDaemon = True
|
||||
self.start()
|
||||
|
||||
def run(self):
|
||||
while self.running:
|
||||
time.sleep(0.1)
|
||||
try:
|
||||
message=self.message_queue.get(timeout=1)
|
||||
asyncio.run(self.broadcast(message))
|
||||
except queue.Empty:
|
||||
#Handle empty queue here
|
||||
pass
|
||||
|
||||
def add_message_to_broadcast_queue(self, message):
|
||||
if type(message) == dict:
|
||||
message = json.dumps(message)
|
||||
self.message_queue.put(message)
|
||||
|
||||
async def connect(self, websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
self.active_connections.append(websocket)
|
||||
|
||||
def disconnect(self, websocket: WebSocket):
|
||||
self.active_connections.remove(websocket)
|
||||
|
||||
async def send_personal_message(self, message: Union[str, dict], websocket: WebSocket):
|
||||
if type(message) == dict:
|
||||
message = json.dumps(message)
|
||||
await websocket.send_text(message)
|
||||
|
||||
async def broadcast(self, message: Union[str, dict]):
|
||||
if type(message) == dict:
|
||||
message = json.dumps(message)
|
||||
for connection in self.active_connections:
|
||||
try:
|
||||
await connection.send_text(message)
|
||||
except RuntimeError as e:
|
||||
pass
|
||||
class ConnectionManager:
|
||||
def __init__(self):
|
||||
self.active_connections: List[WebSocket] = []
|
||||
|
||||
async def connect(self, websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
self.active_connections.append(websocket)
|
||||
|
||||
def disconnect(self, websocket: WebSocket):
|
||||
self.active_connections.remove(websocket)
|
||||
|
||||
async def send_personal_message(self, message: str, websocket: WebSocket):
|
||||
await websocket.send_text(message)
|
||||
|
||||
async def broadcast(self, message: Union[str, dict]):
|
||||
if type(message) == dict:
|
||||
message = json.dumps(message)
|
||||
for connection in self.active_connections:
|
||||
await connection.send_text(message)
|
||||
Reference in New Issue
Block a user