-
Notifications
You must be signed in to change notification settings - Fork 1
/
bt_gattlib.py
147 lines (128 loc) · 4.79 KB
/
bt_gattlib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import gattlib.adapter
import gattlib.device
from gi.repository import GLib
from uuid import UUID
from threading import Semaphore, Thread
import atexit
def interfaces():
return [_interface]
class Interface:
def __init__(self, mac=None):
self._adapter = gattlib.adapter.Adapter()
self._devices = {}
self._addrs = []
self._adapter.open()
self._glibloop = None
def _pump(self):
if self._glibloop == None:
self._glibloop = GLib.MainLoop()
self._pumpthread = Thread(target=self._glibloop.run)
self._pumpthread.start()
def _stop(self):
if self._glibloop != None:
self._glibloop.quit()
self._pumpthread.join()
def __del__(self):
self._stop()
def start_scanning(self, callback = None):
self._devices = []
self._scanning = True
# 2020-12
# this would be an easy patch to gattlib, similar to the characteristic enumeration code
print('')
print('Warning: gattlib has no way to enumerate devices that are already connected via bluetooth.')
print('')
sem = Semaphore(0)
def on_scan(device, userdata):
addr = str(device.id)
name = str(device)
self._devices.append((addr,name))
print('device {} {}'.format(addr,name))
sem.release()
def pump():
while True:
if sem.acquire(timeout=0.5):
print('sem {}'.format(self._devices[-1][0]))
if not self._scanning:
break
callback(self._devices)
thread = Thread(target=pump)
thread.start()
try:
# it looks like CTRL-C/SIGINT doesn't interrupt the process during gattlib calls
# probably not hard to work around that and make it work, maybe by installing a signal handler
# is probably also a bug in gattlib that would be easy to fix
self._adapter.scan_enable(on_scan, 0)
except gattlib.exception.DBusError as e:
e.args.append('Is Bluetooth turned on?')
raise e
sem.release()
thread.join()
def stop_scanning(self):
print('stopping scanning')
self._scanning = False
self._adapter.scan_disable()
def near_addresses(self):
return self._addrs
def device(self, address):
return LEDevice(self, address)
class LEDevice:
def __init__(self, interface, address):
print('device')
self._device = gattlib.device.Device(interface._adapter, address)
print('CONNECT')
self._device.connect()
atexit.register(self.__del__)
print('discovering')
self._device.discover()
print('discovered')
interface._pump()
def __del__(self):
if self._device:
device = self._device
self._device = None
print('DISCONNECT')
device.disconnect()
def info(self):
return {
'address': str(self._device.id)
}
def characteristic(self, service, uuid):
return Characteristic(self, service, uuid)
class Characteristic:
def __init__(self, device, service, uuid):
self._device = device
self._service = service
self._uuid = uuid
self._characteristic = None
self._subscribees = set()
for key, value in self._device._device.characteristics.items():
print(str(key), str(value))
for characteristic in self._device._device.characteristics.values():
if characteristic.uuid == UUID(uuid):
if self._characteristic is not None:
raise AssertionError('todo: characteristics with matching uuids')
self._characteristic = characteristic
if self._characteristic is None:
raise ValueError('no such characteristic on device')
print('uuid', self._characteristic.uuid)
self._characteristic.register_notification(self._notification_registrant)
def _notification_registrant(self, data, user_data):
# current interface is for ONE subscriber at once; this is just to ease changes if they happen
print('notify!')
for subscribee in self._subscribees:
subscribee(data)
def subscribe(self, callback):
print('subscribe!')
self._subscribees.add(callback)
self._characteristic.notification_start()
def unsubscribe(self):
self._subscribees.clear()
self._characteristic.notification_stop()
def write(self, data : bytes):
print('write!', data)
self._characteristic.write(data)
def __del__(self):
if len(self._subscribees):
self.unsubscribe()
_interface = Interface()