How to ensure all commands (and errors) handled in order given











up vote
2
down vote

favorite












TLDR; How do I make a "single-file" asyncio.Queue() and feed it my adb commands, have them executed in the order they're received (one-by-one), handle errors that may occur (disconnect/reconnect) during one of the tasks, and continue processing the rest of the queue after handling the error?





I'm working on a module that leverages the existing python-adb module to ultimately control my android tablet as a media device and incorporate it into my home automation setup.



Problem:

My module is built entirely around async, while the python-adb module is not. The python-adb module also doesn't manage/throttle requests. And I very quickly found out that if multiple adb commands are requested too quickly the adb connection is overloaded, causing an error & requiring a reconnect whenever the disconnect occurred.



A friend of mine managed to implement a workaround/hack-y solution. Note: self._adb_lock & self._adb_error are initially set in the AndroidDevice class's __init__ function.



def adb_wrapper(func):
"""Wait if previous ADB commands haven't finished."""
@functools.wraps(func)
async def _adb_wrapper(self, *args, **kwargs):
attempts = 0
while self._adb_lock and attempts < 5:
attempts += 1
await asyncio.sleep(1)
if (attempts == 4 and self._adb_lock) or self._adb_error:
try:
await self.connect()
self._adb_error = False
except self._exceptions:
logging.error('Failed to re-establish the ADB connection; '
'will re-attempt in the next update.')
self._adb = None
self._adb_lock = False
self._adb_error = True
return

self._adb_lock = True
try:
returns = await func(self, *args, **kwargs)
except self._exceptions:
returns = None
logging.error('Failed to execute an ADB command; will attempt to '
're-establish the ADB connection in the next update')
self._adb = None
self._adb_error = True
finally:
self._adb_lock = False

return returns

return _adb_wrapper


With this workaround I placed the @adb_wrapper decorator above all functions that make adb calls. However, this is terribly inefficient & on higher-end devices doesn't prevent overloading of the adb connection.



Enter asyncio

Let me start my stating I have very little experience working with asyncio at this point; therefore, it's been touch to pick out which questions that were already posted would help me. So, my apologies if the answer is already present elsewhere. Also, in order to give people an idea of how my library is operating the codeblock will be a bit lengthy, but I only included a part of the file (a few functions to show how I'm ultimately interacting) and I tried to only include functions that connect to show the chain of commands.



My idea of a solution:

My goal is to be able to use asyncio to queue all commands and have them sent one at a time and if at any point the command fails (which would cause adb to disconnect) I want to re-establish the adb connection and continue with the queue of commands.



Current Code Structure:



class AndroidTV:
""" Represents an Android TV device. """

def __init__(self, host, adbkey=''):
""" Initialize AndroidTV object.
:param host: Host in format <address>:port.
:param adbkey: The path to the "adbkey" file
"""
self.host = host
self.adbkey = adbkey
self._adb = None
self.state = STATE_UNKNOWN
self.muted = False
self.device = 'hdmi'
self.volume = 0.
self.app_id = None

self.package_launcher = None
self.package_settings = None

self._adb_error = False
self._adb_lock = False
self._exceptions = (TypeError, ValueError, AttributeError,
InvalidCommandError, InvalidResponseError,
InvalidChecksumError, BrokenPipeError)

@adb_wrapper
async def connect(self):
""" Connect to an Android TV device.
Will attempt to establish ADB connection to the given host.
Failure sets state to UNKNOWN and disables sending actions.
"""
try:
if self.adbkey:
signer = Signer(self.adbkey)

# Connect to the device
self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host, rsa_keys=[signer])
else:
self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host)

if not self.package_settings:
self._adb.Shell("am start -a android.settings.SETTINGS")
await asyncio.sleep(1)
logging.info("Getting Settings App Package")
self.package_settings = await self.current_app
if not self.package_launcher:
await self.home()
await asyncio.sleep(1)
logging.info("Getting Launcher App Package")
self.package_launcher = await self.current_app

except socket_error as serr:
logging.warning("Couldn't connect to host: %s, error: %s", self.host, serr.strerror)

@adb_wrapper
async def update(self):
""" Update the device status. """
# Check if device is disconnected.
if not self._adb:
self.state = STATE_UNKNOWN
self.app_id = None
# Check if device is off.
elif not await self._screen_on:
self.state = STATE_OFF
self.app_id = None
else:
self.app_id = await self.current_app

if await self._wake_lock:
self.state = STATE_PLAYING
elif self.app_id not in (self.package_launcher, self.package_settings):
# Check if state was playing on last update
if self.state == STATE_PLAYING:
self.state = STATE_PAUSED
elif self.state != STATE_PAUSED:
self.state = STATE_IDLE
else:
# We're on either the launcher or in settings
self.state = STATE_ON

# Get information from the audio status.
audio_output = await self._dump('audio')
stream_block = re.findall(BLOCK_REGEX, audio_output,
re.DOTALL | re.MULTILINE)[0]
self.muted = re.findall(MUTED_REGEX, stream_block,
re.DOTALL | re.MULTILINE)[0] == 'true'

@property
async def current_app(self):
filtered_dump = await self._dump("window windows", "mCurrentFocus")
current_focus = filtered_dump.replace("r", "")
matches = WINDOW_REGEX.search(current_focus)
if matches:
(pkg, activity) = matches.group('package', 'activity')
return pkg
else:
logging.warning("Couldn't get current app, reply was %s", current_focus)
return None

@property
async def _screen_on(self):
return await self._dump_has('power', 'Display Power', 'state=ON')

@property
async def _awake(self):
return await self._dump_has('power', 'mWakefulness', 'Awake')

@property
async def _wake_lock(self):
return not await self._dump_has('power', 'Locks', 'size=0')

@adb_wrapper
async def _input(self, cmd):
if not self._adb:
return
self._adb.Shell('input {0}'.format(cmd))

@adb_wrapper
async def _dump(self, service, grep=None):
if not self._adb:
return
if grep:
return self._adb.Shell('dumpsys {0} | grep "{1}"'.format(service, grep))
return self._adb.Shell('dumpsys {0}'.format(service))

async def _dump_has(self, service, grep, search):
dump_result = await self._dump(service, grep=grep)
return dump_result.strip().find(search) > -1


As I've stated before, the above method partially works, but is basically a band-aid.



The only commands that directly make adb.Shell calls are

1. async def connect(self)

2. async def update(self)

3. async def _input(self, cmd)

4. async def _dump(self, service, grep=None)

5. async def _key(self, key)



The connect & update functions result in multiple adb.Shell calls themselves, so this might be where my problem ultimately lies.



My (3-Part) Question:

1. How can I queue up all commands as they're received?

2. Execute them in the order they're received?

3. Handle errors at any point, reconnect, then continue executing the rest of the queue of commmands?



Here's my failed half-attempt at accomplishing this.



import asyncio

async def produce_output(queue, commands):
for command in commands:
#execute the adb command
if 'keypress' in command:
#command contains 'input keypress ENTER'
adb.Shell(command)
#mark the task done because there's nothing to process
queue.task_done()
else:
#command contains 'dumpsys audio'
output = adb.Shell(command)
#put result in queue
await queue.put(output)

async def process_adb(queue):
while True:
output = await queue.get()
#return output (somehow?)
queue.task_done()


async def update():
adb_queue = asyncio.Queue()
asyncio.create_task(produce_output(adb_queue,
[self._screen_on,
self.current_app,
self._wake_lock,
self._dump('audio')]))
#Not sure how to proceed

if not self._adb:
self.state = STATE_UNKNOWN
self.app_id = None
# Check if device is off.
# Fetching result of first item in the queue - self._screen_on
elif not await adb_queue.get():
self.state = STATE_OFF
self.app_id = None
else:
# Fetching result of second item in the queue - self.current_app
self.app_id = await adb_queue.get()

# Fetching result of third item in the queue - self._wake_lock
if await adb_queue.get():
self.state = STATE_PLAYING
elif self.app_id not in (self.package_launcher, self.package_settings):
# Check if state was playing on last update
if self.state == STATE_PLAYING:
self.state = STATE_PAUSED
elif self.state != STATE_PAUSED:
self.state = STATE_IDLE
else:
# We're on either the launcher or in settings
self.state = STATE_ON

# Get information from the audio status.
# Fetching result of fourth item in the queue - self._dump('audio')
audio_output = await adb_queue.get()
stream_block = re.findall(BLOCK_REGEX, audio_output,
re.DOTALL | re.MULTILINE)[0]
self.muted = re.findall(MUTED_REGEX, stream_block,
re.DOTALL | re.MULTILINE)[0] == 'true'









share|improve this question




























    up vote
    2
    down vote

    favorite












    TLDR; How do I make a "single-file" asyncio.Queue() and feed it my adb commands, have them executed in the order they're received (one-by-one), handle errors that may occur (disconnect/reconnect) during one of the tasks, and continue processing the rest of the queue after handling the error?





    I'm working on a module that leverages the existing python-adb module to ultimately control my android tablet as a media device and incorporate it into my home automation setup.



    Problem:

    My module is built entirely around async, while the python-adb module is not. The python-adb module also doesn't manage/throttle requests. And I very quickly found out that if multiple adb commands are requested too quickly the adb connection is overloaded, causing an error & requiring a reconnect whenever the disconnect occurred.



    A friend of mine managed to implement a workaround/hack-y solution. Note: self._adb_lock & self._adb_error are initially set in the AndroidDevice class's __init__ function.



    def adb_wrapper(func):
    """Wait if previous ADB commands haven't finished."""
    @functools.wraps(func)
    async def _adb_wrapper(self, *args, **kwargs):
    attempts = 0
    while self._adb_lock and attempts < 5:
    attempts += 1
    await asyncio.sleep(1)
    if (attempts == 4 and self._adb_lock) or self._adb_error:
    try:
    await self.connect()
    self._adb_error = False
    except self._exceptions:
    logging.error('Failed to re-establish the ADB connection; '
    'will re-attempt in the next update.')
    self._adb = None
    self._adb_lock = False
    self._adb_error = True
    return

    self._adb_lock = True
    try:
    returns = await func(self, *args, **kwargs)
    except self._exceptions:
    returns = None
    logging.error('Failed to execute an ADB command; will attempt to '
    're-establish the ADB connection in the next update')
    self._adb = None
    self._adb_error = True
    finally:
    self._adb_lock = False

    return returns

    return _adb_wrapper


    With this workaround I placed the @adb_wrapper decorator above all functions that make adb calls. However, this is terribly inefficient & on higher-end devices doesn't prevent overloading of the adb connection.



    Enter asyncio

    Let me start my stating I have very little experience working with asyncio at this point; therefore, it's been touch to pick out which questions that were already posted would help me. So, my apologies if the answer is already present elsewhere. Also, in order to give people an idea of how my library is operating the codeblock will be a bit lengthy, but I only included a part of the file (a few functions to show how I'm ultimately interacting) and I tried to only include functions that connect to show the chain of commands.



    My idea of a solution:

    My goal is to be able to use asyncio to queue all commands and have them sent one at a time and if at any point the command fails (which would cause adb to disconnect) I want to re-establish the adb connection and continue with the queue of commands.



    Current Code Structure:



    class AndroidTV:
    """ Represents an Android TV device. """

    def __init__(self, host, adbkey=''):
    """ Initialize AndroidTV object.
    :param host: Host in format <address>:port.
    :param adbkey: The path to the "adbkey" file
    """
    self.host = host
    self.adbkey = adbkey
    self._adb = None
    self.state = STATE_UNKNOWN
    self.muted = False
    self.device = 'hdmi'
    self.volume = 0.
    self.app_id = None

    self.package_launcher = None
    self.package_settings = None

    self._adb_error = False
    self._adb_lock = False
    self._exceptions = (TypeError, ValueError, AttributeError,
    InvalidCommandError, InvalidResponseError,
    InvalidChecksumError, BrokenPipeError)

    @adb_wrapper
    async def connect(self):
    """ Connect to an Android TV device.
    Will attempt to establish ADB connection to the given host.
    Failure sets state to UNKNOWN and disables sending actions.
    """
    try:
    if self.adbkey:
    signer = Signer(self.adbkey)

    # Connect to the device
    self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host, rsa_keys=[signer])
    else:
    self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host)

    if not self.package_settings:
    self._adb.Shell("am start -a android.settings.SETTINGS")
    await asyncio.sleep(1)
    logging.info("Getting Settings App Package")
    self.package_settings = await self.current_app
    if not self.package_launcher:
    await self.home()
    await asyncio.sleep(1)
    logging.info("Getting Launcher App Package")
    self.package_launcher = await self.current_app

    except socket_error as serr:
    logging.warning("Couldn't connect to host: %s, error: %s", self.host, serr.strerror)

    @adb_wrapper
    async def update(self):
    """ Update the device status. """
    # Check if device is disconnected.
    if not self._adb:
    self.state = STATE_UNKNOWN
    self.app_id = None
    # Check if device is off.
    elif not await self._screen_on:
    self.state = STATE_OFF
    self.app_id = None
    else:
    self.app_id = await self.current_app

    if await self._wake_lock:
    self.state = STATE_PLAYING
    elif self.app_id not in (self.package_launcher, self.package_settings):
    # Check if state was playing on last update
    if self.state == STATE_PLAYING:
    self.state = STATE_PAUSED
    elif self.state != STATE_PAUSED:
    self.state = STATE_IDLE
    else:
    # We're on either the launcher or in settings
    self.state = STATE_ON

    # Get information from the audio status.
    audio_output = await self._dump('audio')
    stream_block = re.findall(BLOCK_REGEX, audio_output,
    re.DOTALL | re.MULTILINE)[0]
    self.muted = re.findall(MUTED_REGEX, stream_block,
    re.DOTALL | re.MULTILINE)[0] == 'true'

    @property
    async def current_app(self):
    filtered_dump = await self._dump("window windows", "mCurrentFocus")
    current_focus = filtered_dump.replace("r", "")
    matches = WINDOW_REGEX.search(current_focus)
    if matches:
    (pkg, activity) = matches.group('package', 'activity')
    return pkg
    else:
    logging.warning("Couldn't get current app, reply was %s", current_focus)
    return None

    @property
    async def _screen_on(self):
    return await self._dump_has('power', 'Display Power', 'state=ON')

    @property
    async def _awake(self):
    return await self._dump_has('power', 'mWakefulness', 'Awake')

    @property
    async def _wake_lock(self):
    return not await self._dump_has('power', 'Locks', 'size=0')

    @adb_wrapper
    async def _input(self, cmd):
    if not self._adb:
    return
    self._adb.Shell('input {0}'.format(cmd))

    @adb_wrapper
    async def _dump(self, service, grep=None):
    if not self._adb:
    return
    if grep:
    return self._adb.Shell('dumpsys {0} | grep "{1}"'.format(service, grep))
    return self._adb.Shell('dumpsys {0}'.format(service))

    async def _dump_has(self, service, grep, search):
    dump_result = await self._dump(service, grep=grep)
    return dump_result.strip().find(search) > -1


    As I've stated before, the above method partially works, but is basically a band-aid.



    The only commands that directly make adb.Shell calls are

    1. async def connect(self)

    2. async def update(self)

    3. async def _input(self, cmd)

    4. async def _dump(self, service, grep=None)

    5. async def _key(self, key)



    The connect & update functions result in multiple adb.Shell calls themselves, so this might be where my problem ultimately lies.



    My (3-Part) Question:

    1. How can I queue up all commands as they're received?

    2. Execute them in the order they're received?

    3. Handle errors at any point, reconnect, then continue executing the rest of the queue of commmands?



    Here's my failed half-attempt at accomplishing this.



    import asyncio

    async def produce_output(queue, commands):
    for command in commands:
    #execute the adb command
    if 'keypress' in command:
    #command contains 'input keypress ENTER'
    adb.Shell(command)
    #mark the task done because there's nothing to process
    queue.task_done()
    else:
    #command contains 'dumpsys audio'
    output = adb.Shell(command)
    #put result in queue
    await queue.put(output)

    async def process_adb(queue):
    while True:
    output = await queue.get()
    #return output (somehow?)
    queue.task_done()


    async def update():
    adb_queue = asyncio.Queue()
    asyncio.create_task(produce_output(adb_queue,
    [self._screen_on,
    self.current_app,
    self._wake_lock,
    self._dump('audio')]))
    #Not sure how to proceed

    if not self._adb:
    self.state = STATE_UNKNOWN
    self.app_id = None
    # Check if device is off.
    # Fetching result of first item in the queue - self._screen_on
    elif not await adb_queue.get():
    self.state = STATE_OFF
    self.app_id = None
    else:
    # Fetching result of second item in the queue - self.current_app
    self.app_id = await adb_queue.get()

    # Fetching result of third item in the queue - self._wake_lock
    if await adb_queue.get():
    self.state = STATE_PLAYING
    elif self.app_id not in (self.package_launcher, self.package_settings):
    # Check if state was playing on last update
    if self.state == STATE_PLAYING:
    self.state = STATE_PAUSED
    elif self.state != STATE_PAUSED:
    self.state = STATE_IDLE
    else:
    # We're on either the launcher or in settings
    self.state = STATE_ON

    # Get information from the audio status.
    # Fetching result of fourth item in the queue - self._dump('audio')
    audio_output = await adb_queue.get()
    stream_block = re.findall(BLOCK_REGEX, audio_output,
    re.DOTALL | re.MULTILINE)[0]
    self.muted = re.findall(MUTED_REGEX, stream_block,
    re.DOTALL | re.MULTILINE)[0] == 'true'









    share|improve this question


























      up vote
      2
      down vote

      favorite









      up vote
      2
      down vote

      favorite











      TLDR; How do I make a "single-file" asyncio.Queue() and feed it my adb commands, have them executed in the order they're received (one-by-one), handle errors that may occur (disconnect/reconnect) during one of the tasks, and continue processing the rest of the queue after handling the error?





      I'm working on a module that leverages the existing python-adb module to ultimately control my android tablet as a media device and incorporate it into my home automation setup.



      Problem:

      My module is built entirely around async, while the python-adb module is not. The python-adb module also doesn't manage/throttle requests. And I very quickly found out that if multiple adb commands are requested too quickly the adb connection is overloaded, causing an error & requiring a reconnect whenever the disconnect occurred.



      A friend of mine managed to implement a workaround/hack-y solution. Note: self._adb_lock & self._adb_error are initially set in the AndroidDevice class's __init__ function.



      def adb_wrapper(func):
      """Wait if previous ADB commands haven't finished."""
      @functools.wraps(func)
      async def _adb_wrapper(self, *args, **kwargs):
      attempts = 0
      while self._adb_lock and attempts < 5:
      attempts += 1
      await asyncio.sleep(1)
      if (attempts == 4 and self._adb_lock) or self._adb_error:
      try:
      await self.connect()
      self._adb_error = False
      except self._exceptions:
      logging.error('Failed to re-establish the ADB connection; '
      'will re-attempt in the next update.')
      self._adb = None
      self._adb_lock = False
      self._adb_error = True
      return

      self._adb_lock = True
      try:
      returns = await func(self, *args, **kwargs)
      except self._exceptions:
      returns = None
      logging.error('Failed to execute an ADB command; will attempt to '
      're-establish the ADB connection in the next update')
      self._adb = None
      self._adb_error = True
      finally:
      self._adb_lock = False

      return returns

      return _adb_wrapper


      With this workaround I placed the @adb_wrapper decorator above all functions that make adb calls. However, this is terribly inefficient & on higher-end devices doesn't prevent overloading of the adb connection.



      Enter asyncio

      Let me start my stating I have very little experience working with asyncio at this point; therefore, it's been touch to pick out which questions that were already posted would help me. So, my apologies if the answer is already present elsewhere. Also, in order to give people an idea of how my library is operating the codeblock will be a bit lengthy, but I only included a part of the file (a few functions to show how I'm ultimately interacting) and I tried to only include functions that connect to show the chain of commands.



      My idea of a solution:

      My goal is to be able to use asyncio to queue all commands and have them sent one at a time and if at any point the command fails (which would cause adb to disconnect) I want to re-establish the adb connection and continue with the queue of commands.



      Current Code Structure:



      class AndroidTV:
      """ Represents an Android TV device. """

      def __init__(self, host, adbkey=''):
      """ Initialize AndroidTV object.
      :param host: Host in format <address>:port.
      :param adbkey: The path to the "adbkey" file
      """
      self.host = host
      self.adbkey = adbkey
      self._adb = None
      self.state = STATE_UNKNOWN
      self.muted = False
      self.device = 'hdmi'
      self.volume = 0.
      self.app_id = None

      self.package_launcher = None
      self.package_settings = None

      self._adb_error = False
      self._adb_lock = False
      self._exceptions = (TypeError, ValueError, AttributeError,
      InvalidCommandError, InvalidResponseError,
      InvalidChecksumError, BrokenPipeError)

      @adb_wrapper
      async def connect(self):
      """ Connect to an Android TV device.
      Will attempt to establish ADB connection to the given host.
      Failure sets state to UNKNOWN and disables sending actions.
      """
      try:
      if self.adbkey:
      signer = Signer(self.adbkey)

      # Connect to the device
      self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host, rsa_keys=[signer])
      else:
      self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host)

      if not self.package_settings:
      self._adb.Shell("am start -a android.settings.SETTINGS")
      await asyncio.sleep(1)
      logging.info("Getting Settings App Package")
      self.package_settings = await self.current_app
      if not self.package_launcher:
      await self.home()
      await asyncio.sleep(1)
      logging.info("Getting Launcher App Package")
      self.package_launcher = await self.current_app

      except socket_error as serr:
      logging.warning("Couldn't connect to host: %s, error: %s", self.host, serr.strerror)

      @adb_wrapper
      async def update(self):
      """ Update the device status. """
      # Check if device is disconnected.
      if not self._adb:
      self.state = STATE_UNKNOWN
      self.app_id = None
      # Check if device is off.
      elif not await self._screen_on:
      self.state = STATE_OFF
      self.app_id = None
      else:
      self.app_id = await self.current_app

      if await self._wake_lock:
      self.state = STATE_PLAYING
      elif self.app_id not in (self.package_launcher, self.package_settings):
      # Check if state was playing on last update
      if self.state == STATE_PLAYING:
      self.state = STATE_PAUSED
      elif self.state != STATE_PAUSED:
      self.state = STATE_IDLE
      else:
      # We're on either the launcher or in settings
      self.state = STATE_ON

      # Get information from the audio status.
      audio_output = await self._dump('audio')
      stream_block = re.findall(BLOCK_REGEX, audio_output,
      re.DOTALL | re.MULTILINE)[0]
      self.muted = re.findall(MUTED_REGEX, stream_block,
      re.DOTALL | re.MULTILINE)[0] == 'true'

      @property
      async def current_app(self):
      filtered_dump = await self._dump("window windows", "mCurrentFocus")
      current_focus = filtered_dump.replace("r", "")
      matches = WINDOW_REGEX.search(current_focus)
      if matches:
      (pkg, activity) = matches.group('package', 'activity')
      return pkg
      else:
      logging.warning("Couldn't get current app, reply was %s", current_focus)
      return None

      @property
      async def _screen_on(self):
      return await self._dump_has('power', 'Display Power', 'state=ON')

      @property
      async def _awake(self):
      return await self._dump_has('power', 'mWakefulness', 'Awake')

      @property
      async def _wake_lock(self):
      return not await self._dump_has('power', 'Locks', 'size=0')

      @adb_wrapper
      async def _input(self, cmd):
      if not self._adb:
      return
      self._adb.Shell('input {0}'.format(cmd))

      @adb_wrapper
      async def _dump(self, service, grep=None):
      if not self._adb:
      return
      if grep:
      return self._adb.Shell('dumpsys {0} | grep "{1}"'.format(service, grep))
      return self._adb.Shell('dumpsys {0}'.format(service))

      async def _dump_has(self, service, grep, search):
      dump_result = await self._dump(service, grep=grep)
      return dump_result.strip().find(search) > -1


      As I've stated before, the above method partially works, but is basically a band-aid.



      The only commands that directly make adb.Shell calls are

      1. async def connect(self)

      2. async def update(self)

      3. async def _input(self, cmd)

      4. async def _dump(self, service, grep=None)

      5. async def _key(self, key)



      The connect & update functions result in multiple adb.Shell calls themselves, so this might be where my problem ultimately lies.



      My (3-Part) Question:

      1. How can I queue up all commands as they're received?

      2. Execute them in the order they're received?

      3. Handle errors at any point, reconnect, then continue executing the rest of the queue of commmands?



      Here's my failed half-attempt at accomplishing this.



      import asyncio

      async def produce_output(queue, commands):
      for command in commands:
      #execute the adb command
      if 'keypress' in command:
      #command contains 'input keypress ENTER'
      adb.Shell(command)
      #mark the task done because there's nothing to process
      queue.task_done()
      else:
      #command contains 'dumpsys audio'
      output = adb.Shell(command)
      #put result in queue
      await queue.put(output)

      async def process_adb(queue):
      while True:
      output = await queue.get()
      #return output (somehow?)
      queue.task_done()


      async def update():
      adb_queue = asyncio.Queue()
      asyncio.create_task(produce_output(adb_queue,
      [self._screen_on,
      self.current_app,
      self._wake_lock,
      self._dump('audio')]))
      #Not sure how to proceed

      if not self._adb:
      self.state = STATE_UNKNOWN
      self.app_id = None
      # Check if device is off.
      # Fetching result of first item in the queue - self._screen_on
      elif not await adb_queue.get():
      self.state = STATE_OFF
      self.app_id = None
      else:
      # Fetching result of second item in the queue - self.current_app
      self.app_id = await adb_queue.get()

      # Fetching result of third item in the queue - self._wake_lock
      if await adb_queue.get():
      self.state = STATE_PLAYING
      elif self.app_id not in (self.package_launcher, self.package_settings):
      # Check if state was playing on last update
      if self.state == STATE_PLAYING:
      self.state = STATE_PAUSED
      elif self.state != STATE_PAUSED:
      self.state = STATE_IDLE
      else:
      # We're on either the launcher or in settings
      self.state = STATE_ON

      # Get information from the audio status.
      # Fetching result of fourth item in the queue - self._dump('audio')
      audio_output = await adb_queue.get()
      stream_block = re.findall(BLOCK_REGEX, audio_output,
      re.DOTALL | re.MULTILINE)[0]
      self.muted = re.findall(MUTED_REGEX, stream_block,
      re.DOTALL | re.MULTILINE)[0] == 'true'









      share|improve this question















      TLDR; How do I make a "single-file" asyncio.Queue() and feed it my adb commands, have them executed in the order they're received (one-by-one), handle errors that may occur (disconnect/reconnect) during one of the tasks, and continue processing the rest of the queue after handling the error?





      I'm working on a module that leverages the existing python-adb module to ultimately control my android tablet as a media device and incorporate it into my home automation setup.



      Problem:

      My module is built entirely around async, while the python-adb module is not. The python-adb module also doesn't manage/throttle requests. And I very quickly found out that if multiple adb commands are requested too quickly the adb connection is overloaded, causing an error & requiring a reconnect whenever the disconnect occurred.



      A friend of mine managed to implement a workaround/hack-y solution. Note: self._adb_lock & self._adb_error are initially set in the AndroidDevice class's __init__ function.



      def adb_wrapper(func):
      """Wait if previous ADB commands haven't finished."""
      @functools.wraps(func)
      async def _adb_wrapper(self, *args, **kwargs):
      attempts = 0
      while self._adb_lock and attempts < 5:
      attempts += 1
      await asyncio.sleep(1)
      if (attempts == 4 and self._adb_lock) or self._adb_error:
      try:
      await self.connect()
      self._adb_error = False
      except self._exceptions:
      logging.error('Failed to re-establish the ADB connection; '
      'will re-attempt in the next update.')
      self._adb = None
      self._adb_lock = False
      self._adb_error = True
      return

      self._adb_lock = True
      try:
      returns = await func(self, *args, **kwargs)
      except self._exceptions:
      returns = None
      logging.error('Failed to execute an ADB command; will attempt to '
      're-establish the ADB connection in the next update')
      self._adb = None
      self._adb_error = True
      finally:
      self._adb_lock = False

      return returns

      return _adb_wrapper


      With this workaround I placed the @adb_wrapper decorator above all functions that make adb calls. However, this is terribly inefficient & on higher-end devices doesn't prevent overloading of the adb connection.



      Enter asyncio

      Let me start my stating I have very little experience working with asyncio at this point; therefore, it's been touch to pick out which questions that were already posted would help me. So, my apologies if the answer is already present elsewhere. Also, in order to give people an idea of how my library is operating the codeblock will be a bit lengthy, but I only included a part of the file (a few functions to show how I'm ultimately interacting) and I tried to only include functions that connect to show the chain of commands.



      My idea of a solution:

      My goal is to be able to use asyncio to queue all commands and have them sent one at a time and if at any point the command fails (which would cause adb to disconnect) I want to re-establish the adb connection and continue with the queue of commands.



      Current Code Structure:



      class AndroidTV:
      """ Represents an Android TV device. """

      def __init__(self, host, adbkey=''):
      """ Initialize AndroidTV object.
      :param host: Host in format <address>:port.
      :param adbkey: The path to the "adbkey" file
      """
      self.host = host
      self.adbkey = adbkey
      self._adb = None
      self.state = STATE_UNKNOWN
      self.muted = False
      self.device = 'hdmi'
      self.volume = 0.
      self.app_id = None

      self.package_launcher = None
      self.package_settings = None

      self._adb_error = False
      self._adb_lock = False
      self._exceptions = (TypeError, ValueError, AttributeError,
      InvalidCommandError, InvalidResponseError,
      InvalidChecksumError, BrokenPipeError)

      @adb_wrapper
      async def connect(self):
      """ Connect to an Android TV device.
      Will attempt to establish ADB connection to the given host.
      Failure sets state to UNKNOWN and disables sending actions.
      """
      try:
      if self.adbkey:
      signer = Signer(self.adbkey)

      # Connect to the device
      self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host, rsa_keys=[signer])
      else:
      self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host)

      if not self.package_settings:
      self._adb.Shell("am start -a android.settings.SETTINGS")
      await asyncio.sleep(1)
      logging.info("Getting Settings App Package")
      self.package_settings = await self.current_app
      if not self.package_launcher:
      await self.home()
      await asyncio.sleep(1)
      logging.info("Getting Launcher App Package")
      self.package_launcher = await self.current_app

      except socket_error as serr:
      logging.warning("Couldn't connect to host: %s, error: %s", self.host, serr.strerror)

      @adb_wrapper
      async def update(self):
      """ Update the device status. """
      # Check if device is disconnected.
      if not self._adb:
      self.state = STATE_UNKNOWN
      self.app_id = None
      # Check if device is off.
      elif not await self._screen_on:
      self.state = STATE_OFF
      self.app_id = None
      else:
      self.app_id = await self.current_app

      if await self._wake_lock:
      self.state = STATE_PLAYING
      elif self.app_id not in (self.package_launcher, self.package_settings):
      # Check if state was playing on last update
      if self.state == STATE_PLAYING:
      self.state = STATE_PAUSED
      elif self.state != STATE_PAUSED:
      self.state = STATE_IDLE
      else:
      # We're on either the launcher or in settings
      self.state = STATE_ON

      # Get information from the audio status.
      audio_output = await self._dump('audio')
      stream_block = re.findall(BLOCK_REGEX, audio_output,
      re.DOTALL | re.MULTILINE)[0]
      self.muted = re.findall(MUTED_REGEX, stream_block,
      re.DOTALL | re.MULTILINE)[0] == 'true'

      @property
      async def current_app(self):
      filtered_dump = await self._dump("window windows", "mCurrentFocus")
      current_focus = filtered_dump.replace("r", "")
      matches = WINDOW_REGEX.search(current_focus)
      if matches:
      (pkg, activity) = matches.group('package', 'activity')
      return pkg
      else:
      logging.warning("Couldn't get current app, reply was %s", current_focus)
      return None

      @property
      async def _screen_on(self):
      return await self._dump_has('power', 'Display Power', 'state=ON')

      @property
      async def _awake(self):
      return await self._dump_has('power', 'mWakefulness', 'Awake')

      @property
      async def _wake_lock(self):
      return not await self._dump_has('power', 'Locks', 'size=0')

      @adb_wrapper
      async def _input(self, cmd):
      if not self._adb:
      return
      self._adb.Shell('input {0}'.format(cmd))

      @adb_wrapper
      async def _dump(self, service, grep=None):
      if not self._adb:
      return
      if grep:
      return self._adb.Shell('dumpsys {0} | grep "{1}"'.format(service, grep))
      return self._adb.Shell('dumpsys {0}'.format(service))

      async def _dump_has(self, service, grep, search):
      dump_result = await self._dump(service, grep=grep)
      return dump_result.strip().find(search) > -1


      As I've stated before, the above method partially works, but is basically a band-aid.



      The only commands that directly make adb.Shell calls are

      1. async def connect(self)

      2. async def update(self)

      3. async def _input(self, cmd)

      4. async def _dump(self, service, grep=None)

      5. async def _key(self, key)



      The connect & update functions result in multiple adb.Shell calls themselves, so this might be where my problem ultimately lies.



      My (3-Part) Question:

      1. How can I queue up all commands as they're received?

      2. Execute them in the order they're received?

      3. Handle errors at any point, reconnect, then continue executing the rest of the queue of commmands?



      Here's my failed half-attempt at accomplishing this.



      import asyncio

      async def produce_output(queue, commands):
      for command in commands:
      #execute the adb command
      if 'keypress' in command:
      #command contains 'input keypress ENTER'
      adb.Shell(command)
      #mark the task done because there's nothing to process
      queue.task_done()
      else:
      #command contains 'dumpsys audio'
      output = adb.Shell(command)
      #put result in queue
      await queue.put(output)

      async def process_adb(queue):
      while True:
      output = await queue.get()
      #return output (somehow?)
      queue.task_done()


      async def update():
      adb_queue = asyncio.Queue()
      asyncio.create_task(produce_output(adb_queue,
      [self._screen_on,
      self.current_app,
      self._wake_lock,
      self._dump('audio')]))
      #Not sure how to proceed

      if not self._adb:
      self.state = STATE_UNKNOWN
      self.app_id = None
      # Check if device is off.
      # Fetching result of first item in the queue - self._screen_on
      elif not await adb_queue.get():
      self.state = STATE_OFF
      self.app_id = None
      else:
      # Fetching result of second item in the queue - self.current_app
      self.app_id = await adb_queue.get()

      # Fetching result of third item in the queue - self._wake_lock
      if await adb_queue.get():
      self.state = STATE_PLAYING
      elif self.app_id not in (self.package_launcher, self.package_settings):
      # Check if state was playing on last update
      if self.state == STATE_PLAYING:
      self.state = STATE_PAUSED
      elif self.state != STATE_PAUSED:
      self.state = STATE_IDLE
      else:
      # We're on either the launcher or in settings
      self.state = STATE_ON

      # Get information from the audio status.
      # Fetching result of fourth item in the queue - self._dump('audio')
      audio_output = await adb_queue.get()
      stream_block = re.findall(BLOCK_REGEX, audio_output,
      re.DOTALL | re.MULTILINE)[0]
      self.muted = re.findall(MUTED_REGEX, stream_block,
      re.DOTALL | re.MULTILINE)[0] == 'true'






      python-3.x async-await python-asyncio






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 9 at 15:10

























      asked Nov 8 at 16:36









      CaffeinatedCoder

      65521227




      65521227
























          1 Answer
          1






          active

          oldest

          votes

















          up vote
          2
          down vote



          accepted










          You need to ensure that only a single task is using the adb connection to execute a command at any given time. This means you need to either use synchronisation primitives to coordinate access, or use a queue to feed a single worker task commands to execute.



          Next, because an adb connection is entirely synchronous and, as with all I/O, relatively slow, I'd use a thread pool executor to run operations on a adb connection off the asyncio loop, so that asyncio is free to run some other tasks that are not currently blocked on I/O. Otherwise, there is no point to putting .Shell() commands in a async def coroutine, you are not actually cooperating and making room for other tasks to be run.



          Last but not least, if even with serialised access to the connection object you find that it can't take too many commands per time period, you would want to use some kind of rate limiting technique. I've created an asyncio leaky bucket algorithm implementation before that can take care of this, if so required.



          Both a queue or a lock would ensure that commands are executed in first-come-first-serve order, but a queue would require some kind of deferred response mechanism to return command results. A queue would let you queue up related commands (you can add multiple entries using queue.put_nowait() without yielding or you can allow grouped commands), without having to wait for a lock first.



          Because you want to retry connections, I'd encapsulate the connection object in a asynchronous context manager, that can then also handle locking and executing commands with an executor:



          import asyncio
          import collections
          from concurrent.futures import ThreadPoolExecutor
          from functools import partial

          try: # Python 3.7
          base = contextlib.AbstractAsyncContextManager
          except AttributeError:
          base = object # type: ignore

          _retry_exceptions = (...,) # define exceptions on which to retry commands?

          class asyncnullcontext(base):
          def __init__(self, enter_result=None):
          self.enter_result = enter_result
          async def __aenter__(self):
          return self.enter_result
          async def __aexit__(self, *excinfo):
          pass

          class AsyncADBConnection(base):
          def __init__(
          self,
          host,
          adbkey=None,
          rate_limit=None,
          max_retry=None,
          loop=None
          ):
          self._lock = asyncio.Lock(loop=loop)
          self._max_retry = max_retry
          self._loop = None
          self._connection = None
          self._executor = ThreadPoolExecutor()

          self._connect_kwargs = {
          "serial": host,
          "rsa_keys": [Signer(adbkey)] if adbkey else
          }

          if rate_limit is not None:
          # max commands per second
          self._limiter = AsyncLeakyBucket(rate_limit, 1, loop=loop)
          else:
          self._limiter = asyncnullcontext()

          async def __aenter__(self):
          await self._lock.acquire()
          await self._ensure_connection()
          return self

          async def __aexit__(self):
          self._lock.release()

          async def _ensure_connection(self):
          if self._connection is not None:
          return
          loop = self._loop or asyncio.get_running_loop()
          connector = partial(
          adb_commands.AdbCommands().ConnectDevice,
          **self._connect_kwargs
          )
          fut = loop.run_in_executor(pool, connector)
          self._connection = await fut

          async def shell(self, command):
          loop = self._loop or asyncio.get_running_loop()
          max_attempts = self._max_retry or 1
          attempts = 0
          while True:
          with self._limiter:
          try:
          fut = loop.run_in_executor(
          self._executor,
          self._connection.Shell,
          command
          )
          return await fut
          except _retry_exceptions as e:
          attempts += 1
          if attempts >= max_attempts:
          raise
          # re-connect on retry
          self._connection = None
          await self._ensure_connection()


          If you then use a queue, use Future() instances to communicate results.



          Pushing a job into the queue then becomes:



          fut = asyncio.Future()
          await queue.put((command, fut))
          result = await fut


          You could wrap that into a utility function or object. The await fut line only returns once the future has received a result. For commands where you don't care about a result, you only need to await if you want to make sure that the command completed.



          The consumer in the worker task that manages the connection would use:



          while True:
          command, fut = await self.queue.get():
          async with self.connection as conn:
          response = await conn.shell(command)
          fut.set_result(response)
          self.queue.task_done() # optional, only needed when joining the queue


          where self.connection is an AsyncADBConnection instance.






          share|improve this answer























          • Sounds like exactly what I need. But, could you help me get an idea of how I would do so by providing some code?
            – CaffeinatedCoder
            Nov 8 at 16:41






          • 1




            @CaffeinatedCoder: sure, but it'll be generic, not specific to your current codebase. It'll be easy enough to adapt.
            – Martijn Pieters
            Nov 8 at 16:42










          • That'll work, thank you so much!
            – CaffeinatedCoder
            Nov 8 at 17:24










          • @CaffeinatedCoder: it may be tomorrow before I have time, sorry.
            – Martijn Pieters
            Nov 8 at 19:33






          • 1




            @CaffeinatedCoder: I have no Android device, so my code is largely untested, but I've added a context manager that handles the connection and some more queue handling example code.
            – Martijn Pieters
            Nov 10 at 22:53











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














           

          draft saved


          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53212210%2fhow-to-ensure-all-commands-and-errors-handled-in-order-given%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes








          up vote
          2
          down vote



          accepted










          You need to ensure that only a single task is using the adb connection to execute a command at any given time. This means you need to either use synchronisation primitives to coordinate access, or use a queue to feed a single worker task commands to execute.



          Next, because an adb connection is entirely synchronous and, as with all I/O, relatively slow, I'd use a thread pool executor to run operations on a adb connection off the asyncio loop, so that asyncio is free to run some other tasks that are not currently blocked on I/O. Otherwise, there is no point to putting .Shell() commands in a async def coroutine, you are not actually cooperating and making room for other tasks to be run.



          Last but not least, if even with serialised access to the connection object you find that it can't take too many commands per time period, you would want to use some kind of rate limiting technique. I've created an asyncio leaky bucket algorithm implementation before that can take care of this, if so required.



          Both a queue or a lock would ensure that commands are executed in first-come-first-serve order, but a queue would require some kind of deferred response mechanism to return command results. A queue would let you queue up related commands (you can add multiple entries using queue.put_nowait() without yielding or you can allow grouped commands), without having to wait for a lock first.



          Because you want to retry connections, I'd encapsulate the connection object in a asynchronous context manager, that can then also handle locking and executing commands with an executor:



          import asyncio
          import collections
          from concurrent.futures import ThreadPoolExecutor
          from functools import partial

          try: # Python 3.7
          base = contextlib.AbstractAsyncContextManager
          except AttributeError:
          base = object # type: ignore

          _retry_exceptions = (...,) # define exceptions on which to retry commands?

          class asyncnullcontext(base):
          def __init__(self, enter_result=None):
          self.enter_result = enter_result
          async def __aenter__(self):
          return self.enter_result
          async def __aexit__(self, *excinfo):
          pass

          class AsyncADBConnection(base):
          def __init__(
          self,
          host,
          adbkey=None,
          rate_limit=None,
          max_retry=None,
          loop=None
          ):
          self._lock = asyncio.Lock(loop=loop)
          self._max_retry = max_retry
          self._loop = None
          self._connection = None
          self._executor = ThreadPoolExecutor()

          self._connect_kwargs = {
          "serial": host,
          "rsa_keys": [Signer(adbkey)] if adbkey else
          }

          if rate_limit is not None:
          # max commands per second
          self._limiter = AsyncLeakyBucket(rate_limit, 1, loop=loop)
          else:
          self._limiter = asyncnullcontext()

          async def __aenter__(self):
          await self._lock.acquire()
          await self._ensure_connection()
          return self

          async def __aexit__(self):
          self._lock.release()

          async def _ensure_connection(self):
          if self._connection is not None:
          return
          loop = self._loop or asyncio.get_running_loop()
          connector = partial(
          adb_commands.AdbCommands().ConnectDevice,
          **self._connect_kwargs
          )
          fut = loop.run_in_executor(pool, connector)
          self._connection = await fut

          async def shell(self, command):
          loop = self._loop or asyncio.get_running_loop()
          max_attempts = self._max_retry or 1
          attempts = 0
          while True:
          with self._limiter:
          try:
          fut = loop.run_in_executor(
          self._executor,
          self._connection.Shell,
          command
          )
          return await fut
          except _retry_exceptions as e:
          attempts += 1
          if attempts >= max_attempts:
          raise
          # re-connect on retry
          self._connection = None
          await self._ensure_connection()


          If you then use a queue, use Future() instances to communicate results.



          Pushing a job into the queue then becomes:



          fut = asyncio.Future()
          await queue.put((command, fut))
          result = await fut


          You could wrap that into a utility function or object. The await fut line only returns once the future has received a result. For commands where you don't care about a result, you only need to await if you want to make sure that the command completed.



          The consumer in the worker task that manages the connection would use:



          while True:
          command, fut = await self.queue.get():
          async with self.connection as conn:
          response = await conn.shell(command)
          fut.set_result(response)
          self.queue.task_done() # optional, only needed when joining the queue


          where self.connection is an AsyncADBConnection instance.






          share|improve this answer























          • Sounds like exactly what I need. But, could you help me get an idea of how I would do so by providing some code?
            – CaffeinatedCoder
            Nov 8 at 16:41






          • 1




            @CaffeinatedCoder: sure, but it'll be generic, not specific to your current codebase. It'll be easy enough to adapt.
            – Martijn Pieters
            Nov 8 at 16:42










          • That'll work, thank you so much!
            – CaffeinatedCoder
            Nov 8 at 17:24










          • @CaffeinatedCoder: it may be tomorrow before I have time, sorry.
            – Martijn Pieters
            Nov 8 at 19:33






          • 1




            @CaffeinatedCoder: I have no Android device, so my code is largely untested, but I've added a context manager that handles the connection and some more queue handling example code.
            – Martijn Pieters
            Nov 10 at 22:53















          up vote
          2
          down vote



          accepted










          You need to ensure that only a single task is using the adb connection to execute a command at any given time. This means you need to either use synchronisation primitives to coordinate access, or use a queue to feed a single worker task commands to execute.



          Next, because an adb connection is entirely synchronous and, as with all I/O, relatively slow, I'd use a thread pool executor to run operations on a adb connection off the asyncio loop, so that asyncio is free to run some other tasks that are not currently blocked on I/O. Otherwise, there is no point to putting .Shell() commands in a async def coroutine, you are not actually cooperating and making room for other tasks to be run.



          Last but not least, if even with serialised access to the connection object you find that it can't take too many commands per time period, you would want to use some kind of rate limiting technique. I've created an asyncio leaky bucket algorithm implementation before that can take care of this, if so required.



          Both a queue or a lock would ensure that commands are executed in first-come-first-serve order, but a queue would require some kind of deferred response mechanism to return command results. A queue would let you queue up related commands (you can add multiple entries using queue.put_nowait() without yielding or you can allow grouped commands), without having to wait for a lock first.



          Because you want to retry connections, I'd encapsulate the connection object in a asynchronous context manager, that can then also handle locking and executing commands with an executor:



          import asyncio
          import collections
          from concurrent.futures import ThreadPoolExecutor
          from functools import partial

          try: # Python 3.7
          base = contextlib.AbstractAsyncContextManager
          except AttributeError:
          base = object # type: ignore

          _retry_exceptions = (...,) # define exceptions on which to retry commands?

          class asyncnullcontext(base):
          def __init__(self, enter_result=None):
          self.enter_result = enter_result
          async def __aenter__(self):
          return self.enter_result
          async def __aexit__(self, *excinfo):
          pass

          class AsyncADBConnection(base):
          def __init__(
          self,
          host,
          adbkey=None,
          rate_limit=None,
          max_retry=None,
          loop=None
          ):
          self._lock = asyncio.Lock(loop=loop)
          self._max_retry = max_retry
          self._loop = None
          self._connection = None
          self._executor = ThreadPoolExecutor()

          self._connect_kwargs = {
          "serial": host,
          "rsa_keys": [Signer(adbkey)] if adbkey else
          }

          if rate_limit is not None:
          # max commands per second
          self._limiter = AsyncLeakyBucket(rate_limit, 1, loop=loop)
          else:
          self._limiter = asyncnullcontext()

          async def __aenter__(self):
          await self._lock.acquire()
          await self._ensure_connection()
          return self

          async def __aexit__(self):
          self._lock.release()

          async def _ensure_connection(self):
          if self._connection is not None:
          return
          loop = self._loop or asyncio.get_running_loop()
          connector = partial(
          adb_commands.AdbCommands().ConnectDevice,
          **self._connect_kwargs
          )
          fut = loop.run_in_executor(pool, connector)
          self._connection = await fut

          async def shell(self, command):
          loop = self._loop or asyncio.get_running_loop()
          max_attempts = self._max_retry or 1
          attempts = 0
          while True:
          with self._limiter:
          try:
          fut = loop.run_in_executor(
          self._executor,
          self._connection.Shell,
          command
          )
          return await fut
          except _retry_exceptions as e:
          attempts += 1
          if attempts >= max_attempts:
          raise
          # re-connect on retry
          self._connection = None
          await self._ensure_connection()


          If you then use a queue, use Future() instances to communicate results.



          Pushing a job into the queue then becomes:



          fut = asyncio.Future()
          await queue.put((command, fut))
          result = await fut


          You could wrap that into a utility function or object. The await fut line only returns once the future has received a result. For commands where you don't care about a result, you only need to await if you want to make sure that the command completed.



          The consumer in the worker task that manages the connection would use:



          while True:
          command, fut = await self.queue.get():
          async with self.connection as conn:
          response = await conn.shell(command)
          fut.set_result(response)
          self.queue.task_done() # optional, only needed when joining the queue


          where self.connection is an AsyncADBConnection instance.






          share|improve this answer























          • Sounds like exactly what I need. But, could you help me get an idea of how I would do so by providing some code?
            – CaffeinatedCoder
            Nov 8 at 16:41






          • 1




            @CaffeinatedCoder: sure, but it'll be generic, not specific to your current codebase. It'll be easy enough to adapt.
            – Martijn Pieters
            Nov 8 at 16:42










          • That'll work, thank you so much!
            – CaffeinatedCoder
            Nov 8 at 17:24










          • @CaffeinatedCoder: it may be tomorrow before I have time, sorry.
            – Martijn Pieters
            Nov 8 at 19:33






          • 1




            @CaffeinatedCoder: I have no Android device, so my code is largely untested, but I've added a context manager that handles the connection and some more queue handling example code.
            – Martijn Pieters
            Nov 10 at 22:53













          up vote
          2
          down vote



          accepted







          up vote
          2
          down vote



          accepted






          You need to ensure that only a single task is using the adb connection to execute a command at any given time. This means you need to either use synchronisation primitives to coordinate access, or use a queue to feed a single worker task commands to execute.



          Next, because an adb connection is entirely synchronous and, as with all I/O, relatively slow, I'd use a thread pool executor to run operations on a adb connection off the asyncio loop, so that asyncio is free to run some other tasks that are not currently blocked on I/O. Otherwise, there is no point to putting .Shell() commands in a async def coroutine, you are not actually cooperating and making room for other tasks to be run.



          Last but not least, if even with serialised access to the connection object you find that it can't take too many commands per time period, you would want to use some kind of rate limiting technique. I've created an asyncio leaky bucket algorithm implementation before that can take care of this, if so required.



          Both a queue or a lock would ensure that commands are executed in first-come-first-serve order, but a queue would require some kind of deferred response mechanism to return command results. A queue would let you queue up related commands (you can add multiple entries using queue.put_nowait() without yielding or you can allow grouped commands), without having to wait for a lock first.



          Because you want to retry connections, I'd encapsulate the connection object in a asynchronous context manager, that can then also handle locking and executing commands with an executor:



          import asyncio
          import collections
          from concurrent.futures import ThreadPoolExecutor
          from functools import partial

          try: # Python 3.7
          base = contextlib.AbstractAsyncContextManager
          except AttributeError:
          base = object # type: ignore

          _retry_exceptions = (...,) # define exceptions on which to retry commands?

          class asyncnullcontext(base):
          def __init__(self, enter_result=None):
          self.enter_result = enter_result
          async def __aenter__(self):
          return self.enter_result
          async def __aexit__(self, *excinfo):
          pass

          class AsyncADBConnection(base):
          def __init__(
          self,
          host,
          adbkey=None,
          rate_limit=None,
          max_retry=None,
          loop=None
          ):
          self._lock = asyncio.Lock(loop=loop)
          self._max_retry = max_retry
          self._loop = None
          self._connection = None
          self._executor = ThreadPoolExecutor()

          self._connect_kwargs = {
          "serial": host,
          "rsa_keys": [Signer(adbkey)] if adbkey else
          }

          if rate_limit is not None:
          # max commands per second
          self._limiter = AsyncLeakyBucket(rate_limit, 1, loop=loop)
          else:
          self._limiter = asyncnullcontext()

          async def __aenter__(self):
          await self._lock.acquire()
          await self._ensure_connection()
          return self

          async def __aexit__(self):
          self._lock.release()

          async def _ensure_connection(self):
          if self._connection is not None:
          return
          loop = self._loop or asyncio.get_running_loop()
          connector = partial(
          adb_commands.AdbCommands().ConnectDevice,
          **self._connect_kwargs
          )
          fut = loop.run_in_executor(pool, connector)
          self._connection = await fut

          async def shell(self, command):
          loop = self._loop or asyncio.get_running_loop()
          max_attempts = self._max_retry or 1
          attempts = 0
          while True:
          with self._limiter:
          try:
          fut = loop.run_in_executor(
          self._executor,
          self._connection.Shell,
          command
          )
          return await fut
          except _retry_exceptions as e:
          attempts += 1
          if attempts >= max_attempts:
          raise
          # re-connect on retry
          self._connection = None
          await self._ensure_connection()


          If you then use a queue, use Future() instances to communicate results.



          Pushing a job into the queue then becomes:



          fut = asyncio.Future()
          await queue.put((command, fut))
          result = await fut


          You could wrap that into a utility function or object. The await fut line only returns once the future has received a result. For commands where you don't care about a result, you only need to await if you want to make sure that the command completed.



          The consumer in the worker task that manages the connection would use:



          while True:
          command, fut = await self.queue.get():
          async with self.connection as conn:
          response = await conn.shell(command)
          fut.set_result(response)
          self.queue.task_done() # optional, only needed when joining the queue


          where self.connection is an AsyncADBConnection instance.






          share|improve this answer














          You need to ensure that only a single task is using the adb connection to execute a command at any given time. This means you need to either use synchronisation primitives to coordinate access, or use a queue to feed a single worker task commands to execute.



          Next, because an adb connection is entirely synchronous and, as with all I/O, relatively slow, I'd use a thread pool executor to run operations on a adb connection off the asyncio loop, so that asyncio is free to run some other tasks that are not currently blocked on I/O. Otherwise, there is no point to putting .Shell() commands in a async def coroutine, you are not actually cooperating and making room for other tasks to be run.



          Last but not least, if even with serialised access to the connection object you find that it can't take too many commands per time period, you would want to use some kind of rate limiting technique. I've created an asyncio leaky bucket algorithm implementation before that can take care of this, if so required.



          Both a queue or a lock would ensure that commands are executed in first-come-first-serve order, but a queue would require some kind of deferred response mechanism to return command results. A queue would let you queue up related commands (you can add multiple entries using queue.put_nowait() without yielding or you can allow grouped commands), without having to wait for a lock first.



          Because you want to retry connections, I'd encapsulate the connection object in a asynchronous context manager, that can then also handle locking and executing commands with an executor:



          import asyncio
          import collections
          from concurrent.futures import ThreadPoolExecutor
          from functools import partial

          try: # Python 3.7
          base = contextlib.AbstractAsyncContextManager
          except AttributeError:
          base = object # type: ignore

          _retry_exceptions = (...,) # define exceptions on which to retry commands?

          class asyncnullcontext(base):
          def __init__(self, enter_result=None):
          self.enter_result = enter_result
          async def __aenter__(self):
          return self.enter_result
          async def __aexit__(self, *excinfo):
          pass

          class AsyncADBConnection(base):
          def __init__(
          self,
          host,
          adbkey=None,
          rate_limit=None,
          max_retry=None,
          loop=None
          ):
          self._lock = asyncio.Lock(loop=loop)
          self._max_retry = max_retry
          self._loop = None
          self._connection = None
          self._executor = ThreadPoolExecutor()

          self._connect_kwargs = {
          "serial": host,
          "rsa_keys": [Signer(adbkey)] if adbkey else
          }

          if rate_limit is not None:
          # max commands per second
          self._limiter = AsyncLeakyBucket(rate_limit, 1, loop=loop)
          else:
          self._limiter = asyncnullcontext()

          async def __aenter__(self):
          await self._lock.acquire()
          await self._ensure_connection()
          return self

          async def __aexit__(self):
          self._lock.release()

          async def _ensure_connection(self):
          if self._connection is not None:
          return
          loop = self._loop or asyncio.get_running_loop()
          connector = partial(
          adb_commands.AdbCommands().ConnectDevice,
          **self._connect_kwargs
          )
          fut = loop.run_in_executor(pool, connector)
          self._connection = await fut

          async def shell(self, command):
          loop = self._loop or asyncio.get_running_loop()
          max_attempts = self._max_retry or 1
          attempts = 0
          while True:
          with self._limiter:
          try:
          fut = loop.run_in_executor(
          self._executor,
          self._connection.Shell,
          command
          )
          return await fut
          except _retry_exceptions as e:
          attempts += 1
          if attempts >= max_attempts:
          raise
          # re-connect on retry
          self._connection = None
          await self._ensure_connection()


          If you then use a queue, use Future() instances to communicate results.



          Pushing a job into the queue then becomes:



          fut = asyncio.Future()
          await queue.put((command, fut))
          result = await fut


          You could wrap that into a utility function or object. The await fut line only returns once the future has received a result. For commands where you don't care about a result, you only need to await if you want to make sure that the command completed.



          The consumer in the worker task that manages the connection would use:



          while True:
          command, fut = await self.queue.get():
          async with self.connection as conn:
          response = await conn.shell(command)
          fut.set_result(response)
          self.queue.task_done() # optional, only needed when joining the queue


          where self.connection is an AsyncADBConnection instance.







          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Nov 11 at 22:33

























          answered Nov 8 at 16:40









          Martijn Pieters

          690k12723842230




          690k12723842230












          • Sounds like exactly what I need. But, could you help me get an idea of how I would do so by providing some code?
            – CaffeinatedCoder
            Nov 8 at 16:41






          • 1




            @CaffeinatedCoder: sure, but it'll be generic, not specific to your current codebase. It'll be easy enough to adapt.
            – Martijn Pieters
            Nov 8 at 16:42










          • That'll work, thank you so much!
            – CaffeinatedCoder
            Nov 8 at 17:24










          • @CaffeinatedCoder: it may be tomorrow before I have time, sorry.
            – Martijn Pieters
            Nov 8 at 19:33






          • 1




            @CaffeinatedCoder: I have no Android device, so my code is largely untested, but I've added a context manager that handles the connection and some more queue handling example code.
            – Martijn Pieters
            Nov 10 at 22:53


















          • Sounds like exactly what I need. But, could you help me get an idea of how I would do so by providing some code?
            – CaffeinatedCoder
            Nov 8 at 16:41






          • 1




            @CaffeinatedCoder: sure, but it'll be generic, not specific to your current codebase. It'll be easy enough to adapt.
            – Martijn Pieters
            Nov 8 at 16:42










          • That'll work, thank you so much!
            – CaffeinatedCoder
            Nov 8 at 17:24










          • @CaffeinatedCoder: it may be tomorrow before I have time, sorry.
            – Martijn Pieters
            Nov 8 at 19:33






          • 1




            @CaffeinatedCoder: I have no Android device, so my code is largely untested, but I've added a context manager that handles the connection and some more queue handling example code.
            – Martijn Pieters
            Nov 10 at 22:53
















          Sounds like exactly what I need. But, could you help me get an idea of how I would do so by providing some code?
          – CaffeinatedCoder
          Nov 8 at 16:41




          Sounds like exactly what I need. But, could you help me get an idea of how I would do so by providing some code?
          – CaffeinatedCoder
          Nov 8 at 16:41




          1




          1




          @CaffeinatedCoder: sure, but it'll be generic, not specific to your current codebase. It'll be easy enough to adapt.
          – Martijn Pieters
          Nov 8 at 16:42




          @CaffeinatedCoder: sure, but it'll be generic, not specific to your current codebase. It'll be easy enough to adapt.
          – Martijn Pieters
          Nov 8 at 16:42












          That'll work, thank you so much!
          – CaffeinatedCoder
          Nov 8 at 17:24




          That'll work, thank you so much!
          – CaffeinatedCoder
          Nov 8 at 17:24












          @CaffeinatedCoder: it may be tomorrow before I have time, sorry.
          – Martijn Pieters
          Nov 8 at 19:33




          @CaffeinatedCoder: it may be tomorrow before I have time, sorry.
          – Martijn Pieters
          Nov 8 at 19:33




          1




          1




          @CaffeinatedCoder: I have no Android device, so my code is largely untested, but I've added a context manager that handles the connection and some more queue handling example code.
          – Martijn Pieters
          Nov 10 at 22:53




          @CaffeinatedCoder: I have no Android device, so my code is largely untested, but I've added a context manager that handles the connection and some more queue handling example code.
          – Martijn Pieters
          Nov 10 at 22:53


















           

          draft saved


          draft discarded



















































           


          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53212210%2fhow-to-ensure-all-commands-and-errors-handled-in-order-given%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Bressuire

          Vorschmack

          Quarantine