corelink.corelink
Python client for Corelink.
corelink.hsrn.nyu.edu
1""" 2Python client for Corelink.\n 3corelink.hsrn.nyu.edu 4""" 5 6import websockets 7import ssl 8import asyncio 9import json 10from .resources import reqs, processing, variables 11from .resources.control import * 12from .resources.variables import log 13 14def run(function): 15 """Runs the user function in an event loop, and catches keyboard interrupt.""" 16 try: 17 variables.loop = asyncio.get_event_loop() 18 variables.loop.run_until_complete(function) 19 print("Closing down Corelink...") 20 except KeyboardInterrupt: 21 print("Closing from keyboard interrupt.") 22 finally: 23 if variables.is_open: 24 variables.loop.run_until_complete(_exit()) 25 26async def keep_open(): 27 """Called by user to ensure that the program stays open while awaiting data.""" 28 asyncio.create_task(processing.maintain()) 29 30async def close(): 31 """Called by user to end a receiver session (used after calling `keep_open()`).""" 32 variables.shut_down = True 33 34async def connect(username: str, password: str, host: str, port: str): 35 """Connects to server and authenticates. 36 :param username: username registered with Corelink server 37 :param password: password associated with username 38 :param host: host address to connect to 39 :param port: host port to connect to 40 :return: token 41 """ 42 variables.user = username 43 variables.password = password 44 variables.host = host 45 variables.port = port 46 # variables.protocol = protocol 47 variables.connection = await websockets.connect(f"wss://{variables.host}:{variables.port}", ssl=ssl.SSLContext(ssl.PROTOCOL_TLS)) 48 variables.receiver_task = asyncio.create_task(processing.ws_control_receiver(variables.connection)) 49 await auth() 50 51async def set_data_callback(callback): 52 """User should pass their callback function into this. 53 The function is expected to take: 54 param1 message: bytes, 55 param2 streamID: int, 56 param3 header: dict (sometimes empty)""" 57 variables.user_cb = callback 58 59async def set_server_callback(callback, key: str): 60 """Sets a callback function for server messages of the given key: 61 options: 'update', 'subscriber', 'stale', 'dropped' 62 callback should expect dict message with the server message (details in docs), 63 and str key listing what the message type is.""" 64 variables.server_callbacks[key] = callback 65 66async def active_streams() -> list: 67 """Returns a list of current streamIDs.""" 68 return list(variables.streams) + list(variables.receiver) 69 70async def create_sender(workspace, protocol: str = "tcp", streamID="", data_type='', metadata='', sender="", ip="", port="") -> int: 71 """Requests a sender from the server and opens the connection 72 :return: streamID used to send 73 """ 74 protocol = protocol.lower() 75 if protocol not in variables.valid_proto: 76 raise ValueError("protocol: protocol must be ws, udp, or tcp.") 77 request = { 78 "function": "sender", 79 "workspace": workspace, 80 "senderID": streamID, 81 "proto": protocol, 82 "IP": ip, 83 "port": port, 84 "alert": False, 85 "type": data_type, 86 "meta": metadata, 87 "from": sender, 88 "token": variables.token 89 } 90 sender = reqs.retrieve(await reqs.request_func(request), ret=True) 91 variables.streams[sender['streamID']] = sender 92 variables.streams[sender['streamID']]['protocol'] = request['proto'] 93 94 await processing.connect_sender(sender['streamID']) 95 log(type(variables.streams[sender['streamID']])) 96 return sender['streamID'] 97 98async def create_receiver(workspace, protocol, data_type="", metadata="", alert: bool = False, echo: bool = False, receiver_id="", stream_ids=[], ip=None, port=0, subscribe=True) -> int: 99 """Requests a receiver from the server and opens the connection. 100 :return: streamID used to receive 101 """ 102 protocol = protocol.lower() 103 if protocol not in variables.valid_proto: 104 raise ValueError("protocol: protocol must be ws, udp, or tcp.") 105 if ip is None: 106 ip = variables.user_ip 107 request = { 108 "function": "receiver", 109 "workspace": workspace, 110 "receiverID": receiver_id, 111 "streamIDs": stream_ids, 112 "proto": protocol, 113 "type": data_type, 114 "alert": alert, 115 "echo": echo, 116 "IP": ip, 117 "port": port, 118 "meta": metadata, 119 "subscribe": subscribe, 120 "token": variables.token 121 } 122 receiver = reqs.retrieve(await reqs.request_func(request), ret=True) 123 log("receiver: " + str(receiver)) 124 variables.receiver[receiver['streamID']] = receiver 125 await processing.connect_receiver(receiver['streamID']) # WHY ASYNC 126 return receiver['streamID'] 127 128async def send(streamID, data, user_header: dict = None): 129 """Sends data to streamID's stream (user should first call connect_sender(streamID)). 130 data should be either str or bytes. 131 """ 132 stream = variables.streams[streamID] # for convenience 133 user_h = json.dumps(user_header) if user_header else "" 134 header = [0, 0, 0, 0, 0, 0, 0, 0] 135 header = bytearray(header) 136 head = memoryview(header) 137 head[0:2] = int.to_bytes(len(user_h), 2, 'little') 138 head[2:4] = int.to_bytes(len(data), 2, 'little') 139 head[4:6] = int.to_bytes(int(streamID), 2, 'little') 140 message = bytes(header) + user_h.encode() + data.encode() 141 log(f"Sending message on protocol {stream['protocol']} for streamID {streamID}") 142 log(message) 143 if stream['protocol'] == 'ws': 144 asyncio.create_task(stream['connection'].send(message)) 145 log("[ws] data sent") 146 elif stream['protocol'] == 'tcp': 147 loop = asyncio.get_running_loop() 148 await loop.sock_sendall(stream['connection'], message) # Use sock_sendall to ensure all data is sent 149 log("[tcp] data sent") 150 else: 151 stream['connection'].send(message) 152 153async def disconnect_senders(streamIDs: list): 154 """Disconnects sender given list of streamIDs from server and removes streams.""" 155 for ID in streamIDs: 156 log("disconnecting " + str(ID)) 157 if variables.streams[ID]['protocol'] == 'ws': 158 await variables.streams[ID]['connection'].close() 159 else: 160 variables.streams[ID]['connection'].close() 161 await disconnect_streams(stream_ids=streamIDs) 162 163async def disconnect_receivers(streamIDs: list): 164 """Disconnects receiver given list of streamIDs from server and removes streams.""" 165 for ID in streamIDs: 166 log("disconnecting receiver " + str(ID)) 167 receiver = variables.receiver[ID] 168 if receiver['proto'] == 'ws': 169 await receiver['connection'].close() 170 variables.receiver_task.cancel() 171 else: 172 receiver['connection'][0].close() 173 await disconnect_streams(stream_ids=streamIDs) 174 175async def _exit(): 176 """Disconnects all open streams and closes connection. 177 Automatically called by run().""" 178 if variables.streams: 179 await disconnect_senders(list(variables.streams)) 180 if variables.receiver: 181 await disconnect_receivers(list(variables.receiver)) 182 # expire() 183 await variables.connection.close() # closes websockets control stream 184 variables.is_open = False 185 print("Closed.")
15def run(function): 16 """Runs the user function in an event loop, and catches keyboard interrupt.""" 17 try: 18 variables.loop = asyncio.get_event_loop() 19 variables.loop.run_until_complete(function) 20 print("Closing down Corelink...") 21 except KeyboardInterrupt: 22 print("Closing from keyboard interrupt.") 23 finally: 24 if variables.is_open: 25 variables.loop.run_until_complete(_exit())
Runs the user function in an event loop, and catches keyboard interrupt.
27async def keep_open(): 28 """Called by user to ensure that the program stays open while awaiting data.""" 29 asyncio.create_task(processing.maintain())
Called by user to ensure that the program stays open while awaiting data.
31async def close(): 32 """Called by user to end a receiver session (used after calling `keep_open()`).""" 33 variables.shut_down = True
Called by user to end a receiver session (used after calling keep_open()).
35async def connect(username: str, password: str, host: str, port: str): 36 """Connects to server and authenticates. 37 :param username: username registered with Corelink server 38 :param password: password associated with username 39 :param host: host address to connect to 40 :param port: host port to connect to 41 :return: token 42 """ 43 variables.user = username 44 variables.password = password 45 variables.host = host 46 variables.port = port 47 # variables.protocol = protocol 48 variables.connection = await websockets.connect(f"wss://{variables.host}:{variables.port}", ssl=ssl.SSLContext(ssl.PROTOCOL_TLS)) 49 variables.receiver_task = asyncio.create_task(processing.ws_control_receiver(variables.connection)) 50 await auth()
Connects to server and authenticates.
Parameters
- username: username registered with Corelink server
- password: password associated with username
- host: host address to connect to
- port: host port to connect to
Returns
token
52async def set_data_callback(callback): 53 """User should pass their callback function into this. 54 The function is expected to take: 55 param1 message: bytes, 56 param2 streamID: int, 57 param3 header: dict (sometimes empty)""" 58 variables.user_cb = callback
User should pass their callback function into this. The function is expected to take: param1 message: bytes, param2 streamID: int, param3 header: dict (sometimes empty)
60async def set_server_callback(callback, key: str): 61 """Sets a callback function for server messages of the given key: 62 options: 'update', 'subscriber', 'stale', 'dropped' 63 callback should expect dict message with the server message (details in docs), 64 and str key listing what the message type is.""" 65 variables.server_callbacks[key] = callback
Sets a callback function for server messages of the given key: options: 'update', 'subscriber', 'stale', 'dropped' callback should expect dict message with the server message (details in docs), and str key listing what the message type is.
67async def active_streams() -> list: 68 """Returns a list of current streamIDs.""" 69 return list(variables.streams) + list(variables.receiver)
Returns a list of current streamIDs.
71async def create_sender(workspace, protocol: str = "tcp", streamID="", data_type='', metadata='', sender="", ip="", port="") -> int: 72 """Requests a sender from the server and opens the connection 73 :return: streamID used to send 74 """ 75 protocol = protocol.lower() 76 if protocol not in variables.valid_proto: 77 raise ValueError("protocol: protocol must be ws, udp, or tcp.") 78 request = { 79 "function": "sender", 80 "workspace": workspace, 81 "senderID": streamID, 82 "proto": protocol, 83 "IP": ip, 84 "port": port, 85 "alert": False, 86 "type": data_type, 87 "meta": metadata, 88 "from": sender, 89 "token": variables.token 90 } 91 sender = reqs.retrieve(await reqs.request_func(request), ret=True) 92 variables.streams[sender['streamID']] = sender 93 variables.streams[sender['streamID']]['protocol'] = request['proto'] 94 95 await processing.connect_sender(sender['streamID']) 96 log(type(variables.streams[sender['streamID']])) 97 return sender['streamID']
Requests a sender from the server and opens the connection
Returns
streamID used to send
99async def create_receiver(workspace, protocol, data_type="", metadata="", alert: bool = False, echo: bool = False, receiver_id="", stream_ids=[], ip=None, port=0, subscribe=True) -> int: 100 """Requests a receiver from the server and opens the connection. 101 :return: streamID used to receive 102 """ 103 protocol = protocol.lower() 104 if protocol not in variables.valid_proto: 105 raise ValueError("protocol: protocol must be ws, udp, or tcp.") 106 if ip is None: 107 ip = variables.user_ip 108 request = { 109 "function": "receiver", 110 "workspace": workspace, 111 "receiverID": receiver_id, 112 "streamIDs": stream_ids, 113 "proto": protocol, 114 "type": data_type, 115 "alert": alert, 116 "echo": echo, 117 "IP": ip, 118 "port": port, 119 "meta": metadata, 120 "subscribe": subscribe, 121 "token": variables.token 122 } 123 receiver = reqs.retrieve(await reqs.request_func(request), ret=True) 124 log("receiver: " + str(receiver)) 125 variables.receiver[receiver['streamID']] = receiver 126 await processing.connect_receiver(receiver['streamID']) # WHY ASYNC 127 return receiver['streamID']
Requests a receiver from the server and opens the connection.
Returns
streamID used to receive
129async def send(streamID, data, user_header: dict = None): 130 """Sends data to streamID's stream (user should first call connect_sender(streamID)). 131 data should be either str or bytes. 132 """ 133 stream = variables.streams[streamID] # for convenience 134 user_h = json.dumps(user_header) if user_header else "" 135 header = [0, 0, 0, 0, 0, 0, 0, 0] 136 header = bytearray(header) 137 head = memoryview(header) 138 head[0:2] = int.to_bytes(len(user_h), 2, 'little') 139 head[2:4] = int.to_bytes(len(data), 2, 'little') 140 head[4:6] = int.to_bytes(int(streamID), 2, 'little') 141 message = bytes(header) + user_h.encode() + data.encode() 142 log(f"Sending message on protocol {stream['protocol']} for streamID {streamID}") 143 log(message) 144 if stream['protocol'] == 'ws': 145 asyncio.create_task(stream['connection'].send(message)) 146 log("[ws] data sent") 147 elif stream['protocol'] == 'tcp': 148 loop = asyncio.get_running_loop() 149 await loop.sock_sendall(stream['connection'], message) # Use sock_sendall to ensure all data is sent 150 log("[tcp] data sent") 151 else: 152 stream['connection'].send(message)
Sends data to streamID's stream (user should first call connect_sender(streamID)). data should be either str or bytes.
154async def disconnect_senders(streamIDs: list): 155 """Disconnects sender given list of streamIDs from server and removes streams.""" 156 for ID in streamIDs: 157 log("disconnecting " + str(ID)) 158 if variables.streams[ID]['protocol'] == 'ws': 159 await variables.streams[ID]['connection'].close() 160 else: 161 variables.streams[ID]['connection'].close() 162 await disconnect_streams(stream_ids=streamIDs)
Disconnects sender given list of streamIDs from server and removes streams.
164async def disconnect_receivers(streamIDs: list): 165 """Disconnects receiver given list of streamIDs from server and removes streams.""" 166 for ID in streamIDs: 167 log("disconnecting receiver " + str(ID)) 168 receiver = variables.receiver[ID] 169 if receiver['proto'] == 'ws': 170 await receiver['connection'].close() 171 variables.receiver_task.cancel() 172 else: 173 receiver['connection'][0].close() 174 await disconnect_streams(stream_ids=streamIDs)
Disconnects receiver given list of streamIDs from server and removes streams.