How to ensure all commands (and errors) handled in order given
up vote
2
down vote
favorite
TLDR; How do I make a "single-file" asyncio.Queue()
and feed it my adb commands, have them executed in the order they're received (one-by-one), handle errors that may occur (disconnect/reconnect) during one of the tasks, and continue processing the rest of the queue after handling the error?
I'm working on a module that leverages the existing python-adb module to ultimately control my android tablet as a media device and incorporate it into my home automation setup.
Problem:
My module is built entirely around async
, while the python-adb
module is not. The python-adb
module also doesn't manage/throttle requests. And I very quickly found out that if multiple adb commands are requested too quickly the adb connection is overloaded, causing an error & requiring a reconnect whenever the disconnect occurred.
A friend of mine managed to implement a workaround/hack-y solution. Note: self._adb_lock
& self._adb_error
are initially set in the AndroidDevice
class's __init__
function.
def adb_wrapper(func):
"""Wait if previous ADB commands haven't finished."""
@functools.wraps(func)
async def _adb_wrapper(self, *args, **kwargs):
attempts = 0
while self._adb_lock and attempts < 5:
attempts += 1
await asyncio.sleep(1)
if (attempts == 4 and self._adb_lock) or self._adb_error:
try:
await self.connect()
self._adb_error = False
except self._exceptions:
logging.error('Failed to re-establish the ADB connection; '
'will re-attempt in the next update.')
self._adb = None
self._adb_lock = False
self._adb_error = True
return
self._adb_lock = True
try:
returns = await func(self, *args, **kwargs)
except self._exceptions:
returns = None
logging.error('Failed to execute an ADB command; will attempt to '
're-establish the ADB connection in the next update')
self._adb = None
self._adb_error = True
finally:
self._adb_lock = False
return returns
return _adb_wrapper
With this workaround I placed the @adb_wrapper
decorator above all functions that make adb calls. However, this is terribly inefficient & on higher-end devices doesn't prevent overloading of the adb connection.
Enter asyncio
Let me start my stating I have very little experience working with asyncio
at this point; therefore, it's been touch to pick out which questions that were already posted would help me. So, my apologies if the answer is already present elsewhere. Also, in order to give people an idea of how my library is operating the codeblock will be a bit lengthy, but I only included a part of the file (a few functions to show how I'm ultimately interacting) and I tried to only include functions that connect to show the chain of commands.
My idea of a solution:
My goal is to be able to use asyncio
to queue all commands and have them sent one at a time and if at any point the command fails (which would cause adb to disconnect) I want to re-establish the adb connection and continue with the queue of commands.
Current Code Structure:
class AndroidTV:
""" Represents an Android TV device. """
def __init__(self, host, adbkey=''):
""" Initialize AndroidTV object.
:param host: Host in format <address>:port.
:param adbkey: The path to the "adbkey" file
"""
self.host = host
self.adbkey = adbkey
self._adb = None
self.state = STATE_UNKNOWN
self.muted = False
self.device = 'hdmi'
self.volume = 0.
self.app_id = None
self.package_launcher = None
self.package_settings = None
self._adb_error = False
self._adb_lock = False
self._exceptions = (TypeError, ValueError, AttributeError,
InvalidCommandError, InvalidResponseError,
InvalidChecksumError, BrokenPipeError)
@adb_wrapper
async def connect(self):
""" Connect to an Android TV device.
Will attempt to establish ADB connection to the given host.
Failure sets state to UNKNOWN and disables sending actions.
"""
try:
if self.adbkey:
signer = Signer(self.adbkey)
# Connect to the device
self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host, rsa_keys=[signer])
else:
self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host)
if not self.package_settings:
self._adb.Shell("am start -a android.settings.SETTINGS")
await asyncio.sleep(1)
logging.info("Getting Settings App Package")
self.package_settings = await self.current_app
if not self.package_launcher:
await self.home()
await asyncio.sleep(1)
logging.info("Getting Launcher App Package")
self.package_launcher = await self.current_app
except socket_error as serr:
logging.warning("Couldn't connect to host: %s, error: %s", self.host, serr.strerror)
@adb_wrapper
async def update(self):
""" Update the device status. """
# Check if device is disconnected.
if not self._adb:
self.state = STATE_UNKNOWN
self.app_id = None
# Check if device is off.
elif not await self._screen_on:
self.state = STATE_OFF
self.app_id = None
else:
self.app_id = await self.current_app
if await self._wake_lock:
self.state = STATE_PLAYING
elif self.app_id not in (self.package_launcher, self.package_settings):
# Check if state was playing on last update
if self.state == STATE_PLAYING:
self.state = STATE_PAUSED
elif self.state != STATE_PAUSED:
self.state = STATE_IDLE
else:
# We're on either the launcher or in settings
self.state = STATE_ON
# Get information from the audio status.
audio_output = await self._dump('audio')
stream_block = re.findall(BLOCK_REGEX, audio_output,
re.DOTALL | re.MULTILINE)[0]
self.muted = re.findall(MUTED_REGEX, stream_block,
re.DOTALL | re.MULTILINE)[0] == 'true'
@property
async def current_app(self):
filtered_dump = await self._dump("window windows", "mCurrentFocus")
current_focus = filtered_dump.replace("r", "")
matches = WINDOW_REGEX.search(current_focus)
if matches:
(pkg, activity) = matches.group('package', 'activity')
return pkg
else:
logging.warning("Couldn't get current app, reply was %s", current_focus)
return None
@property
async def _screen_on(self):
return await self._dump_has('power', 'Display Power', 'state=ON')
@property
async def _awake(self):
return await self._dump_has('power', 'mWakefulness', 'Awake')
@property
async def _wake_lock(self):
return not await self._dump_has('power', 'Locks', 'size=0')
@adb_wrapper
async def _input(self, cmd):
if not self._adb:
return
self._adb.Shell('input {0}'.format(cmd))
@adb_wrapper
async def _dump(self, service, grep=None):
if not self._adb:
return
if grep:
return self._adb.Shell('dumpsys {0} | grep "{1}"'.format(service, grep))
return self._adb.Shell('dumpsys {0}'.format(service))
async def _dump_has(self, service, grep, search):
dump_result = await self._dump(service, grep=grep)
return dump_result.strip().find(search) > -1
As I've stated before, the above method partially works, but is basically a band-aid.
The only commands that directly make adb.Shell
calls are
1. async def connect(self)
2. async def update(self)
3. async def _input(self, cmd)
4. async def _dump(self, service, grep=None)
5. async def _key(self, key)
The connect
& update
functions result in multiple adb.Shell
calls themselves, so this might be where my problem ultimately lies.
My (3-Part) Question:
1. How can I queue up all commands as they're received?
2. Execute them in the order they're received?
3. Handle errors at any point, reconnect, then continue executing the rest of the queue of commmands?
Here's my failed half-attempt at accomplishing this.
import asyncio
async def produce_output(queue, commands):
for command in commands:
#execute the adb command
if 'keypress' in command:
#command contains 'input keypress ENTER'
adb.Shell(command)
#mark the task done because there's nothing to process
queue.task_done()
else:
#command contains 'dumpsys audio'
output = adb.Shell(command)
#put result in queue
await queue.put(output)
async def process_adb(queue):
while True:
output = await queue.get()
#return output (somehow?)
queue.task_done()
async def update():
adb_queue = asyncio.Queue()
asyncio.create_task(produce_output(adb_queue,
[self._screen_on,
self.current_app,
self._wake_lock,
self._dump('audio')]))
#Not sure how to proceed
if not self._adb:
self.state = STATE_UNKNOWN
self.app_id = None
# Check if device is off.
# Fetching result of first item in the queue - self._screen_on
elif not await adb_queue.get():
self.state = STATE_OFF
self.app_id = None
else:
# Fetching result of second item in the queue - self.current_app
self.app_id = await adb_queue.get()
# Fetching result of third item in the queue - self._wake_lock
if await adb_queue.get():
self.state = STATE_PLAYING
elif self.app_id not in (self.package_launcher, self.package_settings):
# Check if state was playing on last update
if self.state == STATE_PLAYING:
self.state = STATE_PAUSED
elif self.state != STATE_PAUSED:
self.state = STATE_IDLE
else:
# We're on either the launcher or in settings
self.state = STATE_ON
# Get information from the audio status.
# Fetching result of fourth item in the queue - self._dump('audio')
audio_output = await adb_queue.get()
stream_block = re.findall(BLOCK_REGEX, audio_output,
re.DOTALL | re.MULTILINE)[0]
self.muted = re.findall(MUTED_REGEX, stream_block,
re.DOTALL | re.MULTILINE)[0] == 'true'
python-3.x async-await python-asyncio
add a comment |
up vote
2
down vote
favorite
TLDR; How do I make a "single-file" asyncio.Queue()
and feed it my adb commands, have them executed in the order they're received (one-by-one), handle errors that may occur (disconnect/reconnect) during one of the tasks, and continue processing the rest of the queue after handling the error?
I'm working on a module that leverages the existing python-adb module to ultimately control my android tablet as a media device and incorporate it into my home automation setup.
Problem:
My module is built entirely around async
, while the python-adb
module is not. The python-adb
module also doesn't manage/throttle requests. And I very quickly found out that if multiple adb commands are requested too quickly the adb connection is overloaded, causing an error & requiring a reconnect whenever the disconnect occurred.
A friend of mine managed to implement a workaround/hack-y solution. Note: self._adb_lock
& self._adb_error
are initially set in the AndroidDevice
class's __init__
function.
def adb_wrapper(func):
"""Wait if previous ADB commands haven't finished."""
@functools.wraps(func)
async def _adb_wrapper(self, *args, **kwargs):
attempts = 0
while self._adb_lock and attempts < 5:
attempts += 1
await asyncio.sleep(1)
if (attempts == 4 and self._adb_lock) or self._adb_error:
try:
await self.connect()
self._adb_error = False
except self._exceptions:
logging.error('Failed to re-establish the ADB connection; '
'will re-attempt in the next update.')
self._adb = None
self._adb_lock = False
self._adb_error = True
return
self._adb_lock = True
try:
returns = await func(self, *args, **kwargs)
except self._exceptions:
returns = None
logging.error('Failed to execute an ADB command; will attempt to '
're-establish the ADB connection in the next update')
self._adb = None
self._adb_error = True
finally:
self._adb_lock = False
return returns
return _adb_wrapper
With this workaround I placed the @adb_wrapper
decorator above all functions that make adb calls. However, this is terribly inefficient & on higher-end devices doesn't prevent overloading of the adb connection.
Enter asyncio
Let me start my stating I have very little experience working with asyncio
at this point; therefore, it's been touch to pick out which questions that were already posted would help me. So, my apologies if the answer is already present elsewhere. Also, in order to give people an idea of how my library is operating the codeblock will be a bit lengthy, but I only included a part of the file (a few functions to show how I'm ultimately interacting) and I tried to only include functions that connect to show the chain of commands.
My idea of a solution:
My goal is to be able to use asyncio
to queue all commands and have them sent one at a time and if at any point the command fails (which would cause adb to disconnect) I want to re-establish the adb connection and continue with the queue of commands.
Current Code Structure:
class AndroidTV:
""" Represents an Android TV device. """
def __init__(self, host, adbkey=''):
""" Initialize AndroidTV object.
:param host: Host in format <address>:port.
:param adbkey: The path to the "adbkey" file
"""
self.host = host
self.adbkey = adbkey
self._adb = None
self.state = STATE_UNKNOWN
self.muted = False
self.device = 'hdmi'
self.volume = 0.
self.app_id = None
self.package_launcher = None
self.package_settings = None
self._adb_error = False
self._adb_lock = False
self._exceptions = (TypeError, ValueError, AttributeError,
InvalidCommandError, InvalidResponseError,
InvalidChecksumError, BrokenPipeError)
@adb_wrapper
async def connect(self):
""" Connect to an Android TV device.
Will attempt to establish ADB connection to the given host.
Failure sets state to UNKNOWN and disables sending actions.
"""
try:
if self.adbkey:
signer = Signer(self.adbkey)
# Connect to the device
self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host, rsa_keys=[signer])
else:
self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host)
if not self.package_settings:
self._adb.Shell("am start -a android.settings.SETTINGS")
await asyncio.sleep(1)
logging.info("Getting Settings App Package")
self.package_settings = await self.current_app
if not self.package_launcher:
await self.home()
await asyncio.sleep(1)
logging.info("Getting Launcher App Package")
self.package_launcher = await self.current_app
except socket_error as serr:
logging.warning("Couldn't connect to host: %s, error: %s", self.host, serr.strerror)
@adb_wrapper
async def update(self):
""" Update the device status. """
# Check if device is disconnected.
if not self._adb:
self.state = STATE_UNKNOWN
self.app_id = None
# Check if device is off.
elif not await self._screen_on:
self.state = STATE_OFF
self.app_id = None
else:
self.app_id = await self.current_app
if await self._wake_lock:
self.state = STATE_PLAYING
elif self.app_id not in (self.package_launcher, self.package_settings):
# Check if state was playing on last update
if self.state == STATE_PLAYING:
self.state = STATE_PAUSED
elif self.state != STATE_PAUSED:
self.state = STATE_IDLE
else:
# We're on either the launcher or in settings
self.state = STATE_ON
# Get information from the audio status.
audio_output = await self._dump('audio')
stream_block = re.findall(BLOCK_REGEX, audio_output,
re.DOTALL | re.MULTILINE)[0]
self.muted = re.findall(MUTED_REGEX, stream_block,
re.DOTALL | re.MULTILINE)[0] == 'true'
@property
async def current_app(self):
filtered_dump = await self._dump("window windows", "mCurrentFocus")
current_focus = filtered_dump.replace("r", "")
matches = WINDOW_REGEX.search(current_focus)
if matches:
(pkg, activity) = matches.group('package', 'activity')
return pkg
else:
logging.warning("Couldn't get current app, reply was %s", current_focus)
return None
@property
async def _screen_on(self):
return await self._dump_has('power', 'Display Power', 'state=ON')
@property
async def _awake(self):
return await self._dump_has('power', 'mWakefulness', 'Awake')
@property
async def _wake_lock(self):
return not await self._dump_has('power', 'Locks', 'size=0')
@adb_wrapper
async def _input(self, cmd):
if not self._adb:
return
self._adb.Shell('input {0}'.format(cmd))
@adb_wrapper
async def _dump(self, service, grep=None):
if not self._adb:
return
if grep:
return self._adb.Shell('dumpsys {0} | grep "{1}"'.format(service, grep))
return self._adb.Shell('dumpsys {0}'.format(service))
async def _dump_has(self, service, grep, search):
dump_result = await self._dump(service, grep=grep)
return dump_result.strip().find(search) > -1
As I've stated before, the above method partially works, but is basically a band-aid.
The only commands that directly make adb.Shell
calls are
1. async def connect(self)
2. async def update(self)
3. async def _input(self, cmd)
4. async def _dump(self, service, grep=None)
5. async def _key(self, key)
The connect
& update
functions result in multiple adb.Shell
calls themselves, so this might be where my problem ultimately lies.
My (3-Part) Question:
1. How can I queue up all commands as they're received?
2. Execute them in the order they're received?
3. Handle errors at any point, reconnect, then continue executing the rest of the queue of commmands?
Here's my failed half-attempt at accomplishing this.
import asyncio
async def produce_output(queue, commands):
for command in commands:
#execute the adb command
if 'keypress' in command:
#command contains 'input keypress ENTER'
adb.Shell(command)
#mark the task done because there's nothing to process
queue.task_done()
else:
#command contains 'dumpsys audio'
output = adb.Shell(command)
#put result in queue
await queue.put(output)
async def process_adb(queue):
while True:
output = await queue.get()
#return output (somehow?)
queue.task_done()
async def update():
adb_queue = asyncio.Queue()
asyncio.create_task(produce_output(adb_queue,
[self._screen_on,
self.current_app,
self._wake_lock,
self._dump('audio')]))
#Not sure how to proceed
if not self._adb:
self.state = STATE_UNKNOWN
self.app_id = None
# Check if device is off.
# Fetching result of first item in the queue - self._screen_on
elif not await adb_queue.get():
self.state = STATE_OFF
self.app_id = None
else:
# Fetching result of second item in the queue - self.current_app
self.app_id = await adb_queue.get()
# Fetching result of third item in the queue - self._wake_lock
if await adb_queue.get():
self.state = STATE_PLAYING
elif self.app_id not in (self.package_launcher, self.package_settings):
# Check if state was playing on last update
if self.state == STATE_PLAYING:
self.state = STATE_PAUSED
elif self.state != STATE_PAUSED:
self.state = STATE_IDLE
else:
# We're on either the launcher or in settings
self.state = STATE_ON
# Get information from the audio status.
# Fetching result of fourth item in the queue - self._dump('audio')
audio_output = await adb_queue.get()
stream_block = re.findall(BLOCK_REGEX, audio_output,
re.DOTALL | re.MULTILINE)[0]
self.muted = re.findall(MUTED_REGEX, stream_block,
re.DOTALL | re.MULTILINE)[0] == 'true'
python-3.x async-await python-asyncio
add a comment |
up vote
2
down vote
favorite
up vote
2
down vote
favorite
TLDR; How do I make a "single-file" asyncio.Queue()
and feed it my adb commands, have them executed in the order they're received (one-by-one), handle errors that may occur (disconnect/reconnect) during one of the tasks, and continue processing the rest of the queue after handling the error?
I'm working on a module that leverages the existing python-adb module to ultimately control my android tablet as a media device and incorporate it into my home automation setup.
Problem:
My module is built entirely around async
, while the python-adb
module is not. The python-adb
module also doesn't manage/throttle requests. And I very quickly found out that if multiple adb commands are requested too quickly the adb connection is overloaded, causing an error & requiring a reconnect whenever the disconnect occurred.
A friend of mine managed to implement a workaround/hack-y solution. Note: self._adb_lock
& self._adb_error
are initially set in the AndroidDevice
class's __init__
function.
def adb_wrapper(func):
"""Wait if previous ADB commands haven't finished."""
@functools.wraps(func)
async def _adb_wrapper(self, *args, **kwargs):
attempts = 0
while self._adb_lock and attempts < 5:
attempts += 1
await asyncio.sleep(1)
if (attempts == 4 and self._adb_lock) or self._adb_error:
try:
await self.connect()
self._adb_error = False
except self._exceptions:
logging.error('Failed to re-establish the ADB connection; '
'will re-attempt in the next update.')
self._adb = None
self._adb_lock = False
self._adb_error = True
return
self._adb_lock = True
try:
returns = await func(self, *args, **kwargs)
except self._exceptions:
returns = None
logging.error('Failed to execute an ADB command; will attempt to '
're-establish the ADB connection in the next update')
self._adb = None
self._adb_error = True
finally:
self._adb_lock = False
return returns
return _adb_wrapper
With this workaround I placed the @adb_wrapper
decorator above all functions that make adb calls. However, this is terribly inefficient & on higher-end devices doesn't prevent overloading of the adb connection.
Enter asyncio
Let me start my stating I have very little experience working with asyncio
at this point; therefore, it's been touch to pick out which questions that were already posted would help me. So, my apologies if the answer is already present elsewhere. Also, in order to give people an idea of how my library is operating the codeblock will be a bit lengthy, but I only included a part of the file (a few functions to show how I'm ultimately interacting) and I tried to only include functions that connect to show the chain of commands.
My idea of a solution:
My goal is to be able to use asyncio
to queue all commands and have them sent one at a time and if at any point the command fails (which would cause adb to disconnect) I want to re-establish the adb connection and continue with the queue of commands.
Current Code Structure:
class AndroidTV:
""" Represents an Android TV device. """
def __init__(self, host, adbkey=''):
""" Initialize AndroidTV object.
:param host: Host in format <address>:port.
:param adbkey: The path to the "adbkey" file
"""
self.host = host
self.adbkey = adbkey
self._adb = None
self.state = STATE_UNKNOWN
self.muted = False
self.device = 'hdmi'
self.volume = 0.
self.app_id = None
self.package_launcher = None
self.package_settings = None
self._adb_error = False
self._adb_lock = False
self._exceptions = (TypeError, ValueError, AttributeError,
InvalidCommandError, InvalidResponseError,
InvalidChecksumError, BrokenPipeError)
@adb_wrapper
async def connect(self):
""" Connect to an Android TV device.
Will attempt to establish ADB connection to the given host.
Failure sets state to UNKNOWN and disables sending actions.
"""
try:
if self.adbkey:
signer = Signer(self.adbkey)
# Connect to the device
self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host, rsa_keys=[signer])
else:
self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host)
if not self.package_settings:
self._adb.Shell("am start -a android.settings.SETTINGS")
await asyncio.sleep(1)
logging.info("Getting Settings App Package")
self.package_settings = await self.current_app
if not self.package_launcher:
await self.home()
await asyncio.sleep(1)
logging.info("Getting Launcher App Package")
self.package_launcher = await self.current_app
except socket_error as serr:
logging.warning("Couldn't connect to host: %s, error: %s", self.host, serr.strerror)
@adb_wrapper
async def update(self):
""" Update the device status. """
# Check if device is disconnected.
if not self._adb:
self.state = STATE_UNKNOWN
self.app_id = None
# Check if device is off.
elif not await self._screen_on:
self.state = STATE_OFF
self.app_id = None
else:
self.app_id = await self.current_app
if await self._wake_lock:
self.state = STATE_PLAYING
elif self.app_id not in (self.package_launcher, self.package_settings):
# Check if state was playing on last update
if self.state == STATE_PLAYING:
self.state = STATE_PAUSED
elif self.state != STATE_PAUSED:
self.state = STATE_IDLE
else:
# We're on either the launcher or in settings
self.state = STATE_ON
# Get information from the audio status.
audio_output = await self._dump('audio')
stream_block = re.findall(BLOCK_REGEX, audio_output,
re.DOTALL | re.MULTILINE)[0]
self.muted = re.findall(MUTED_REGEX, stream_block,
re.DOTALL | re.MULTILINE)[0] == 'true'
@property
async def current_app(self):
filtered_dump = await self._dump("window windows", "mCurrentFocus")
current_focus = filtered_dump.replace("r", "")
matches = WINDOW_REGEX.search(current_focus)
if matches:
(pkg, activity) = matches.group('package', 'activity')
return pkg
else:
logging.warning("Couldn't get current app, reply was %s", current_focus)
return None
@property
async def _screen_on(self):
return await self._dump_has('power', 'Display Power', 'state=ON')
@property
async def _awake(self):
return await self._dump_has('power', 'mWakefulness', 'Awake')
@property
async def _wake_lock(self):
return not await self._dump_has('power', 'Locks', 'size=0')
@adb_wrapper
async def _input(self, cmd):
if not self._adb:
return
self._adb.Shell('input {0}'.format(cmd))
@adb_wrapper
async def _dump(self, service, grep=None):
if not self._adb:
return
if grep:
return self._adb.Shell('dumpsys {0} | grep "{1}"'.format(service, grep))
return self._adb.Shell('dumpsys {0}'.format(service))
async def _dump_has(self, service, grep, search):
dump_result = await self._dump(service, grep=grep)
return dump_result.strip().find(search) > -1
As I've stated before, the above method partially works, but is basically a band-aid.
The only commands that directly make adb.Shell
calls are
1. async def connect(self)
2. async def update(self)
3. async def _input(self, cmd)
4. async def _dump(self, service, grep=None)
5. async def _key(self, key)
The connect
& update
functions result in multiple adb.Shell
calls themselves, so this might be where my problem ultimately lies.
My (3-Part) Question:
1. How can I queue up all commands as they're received?
2. Execute them in the order they're received?
3. Handle errors at any point, reconnect, then continue executing the rest of the queue of commmands?
Here's my failed half-attempt at accomplishing this.
import asyncio
async def produce_output(queue, commands):
for command in commands:
#execute the adb command
if 'keypress' in command:
#command contains 'input keypress ENTER'
adb.Shell(command)
#mark the task done because there's nothing to process
queue.task_done()
else:
#command contains 'dumpsys audio'
output = adb.Shell(command)
#put result in queue
await queue.put(output)
async def process_adb(queue):
while True:
output = await queue.get()
#return output (somehow?)
queue.task_done()
async def update():
adb_queue = asyncio.Queue()
asyncio.create_task(produce_output(adb_queue,
[self._screen_on,
self.current_app,
self._wake_lock,
self._dump('audio')]))
#Not sure how to proceed
if not self._adb:
self.state = STATE_UNKNOWN
self.app_id = None
# Check if device is off.
# Fetching result of first item in the queue - self._screen_on
elif not await adb_queue.get():
self.state = STATE_OFF
self.app_id = None
else:
# Fetching result of second item in the queue - self.current_app
self.app_id = await adb_queue.get()
# Fetching result of third item in the queue - self._wake_lock
if await adb_queue.get():
self.state = STATE_PLAYING
elif self.app_id not in (self.package_launcher, self.package_settings):
# Check if state was playing on last update
if self.state == STATE_PLAYING:
self.state = STATE_PAUSED
elif self.state != STATE_PAUSED:
self.state = STATE_IDLE
else:
# We're on either the launcher or in settings
self.state = STATE_ON
# Get information from the audio status.
# Fetching result of fourth item in the queue - self._dump('audio')
audio_output = await adb_queue.get()
stream_block = re.findall(BLOCK_REGEX, audio_output,
re.DOTALL | re.MULTILINE)[0]
self.muted = re.findall(MUTED_REGEX, stream_block,
re.DOTALL | re.MULTILINE)[0] == 'true'
python-3.x async-await python-asyncio
TLDR; How do I make a "single-file" asyncio.Queue()
and feed it my adb commands, have them executed in the order they're received (one-by-one), handle errors that may occur (disconnect/reconnect) during one of the tasks, and continue processing the rest of the queue after handling the error?
I'm working on a module that leverages the existing python-adb module to ultimately control my android tablet as a media device and incorporate it into my home automation setup.
Problem:
My module is built entirely around async
, while the python-adb
module is not. The python-adb
module also doesn't manage/throttle requests. And I very quickly found out that if multiple adb commands are requested too quickly the adb connection is overloaded, causing an error & requiring a reconnect whenever the disconnect occurred.
A friend of mine managed to implement a workaround/hack-y solution. Note: self._adb_lock
& self._adb_error
are initially set in the AndroidDevice
class's __init__
function.
def adb_wrapper(func):
"""Wait if previous ADB commands haven't finished."""
@functools.wraps(func)
async def _adb_wrapper(self, *args, **kwargs):
attempts = 0
while self._adb_lock and attempts < 5:
attempts += 1
await asyncio.sleep(1)
if (attempts == 4 and self._adb_lock) or self._adb_error:
try:
await self.connect()
self._adb_error = False
except self._exceptions:
logging.error('Failed to re-establish the ADB connection; '
'will re-attempt in the next update.')
self._adb = None
self._adb_lock = False
self._adb_error = True
return
self._adb_lock = True
try:
returns = await func(self, *args, **kwargs)
except self._exceptions:
returns = None
logging.error('Failed to execute an ADB command; will attempt to '
're-establish the ADB connection in the next update')
self._adb = None
self._adb_error = True
finally:
self._adb_lock = False
return returns
return _adb_wrapper
With this workaround I placed the @adb_wrapper
decorator above all functions that make adb calls. However, this is terribly inefficient & on higher-end devices doesn't prevent overloading of the adb connection.
Enter asyncio
Let me start my stating I have very little experience working with asyncio
at this point; therefore, it's been touch to pick out which questions that were already posted would help me. So, my apologies if the answer is already present elsewhere. Also, in order to give people an idea of how my library is operating the codeblock will be a bit lengthy, but I only included a part of the file (a few functions to show how I'm ultimately interacting) and I tried to only include functions that connect to show the chain of commands.
My idea of a solution:
My goal is to be able to use asyncio
to queue all commands and have them sent one at a time and if at any point the command fails (which would cause adb to disconnect) I want to re-establish the adb connection and continue with the queue of commands.
Current Code Structure:
class AndroidTV:
""" Represents an Android TV device. """
def __init__(self, host, adbkey=''):
""" Initialize AndroidTV object.
:param host: Host in format <address>:port.
:param adbkey: The path to the "adbkey" file
"""
self.host = host
self.adbkey = adbkey
self._adb = None
self.state = STATE_UNKNOWN
self.muted = False
self.device = 'hdmi'
self.volume = 0.
self.app_id = None
self.package_launcher = None
self.package_settings = None
self._adb_error = False
self._adb_lock = False
self._exceptions = (TypeError, ValueError, AttributeError,
InvalidCommandError, InvalidResponseError,
InvalidChecksumError, BrokenPipeError)
@adb_wrapper
async def connect(self):
""" Connect to an Android TV device.
Will attempt to establish ADB connection to the given host.
Failure sets state to UNKNOWN and disables sending actions.
"""
try:
if self.adbkey:
signer = Signer(self.adbkey)
# Connect to the device
self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host, rsa_keys=[signer])
else:
self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host)
if not self.package_settings:
self._adb.Shell("am start -a android.settings.SETTINGS")
await asyncio.sleep(1)
logging.info("Getting Settings App Package")
self.package_settings = await self.current_app
if not self.package_launcher:
await self.home()
await asyncio.sleep(1)
logging.info("Getting Launcher App Package")
self.package_launcher = await self.current_app
except socket_error as serr:
logging.warning("Couldn't connect to host: %s, error: %s", self.host, serr.strerror)
@adb_wrapper
async def update(self):
""" Update the device status. """
# Check if device is disconnected.
if not self._adb:
self.state = STATE_UNKNOWN
self.app_id = None
# Check if device is off.
elif not await self._screen_on:
self.state = STATE_OFF
self.app_id = None
else:
self.app_id = await self.current_app
if await self._wake_lock:
self.state = STATE_PLAYING
elif self.app_id not in (self.package_launcher, self.package_settings):
# Check if state was playing on last update
if self.state == STATE_PLAYING:
self.state = STATE_PAUSED
elif self.state != STATE_PAUSED:
self.state = STATE_IDLE
else:
# We're on either the launcher or in settings
self.state = STATE_ON
# Get information from the audio status.
audio_output = await self._dump('audio')
stream_block = re.findall(BLOCK_REGEX, audio_output,
re.DOTALL | re.MULTILINE)[0]
self.muted = re.findall(MUTED_REGEX, stream_block,
re.DOTALL | re.MULTILINE)[0] == 'true'
@property
async def current_app(self):
filtered_dump = await self._dump("window windows", "mCurrentFocus")
current_focus = filtered_dump.replace("r", "")
matches = WINDOW_REGEX.search(current_focus)
if matches:
(pkg, activity) = matches.group('package', 'activity')
return pkg
else:
logging.warning("Couldn't get current app, reply was %s", current_focus)
return None
@property
async def _screen_on(self):
return await self._dump_has('power', 'Display Power', 'state=ON')
@property
async def _awake(self):
return await self._dump_has('power', 'mWakefulness', 'Awake')
@property
async def _wake_lock(self):
return not await self._dump_has('power', 'Locks', 'size=0')
@adb_wrapper
async def _input(self, cmd):
if not self._adb:
return
self._adb.Shell('input {0}'.format(cmd))
@adb_wrapper
async def _dump(self, service, grep=None):
if not self._adb:
return
if grep:
return self._adb.Shell('dumpsys {0} | grep "{1}"'.format(service, grep))
return self._adb.Shell('dumpsys {0}'.format(service))
async def _dump_has(self, service, grep, search):
dump_result = await self._dump(service, grep=grep)
return dump_result.strip().find(search) > -1
As I've stated before, the above method partially works, but is basically a band-aid.
The only commands that directly make adb.Shell
calls are
1. async def connect(self)
2. async def update(self)
3. async def _input(self, cmd)
4. async def _dump(self, service, grep=None)
5. async def _key(self, key)
The connect
& update
functions result in multiple adb.Shell
calls themselves, so this might be where my problem ultimately lies.
My (3-Part) Question:
1. How can I queue up all commands as they're received?
2. Execute them in the order they're received?
3. Handle errors at any point, reconnect, then continue executing the rest of the queue of commmands?
Here's my failed half-attempt at accomplishing this.
import asyncio
async def produce_output(queue, commands):
for command in commands:
#execute the adb command
if 'keypress' in command:
#command contains 'input keypress ENTER'
adb.Shell(command)
#mark the task done because there's nothing to process
queue.task_done()
else:
#command contains 'dumpsys audio'
output = adb.Shell(command)
#put result in queue
await queue.put(output)
async def process_adb(queue):
while True:
output = await queue.get()
#return output (somehow?)
queue.task_done()
async def update():
adb_queue = asyncio.Queue()
asyncio.create_task(produce_output(adb_queue,
[self._screen_on,
self.current_app,
self._wake_lock,
self._dump('audio')]))
#Not sure how to proceed
if not self._adb:
self.state = STATE_UNKNOWN
self.app_id = None
# Check if device is off.
# Fetching result of first item in the queue - self._screen_on
elif not await adb_queue.get():
self.state = STATE_OFF
self.app_id = None
else:
# Fetching result of second item in the queue - self.current_app
self.app_id = await adb_queue.get()
# Fetching result of third item in the queue - self._wake_lock
if await adb_queue.get():
self.state = STATE_PLAYING
elif self.app_id not in (self.package_launcher, self.package_settings):
# Check if state was playing on last update
if self.state == STATE_PLAYING:
self.state = STATE_PAUSED
elif self.state != STATE_PAUSED:
self.state = STATE_IDLE
else:
# We're on either the launcher or in settings
self.state = STATE_ON
# Get information from the audio status.
# Fetching result of fourth item in the queue - self._dump('audio')
audio_output = await adb_queue.get()
stream_block = re.findall(BLOCK_REGEX, audio_output,
re.DOTALL | re.MULTILINE)[0]
self.muted = re.findall(MUTED_REGEX, stream_block,
re.DOTALL | re.MULTILINE)[0] == 'true'
python-3.x async-await python-asyncio
python-3.x async-await python-asyncio
edited Nov 9 at 15:10
asked Nov 8 at 16:36
CaffeinatedCoder
65521227
65521227
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
up vote
2
down vote
accepted
You need to ensure that only a single task is using the adb
connection to execute a command at any given time. This means you need to either use synchronisation primitives to coordinate access, or use a queue to feed a single worker task commands to execute.
Next, because an adb
connection is entirely synchronous and, as with all I/O, relatively slow, I'd use a thread pool executor to run operations on a adb
connection off the asyncio loop, so that asyncio is free to run some other tasks that are not currently blocked on I/O. Otherwise, there is no point to putting .Shell()
commands in a async def
coroutine, you are not actually cooperating and making room for other tasks to be run.
Last but not least, if even with serialised access to the connection object you find that it can't take too many commands per time period, you would want to use some kind of rate limiting technique. I've created an asyncio leaky bucket algorithm implementation before that can take care of this, if so required.
Both a queue or a lock would ensure that commands are executed in first-come-first-serve order, but a queue would require some kind of deferred response mechanism to return command results. A queue would let you queue up related commands (you can add multiple entries using queue.put_nowait()
without yielding or you can allow grouped commands), without having to wait for a lock first.
Because you want to retry connections, I'd encapsulate the connection object in a asynchronous context manager, that can then also handle locking and executing commands with an executor:
import asyncio
import collections
from concurrent.futures import ThreadPoolExecutor
from functools import partial
try: # Python 3.7
base = contextlib.AbstractAsyncContextManager
except AttributeError:
base = object # type: ignore
_retry_exceptions = (...,) # define exceptions on which to retry commands?
class asyncnullcontext(base):
def __init__(self, enter_result=None):
self.enter_result = enter_result
async def __aenter__(self):
return self.enter_result
async def __aexit__(self, *excinfo):
pass
class AsyncADBConnection(base):
def __init__(
self,
host,
adbkey=None,
rate_limit=None,
max_retry=None,
loop=None
):
self._lock = asyncio.Lock(loop=loop)
self._max_retry = max_retry
self._loop = None
self._connection = None
self._executor = ThreadPoolExecutor()
self._connect_kwargs = {
"serial": host,
"rsa_keys": [Signer(adbkey)] if adbkey else
}
if rate_limit is not None:
# max commands per second
self._limiter = AsyncLeakyBucket(rate_limit, 1, loop=loop)
else:
self._limiter = asyncnullcontext()
async def __aenter__(self):
await self._lock.acquire()
await self._ensure_connection()
return self
async def __aexit__(self):
self._lock.release()
async def _ensure_connection(self):
if self._connection is not None:
return
loop = self._loop or asyncio.get_running_loop()
connector = partial(
adb_commands.AdbCommands().ConnectDevice,
**self._connect_kwargs
)
fut = loop.run_in_executor(pool, connector)
self._connection = await fut
async def shell(self, command):
loop = self._loop or asyncio.get_running_loop()
max_attempts = self._max_retry or 1
attempts = 0
while True:
with self._limiter:
try:
fut = loop.run_in_executor(
self._executor,
self._connection.Shell,
command
)
return await fut
except _retry_exceptions as e:
attempts += 1
if attempts >= max_attempts:
raise
# re-connect on retry
self._connection = None
await self._ensure_connection()
If you then use a queue, use Future()
instances to communicate results.
Pushing a job into the queue then becomes:
fut = asyncio.Future()
await queue.put((command, fut))
result = await fut
You could wrap that into a utility function or object. The await fut
line only returns once the future has received a result. For commands where you don't care about a result, you only need to await
if you want to make sure that the command completed.
The consumer in the worker task that manages the connection would use:
while True:
command, fut = await self.queue.get():
async with self.connection as conn:
response = await conn.shell(command)
fut.set_result(response)
self.queue.task_done() # optional, only needed when joining the queue
where self.connection
is an AsyncADBConnection
instance.
Sounds like exactly what I need. But, could you help me get an idea of how I would do so by providing some code?
– CaffeinatedCoder
Nov 8 at 16:41
1
@CaffeinatedCoder: sure, but it'll be generic, not specific to your current codebase. It'll be easy enough to adapt.
– Martijn Pieters♦
Nov 8 at 16:42
That'll work, thank you so much!
– CaffeinatedCoder
Nov 8 at 17:24
@CaffeinatedCoder: it may be tomorrow before I have time, sorry.
– Martijn Pieters♦
Nov 8 at 19:33
1
@CaffeinatedCoder: I have no Android device, so my code is largely untested, but I've added a context manager that handles the connection and some more queue handling example code.
– Martijn Pieters♦
Nov 10 at 22:53
|
show 7 more comments
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
2
down vote
accepted
You need to ensure that only a single task is using the adb
connection to execute a command at any given time. This means you need to either use synchronisation primitives to coordinate access, or use a queue to feed a single worker task commands to execute.
Next, because an adb
connection is entirely synchronous and, as with all I/O, relatively slow, I'd use a thread pool executor to run operations on a adb
connection off the asyncio loop, so that asyncio is free to run some other tasks that are not currently blocked on I/O. Otherwise, there is no point to putting .Shell()
commands in a async def
coroutine, you are not actually cooperating and making room for other tasks to be run.
Last but not least, if even with serialised access to the connection object you find that it can't take too many commands per time period, you would want to use some kind of rate limiting technique. I've created an asyncio leaky bucket algorithm implementation before that can take care of this, if so required.
Both a queue or a lock would ensure that commands are executed in first-come-first-serve order, but a queue would require some kind of deferred response mechanism to return command results. A queue would let you queue up related commands (you can add multiple entries using queue.put_nowait()
without yielding or you can allow grouped commands), without having to wait for a lock first.
Because you want to retry connections, I'd encapsulate the connection object in a asynchronous context manager, that can then also handle locking and executing commands with an executor:
import asyncio
import collections
from concurrent.futures import ThreadPoolExecutor
from functools import partial
try: # Python 3.7
base = contextlib.AbstractAsyncContextManager
except AttributeError:
base = object # type: ignore
_retry_exceptions = (...,) # define exceptions on which to retry commands?
class asyncnullcontext(base):
def __init__(self, enter_result=None):
self.enter_result = enter_result
async def __aenter__(self):
return self.enter_result
async def __aexit__(self, *excinfo):
pass
class AsyncADBConnection(base):
def __init__(
self,
host,
adbkey=None,
rate_limit=None,
max_retry=None,
loop=None
):
self._lock = asyncio.Lock(loop=loop)
self._max_retry = max_retry
self._loop = None
self._connection = None
self._executor = ThreadPoolExecutor()
self._connect_kwargs = {
"serial": host,
"rsa_keys": [Signer(adbkey)] if adbkey else
}
if rate_limit is not None:
# max commands per second
self._limiter = AsyncLeakyBucket(rate_limit, 1, loop=loop)
else:
self._limiter = asyncnullcontext()
async def __aenter__(self):
await self._lock.acquire()
await self._ensure_connection()
return self
async def __aexit__(self):
self._lock.release()
async def _ensure_connection(self):
if self._connection is not None:
return
loop = self._loop or asyncio.get_running_loop()
connector = partial(
adb_commands.AdbCommands().ConnectDevice,
**self._connect_kwargs
)
fut = loop.run_in_executor(pool, connector)
self._connection = await fut
async def shell(self, command):
loop = self._loop or asyncio.get_running_loop()
max_attempts = self._max_retry or 1
attempts = 0
while True:
with self._limiter:
try:
fut = loop.run_in_executor(
self._executor,
self._connection.Shell,
command
)
return await fut
except _retry_exceptions as e:
attempts += 1
if attempts >= max_attempts:
raise
# re-connect on retry
self._connection = None
await self._ensure_connection()
If you then use a queue, use Future()
instances to communicate results.
Pushing a job into the queue then becomes:
fut = asyncio.Future()
await queue.put((command, fut))
result = await fut
You could wrap that into a utility function or object. The await fut
line only returns once the future has received a result. For commands where you don't care about a result, you only need to await
if you want to make sure that the command completed.
The consumer in the worker task that manages the connection would use:
while True:
command, fut = await self.queue.get():
async with self.connection as conn:
response = await conn.shell(command)
fut.set_result(response)
self.queue.task_done() # optional, only needed when joining the queue
where self.connection
is an AsyncADBConnection
instance.
Sounds like exactly what I need. But, could you help me get an idea of how I would do so by providing some code?
– CaffeinatedCoder
Nov 8 at 16:41
1
@CaffeinatedCoder: sure, but it'll be generic, not specific to your current codebase. It'll be easy enough to adapt.
– Martijn Pieters♦
Nov 8 at 16:42
That'll work, thank you so much!
– CaffeinatedCoder
Nov 8 at 17:24
@CaffeinatedCoder: it may be tomorrow before I have time, sorry.
– Martijn Pieters♦
Nov 8 at 19:33
1
@CaffeinatedCoder: I have no Android device, so my code is largely untested, but I've added a context manager that handles the connection and some more queue handling example code.
– Martijn Pieters♦
Nov 10 at 22:53
|
show 7 more comments
up vote
2
down vote
accepted
You need to ensure that only a single task is using the adb
connection to execute a command at any given time. This means you need to either use synchronisation primitives to coordinate access, or use a queue to feed a single worker task commands to execute.
Next, because an adb
connection is entirely synchronous and, as with all I/O, relatively slow, I'd use a thread pool executor to run operations on a adb
connection off the asyncio loop, so that asyncio is free to run some other tasks that are not currently blocked on I/O. Otherwise, there is no point to putting .Shell()
commands in a async def
coroutine, you are not actually cooperating and making room for other tasks to be run.
Last but not least, if even with serialised access to the connection object you find that it can't take too many commands per time period, you would want to use some kind of rate limiting technique. I've created an asyncio leaky bucket algorithm implementation before that can take care of this, if so required.
Both a queue or a lock would ensure that commands are executed in first-come-first-serve order, but a queue would require some kind of deferred response mechanism to return command results. A queue would let you queue up related commands (you can add multiple entries using queue.put_nowait()
without yielding or you can allow grouped commands), without having to wait for a lock first.
Because you want to retry connections, I'd encapsulate the connection object in a asynchronous context manager, that can then also handle locking and executing commands with an executor:
import asyncio
import collections
from concurrent.futures import ThreadPoolExecutor
from functools import partial
try: # Python 3.7
base = contextlib.AbstractAsyncContextManager
except AttributeError:
base = object # type: ignore
_retry_exceptions = (...,) # define exceptions on which to retry commands?
class asyncnullcontext(base):
def __init__(self, enter_result=None):
self.enter_result = enter_result
async def __aenter__(self):
return self.enter_result
async def __aexit__(self, *excinfo):
pass
class AsyncADBConnection(base):
def __init__(
self,
host,
adbkey=None,
rate_limit=None,
max_retry=None,
loop=None
):
self._lock = asyncio.Lock(loop=loop)
self._max_retry = max_retry
self._loop = None
self._connection = None
self._executor = ThreadPoolExecutor()
self._connect_kwargs = {
"serial": host,
"rsa_keys": [Signer(adbkey)] if adbkey else
}
if rate_limit is not None:
# max commands per second
self._limiter = AsyncLeakyBucket(rate_limit, 1, loop=loop)
else:
self._limiter = asyncnullcontext()
async def __aenter__(self):
await self._lock.acquire()
await self._ensure_connection()
return self
async def __aexit__(self):
self._lock.release()
async def _ensure_connection(self):
if self._connection is not None:
return
loop = self._loop or asyncio.get_running_loop()
connector = partial(
adb_commands.AdbCommands().ConnectDevice,
**self._connect_kwargs
)
fut = loop.run_in_executor(pool, connector)
self._connection = await fut
async def shell(self, command):
loop = self._loop or asyncio.get_running_loop()
max_attempts = self._max_retry or 1
attempts = 0
while True:
with self._limiter:
try:
fut = loop.run_in_executor(
self._executor,
self._connection.Shell,
command
)
return await fut
except _retry_exceptions as e:
attempts += 1
if attempts >= max_attempts:
raise
# re-connect on retry
self._connection = None
await self._ensure_connection()
If you then use a queue, use Future()
instances to communicate results.
Pushing a job into the queue then becomes:
fut = asyncio.Future()
await queue.put((command, fut))
result = await fut
You could wrap that into a utility function or object. The await fut
line only returns once the future has received a result. For commands where you don't care about a result, you only need to await
if you want to make sure that the command completed.
The consumer in the worker task that manages the connection would use:
while True:
command, fut = await self.queue.get():
async with self.connection as conn:
response = await conn.shell(command)
fut.set_result(response)
self.queue.task_done() # optional, only needed when joining the queue
where self.connection
is an AsyncADBConnection
instance.
Sounds like exactly what I need. But, could you help me get an idea of how I would do so by providing some code?
– CaffeinatedCoder
Nov 8 at 16:41
1
@CaffeinatedCoder: sure, but it'll be generic, not specific to your current codebase. It'll be easy enough to adapt.
– Martijn Pieters♦
Nov 8 at 16:42
That'll work, thank you so much!
– CaffeinatedCoder
Nov 8 at 17:24
@CaffeinatedCoder: it may be tomorrow before I have time, sorry.
– Martijn Pieters♦
Nov 8 at 19:33
1
@CaffeinatedCoder: I have no Android device, so my code is largely untested, but I've added a context manager that handles the connection and some more queue handling example code.
– Martijn Pieters♦
Nov 10 at 22:53
|
show 7 more comments
up vote
2
down vote
accepted
up vote
2
down vote
accepted
You need to ensure that only a single task is using the adb
connection to execute a command at any given time. This means you need to either use synchronisation primitives to coordinate access, or use a queue to feed a single worker task commands to execute.
Next, because an adb
connection is entirely synchronous and, as with all I/O, relatively slow, I'd use a thread pool executor to run operations on a adb
connection off the asyncio loop, so that asyncio is free to run some other tasks that are not currently blocked on I/O. Otherwise, there is no point to putting .Shell()
commands in a async def
coroutine, you are not actually cooperating and making room for other tasks to be run.
Last but not least, if even with serialised access to the connection object you find that it can't take too many commands per time period, you would want to use some kind of rate limiting technique. I've created an asyncio leaky bucket algorithm implementation before that can take care of this, if so required.
Both a queue or a lock would ensure that commands are executed in first-come-first-serve order, but a queue would require some kind of deferred response mechanism to return command results. A queue would let you queue up related commands (you can add multiple entries using queue.put_nowait()
without yielding or you can allow grouped commands), without having to wait for a lock first.
Because you want to retry connections, I'd encapsulate the connection object in a asynchronous context manager, that can then also handle locking and executing commands with an executor:
import asyncio
import collections
from concurrent.futures import ThreadPoolExecutor
from functools import partial
try: # Python 3.7
base = contextlib.AbstractAsyncContextManager
except AttributeError:
base = object # type: ignore
_retry_exceptions = (...,) # define exceptions on which to retry commands?
class asyncnullcontext(base):
def __init__(self, enter_result=None):
self.enter_result = enter_result
async def __aenter__(self):
return self.enter_result
async def __aexit__(self, *excinfo):
pass
class AsyncADBConnection(base):
def __init__(
self,
host,
adbkey=None,
rate_limit=None,
max_retry=None,
loop=None
):
self._lock = asyncio.Lock(loop=loop)
self._max_retry = max_retry
self._loop = None
self._connection = None
self._executor = ThreadPoolExecutor()
self._connect_kwargs = {
"serial": host,
"rsa_keys": [Signer(adbkey)] if adbkey else
}
if rate_limit is not None:
# max commands per second
self._limiter = AsyncLeakyBucket(rate_limit, 1, loop=loop)
else:
self._limiter = asyncnullcontext()
async def __aenter__(self):
await self._lock.acquire()
await self._ensure_connection()
return self
async def __aexit__(self):
self._lock.release()
async def _ensure_connection(self):
if self._connection is not None:
return
loop = self._loop or asyncio.get_running_loop()
connector = partial(
adb_commands.AdbCommands().ConnectDevice,
**self._connect_kwargs
)
fut = loop.run_in_executor(pool, connector)
self._connection = await fut
async def shell(self, command):
loop = self._loop or asyncio.get_running_loop()
max_attempts = self._max_retry or 1
attempts = 0
while True:
with self._limiter:
try:
fut = loop.run_in_executor(
self._executor,
self._connection.Shell,
command
)
return await fut
except _retry_exceptions as e:
attempts += 1
if attempts >= max_attempts:
raise
# re-connect on retry
self._connection = None
await self._ensure_connection()
If you then use a queue, use Future()
instances to communicate results.
Pushing a job into the queue then becomes:
fut = asyncio.Future()
await queue.put((command, fut))
result = await fut
You could wrap that into a utility function or object. The await fut
line only returns once the future has received a result. For commands where you don't care about a result, you only need to await
if you want to make sure that the command completed.
The consumer in the worker task that manages the connection would use:
while True:
command, fut = await self.queue.get():
async with self.connection as conn:
response = await conn.shell(command)
fut.set_result(response)
self.queue.task_done() # optional, only needed when joining the queue
where self.connection
is an AsyncADBConnection
instance.
You need to ensure that only a single task is using the adb
connection to execute a command at any given time. This means you need to either use synchronisation primitives to coordinate access, or use a queue to feed a single worker task commands to execute.
Next, because an adb
connection is entirely synchronous and, as with all I/O, relatively slow, I'd use a thread pool executor to run operations on a adb
connection off the asyncio loop, so that asyncio is free to run some other tasks that are not currently blocked on I/O. Otherwise, there is no point to putting .Shell()
commands in a async def
coroutine, you are not actually cooperating and making room for other tasks to be run.
Last but not least, if even with serialised access to the connection object you find that it can't take too many commands per time period, you would want to use some kind of rate limiting technique. I've created an asyncio leaky bucket algorithm implementation before that can take care of this, if so required.
Both a queue or a lock would ensure that commands are executed in first-come-first-serve order, but a queue would require some kind of deferred response mechanism to return command results. A queue would let you queue up related commands (you can add multiple entries using queue.put_nowait()
without yielding or you can allow grouped commands), without having to wait for a lock first.
Because you want to retry connections, I'd encapsulate the connection object in a asynchronous context manager, that can then also handle locking and executing commands with an executor:
import asyncio
import collections
from concurrent.futures import ThreadPoolExecutor
from functools import partial
try: # Python 3.7
base = contextlib.AbstractAsyncContextManager
except AttributeError:
base = object # type: ignore
_retry_exceptions = (...,) # define exceptions on which to retry commands?
class asyncnullcontext(base):
def __init__(self, enter_result=None):
self.enter_result = enter_result
async def __aenter__(self):
return self.enter_result
async def __aexit__(self, *excinfo):
pass
class AsyncADBConnection(base):
def __init__(
self,
host,
adbkey=None,
rate_limit=None,
max_retry=None,
loop=None
):
self._lock = asyncio.Lock(loop=loop)
self._max_retry = max_retry
self._loop = None
self._connection = None
self._executor = ThreadPoolExecutor()
self._connect_kwargs = {
"serial": host,
"rsa_keys": [Signer(adbkey)] if adbkey else
}
if rate_limit is not None:
# max commands per second
self._limiter = AsyncLeakyBucket(rate_limit, 1, loop=loop)
else:
self._limiter = asyncnullcontext()
async def __aenter__(self):
await self._lock.acquire()
await self._ensure_connection()
return self
async def __aexit__(self):
self._lock.release()
async def _ensure_connection(self):
if self._connection is not None:
return
loop = self._loop or asyncio.get_running_loop()
connector = partial(
adb_commands.AdbCommands().ConnectDevice,
**self._connect_kwargs
)
fut = loop.run_in_executor(pool, connector)
self._connection = await fut
async def shell(self, command):
loop = self._loop or asyncio.get_running_loop()
max_attempts = self._max_retry or 1
attempts = 0
while True:
with self._limiter:
try:
fut = loop.run_in_executor(
self._executor,
self._connection.Shell,
command
)
return await fut
except _retry_exceptions as e:
attempts += 1
if attempts >= max_attempts:
raise
# re-connect on retry
self._connection = None
await self._ensure_connection()
If you then use a queue, use Future()
instances to communicate results.
Pushing a job into the queue then becomes:
fut = asyncio.Future()
await queue.put((command, fut))
result = await fut
You could wrap that into a utility function or object. The await fut
line only returns once the future has received a result. For commands where you don't care about a result, you only need to await
if you want to make sure that the command completed.
The consumer in the worker task that manages the connection would use:
while True:
command, fut = await self.queue.get():
async with self.connection as conn:
response = await conn.shell(command)
fut.set_result(response)
self.queue.task_done() # optional, only needed when joining the queue
where self.connection
is an AsyncADBConnection
instance.
edited Nov 11 at 22:33
answered Nov 8 at 16:40
Martijn Pieters♦
690k12723842230
690k12723842230
Sounds like exactly what I need. But, could you help me get an idea of how I would do so by providing some code?
– CaffeinatedCoder
Nov 8 at 16:41
1
@CaffeinatedCoder: sure, but it'll be generic, not specific to your current codebase. It'll be easy enough to adapt.
– Martijn Pieters♦
Nov 8 at 16:42
That'll work, thank you so much!
– CaffeinatedCoder
Nov 8 at 17:24
@CaffeinatedCoder: it may be tomorrow before I have time, sorry.
– Martijn Pieters♦
Nov 8 at 19:33
1
@CaffeinatedCoder: I have no Android device, so my code is largely untested, but I've added a context manager that handles the connection and some more queue handling example code.
– Martijn Pieters♦
Nov 10 at 22:53
|
show 7 more comments
Sounds like exactly what I need. But, could you help me get an idea of how I would do so by providing some code?
– CaffeinatedCoder
Nov 8 at 16:41
1
@CaffeinatedCoder: sure, but it'll be generic, not specific to your current codebase. It'll be easy enough to adapt.
– Martijn Pieters♦
Nov 8 at 16:42
That'll work, thank you so much!
– CaffeinatedCoder
Nov 8 at 17:24
@CaffeinatedCoder: it may be tomorrow before I have time, sorry.
– Martijn Pieters♦
Nov 8 at 19:33
1
@CaffeinatedCoder: I have no Android device, so my code is largely untested, but I've added a context manager that handles the connection and some more queue handling example code.
– Martijn Pieters♦
Nov 10 at 22:53
Sounds like exactly what I need. But, could you help me get an idea of how I would do so by providing some code?
– CaffeinatedCoder
Nov 8 at 16:41
Sounds like exactly what I need. But, could you help me get an idea of how I would do so by providing some code?
– CaffeinatedCoder
Nov 8 at 16:41
1
1
@CaffeinatedCoder: sure, but it'll be generic, not specific to your current codebase. It'll be easy enough to adapt.
– Martijn Pieters♦
Nov 8 at 16:42
@CaffeinatedCoder: sure, but it'll be generic, not specific to your current codebase. It'll be easy enough to adapt.
– Martijn Pieters♦
Nov 8 at 16:42
That'll work, thank you so much!
– CaffeinatedCoder
Nov 8 at 17:24
That'll work, thank you so much!
– CaffeinatedCoder
Nov 8 at 17:24
@CaffeinatedCoder: it may be tomorrow before I have time, sorry.
– Martijn Pieters♦
Nov 8 at 19:33
@CaffeinatedCoder: it may be tomorrow before I have time, sorry.
– Martijn Pieters♦
Nov 8 at 19:33
1
1
@CaffeinatedCoder: I have no Android device, so my code is largely untested, but I've added a context manager that handles the connection and some more queue handling example code.
– Martijn Pieters♦
Nov 10 at 22:53
@CaffeinatedCoder: I have no Android device, so my code is largely untested, but I've added a context manager that handles the connection and some more queue handling example code.
– Martijn Pieters♦
Nov 10 at 22:53
|
show 7 more comments
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53212210%2fhow-to-ensure-all-commands-and-errors-handled-in-order-given%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown