Source code for qmt.utils.webapp_process

# SPDX-FileCopyrightText: 2021 Daniel Laidig <laidig@control.tu-berlin.de>
#
# SPDX-License-Identifier: MIT

import asyncio
import logging
import multiprocessing

from qmt.utils.webapp import Webapp
from qmt.utils.datasource import ProcessDataSourceConnection


logger = logging.getLogger(__name__)


async def _readPipe(pipe, webapp):
    frameAvailable = None
    try:
        event = asyncio.Event()  # https://stackoverflow.com/a/62098165
        asyncio.get_event_loop().add_reader(pipe.fileno(), event.set)
        frameAvailable = event
    except NotImplementedError:
        # add_reader is not supported on Windows with the ProactorEventLoop:
        # https://docs.python.org/3/library/asyncio-platforms.html#asyncio-platform-support
        logger.info('Webapp.runInProcess: falling back to polling because add_reader is not supported')

    while True:
        while pipe.poll():
            msg = pipe.recv()
            if isinstance(msg, dict):
                webapp.sendSample(msg)
            elif isinstance(msg, list):
                webapp.sendCommand(msg)
            elif msg == 'shutdown':
                webapp.shutdown()
            else:
                raise RuntimeError(f'unknown message received: {msg}')

        if frameAvailable is None:
            await asyncio.sleep(0.01)
        else:
            frameAvailable.clear()
            if not pipe.poll():
                await frameAvailable.wait()


def _run(pipe, attrs):
    webapp = Webapp()
    for k, v in attrs.items():
        setattr(webapp, k, v)

    def onParams(_, params):
        pipe.send(['params', params])

    def onCommand(_, command):
        pipe.send(['command', command])

    async def onRunning(_):
        asyncio.get_event_loop().create_task(_readPipe(pipe, webapp))

    webapp.on('params', onParams)
    webapp.on('command', onCommand)
    webapp.on('running', onRunning)
    webapp.run()


[docs]class WebappProcessConnection(ProcessDataSourceConnection): """ Helper for :meth:`Webapp.runInProcess`. An instance of this class is returned and can be used to communicate with the webapp. See the base class :class:`qmt.ProcessDataSourceConnection` for documentation on most methods. """ def __init__(self, webapp): pipe, targetPipe = multiprocessing.Pipe() super().__init__(pipe) self.webapp = webapp keys = ('path', 'config', 'data', 'show', 'noLib', 'jsLogLevel', 'devServerUrl', 'host', 'port', 'autoIncrementPort', 'stopOnDisconnect', 'stopOnWindowClose', 'chromiumExecutable', 'source', 'block') attrs = {k: getattr(webapp, k) for k in keys} self.process = multiprocessing.Process(target=_run, args=(targetPipe, attrs)) self.process.start()
[docs] def shutdown(self): """Shut down the webapp.""" try: self._pipe.send('shutdown') except ConnectionResetError: self._closed()