mirror of
https://github.com/erjiang/huion-keys.git
synced 2026-03-23 21:54:53 +03:00
Merge pull request #12 from NeoTheFox/threaded-polling
Threaded input implementation
This commit is contained in:
commit
b0f4cdc575
249
huion_keys.py
249
huion_keys.py
@ -1,8 +1,8 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
import signal
|
|
||||||
import argparse
|
import argparse
|
||||||
|
import threading
|
||||||
import configparser
|
import configparser
|
||||||
|
|
||||||
from _xdo_cffi import ffi, lib
|
from _xdo_cffi import ffi, lib
|
||||||
@ -17,12 +17,23 @@ TABLET_MODELS = {
|
|||||||
BUTTON_BINDINGS = {}
|
BUTTON_BINDINGS = {}
|
||||||
BUTTON_BINDINGS_HOLD = {}
|
BUTTON_BINDINGS_HOLD = {}
|
||||||
CYCLE_BUTTON = None
|
CYCLE_BUTTON = None
|
||||||
CYCLE_MODE = 1
|
|
||||||
CYCLE_MODES = 1
|
CYCLE_MODES = 1
|
||||||
DIAL_MODES = {}
|
DIAL_MODES = {}
|
||||||
|
|
||||||
|
BUTTON_BITS = {
|
||||||
|
0x01: 1,
|
||||||
|
0x02: 2,
|
||||||
|
0x04: 3,
|
||||||
|
0x08: 4,
|
||||||
|
0x10: 5,
|
||||||
|
0x20: 6,
|
||||||
|
0x40: 7,
|
||||||
|
0x80: 8,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
#Commandline arguments processing
|
# Commandline arguments processing
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
description='Linux utility to create custom key bindings for the Huion Kamvas Pro (2019), Inspiroy Q620M, and potentially other tablets.')
|
description='Linux utility to create custom key bindings for the Huion Kamvas Pro (2019), Inspiroy Q620M, and potentially other tablets.')
|
||||||
parser.add_argument('--rules', action='store_true', default=False,
|
parser.add_argument('--rules', action='store_true', default=False,
|
||||||
@ -41,8 +52,6 @@ def main():
|
|||||||
else:
|
else:
|
||||||
CONFIG_FILE_PATH = os.path.expanduser(args.config)
|
CONFIG_FILE_PATH = os.path.expanduser(args.config)
|
||||||
|
|
||||||
global CYCLE_MODES, CYCLE_MODE, CYCLE_BUTTON
|
|
||||||
xdo = lib.xdo_new(ffi.NULL)
|
|
||||||
if os.path.isfile(CONFIG_FILE_PATH):
|
if os.path.isfile(CONFIG_FILE_PATH):
|
||||||
read_config(CONFIG_FILE_PATH)
|
read_config(CONFIG_FILE_PATH)
|
||||||
else:
|
else:
|
||||||
@ -50,79 +59,162 @@ def main():
|
|||||||
create_default_config(CONFIG_FILE_PATH)
|
create_default_config(CONFIG_FILE_PATH)
|
||||||
print("Created an example config file at " + CONFIG_FILE_PATH)
|
print("Created an example config file at " + CONFIG_FILE_PATH)
|
||||||
return 1
|
return 1
|
||||||
signal.signal(signal.SIGUSR1, handle_reload_signal) # Reload the config if recieved SIGUSR1
|
|
||||||
prev_button = None
|
hidraw_paths = []
|
||||||
while True:
|
while True:
|
||||||
hidraw_path = None
|
# search for a known tablet devices
|
||||||
# search for a known tablet device
|
|
||||||
for device_name, device_id in TABLET_MODELS.items():
|
for device_name, device_id in TABLET_MODELS.items():
|
||||||
hidraw_path = get_tablet_hidraw(device_id)
|
hidraw_path = get_tablet_hidraw(device_id)
|
||||||
if hidraw_path is not None:
|
if hidraw_path is not None:
|
||||||
print("Found %s at %s" % (device_name, hidraw_path))
|
print("Found %s at %s" % (device_name, hidraw_path))
|
||||||
break
|
hidraw_paths = hidraw_paths + hidraw_path
|
||||||
if hidraw_path is None:
|
if not hidraw_paths:
|
||||||
print("Could not find tablet hidraw device")
|
print("Could not find any tablet hidraw devices")
|
||||||
time.sleep(2)
|
time.sleep(3)
|
||||||
continue
|
continue
|
||||||
try:
|
elif hidraw_paths:
|
||||||
hidraw = open(hidraw_path, 'rb')
|
threads = []
|
||||||
except PermissionError as e:
|
for hidraw_path in hidraw_paths:
|
||||||
print(e)
|
thread = PollThread(hidraw_path)
|
||||||
print("Trying again in 5 seconds...")
|
# Do not let the threads to continue if main script is terminated
|
||||||
time.sleep(5)
|
thread.daemon = True
|
||||||
|
threads.append(thread)
|
||||||
|
thread.start()
|
||||||
|
# TODO: Maybe should be reworked for the edge case of having more than one tablet connected at the same time.
|
||||||
|
for thread in threads:
|
||||||
|
thread.join()
|
||||||
|
hidraw_paths.clear()
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
||||||
|
class PollThread(threading.Thread):
|
||||||
|
|
||||||
|
cycle_mode = None
|
||||||
|
scroll_state = None
|
||||||
|
hidraw_path = None
|
||||||
|
xdo = None
|
||||||
|
|
||||||
|
def __init__(self, hidraw_path):
|
||||||
|
super(PollThread, self).__init__()
|
||||||
|
self.xdo = lib.xdo_new(ffi.NULL)
|
||||||
|
self.hidraw_path = hidraw_path
|
||||||
|
self.cycle_mode = 1
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
global BUTTON_BINDINGS, BUTTON_BINDINGS_HOLD, CYCLE_MODES, CYCLE_BUTTON, DIAL_MODES
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
btn = get_button_press(hidraw)
|
hidraw = open(self.hidraw_path, 'rb')
|
||||||
|
break
|
||||||
|
except PermissionError as e:
|
||||||
|
print(e)
|
||||||
|
print("Trying again in 5 seconds...")
|
||||||
|
time.sleep(5)
|
||||||
|
continue
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
btn = self.get_button_press(hidraw)
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
print("Lost connection with the tablet - searching for tablet...")
|
print("%s lost connection with the tablet..." % (self.name,))
|
||||||
time.sleep(3)
|
|
||||||
break
|
break
|
||||||
print("Got button %s" % (btn,))
|
print("Got button %s" % (btn,))
|
||||||
if btn == CYCLE_BUTTON and CYCLE_BUTTON is not None:
|
if btn == CYCLE_BUTTON and CYCLE_BUTTON is not None:
|
||||||
CYCLE_MODE = CYCLE_MODE + 1
|
self.cycle_mode = self.cycle_mode + 1
|
||||||
if CYCLE_MODE > CYCLE_MODES:
|
if self.cycle_mode > CYCLE_MODES:
|
||||||
CYCLE_MODE = 1
|
self.cycle_mode = 1
|
||||||
print("Cycling to mode %s" % (CYCLE_MODE,))
|
print("Cycling to mode %s" % (self.cycle_mode,))
|
||||||
elif CYCLE_MODE in DIAL_MODES and btn in DIAL_MODES[CYCLE_MODE]:
|
elif self.cycle_mode in DIAL_MODES and btn in DIAL_MODES[self.cycle_mode]:
|
||||||
print("Sending %s from Mode %d" % (DIAL_MODES[CYCLE_MODE][btn], CYCLE_MODE),)
|
print("Sending %s from Mode %d" % (DIAL_MODES[self.cycle_mode][btn], self.cycle_mode),)
|
||||||
lib.xdo_send_keysequence_window(
|
lib.xdo_send_keysequence_window(
|
||||||
xdo, lib.CURRENTWINDOW, DIAL_MODES[CYCLE_MODE][btn], 1000)
|
self.xdo, lib.CURRENTWINDOW, DIAL_MODES[self.cycle_mode][btn], 1000)
|
||||||
elif btn in BUTTON_BINDINGS_HOLD:
|
elif btn in BUTTON_BINDINGS_HOLD:
|
||||||
print("Pressing %s" % (BUTTON_BINDINGS_HOLD[btn],))
|
print("Pressing %s" % (BUTTON_BINDINGS_HOLD[btn],))
|
||||||
lib.xdo_send_keysequence_window_down(xdo, lib.CURRENTWINDOW, BUTTON_BINDINGS_HOLD[btn], 12000)
|
lib.xdo_send_keysequence_window_down(self.xdo, lib.CURRENTWINDOW, BUTTON_BINDINGS_HOLD[btn], 12000)
|
||||||
get_button_release(hidraw)
|
self.get_button_release(hidraw)
|
||||||
print("Releasing %s" % (BUTTON_BINDINGS_HOLD[btn],))
|
print("Releasing %s" % (BUTTON_BINDINGS_HOLD[btn],))
|
||||||
lib.xdo_send_keysequence_window_up(xdo, lib.CURRENTWINDOW, BUTTON_BINDINGS_HOLD[btn], 12000)
|
lib.xdo_send_keysequence_window_up(self.xdo, lib.CURRENTWINDOW, BUTTON_BINDINGS_HOLD[btn], 12000)
|
||||||
elif btn in BUTTON_BINDINGS:
|
elif btn in BUTTON_BINDINGS:
|
||||||
print("Sending %s" % (BUTTON_BINDINGS[btn],))
|
print("Sending %s" % (BUTTON_BINDINGS[btn],))
|
||||||
lib.xdo_send_keysequence_window(
|
lib.xdo_send_keysequence_window(
|
||||||
xdo, lib.CURRENTWINDOW, BUTTON_BINDINGS[btn], 1000)
|
self.xdo, lib.CURRENTWINDOW, BUTTON_BINDINGS[btn], 1000)
|
||||||
|
|
||||||
|
def get_button_press(self, hidraw):
|
||||||
|
while True:
|
||||||
|
sequence = hidraw.read(12)
|
||||||
|
# 0xf7 is what my Kamvas Pro 22 reads
|
||||||
|
# another model seems to send 0x08
|
||||||
|
# Q620M reads as 0xf9
|
||||||
|
if sequence[0] != 0xf7 and sequence[0] != 0x08 and sequence[0] != 0xf9:
|
||||||
|
pass
|
||||||
|
if sequence[1] == 0xe0: # buttons
|
||||||
|
# doesn't seem like the tablet will let you push two buttons at once
|
||||||
|
if sequence[4] > 0:
|
||||||
|
return BUTTON_BITS[sequence[4]]
|
||||||
|
elif sequence[5] > 0:
|
||||||
|
# right-side buttons are 8-15, so add 8
|
||||||
|
return BUTTON_BITS[sequence[5]] + 8
|
||||||
|
else:
|
||||||
|
# must be button release (all zeros)
|
||||||
|
continue
|
||||||
|
elif sequence[1] == 0xf0: # scroll strip
|
||||||
|
scroll_pos = sequence[5]
|
||||||
|
if scroll_pos == 0:
|
||||||
|
# reset scroll state after lifting finger off scroll strip
|
||||||
|
self.scroll_state = None
|
||||||
|
elif self.scroll_state is not None:
|
||||||
|
# scroll strip is numbered from top to bottom so a greater new
|
||||||
|
# value means they scrolled down
|
||||||
|
if scroll_pos > self.scroll_state:
|
||||||
|
self.scroll_state = scroll_pos
|
||||||
|
return 'scroll_down'
|
||||||
|
elif scroll_pos < self.scroll_state:
|
||||||
|
self.scroll_state = scroll_pos
|
||||||
|
return 'scroll_up'
|
||||||
|
else:
|
||||||
|
self.scroll_state = scroll_pos
|
||||||
|
continue
|
||||||
|
elif sequence[1] == 0xf1: # dial on Q620M, practically 2 buttons
|
||||||
|
if sequence[5] == 0x1:
|
||||||
|
return 'dial_cw'
|
||||||
|
elif sequence[5] == 0xff:
|
||||||
|
return 'dial_ccw'
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
|
||||||
|
def get_button_release(self, hidraw):
|
||||||
|
while True:
|
||||||
|
sequence = hidraw.read(12)
|
||||||
|
if sequence[1] == 0xe0 and sequence[4] == 0 and sequence[5] == 0:
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
def get_tablet_hidraw(device_id):
|
def get_tablet_hidraw(device_id):
|
||||||
"""Finds the /dev/hidrawX file that belongs to the given device ID (in xxxx:xxxx format)."""
|
"""Finds the /dev/hidrawX file or files that belong to the given device ID (in xxxx:xxxx format)."""
|
||||||
# TODO: is this too fragile?
|
# TODO: is this too fragile?
|
||||||
hidraws = os.listdir('/sys/class/hidraw')
|
hidraws = os.listdir('/sys/class/hidraw')
|
||||||
|
inputs = []
|
||||||
for h in hidraws:
|
for h in hidraws:
|
||||||
device_path = os.readlink(os.path.join('/sys/class/hidraw', h, 'device'))
|
device_path = os.readlink(os.path.join('/sys/class/hidraw', h, 'device'))
|
||||||
if device_id.upper() in device_path:
|
if device_id.upper() in device_path:
|
||||||
# need to confirm that there's "input" because there are two hidraw
|
# need to confirm that there's "input" because there are two or more hidraw
|
||||||
# files listed for the tablet, but only one of them carries the
|
# files listed for the tablet, but only few of them carry the
|
||||||
# mouse/keyboard input
|
# mouse/keyboard input
|
||||||
if os.path.exists(os.path.join('/sys/class/hidraw', h, 'device/input')):
|
if os.path.exists(os.path.join('/sys/class/hidraw', h, 'device/input')):
|
||||||
return os.path.join('/dev', os.path.basename(h))
|
inputs.append(os.path.join('/dev', os.path.basename(h)))
|
||||||
|
if inputs:
|
||||||
|
return inputs
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def read_config(config_file):
|
def read_config(config_file):
|
||||||
global CYCLE_MODES, CYCLE_MODE, CYCLE_BUTTON
|
global CYCLE_MODES, CYCLE_BUTTON, BUTTON_BINDINGS, BUTTON_BINDINGS_HOLD
|
||||||
CONFIG = configparser.ConfigParser()
|
CONFIG = configparser.ConfigParser()
|
||||||
CONFIG.read(config_file)
|
CONFIG.read(config_file)
|
||||||
# It is still better for performance to pre-encode these values
|
# It is still better for performance to pre-encode these values
|
||||||
for binding in CONFIG['Bindings']:
|
for binding in CONFIG['Bindings']:
|
||||||
if binding.isdigit():
|
if binding.isdigit():
|
||||||
# store button configs with their 1-indexed ID
|
# store button configs with their 1-indexed ID
|
||||||
BUTTON_BINDINGS[int(binding)] = CONFIG['Bindings'][binding].encode('utf-8')
|
BUTTON_BINDINGS[int(binding)] = CONFIG['Bindings'][binding].encode('utf-8')
|
||||||
elif binding == 'scroll_up':
|
elif binding == 'scroll_up':
|
||||||
BUTTON_BINDINGS['scroll_up'] = CONFIG['Bindings'][binding].encode('utf-8')
|
BUTTON_BINDINGS['scroll_up'] = CONFIG['Bindings'][binding].encode('utf-8')
|
||||||
@ -133,10 +225,10 @@ def read_config(config_file):
|
|||||||
elif binding == 'dial_ccw':
|
elif binding == 'dial_ccw':
|
||||||
BUTTON_BINDINGS['dial_ccw'] = CONFIG['Bindings'][binding].encode('utf-8')
|
BUTTON_BINDINGS['dial_ccw'] = CONFIG['Bindings'][binding].encode('utf-8')
|
||||||
elif binding == '':
|
elif binding == '':
|
||||||
continue # ignore empty line
|
continue # ignore empty line
|
||||||
else:
|
else:
|
||||||
print("[WARN] unrecognized regular binding '%s'" % (binding,))
|
print("[WARN] unrecognized regular binding '%s'" % (binding,))
|
||||||
#Same, but for buttons that should be held down
|
# Same, but for buttons that should be held down
|
||||||
if 'Hold' in CONFIG:
|
if 'Hold' in CONFIG:
|
||||||
for binding in CONFIG['Hold']:
|
for binding in CONFIG['Hold']:
|
||||||
if binding.isdigit():
|
if binding.isdigit():
|
||||||
@ -144,14 +236,14 @@ def read_config(config_file):
|
|||||||
elif binding == '':
|
elif binding == '':
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
print ("[WARN] unrecognized hold binding '%s'" % (binding,))
|
print("[WARN] unrecognized hold binding '%s'" % (binding,))
|
||||||
# Assume that if cycle is assigned we have modes for now
|
# Assume that if cycle is assigned we have modes for now
|
||||||
if 'Dial' in CONFIG:
|
if 'Dial' in CONFIG:
|
||||||
CYCLE_BUTTON = int(CONFIG['Dial']['cycle'])
|
CYCLE_BUTTON = int(CONFIG['Dial']['cycle'])
|
||||||
for key in CONFIG:
|
for key in CONFIG:
|
||||||
if key.startswith("Mode"):
|
if key.startswith("Mode"):
|
||||||
# Count the modes
|
# Count the modes
|
||||||
mode = int(key.split(' ')[1])
|
mode = int(key.split(' ')[1])
|
||||||
if mode > CYCLE_MODES:
|
if mode > CYCLE_MODES:
|
||||||
CYCLE_MODES = mode
|
CYCLE_MODES = mode
|
||||||
DIAL_MODES[mode] = {}
|
DIAL_MODES[mode] = {}
|
||||||
@ -159,16 +251,13 @@ def read_config(config_file):
|
|||||||
DIAL_MODES[mode][binding] = CONFIG[key][binding].encode('utf-8')
|
DIAL_MODES[mode][binding] = CONFIG[key][binding].encode('utf-8')
|
||||||
|
|
||||||
|
|
||||||
def handle_reload_signal(signum, frame):
|
|
||||||
print("SIGUSR1 recieved - reloading config..")
|
|
||||||
read_config(CONFIG_FILE_PATH)
|
|
||||||
|
|
||||||
def make_rules():
|
def make_rules():
|
||||||
for device_name, device_id in TABLET_MODELS.items():
|
for device_name, device_id in TABLET_MODELS.items():
|
||||||
print("# %s" % (device_name, ))
|
print("# %s" % (device_name, ))
|
||||||
VID, PID = device_id.split(':')
|
VID, PID = device_id.split(':')
|
||||||
print('KERNEL=="hidraw*", ATTRS{idVendor}=="%s", ATTRS{idProduct}=="%s", MODE="0660", TAG+="uaccess"' % (VID, PID, ))
|
print('KERNEL=="hidraw*", ATTRS{idVendor}=="%s", ATTRS{idProduct}=="%s", MODE="0660", TAG+="uaccess"' % (VID, PID, ))
|
||||||
|
|
||||||
|
|
||||||
def create_default_config(config_file):
|
def create_default_config(config_file):
|
||||||
with open(config_file, 'w') as config:
|
with open(config_file, 'w') as config:
|
||||||
config.write("""
|
config.write("""
|
||||||
@ -199,69 +288,5 @@ dial_ccw=equal
|
|||||||
""")
|
""")
|
||||||
|
|
||||||
|
|
||||||
BUTTON_BITS = {
|
|
||||||
0x01: 1,
|
|
||||||
0x02: 2,
|
|
||||||
0x04: 3,
|
|
||||||
0x08: 4,
|
|
||||||
0x10: 5,
|
|
||||||
0x20: 6,
|
|
||||||
0x40: 7,
|
|
||||||
0x80: 8,
|
|
||||||
}
|
|
||||||
|
|
||||||
SCROLL_STATE=None
|
|
||||||
|
|
||||||
def get_button_press(hidraw):
|
|
||||||
global SCROLL_STATE
|
|
||||||
while True:
|
|
||||||
sequence = hidraw.read(12)
|
|
||||||
# 0xf7 is what my Kamvas Pro 22 reads
|
|
||||||
# another model seems to send 0x08
|
|
||||||
# Q620M reads as 0xf9
|
|
||||||
if sequence[0] != 0xf7 and sequence[0] != 0x08 and sequence[0] != 0xf9:
|
|
||||||
pass
|
|
||||||
if sequence[1] == 0xe0: # buttons
|
|
||||||
# doesn't seem like the tablet will let you push two buttons at once
|
|
||||||
if sequence[4] > 0:
|
|
||||||
return BUTTON_BITS[sequence[4]]
|
|
||||||
elif sequence[5] > 0:
|
|
||||||
# right-side buttons are 8-15, so add 8
|
|
||||||
return BUTTON_BITS[sequence[5]] + 8
|
|
||||||
else:
|
|
||||||
# must be button release (all zeros)
|
|
||||||
continue
|
|
||||||
elif sequence[1] == 0xf0: # scroll strip
|
|
||||||
scroll_pos = sequence[5]
|
|
||||||
if scroll_pos == 0:
|
|
||||||
# reset scroll state after lifting finger off scroll strip
|
|
||||||
SCROLL_STATE = None
|
|
||||||
elif SCROLL_STATE is not None:
|
|
||||||
# scroll strip is numbered from top to bottom so a greater new
|
|
||||||
# value means they scrolled down
|
|
||||||
if scroll_pos > SCROLL_STATE:
|
|
||||||
SCROLL_STATE = scroll_pos
|
|
||||||
return 'scroll_down'
|
|
||||||
elif scroll_pos < SCROLL_STATE:
|
|
||||||
SCROLL_STATE = scroll_pos
|
|
||||||
return 'scroll_up'
|
|
||||||
else:
|
|
||||||
SCROLL_STATE = scroll_pos
|
|
||||||
continue
|
|
||||||
elif sequence[1] == 0xf1: # dial on Q620M, practically 2 buttons
|
|
||||||
if sequence[5] == 0x1:
|
|
||||||
return 'dial_cw'
|
|
||||||
elif sequence[5] == 0xff:
|
|
||||||
return 'dial_ccw'
|
|
||||||
else:
|
|
||||||
continue
|
|
||||||
|
|
||||||
def get_button_release(hidraw):
|
|
||||||
while True:
|
|
||||||
sequence = hidraw.read(12)
|
|
||||||
if sequence[1] == 0xe0 and sequence[4] == 0 and sequence[5] == 0:
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user