corelink.resources.processing
Some more backend processing functions.
1"""Some more backend processing functions.""" 2 3from socket import SOCK_DGRAM, socket, AF_INET, SOCK_STREAM, SOCK_DGRAM 4import ssl 5import asyncio 6import websockets 7import json 8from .Proto import Proto 9from .reqs import receiver_callback 10from . import variables 11 12async def ws_control_receiver(stream): 13 variables.log("[WS] control receiver channel open.") 14 async for message in stream: 15 asyncio.create_task(response(message)) 16 17async def ws_receiver(stream): 18 variables.log('[WS] Receiver connected.') 19 async for message in stream['connection']: 20 asyncio.create_task(receiver_callback(message)) 21 22async def response(response) -> dict: 23 """Processes server response and relays server control functions away""" 24 variables.log("response received") 25 response = json.loads(response) 26 if 'function' in response: 27 await variables.server_callbacks[response['function']](response, key=response['function']) 28 elif 'ID' in response: 29 variables.request_queue[response['ID']] = response 30 del response['ID'] 31 32async def connect_sender(streamID): 33 """Connects sender in order to send data""" 34 stream = variables.streams[streamID] # for convenience 35 if stream['protocol'] == 'tcp': 36 stream['connection'] = socket(AF_INET, SOCK_STREAM) 37 stream['connection'].connect((variables.host, int(stream['port']))) 38 elif stream['protocol'] == 'udp': 39 stream['connection'] = socket(AF_INET, SOCK_DGRAM) 40 stream['connection'].connect((variables.host, int(stream['port']))) 41 elif stream['protocol'] == 'ws': 42 stream['connection'] = await websockets.connect(f"wss://{variables.host}:{stream['port']}", 43 ssl=ssl.SSLContext(ssl.PROTOCOL_TLS)) 44 variables.log('connected WS sender.') 45 46async def connect_receiver(streamID): 47 """Connects receiver in order to send data.""" 48 receiver = variables.receiver[streamID] # for convenience 49 variables.log(receiver) 50 51 header = [0, 0, 0, 0, 0, 0, 0, 0] 52 header = bytearray(header) 53 head = memoryview(header) 54 head[4:6] = int.to_bytes(int(streamID), 2, 'little') 55 header = bytes(header) 56 variables.log(header) 57 58 if receiver['proto'] == 'tcp': 59 receiver['connection'] = await variables.loop.create_connection(lambda: Proto(header, 'tcp', int(receiver['port'])), 60 variables.host, int(receiver['port'])) 61 elif receiver['proto'] == 'udp': 62 receiver['connection'] = await variables.loop.create_datagram_endpoint(lambda: Proto(header, 'udp', int(receiver['port'])), 63 remote_addr=(variables.host, int(receiver['port']))) 64 elif receiver['proto'] == 'ws': 65 receiver['connection'] = await websockets.connect(f"wss://{variables.host}:{receiver['port']}", 66 ssl=ssl.SSLContext(ssl.PROTOCOL_TLS)) 67 await receiver['connection'].send(header) 68 variables.receiver_task = asyncio.create_task(ws_receiver(receiver)) 69 70async def maintain(): 71 while not variables.shut_down: 72 await asyncio.sleep(0)
async def
ws_control_receiver(stream):
async def
ws_receiver(stream):
async def
response(response) -> dict:
23async def response(response) -> dict: 24 """Processes server response and relays server control functions away""" 25 variables.log("response received") 26 response = json.loads(response) 27 if 'function' in response: 28 await variables.server_callbacks[response['function']](response, key=response['function']) 29 elif 'ID' in response: 30 variables.request_queue[response['ID']] = response 31 del response['ID']
Processes server response and relays server control functions away
async def
connect_sender(streamID):
33async def connect_sender(streamID): 34 """Connects sender in order to send data""" 35 stream = variables.streams[streamID] # for convenience 36 if stream['protocol'] == 'tcp': 37 stream['connection'] = socket(AF_INET, SOCK_STREAM) 38 stream['connection'].connect((variables.host, int(stream['port']))) 39 elif stream['protocol'] == 'udp': 40 stream['connection'] = socket(AF_INET, SOCK_DGRAM) 41 stream['connection'].connect((variables.host, int(stream['port']))) 42 elif stream['protocol'] == 'ws': 43 stream['connection'] = await websockets.connect(f"wss://{variables.host}:{stream['port']}", 44 ssl=ssl.SSLContext(ssl.PROTOCOL_TLS)) 45 variables.log('connected WS sender.')
Connects sender in order to send data
async def
connect_receiver(streamID):
47async def connect_receiver(streamID): 48 """Connects receiver in order to send data.""" 49 receiver = variables.receiver[streamID] # for convenience 50 variables.log(receiver) 51 52 header = [0, 0, 0, 0, 0, 0, 0, 0] 53 header = bytearray(header) 54 head = memoryview(header) 55 head[4:6] = int.to_bytes(int(streamID), 2, 'little') 56 header = bytes(header) 57 variables.log(header) 58 59 if receiver['proto'] == 'tcp': 60 receiver['connection'] = await variables.loop.create_connection(lambda: Proto(header, 'tcp', int(receiver['port'])), 61 variables.host, int(receiver['port'])) 62 elif receiver['proto'] == 'udp': 63 receiver['connection'] = await variables.loop.create_datagram_endpoint(lambda: Proto(header, 'udp', int(receiver['port'])), 64 remote_addr=(variables.host, int(receiver['port']))) 65 elif receiver['proto'] == 'ws': 66 receiver['connection'] = await websockets.connect(f"wss://{variables.host}:{receiver['port']}", 67 ssl=ssl.SSLContext(ssl.PROTOCOL_TLS)) 68 await receiver['connection'].send(header) 69 variables.receiver_task = asyncio.create_task(ws_receiver(receiver))
Connects receiver in order to send data.
async def
maintain():