Source code for pisak.tracker

"""
Websocket server and client implementations for PISAK eyetrackers.
"""
import asyncio
import threading

from ws4py.async_websocket import WebSocket
from ws4py.server.tulipserver import WebSocketProtocol
from ws4py.client.threadedclient import WebSocketClient

from pisak import logger


_LOG = logger.get_logger('tracker')

SERVER_HOST = '127.0.0.1'
SERVER_PORT = '28394'

CLIENT_START_MSG = 'start'
CLIENT_STOP_MSG = 'stop'


[docs]class TrackerServer: """ Server for trackers. """ clients = set()
[docs] class TrackerServerWebSocket(WebSocket): """ Handler web socket class for new coming connections. """ def __init__(self, *args, **kwargs): WebSocket.__init__(self, *args, **kwargs) self.active = False
[docs] def received_message(self, message): """ Implementation of the`WebSocket` method, called when a new message arrives. :param message: received message. Must be a `BinaryMessage`instance. """ msg = str(message) if msg == CLIENT_START_MSG: self.active = True elif msg == CLIENT_STOP_MSG: self.active = False
[docs] def opened(self): """ Implementation of the`WebSocket` method, called when the connection is opened. Client is then added to the set of all clients. """ _LOG.debug("New connection opened: {}".format(self)) TrackerServer.clients.add(self)
[docs] def closed(self, _code, _reason=None): """ Implementation of the`WebSocket` method, called when the connection is closed, no matter by which side. Client is then removed from the set of all clients. :param _code: code of the connection. Unused argument passed by the `WebSocket` internally. :param _reason: reason of closing the connection. Unused argument passed by the `WebSocket` internally. """ _LOG.debug("Connection to client closed: {}".format(self)) try: TrackerServer.clients.remove(self) except KeyError: _LOG.warning("Client {} had not been registered.".format(self))
def __init__(self, tracker): self._tracker = tracker self._server = None self._loop = asyncio.get_event_loop() self._worker = threading.Thread(target=self._start_server, daemon=True) def _create_server(self): """ Create server based on the web sockets system. :return: server created. """ return self._loop.create_server( lambda: WebSocketProtocol( TrackerServer.TrackerServerWebSocket), SERVER_HOST, SERVER_PORT ) def _start_server(self): """ Prepare and make the server serve within the asyncio event loop. """ asyncio.set_event_loop(self._loop) self._server = self._loop.run_until_complete(self._create_server()) self._loop.add_reader(self._tracker.stdout, self._read_from_tracker) self._loop.run_forever() def _read_from_tracker(self): """ Read form the tracker standard output and send data to all the active clients. """ data = self._tracker.stdout.readline() if not data: return data = data.decode('utf-8', 'ignore').strip() if 'gaze_pos:' in data: data_to_send = data.replace('gaze_pos:', '').strip() for client in self.clients: if client.active: client.send(data_to_send)
[docs] def run(self): """ Run the server. Server is run in a separate thread. """ self._worker.start()
[docs] def stop(self): """ Stop and close the server, close the server thread. """ self._server.close() self._server.wait_closed() self._loop.stop() self._worker.join()
[docs]class TrackerClient(WebSocketClient): """ Tracker server client. """ SERVER_ADDRESS = 'ws://{}/ws'.format( ":".join([SERVER_HOST, SERVER_PORT])) def __init__(self, target): _LOG.debug('TrackerClient.__init__: {}, {}'.format(self.SERVER_ADDRESS, target)) super().__init__(self.SERVER_ADDRESS) self.target = target # target for data coming from the tracker server
[docs] def activate(self): """ Activate the client. Activated client will be able to receive data from the tracker server. """ self.send(CLIENT_START_MSG)
[docs] def deactivate(self): """ Deactivate the client. Deactivated client will not be able to receive data from the tracker server but its connection will remain opened. """ if not self.terminated: self.send(CLIENT_STOP_MSG)
[docs] def received_message(self, data): """ Implementation of the`WebSocketClient` method, called when a new data arrives. :param data: received binary data item. """ self.target.on_new_data(str(data))
if __name__ == '__main__': import os import time import subprocess from pisak import dirs command = os.path.join( dirs.HOME, "pisak", "eyetracker", "mockup", "pisak-eyetracker-mockup" ) + ' --tracking' process = subprocess.Popen(command.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) device_server = TrackerServer(process) device_server.run() # this method is non-blocking time.sleep(1.0) class ClientMockup: def on_new_data(self, data): print('on_new_data: {}'.format(data)) client_mockup = ClientMockup() tracker_client = TrackerClient(client_mockup) tracker_client.connect() tracker_client.activate() while True: time.sleep(0.5)