-
Notifications
You must be signed in to change notification settings - Fork 587
Tutorial: RxBleCustomOperation
Every additional layer of code one adds to their application comes with a cost. This cost (apart from additional fields/methods/classes that add up to the 64k DEX limit) is potentially reduced performance. This library is no exception. In case of BLE it is rarely a problem due to the nature and typical use-cases of this interface which is to occasionally transmit a limited amount of data.
The intention for RxAndroidBle is to allow developers to model their connection algorithms in possibly stateless way to reduce possibility of bugs. Sometimes however there is a need to transmit bulk data as quickly as possible — BLE was not designed with this use-case in mind — in the end to do so requires a lot of roundtrips from the application code to the BLE stack and back. Needless to say — additional layers of code make these roundtrips longer. This is where one can implement their own RxBleCustomOperation
which has access to the raw Android's BluetoothGatt
/BluetoothGattCallback
objects and cut the "middle-man" out.
There are two potential benefits of using a custom operation:
- better performance in sending data using WRITE_TYPE_NO_RESPONSE (see this performance comparison)
- ability to use
BluetoothGatt
/BluetoothGattCallback
functions that may not have been yet implemented inRxAndroidBle
- Using
RxBleCustomOperation
bypasses all layers ofRxAndroidBle
code which help to keep the Android BLE stack in a stable state — this may potentially degrade usage experience -
RxBleCustomOperation
partially uses library's internal API which may potentially change in minor version updates
Click to see the code
package net.grlewis.cufftest;
import android.bluetooth.BluetoothGatt;
import com.polidea.rxandroidble2.RxBleCustomOperation;
import com.polidea.rxandroidble2.internal.connection.RxBleGattCallback;
import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.annotations.NonNull;
public class GetGattOperation implements RxBleCustomOperation<BluetoothGatt> {
private BluetoothGatt gatt;
// How this works:
// You call rxBleConnection.queue( <instance of this class> )
// It returns an Observable<T>--call it Observable A
// The queue manager calls the .asObservable() method below,
// which returns another Observable<T>--call it Observable B
// It is placed in the queue for execution
// When it's time to run this operation, ConnectionOperationQueue will
// subscribe to Observable B (here, Observable.just( bluetoothGatt ))
// Emissions from this Observable B (here, the bluetoothGatt) are forwarded to the Observable A returned by the .queue() method
// Instances can be queued and received via a subscription to Observable A:
// rxBleConnection.queue( new GetGattOperation() ).subscribe( gatt -> {} );
@Override // returns "Observable B" (see above)
@NonNull
public Observable<BluetoothGatt> asObservable(BluetoothGatt bluetoothGatt,
RxBleGattCallback rxBleGattCallback,
Scheduler scheduler) throws Throwable {
gatt = bluetoothGatt;
return Observable.just(bluetoothGatt); // return Observable B (emits Gatt then completes)
}
public BluetoothGatt getGatt() {
return gatt;
}
}
BluetoothGatt.refresh()
refreshes internals (e.g. service/characteristic caches) of a particular BluetoothGatt
.
Click to see the code
RxBleCustomOperation<Void> bluetoothGattRefreshCustomOp = (bluetoothGatt, rxBleGattCallback, scheduler) -> {
try {
Method bluetoothGattRefreshFunction = bluetoothGatt.getClass().getMethod("refresh");
boolean success = (Boolean) bluetoothGattRefreshFunction.invoke(bluetoothGatt);
if (!success) return Observable.error(new RuntimeException("BluetoothGatt.refresh() returned false"));
return Observable.<Void>empty().delay(500, TimeUnit.MILLISECONDS);
} catch (NoSuchMethodException e) {
return Observable.error(e);
} catch (IllegalAccessException e) {
return Observable.error(e);
} catch (InvocationTargetException e) {
return Observable.error(e);
}
};
Click to see the code
RxBleCustomOperation<List<BluetoothGattService>> discoverServicesCustomOp = (bluetoothGatt, rxBleGattCallback, scheduler) -> {
boolean success = bluetoothGatt.discoverServices();
if (!success) return Observable.error(new RuntimeException("BluetoothGatt.discoverServices() returned false"));
return rxBleGattCallback.getOnServicesDiscovered()
.take(1) // so this RxBleCustomOperation will complete after the first result from BluetoothGattCallback.onServicesDiscovered()
.map(RxBleDeviceServices::getBluetoothGattServices);
};
Click to see the code
RxBleCustomOperation<Void> optimisedWriteOp = (bluetoothGatt, rxBleGattCallback, scheduler) -> Observable.create(
emitter -> {
Log.i("START", String.valueOf(testConnection.batchCount));
final byte[] data = new byte[20];
testConnection.characteristic.setValue(data);
final AtomicBoolean writeCompleted = new AtomicBoolean(false);
final AtomicBoolean ackCompleted = new AtomicBoolean(false);
final AtomicInteger batchesSent = new AtomicInteger(0);
final Runnable writeNextBatch = () -> {
data[0]++;
if (!bluetoothGatt.writeCharacteristic(testConnection.characteristic)) {
emitter.onError(new BleGattCannotStartException(bluetoothGatt, BleGattOperationType.CHARACTERISTIC_WRITE));
} else {
Log.i("SEND", String.valueOf(data[0]));
batchesSent.incrementAndGet();
}
};
final BluetoothGattCallback bluetoothGattCallback = new BluetoothGattCallback() {
@Override
public void onCharacteristicWrite(BluetoothGatt gatt, BluetoothGattCharacteristic characteristic, int status) {
if (status != BluetoothGatt.GATT_SUCCESS) {
emitter.onError(new BleGattException(gatt, status, BleGattOperationType.CHARACTERISTIC_WRITE));
} else if (batchesSent.get() == testConnection.batchCount) {
if (ackCompleted.get()) {
batchesSent.set(0);
ackCompleted.set(false);
emitter.onNext(null);
writeNextBatch.run();
} else {
writeCompleted.set(true);
}
} else {
characteristic.setValue(data);
writeNextBatch.run();
}
}
@Override
public void onCharacteristicChanged(BluetoothGatt gatt, BluetoothGattCharacteristic characteristic) {
final byte[] bytes = characteristic.getValue();
Log.i("ACK", Arrays.toString(bytes) + "/" + bytes.length + "/" + System.identityHashCode(bytes));
characteristic.setValue(data);
if (writeCompleted.get()) {
batchesSent.set(0);
writeCompleted.set(false);
emitter.onNext(null);
writeNextBatch.run();
} else {
ackCompleted.set(true);
}
}
};
rxBleGattCallback.setNativeCallback(bluetoothGattCallback);
Log.i("SEND", String.valueOf(data[0]));
if (!bluetoothGatt.writeCharacteristic(testConnection.characteristic)) {
emitter.onError(new BleGattCannotStartException(bluetoothGatt, BleGattOperationType.CHARACTERISTIC_WRITE));
} else {
batchesSent.incrementAndGet();
}
},
Emitter.BackpressureMode.NONE
);