Skip to content

Benchmark

Bases: PerformanceTestBase

Source code in navi/tests/benchmark/a2dp_test.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
class A2dpTest(test_base.PerformanceTestBase):
  def _setup_a2dp_device(
      self, codecs: list[_A2dpCodec]
  ) -> tuple[avdtp.Listener, avrcp.Protocol]:
    """Sets up A2DP profile on REF.

    Args:
      codecs: A2DP codecs supported by REF.

    Returns:
      A tuple of (avdtp.Listener, avrcp.Protocol).
    """
    listener = a2dp_ext.setup_sink_server(
        self.ref.device,
        [codec.get_default_capabilities() for codec in codecs],
        _A2DP_SERVICE_RECORD_HANDLE,
    )
    avrcp_delegator = AvrcpDelegate(
        supported_events=(avrcp.EventId.VOLUME_CHANGED,)
    )
    avrcp_protocol = a2dp_ext.setup_avrcp_server(
        self.ref.device,
        avrcp_controller_handle=_AVRCP_CONTROLLER_RECORD_HANDLE,
        avrcp_target_handle=_AVRCP_TARGET_RECORD_HANDLE,
        delegate=avrcp_delegator,
    )

    return listener, avrcp_protocol

  async def _setup_a2dp_connection(self, ref_codecs: list[_A2dpCodec]) -> tuple[
      avrcp.Protocol,
      avdtp.Protocol,
  ]:
    """Sets up A2DP connection between DUT and REF.

    Args:
      ref_codecs: A2DP codecs supported by REF.

    Returns:
      A tuple of (avrcp.Protocol, avdtp.Protocol).
    """
    with self.dut.bl4a.register_callback(bl4a_api.Module.A2DP) as dut_cb:
      ref_avdtp_listener, ref_avrcp_protocol = self._setup_a2dp_device(
          ref_codecs
      )
      ref_avdtp_connections = asyncio.Queue[avdtp.Protocol]()
      ref_avdtp_listener.on(
          ref_avdtp_listener.EVENT_CONNECTION, ref_avdtp_connections.put
      )

      self.logger.info("[DUT] Connect and pair REF.")
      ref_acl = await self.classic_connect_and_pair()

      self.logger.info("[DUT] Wait for A2DP connected.")
      await dut_cb.wait_for_event(
          bl4a_api.ProfileConnectionStateChanged(
              address=self.ref.address,
              state=android_constants.ConnectionState.CONNECTED,
          ),
      )
      async with self.assert_not_timeout(
          _DEFAULT_TIMEOUT_SECONDS, msg="[REF] Wait for A2DP connected."
      ):
        ref_avdtp_connection = await ref_avdtp_connections.get()
      self.logger.info("[DUT] Wait for A2DP becomes active.")
      await dut_cb.wait_for_event(
          bl4a_api.ProfileActiveDeviceChanged(address=self.ref.address),
          timeout=_DEFAULT_TIMEOUT_SECONDS,
      )

      if ref_avrcp_protocol.avctp_protocol is not None:
        self.logger.info("[REF] AVRCP already connected.")
      else:
        self.logger.info("[REF] Connect AVRCP.")
        async with self.assert_not_timeout(_DEFAULT_TIMEOUT_SECONDS):
          await ref_avrcp_protocol.connect(ref_acl)
        self.logger.info("[REF] AVRCP connected.")
    return ref_avrcp_protocol, ref_avdtp_connection

  async def pair_and_connect(self) -> None:
    with self.dut.bl4a.register_callback(bl4a_api.Module.A2DP) as dut_cb:
      a2dp_ext.setup_sink_server(
          self.ref.device,
          [_A2dpCodec.SBC.get_default_capabilities()],
          _A2DP_SERVICE_RECORD_HANDLE,
      )
      await self.classic_connect_and_pair()
      await dut_cb.wait_for_event(
          bl4a_api.ProfileConnectionStateChanged(
              self.ref.address,
              state=android_constants.ConnectionState.CONNECTED,
          )
      )

  async def test_a2dp_connection_outgoing(self) -> None:
    """Test make outgoing A2DP connections.

    Test steps:
      1. Pair and connect.
      2. Terminate the connection.
      3. Make A2DP connection from DUT to REF.
      4. Terminate the connection.
      5. Repeat step 3-4.
    """
    latency_list = list[float]()
    await self.pair_and_connect()
    await performance_tool.terminate_connection_from_ref(self.dut, self.ref)
    for i in range(_DEFAULT_REPEAT_TIMES):
      try:
        with self.dut.bl4a.register_callback(bl4a_api.Module.A2DP) as dut_cb:
          self.logger.info("[DUT] Reconnect.")
          with performance_tool.Stopwatch() as stop_watch:
            self.dut.bt.connect(self.ref.address)
            self.logger.info("[DUT] Wait for A2DP connected.")
            await dut_cb.wait_for_event(
                bl4a_api.ProfileActiveDeviceChanged(self.ref.address),
                timeout=_DEFAULT_TIMEOUT_SECONDS,
            )
          latency_seconds = stop_watch.elapsed_time.total_seconds()
        self.success_attempt_record(
            test_round=i + 1, latency=latency_seconds, latency_list=latency_list
        )
      except (core.BaseBumbleError, AssertionError):
        self.logger.exception("Failed to make A2DP connection")
      finally:
        await performance_tool.terminate_connection_from_ref(self.dut, self.ref)
    self.record_sponge_data(
        repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
    )

  @navi_test_base.named_parameterized(
      without_avrcp=(False,), with_avrcp=(True,)
  )
  async def test_connection_incoming(self, avrcp_enabled: bool) -> None:
    """Test make incoming A2DP connections.

    Test steps:
      1. Pair and connect.
      2. Terminate the connection.
      3. Make A2DP connection from REF to DUT.
      4. Terminate the connection.
      5. Repeat step 3-4.

    Args:
      avrcp_enabled: Enable AVRCP during the test.
    """
    latency_list = list[float]()
    ref_avdtp_listener = a2dp_ext.setup_sink_server(
        self.ref.device,
        [_A2dpCodec.SBC.get_default_capabilities()],
        _A2DP_SERVICE_RECORD_HANDLE,
    )
    ref_avrcp_protocol: avrcp.Protocol | None = None
    if avrcp_enabled:
      ref_avrcp_protocol = a2dp_ext.setup_avrcp_server(
          self.ref.device,
          avrcp_controller_handle=_AVRCP_CONTROLLER_RECORD_HANDLE,
          avrcp_target_handle=_AVRCP_TARGET_RECORD_HANDLE,
      )
    ref_acl = await self.classic_connect_and_pair()
    await performance_tool.terminate_connection_from_ref(self.dut, self.ref)
    for i in range(_DEFAULT_REPEAT_TIMES):
      try:
        if avrcp_enabled:
          ref_avdtp_connections = asyncio.Queue[avdtp.Protocol]()
          ref_avdtp_listener.on(
              ref_avdtp_listener.EVENT_CONNECTION, ref_avdtp_connections.put
          )
        with self.dut.bl4a.register_callback(bl4a_api.Module.A2DP) as dut_cb:
          self.logger.info("[REF] Connect to DUT.")
          with performance_tool.Stopwatch() as stop_watch:
            dut_ref_acl = await self.ref.device.connect(
                self.dut.address,
                transport=core.BT_BR_EDR_TRANSPORT,
                timeout=_DEFAULT_TIMEOUT_SECONDS,
            )

            async with self.assert_not_timeout(
                _DEFAULT_TIMEOUT_SECONDS,
                msg="[REF] Authenticate and encrypt connection.",
            ):
              await dut_ref_acl.authenticate()
              await dut_ref_acl.encrypt()

            async with self.assert_not_timeout(
                _DEFAULT_TIMEOUT_SECONDS,
                msg="[REF] Connect A2DP.",
            ):
              server = await avdtp.Protocol.connect(dut_ref_acl)
            server.add_sink(_A2dpCodec.AAC.get_default_capabilities())

            self.logger.info("[DUT] Wait for A2DP connected.")
            await dut_cb.wait_for_event(
                bl4a_api.ProfileConnectionStateChanged(
                    address=self.ref.address,
                    state=android_constants.ConnectionState.CONNECTED,
                ),
            )
            self.logger.info("[DUT] Wait for A2DP becomes active.")
            await dut_cb.wait_for_event(
                bl4a_api.ProfileActiveDeviceChanged(address=self.ref.address),
                timeout=_DEFAULT_TIMEOUT_SECONDS,
            )
            if avrcp_enabled:
              if ref_avrcp_protocol is None:
                raise ValueError("[REF] AVRCP is not initialized.")
              elif ref_avrcp_protocol.avctp_protocol is not None:
                raise ValueError("[REF] AVRCP already connected.")
              else:
                self.logger.info("[REF] Connect AVRCP.")
                async with self.assert_not_timeout(_DEFAULT_TIMEOUT_SECONDS):
                  await ref_avrcp_protocol.connect(ref_acl)
                self.logger.info("[REF] AVRCP connected.")
          latency_seconds = stop_watch.elapsed_time.total_seconds()
          self.success_attempt_record(
              test_round=i + 1,
              latency=latency_seconds,
              latency_list=latency_list,
          )
      except (core.BaseBumbleError, AssertionError):
        self.logger.exception("Failed to make A2DP connection")
      finally:
        await performance_tool.terminate_connection_from_ref(self.dut, self.ref)
    self.record_sponge_data(
        repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
    )

  async def wait_for_a2dp_status(
      self, ref_sink: LocalSinkWrapper, status: _A2dpStreamState
  ) -> None:
    with self.dut.bl4a.register_callback(bl4a_api.Module.A2DP) as dut_cb:
      if status == _A2dpStreamState.STOP:
        if self.dut.bt.isA2dpPlaying(self.ref.address):
          self.logger.info("[DUT] A2DP is streaming, wait for A2DP stopped.")
          await dut_cb.wait_for_event(
              bl4a_api.A2dpPlayingStateChanged(
                  self.ref.address, _A2dpState.NOT_PLAYING
              ),
          )
      elif status == _A2dpStreamState.START:
        if not self.dut.bt.isA2dpPlaying(self.ref.address):
          self.logger.info("[DUT] wait for A2DP started.")
          await dut_cb.wait_for_event(
              bl4a_api.A2dpPlayingStateChanged(
                  self.ref.address, _A2dpState.PLAYING
              ),
          )
    async with (
        self.assert_not_timeout(
            _DEFAULT_TIMEOUT_SECONDS,
        ),
        ref_sink.condition,
    ):
      if status == _A2dpStreamState.STOP:
        self.logger.info("[REF] Wait for A2DP stopped.")
        await ref_sink.condition.wait_for(
            lambda: ref_sink.stream_state != avdtp.AVDTP_STREAMING_STATE
        )
      elif status == _A2dpStreamState.START:
        self.logger.info("[REF] Wait for A2DP started.")
        await ref_sink.condition.wait_for(
            lambda: ref_sink.stream_state == avdtp.AVDTP_STREAMING_STATE
        )

  async def test_stream_start(self) -> None:
    """Tests A2DP streaming controlled by DUT.

    Test steps:
      1. Setup pairing between DUT and REF.
      2. Start stream from DUT.
      3. Stop stream from DUT.
    """
    codec = _A2dpCodec.SBC
    self.dut.bt.audioSetRepeat(android_constants.RepeatMode.ONE)
    _, ref_avdtp_connection = await self._setup_a2dp_connection([codec])

    ref_sinks = a2dp_ext.find_local_endpoints_by_codec(
        ref_avdtp_connection,
        codec.codec_type,
        avdtp.LocalSink,
        vendor_id=codec.vendor_id,
        codec_id=codec.codec_id,
    )
    if not ref_sinks:
      self.fail("No sink found for codec %s." % codec.name)
    ref_sink = LocalSinkWrapper(ref_sinks[0])

    # If there is a playback, wait until it ends.
    await self.wait_for_a2dp_status(ref_sink, _A2dpStreamState.STOP)
    latency_list = list[float]()
    for i in range(_DEFAULT_REPEAT_TIMES):
      try:
        # Register the sink buffer to receive the packets.
        buffer = a2dp_ext.register_sink_buffer(ref_sink.impl, codec)
        with (
            performance_tool.Stopwatch() as stop_watch_for_start_stream,
        ):
          self.logger.info("[DUT] Start stream.")
          self.dut.bt.audioPlaySine()
          await self.wait_for_a2dp_status(ref_sink, _A2dpStreamState.START)
        latency_seconds = (
            stop_watch_for_start_stream.elapsed_time.total_seconds()
        )
        # Streaming for 1 second.
        await asyncio.sleep(1.0)
        self.logger.info("[DUT] Stop stream.")
        self.dut.bt.audioPause()
        await self.wait_for_a2dp_status(ref_sink, _A2dpStreamState.STOP)

        if (
            buffer is not None
            and audio.SUPPORT_AUDIO_PROCESSING
        ):
          dominant_frequency = audio.get_dominant_frequency(
              buffer, format=codec.format
          )
          self.logger.info("Dominant frequency: %.2f", dominant_frequency)
          # Dominant frequency is not accurate on emulator.
          if not self.dut.device.is_emulator:
            self.assertAlmostEqual(dominant_frequency, 1000, delta=10)
        self.success_attempt_record(
            test_round=i + 1,
            latency=latency_seconds,
            latency_list=latency_list,
        )
      except (core.BaseBumbleError, AssertionError):
        self.logger.exception("Failed to stream")
      finally:
        self.dut.bt.audioPause()
        await self.wait_for_a2dp_status(ref_sink, _A2dpStreamState.STOP)
    self.record_sponge_data(
        repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
    )

  async def test_set_absolute_volume(self) -> None:
    """Tests setting absolute volume.

    Test steps:
      1. Setup pairing between DUT and REF.
      2. Set absolute volume.
    """
    ref_avrcp_protocol, _ = await self._setup_a2dp_connection([_A2dpCodec.SBC])
    ref_avrcp_delegator = ref_avrcp_protocol.delegate
    assert isinstance(ref_avrcp_delegator, AvrcpDelegate)

    dut_max_volume = self.dut.bt.getMaxVolume(_StreamType.MUSIC)
    dut_min_volume = self.dut.bt.getMinVolume(_StreamType.MUSIC)
    if dut_max_volume == dut_min_volume:
      raise self.skipTest("DUT's max volume is the same as min volume.")

    def android_to_avrcp_volume(volume: int) -> int:
      # Android JVM uses ROUND_HALF_UP policy, while Python uses ROUND_HALF_EVEN
      # by default, so we need to specify policy here.
      return int(
          decimal.Decimal(
              volume / dut_max_volume * _AVRCP_MAX_VOLUME
          ).to_integral_exact(rounding=decimal.ROUND_HALF_UP)
      )

    async with (
        self.assert_not_timeout(
            _DEFAULT_TIMEOUT_SECONDS,
            msg="[REF] Wait for initial volume indicator.",
        ),
        ref_avrcp_delegator.condition,
    ):
      await ref_avrcp_delegator.condition.wait_for(
          lambda: (
              android_to_avrcp_volume(self.dut.bt.getVolume(_StreamType.MUSIC))
              == ref_avrcp_delegator.volume
          )
      )

    # DUT's VCS client might not be stable at the beginning. If we set volume
    # immediately, the volume might not be set correctly.
    await asyncio.sleep(_PREPARE_TIME_SECONDS)
    latency_list = list[float]()
    for i in range(_DEFAULT_REPEAT_TIMES):
      try:
        dut_expected_volume = dut_max_volume
        if self.dut.bt.getVolume(_StreamType.MUSIC) == dut_expected_volume:
          dut_expected_volume -= 1

        ref_expected_volume = android_to_avrcp_volume(dut_expected_volume)
        with performance_tool.Stopwatch() as stop_watch:
          with self.dut.bl4a.register_callback(
              bl4a_api.Module.AUDIO
          ) as dut_audio_cb:
            self.logger.info("[DUT] Set volume to %d.", dut_expected_volume)
            self.dut.bt.setVolume(_StreamType.MUSIC, dut_expected_volume)

            self.logger.info("[DUT] Wait for volume changed.")
            volume_changed_event = await dut_audio_cb.wait_for_event(
                bl4a_api.VolumeChanged(
                    stream_type=_StreamType.MUSIC, volume_value=matcher.ANY
                ),
            )
            self.assertEqual(
                volume_changed_event.volume_value, dut_expected_volume
            )
          async with (
              self.assert_not_timeout(
                  _DEFAULT_TIMEOUT_SECONDS,
                  msg="[REF] Wait for volume changed.",
              ),
              ref_avrcp_delegator.condition,
          ):
            await ref_avrcp_delegator.condition.wait_for(
                lambda: ref_avrcp_delegator.volume == ref_expected_volume  # pylint: disable=cell-var-from-loop
            )
        latency_seconds = stop_watch.elapsed_time.total_seconds()
        self.success_attempt_record(
            test_round=i + 1, latency=latency_seconds, latency_list=latency_list
        )
      except (core.BaseBumbleError, AssertionError):
        self.logger.exception("Failed to set volume")
    self.record_sponge_data(
        repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
    )

Test make outgoing A2DP connections.

Test steps
  1. Pair and connect.
  2. Terminate the connection.
  3. Make A2DP connection from DUT to REF.
  4. Terminate the connection.
  5. Repeat step 3-4.
Source code in navi/tests/benchmark/a2dp_test.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
async def test_a2dp_connection_outgoing(self) -> None:
  """Test make outgoing A2DP connections.

  Test steps:
    1. Pair and connect.
    2. Terminate the connection.
    3. Make A2DP connection from DUT to REF.
    4. Terminate the connection.
    5. Repeat step 3-4.
  """
  latency_list = list[float]()
  await self.pair_and_connect()
  await performance_tool.terminate_connection_from_ref(self.dut, self.ref)
  for i in range(_DEFAULT_REPEAT_TIMES):
    try:
      with self.dut.bl4a.register_callback(bl4a_api.Module.A2DP) as dut_cb:
        self.logger.info("[DUT] Reconnect.")
        with performance_tool.Stopwatch() as stop_watch:
          self.dut.bt.connect(self.ref.address)
          self.logger.info("[DUT] Wait for A2DP connected.")
          await dut_cb.wait_for_event(
              bl4a_api.ProfileActiveDeviceChanged(self.ref.address),
              timeout=_DEFAULT_TIMEOUT_SECONDS,
          )
        latency_seconds = stop_watch.elapsed_time.total_seconds()
      self.success_attempt_record(
          test_round=i + 1, latency=latency_seconds, latency_list=latency_list
      )
    except (core.BaseBumbleError, AssertionError):
      self.logger.exception("Failed to make A2DP connection")
    finally:
      await performance_tool.terminate_connection_from_ref(self.dut, self.ref)
  self.record_sponge_data(
      repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
  )

Test make incoming A2DP connections.

Test steps
  1. Pair and connect.
  2. Terminate the connection.
  3. Make A2DP connection from REF to DUT.
  4. Terminate the connection.
  5. Repeat step 3-4.

Parameters:

Name Type Description Default
avrcp_enabled bool

Enable AVRCP during the test.

required
Source code in navi/tests/benchmark/a2dp_test.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
@navi_test_base.named_parameterized(
    without_avrcp=(False,), with_avrcp=(True,)
)
async def test_connection_incoming(self, avrcp_enabled: bool) -> None:
  """Test make incoming A2DP connections.

  Test steps:
    1. Pair and connect.
    2. Terminate the connection.
    3. Make A2DP connection from REF to DUT.
    4. Terminate the connection.
    5. Repeat step 3-4.

  Args:
    avrcp_enabled: Enable AVRCP during the test.
  """
  latency_list = list[float]()
  ref_avdtp_listener = a2dp_ext.setup_sink_server(
      self.ref.device,
      [_A2dpCodec.SBC.get_default_capabilities()],
      _A2DP_SERVICE_RECORD_HANDLE,
  )
  ref_avrcp_protocol: avrcp.Protocol | None = None
  if avrcp_enabled:
    ref_avrcp_protocol = a2dp_ext.setup_avrcp_server(
        self.ref.device,
        avrcp_controller_handle=_AVRCP_CONTROLLER_RECORD_HANDLE,
        avrcp_target_handle=_AVRCP_TARGET_RECORD_HANDLE,
    )
  ref_acl = await self.classic_connect_and_pair()
  await performance_tool.terminate_connection_from_ref(self.dut, self.ref)
  for i in range(_DEFAULT_REPEAT_TIMES):
    try:
      if avrcp_enabled:
        ref_avdtp_connections = asyncio.Queue[avdtp.Protocol]()
        ref_avdtp_listener.on(
            ref_avdtp_listener.EVENT_CONNECTION, ref_avdtp_connections.put
        )
      with self.dut.bl4a.register_callback(bl4a_api.Module.A2DP) as dut_cb:
        self.logger.info("[REF] Connect to DUT.")
        with performance_tool.Stopwatch() as stop_watch:
          dut_ref_acl = await self.ref.device.connect(
              self.dut.address,
              transport=core.BT_BR_EDR_TRANSPORT,
              timeout=_DEFAULT_TIMEOUT_SECONDS,
          )

          async with self.assert_not_timeout(
              _DEFAULT_TIMEOUT_SECONDS,
              msg="[REF] Authenticate and encrypt connection.",
          ):
            await dut_ref_acl.authenticate()
            await dut_ref_acl.encrypt()

          async with self.assert_not_timeout(
              _DEFAULT_TIMEOUT_SECONDS,
              msg="[REF] Connect A2DP.",
          ):
            server = await avdtp.Protocol.connect(dut_ref_acl)
          server.add_sink(_A2dpCodec.AAC.get_default_capabilities())

          self.logger.info("[DUT] Wait for A2DP connected.")
          await dut_cb.wait_for_event(
              bl4a_api.ProfileConnectionStateChanged(
                  address=self.ref.address,
                  state=android_constants.ConnectionState.CONNECTED,
              ),
          )
          self.logger.info("[DUT] Wait for A2DP becomes active.")
          await dut_cb.wait_for_event(
              bl4a_api.ProfileActiveDeviceChanged(address=self.ref.address),
              timeout=_DEFAULT_TIMEOUT_SECONDS,
          )
          if avrcp_enabled:
            if ref_avrcp_protocol is None:
              raise ValueError("[REF] AVRCP is not initialized.")
            elif ref_avrcp_protocol.avctp_protocol is not None:
              raise ValueError("[REF] AVRCP already connected.")
            else:
              self.logger.info("[REF] Connect AVRCP.")
              async with self.assert_not_timeout(_DEFAULT_TIMEOUT_SECONDS):
                await ref_avrcp_protocol.connect(ref_acl)
              self.logger.info("[REF] AVRCP connected.")
        latency_seconds = stop_watch.elapsed_time.total_seconds()
        self.success_attempt_record(
            test_round=i + 1,
            latency=latency_seconds,
            latency_list=latency_list,
        )
    except (core.BaseBumbleError, AssertionError):
      self.logger.exception("Failed to make A2DP connection")
    finally:
      await performance_tool.terminate_connection_from_ref(self.dut, self.ref)
  self.record_sponge_data(
      repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
  )

Tests setting absolute volume.

Test steps
  1. Setup pairing between DUT and REF.
  2. Set absolute volume.
Source code in navi/tests/benchmark/a2dp_test.py
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
async def test_set_absolute_volume(self) -> None:
  """Tests setting absolute volume.

  Test steps:
    1. Setup pairing between DUT and REF.
    2. Set absolute volume.
  """
  ref_avrcp_protocol, _ = await self._setup_a2dp_connection([_A2dpCodec.SBC])
  ref_avrcp_delegator = ref_avrcp_protocol.delegate
  assert isinstance(ref_avrcp_delegator, AvrcpDelegate)

  dut_max_volume = self.dut.bt.getMaxVolume(_StreamType.MUSIC)
  dut_min_volume = self.dut.bt.getMinVolume(_StreamType.MUSIC)
  if dut_max_volume == dut_min_volume:
    raise self.skipTest("DUT's max volume is the same as min volume.")

  def android_to_avrcp_volume(volume: int) -> int:
    # Android JVM uses ROUND_HALF_UP policy, while Python uses ROUND_HALF_EVEN
    # by default, so we need to specify policy here.
    return int(
        decimal.Decimal(
            volume / dut_max_volume * _AVRCP_MAX_VOLUME
        ).to_integral_exact(rounding=decimal.ROUND_HALF_UP)
    )

  async with (
      self.assert_not_timeout(
          _DEFAULT_TIMEOUT_SECONDS,
          msg="[REF] Wait for initial volume indicator.",
      ),
      ref_avrcp_delegator.condition,
  ):
    await ref_avrcp_delegator.condition.wait_for(
        lambda: (
            android_to_avrcp_volume(self.dut.bt.getVolume(_StreamType.MUSIC))
            == ref_avrcp_delegator.volume
        )
    )

  # DUT's VCS client might not be stable at the beginning. If we set volume
  # immediately, the volume might not be set correctly.
  await asyncio.sleep(_PREPARE_TIME_SECONDS)
  latency_list = list[float]()
  for i in range(_DEFAULT_REPEAT_TIMES):
    try:
      dut_expected_volume = dut_max_volume
      if self.dut.bt.getVolume(_StreamType.MUSIC) == dut_expected_volume:
        dut_expected_volume -= 1

      ref_expected_volume = android_to_avrcp_volume(dut_expected_volume)
      with performance_tool.Stopwatch() as stop_watch:
        with self.dut.bl4a.register_callback(
            bl4a_api.Module.AUDIO
        ) as dut_audio_cb:
          self.logger.info("[DUT] Set volume to %d.", dut_expected_volume)
          self.dut.bt.setVolume(_StreamType.MUSIC, dut_expected_volume)

          self.logger.info("[DUT] Wait for volume changed.")
          volume_changed_event = await dut_audio_cb.wait_for_event(
              bl4a_api.VolumeChanged(
                  stream_type=_StreamType.MUSIC, volume_value=matcher.ANY
              ),
          )
          self.assertEqual(
              volume_changed_event.volume_value, dut_expected_volume
          )
        async with (
            self.assert_not_timeout(
                _DEFAULT_TIMEOUT_SECONDS,
                msg="[REF] Wait for volume changed.",
            ),
            ref_avrcp_delegator.condition,
        ):
          await ref_avrcp_delegator.condition.wait_for(
              lambda: ref_avrcp_delegator.volume == ref_expected_volume  # pylint: disable=cell-var-from-loop
          )
      latency_seconds = stop_watch.elapsed_time.total_seconds()
      self.success_attempt_record(
          test_round=i + 1, latency=latency_seconds, latency_list=latency_list
      )
    except (core.BaseBumbleError, AssertionError):
      self.logger.exception("Failed to set volume")
  self.record_sponge_data(
      repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
  )

Tests A2DP streaming controlled by DUT.

Test steps
  1. Setup pairing between DUT and REF.
  2. Start stream from DUT.
  3. Stop stream from DUT.
Source code in navi/tests/benchmark/a2dp_test.py
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
async def test_stream_start(self) -> None:
  """Tests A2DP streaming controlled by DUT.

  Test steps:
    1. Setup pairing between DUT and REF.
    2. Start stream from DUT.
    3. Stop stream from DUT.
  """
  codec = _A2dpCodec.SBC
  self.dut.bt.audioSetRepeat(android_constants.RepeatMode.ONE)
  _, ref_avdtp_connection = await self._setup_a2dp_connection([codec])

  ref_sinks = a2dp_ext.find_local_endpoints_by_codec(
      ref_avdtp_connection,
      codec.codec_type,
      avdtp.LocalSink,
      vendor_id=codec.vendor_id,
      codec_id=codec.codec_id,
  )
  if not ref_sinks:
    self.fail("No sink found for codec %s." % codec.name)
  ref_sink = LocalSinkWrapper(ref_sinks[0])

  # If there is a playback, wait until it ends.
  await self.wait_for_a2dp_status(ref_sink, _A2dpStreamState.STOP)
  latency_list = list[float]()
  for i in range(_DEFAULT_REPEAT_TIMES):
    try:
      # Register the sink buffer to receive the packets.
      buffer = a2dp_ext.register_sink_buffer(ref_sink.impl, codec)
      with (
          performance_tool.Stopwatch() as stop_watch_for_start_stream,
      ):
        self.logger.info("[DUT] Start stream.")
        self.dut.bt.audioPlaySine()
        await self.wait_for_a2dp_status(ref_sink, _A2dpStreamState.START)
      latency_seconds = (
          stop_watch_for_start_stream.elapsed_time.total_seconds()
      )
      # Streaming for 1 second.
      await asyncio.sleep(1.0)
      self.logger.info("[DUT] Stop stream.")
      self.dut.bt.audioPause()
      await self.wait_for_a2dp_status(ref_sink, _A2dpStreamState.STOP)

      if (
          buffer is not None
          and audio.SUPPORT_AUDIO_PROCESSING
      ):
        dominant_frequency = audio.get_dominant_frequency(
            buffer, format=codec.format
        )
        self.logger.info("Dominant frequency: %.2f", dominant_frequency)
        # Dominant frequency is not accurate on emulator.
        if not self.dut.device.is_emulator:
          self.assertAlmostEqual(dominant_frequency, 1000, delta=10)
      self.success_attempt_record(
          test_round=i + 1,
          latency=latency_seconds,
          latency_list=latency_list,
      )
    except (core.BaseBumbleError, AssertionError):
      self.logger.exception("Failed to stream")
    finally:
      self.dut.bt.audioPause()
      await self.wait_for_a2dp_status(ref_sink, _A2dpStreamState.STOP)
  self.record_sponge_data(
      repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
  )

Bases: PerformanceTestBase

Source code in navi/tests/benchmark/classic_gap_test.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
class ClassicGapTest(test_base.PerformanceTestBase):
  pairing_delegate: pairing_utils.PairingDelegate

  async def test_inquiry(self) -> None:
    """Test inquiry.

    Test steps:
      1. Set REF in discoverable mode.
      2. Start discovery on DUT.
      3. Wait for DUT discovered or timeout(15 seconds).
      4. Check result(should be discovered).
    """
    latency_list = list[float]()
    for i in range(_DEFAULT_REPEAT_TIMES):
      try:
        with self.dut.bl4a.register_callback(bl4a_api.Module.ADAPTER) as dut_cb:
          await self.ref.device.set_discoverable(True)
          with performance_tool.Stopwatch() as stop_watch:
            self.dut.bt.startInquiry()

            await dut_cb.wait_for_event(
                bl4a_api.DeviceFound(address=self.ref.address, name=mock.ANY)
            )
          latency_seconds = stop_watch.elapsed_time.total_seconds()
          self.success_attempt_record(
              test_round=i + 1,
              latency=latency_seconds,
              latency_list=latency_list,
          )
      except (core.BaseBumbleError, AssertionError):
        self.logger.exception("Failed to make inquiry")
      finally:
        self.dut.bt.stopInquiry()
    self.record_sponge_data(
        repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
    )

  async def test_incoming_connection(self) -> None:
    """Tests ACL incoming connection.

    Test steps:
      1. REF sends connection request to DUT.
      2. DUT accepts the connection.
      3. REF terminates the connection.
      4. Repeat step 1-3.
    """
    latency_list = list[float]()
    for i in range(_DEFAULT_REPEAT_TIMES):
      try:
        with performance_tool.Stopwatch() as stop_watch:
          self.logger.info("[REF] Connect to DUT.")
          await self.ref.device.connect(
              self.dut.address,
              transport=core.BT_BR_EDR_TRANSPORT,
          )
          self.logger.info("[REF] Connected to DUT.")

        latency_seconds = stop_watch.elapsed_time.total_seconds()
        self.success_attempt_record(
            test_round=i + 1,
            latency=latency_seconds,
            latency_list=latency_list,
        )
      except (core.BaseBumbleError, AssertionError):
        self.logger.exception("Failed to make ACL connection")
      finally:
        await performance_tool.cleanup_connections(self.dut, self.ref)
    self.record_sponge_data(
        repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
    )

  async def test_outgoing_connection(self) -> None:
    """Tests ACL outgoing connection.

    Test steps:
      1. DUT creates RFCOMM channel.
      2. ACL connection is established.
      3. REF terminates the connection.
      4. Repeat step 1-3.
    """
    latency_list = list[float]()
    await self.classic_connect_and_pair()
    await performance_tool.terminate_connection_from_ref(self.dut, self.ref)
    channel = rfcomm.Server(self.ref.device).listen(acceptor=mock.MagicMock())
    self.ref.device.sdp_service_records[_RFCOMM_SERVICE_RECORD_HANDLE] = (
        rfcomm.make_service_sdp_records(
            service_record_handle=_RFCOMM_SERVICE_RECORD_HANDLE,
            channel=channel,
            uuid=core.UUID(_RFCOMM_UUID),
        )
    )
    for i in range(_DEFAULT_REPEAT_TIMES):
      try:
        with performance_tool.Stopwatch() as stop_watch:
          with self.dut.bl4a.register_callback(
              bl4a_api.Module.ADAPTER
          ) as dut_cb:
            self.logger.info("[DUT] Create RFCOMM channel.")
            future = self.dut.bl4a.create_rfcomm_channel_async(
                address=self.ref.address,
                secure=True,
                uuid=_RFCOMM_UUID,
            )
            self.logger.info("[DUT] Wait for ACL connection.")
            await dut_cb.wait_for_event(
                bl4a_api.AclConnected(
                    address=self.ref.address,
                    transport=android_constants.Transport.CLASSIC,
                ),
                timeout=datetime.timedelta(seconds=30),
            )
            latency_seconds = stop_watch.elapsed_time.total_seconds()
            await future
        self.success_attempt_record(
            test_round=i + 1, latency=latency_seconds, latency_list=latency_list
        )
      except (core.BaseBumbleError, AssertionError):
        self.logger.exception("Failed to make ACL connection")
      finally:
        await performance_tool.terminate_connection_from_ref(self.dut, self.ref)
    self.record_sponge_data(
        repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
    )

  async def _test_ssp_pairing_async(
      self,
      pairing_direction: _Direction,
      ref_io_capability: _IoCapability,
      ref_role: _Role,
  ) -> float:
    """Tests Classic SSP pairing.

    Test steps:
      1. Setup configurations.
      2. Make ACL connections.
      3. Start pairing.
      4. Wait for pairing requests and verify pins.
      5. Make actions corresponding to variants.
      6. Verify final states.

    Args:
      pairing_direction: Direction of pairing.
      ref_io_capability: IO Capability on the REF device.
      ref_role: Role of the REF device.

    Returns:
      The latency of the pairing process.
    """

    pairing_delegate = self.pairing_delegate

    def pairing_config_factory(
        _: device.Connection,
    ) -> pairing.PairingConfig:
      return pairing.PairingConfig(
          sc=True,
          mitm=True,
          bonding=True,
          identity_address_type=pairing.PairingConfig.AddressType.PUBLIC,
          delegate=pairing_delegate,
      )

    self.ref.device.pairing_config_factory = pairing_config_factory

    self.logger.info("[REF] Allow role switch")
    await self.ref.device.send_command(
        hci.HCI_Write_Default_Link_Policy_Settings_Command(
            default_link_policy_settings=0x01
        ),
        check_result=True,
    )
    dut_cb = self.dut.bl4a.register_callback(bl4a_api.Module.ADAPTER)
    self.test_case_context.push(dut_cb)
    ref_addr = str(self.ref.address)
    begin = datetime.datetime.now()
    auth_task: asyncio.tasks.Task | None = None
    if pairing_direction == _Direction.OUTGOING:
      self.logger.info("[REF] Prepare to accept connection.")
      ref_accept_task = asyncio.tasks.create_task(
          self.ref.device.accept(
              f"{self.dut.address}/P",
              role=ref_role,
              timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
          )
      )
      self.logger.info("[DUT] Create bond and connect implicitly.")
      self.assertTrue(
          self.dut.bt.createBond(ref_addr, android_constants.Transport.CLASSIC)
      )
      self.logger.info("[REF] Accept connection")
      await ref_accept_task
    else:
      self.logger.info("[REF] Connect to DUT.")
      ref_dut = await self.ref.device.connect(
          f"{self.dut.address}/P",
          transport=core.BT_BR_EDR_TRANSPORT,
          timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
      )
      self.logger.info("[REF] Create bond.")
      auth_task = asyncio.tasks.create_task(ref_dut.authenticate())
      self.logger.info("[DUT] Wait for incoming connection.")
      await dut_cb.wait_for_event(
          event=bl4a_api.AclConnected(
              ref_addr, transport=android_constants.Transport.CLASSIC
          ),
          timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
      )

    self.logger.info("[DUT] Wait for pairing request.")
    dut_pairing_event = await dut_cb.wait_for_event(
        event=bl4a_api.PairingRequest,
        predicate=lambda e: (e.address == ref_addr),
        timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
    )

    self.logger.info("[DUT] Check reported pairing method.")
    match ref_io_capability:
      case _IoCapability.NO_OUTPUT_NO_INPUT:
        expected_dut_pairing_variant = android_constants.PairingVariant.CONSENT
        expected_ref_pairing_variant = pairing_utils.PairingVariant.JUST_WORK
      case _IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT:
        expected_dut_pairing_variant = (
            android_constants.PairingVariant.PASSKEY_CONFIRMATION
        )
        expected_ref_pairing_variant = (
            pairing_utils.PairingVariant.NUMERIC_COMPARISON
        )
      case _:
        raise ValueError(f"Unsupported IO capability: {ref_io_capability}")

    self.assertEqual(dut_pairing_event.variant, expected_dut_pairing_variant)

    self.logger.info("[REF] Wait for pairing request.")
    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      ref_pairing_event = await pairing_delegate.pairing_events.get()

    self.logger.info("[REF] Check reported pairing method.")
    self.assertEqual(ref_pairing_event.variant, expected_ref_pairing_variant)

    self.logger.info("[DUT] Handle pairing confirmation.")
    self.dut.bt.setPairingConfirmation(ref_addr, True)

    self.logger.info("[REF] Handle pairing confirmation.")
    pairing_delegate.pairing_answers.put_nowait(True)
    if ref_io_capability == _IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT:
      self.logger.info("[REF] Check pairing pin.")
      self.assertEqual(ref_pairing_event.arg, dut_pairing_event.pin)

    self.logger.info("[DUT] Check final state.")
    actual_state = (
        await dut_cb.wait_for_event(
            event=bl4a_api.BondStateChanged,
            predicate=lambda e: (e.state in _TERMINATED_BOND_STATES),
            timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
        )
    ).state
    self.assertEqual(actual_state, android_constants.BondState.BONDED)
    if auth_task:
      self.logger.info("[REF] Wait authentication complete.")
      async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
        await auth_task
    latency_seconds = (datetime.datetime.now() - begin).total_seconds()
    self.logger.info("Success connection in %.2f seconds", latency_seconds)
    return latency_seconds

  @navi_test_base.named_parameterized(
      outgoing_ref_is_NOIO_PERI=(
          _Direction.OUTGOING,
          _IoCapability.NO_OUTPUT_NO_INPUT,
          _Role.PERIPHERAL,
          _MAJOR_CASE_DEFAULT_REPEAT_TIMES,
      ),
      outgoing_ref_is_YESNO_PERI=(
          _Direction.OUTGOING,
          _IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
          _Role.PERIPHERAL,
          _MINOR_CASE_DEFAULT_REPEAT_TIMES,
      ),
      incoming_ref_is_NOIO_PERI=(
          _Direction.INCOMING,
          _IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
          _Role.PERIPHERAL,
          _MINOR_CASE_DEFAULT_REPEAT_TIMES,
      ),
      incoming_ref_is_YESNO_PERI=(
          _Direction.INCOMING,
          _IoCapability.NO_OUTPUT_NO_INPUT,
          _Role.PERIPHERAL,
          _MINOR_CASE_DEFAULT_REPEAT_TIMES,
      ),
      incoming_ref_is_YESNO_CENTRAL=(
          _Direction.INCOMING,
          _IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
          _Role.CENTRAL,
          _MINOR_CASE_DEFAULT_REPEAT_TIMES,
      ),
      outgoing_ref_is_YESNO_CENTRAL=(
          _Direction.OUTGOING,
          _IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
          _Role.CENTRAL,
          _MINOR_CASE_DEFAULT_REPEAT_TIMES,
      ),
      outgoing_ref_is_NOIO_CENTRAL=(
          _Direction.OUTGOING,
          _IoCapability.NO_OUTPUT_NO_INPUT,
          _Role.CENTRAL,
          _MINOR_CASE_DEFAULT_REPEAT_TIMES,
      ),
      incoming_ref_is_NOIO_CENTRAL=(
          _Direction.INCOMING,
          _IoCapability.NO_OUTPUT_NO_INPUT,
          _Role.CENTRAL,
          _MINOR_CASE_DEFAULT_REPEAT_TIMES,
      ),
  )
  @navi_test_base.retry(max_count=2)
  async def test_pairing_ssp_only(
      self,
      pairing_direction: _Direction,
      ref_io_capability: _IoCapability,
      ref_role: _Role,
      default_repeat_times: int,
  ) -> None:
    """Tests Simple Secure Pairing.

    Test steps:
      1. Perform SSP.

    Args:
      pairing_direction: Direction of pairing.
      ref_io_capability: IO capabilities of the REF device.
      ref_role: Role of the REF device.
      default_repeat_times: Testcases repeat times.
    """
    # [REF] Disable SMP over Classic L2CAP channel.

    latency_list = list[float]()
    for i in range(default_repeat_times):
      try:
        self.ref.device.l2cap_channel_manager.deregister_fixed_channel(
            smp.SMP_BR_CID
        )
        self.pairing_delegate = pairing_utils.PairingDelegate(
            io_capability=ref_io_capability,
            auto_accept=True,
        )
        latency_seconds = await self._test_ssp_pairing_async(
            pairing_direction=pairing_direction,
            ref_io_capability=ref_io_capability,
            ref_role=ref_role,
        )
        self.success_attempt_record(
            test_round=i + 1, latency=latency_seconds, latency_list=latency_list
        )
      except (core.BaseBumbleError, AssertionError):
        self.logger.exception("Failed to make outgoing pairing")
        self.dut.bt.removeBond(self.ref.address)
        await performance_tool.cleanup_connections(self.dut, self.ref)
    self.record_sponge_data(
        repeat_times=default_repeat_times, latency_list=latency_list
    )

Tests ACL incoming connection.

Test steps
  1. REF sends connection request to DUT.
  2. DUT accepts the connection.
  3. REF terminates the connection.
  4. Repeat step 1-3.
Source code in navi/tests/benchmark/classic_gap_test.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
async def test_incoming_connection(self) -> None:
  """Tests ACL incoming connection.

  Test steps:
    1. REF sends connection request to DUT.
    2. DUT accepts the connection.
    3. REF terminates the connection.
    4. Repeat step 1-3.
  """
  latency_list = list[float]()
  for i in range(_DEFAULT_REPEAT_TIMES):
    try:
      with performance_tool.Stopwatch() as stop_watch:
        self.logger.info("[REF] Connect to DUT.")
        await self.ref.device.connect(
            self.dut.address,
            transport=core.BT_BR_EDR_TRANSPORT,
        )
        self.logger.info("[REF] Connected to DUT.")

      latency_seconds = stop_watch.elapsed_time.total_seconds()
      self.success_attempt_record(
          test_round=i + 1,
          latency=latency_seconds,
          latency_list=latency_list,
      )
    except (core.BaseBumbleError, AssertionError):
      self.logger.exception("Failed to make ACL connection")
    finally:
      await performance_tool.cleanup_connections(self.dut, self.ref)
  self.record_sponge_data(
      repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
  )

Test inquiry.

Test steps
  1. Set REF in discoverable mode.
  2. Start discovery on DUT.
  3. Wait for DUT discovered or timeout(15 seconds).
  4. Check result(should be discovered).
Source code in navi/tests/benchmark/classic_gap_test.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
async def test_inquiry(self) -> None:
  """Test inquiry.

  Test steps:
    1. Set REF in discoverable mode.
    2. Start discovery on DUT.
    3. Wait for DUT discovered or timeout(15 seconds).
    4. Check result(should be discovered).
  """
  latency_list = list[float]()
  for i in range(_DEFAULT_REPEAT_TIMES):
    try:
      with self.dut.bl4a.register_callback(bl4a_api.Module.ADAPTER) as dut_cb:
        await self.ref.device.set_discoverable(True)
        with performance_tool.Stopwatch() as stop_watch:
          self.dut.bt.startInquiry()

          await dut_cb.wait_for_event(
              bl4a_api.DeviceFound(address=self.ref.address, name=mock.ANY)
          )
        latency_seconds = stop_watch.elapsed_time.total_seconds()
        self.success_attempt_record(
            test_round=i + 1,
            latency=latency_seconds,
            latency_list=latency_list,
        )
    except (core.BaseBumbleError, AssertionError):
      self.logger.exception("Failed to make inquiry")
    finally:
      self.dut.bt.stopInquiry()
  self.record_sponge_data(
      repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
  )

Tests ACL outgoing connection.

Test steps
  1. DUT creates RFCOMM channel.
  2. ACL connection is established.
  3. REF terminates the connection.
  4. Repeat step 1-3.
Source code in navi/tests/benchmark/classic_gap_test.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
async def test_outgoing_connection(self) -> None:
  """Tests ACL outgoing connection.

  Test steps:
    1. DUT creates RFCOMM channel.
    2. ACL connection is established.
    3. REF terminates the connection.
    4. Repeat step 1-3.
  """
  latency_list = list[float]()
  await self.classic_connect_and_pair()
  await performance_tool.terminate_connection_from_ref(self.dut, self.ref)
  channel = rfcomm.Server(self.ref.device).listen(acceptor=mock.MagicMock())
  self.ref.device.sdp_service_records[_RFCOMM_SERVICE_RECORD_HANDLE] = (
      rfcomm.make_service_sdp_records(
          service_record_handle=_RFCOMM_SERVICE_RECORD_HANDLE,
          channel=channel,
          uuid=core.UUID(_RFCOMM_UUID),
      )
  )
  for i in range(_DEFAULT_REPEAT_TIMES):
    try:
      with performance_tool.Stopwatch() as stop_watch:
        with self.dut.bl4a.register_callback(
            bl4a_api.Module.ADAPTER
        ) as dut_cb:
          self.logger.info("[DUT] Create RFCOMM channel.")
          future = self.dut.bl4a.create_rfcomm_channel_async(
              address=self.ref.address,
              secure=True,
              uuid=_RFCOMM_UUID,
          )
          self.logger.info("[DUT] Wait for ACL connection.")
          await dut_cb.wait_for_event(
              bl4a_api.AclConnected(
                  address=self.ref.address,
                  transport=android_constants.Transport.CLASSIC,
              ),
              timeout=datetime.timedelta(seconds=30),
          )
          latency_seconds = stop_watch.elapsed_time.total_seconds()
          await future
      self.success_attempt_record(
          test_round=i + 1, latency=latency_seconds, latency_list=latency_list
      )
    except (core.BaseBumbleError, AssertionError):
      self.logger.exception("Failed to make ACL connection")
    finally:
      await performance_tool.terminate_connection_from_ref(self.dut, self.ref)
  self.record_sponge_data(
      repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
  )

Tests Simple Secure Pairing.

Test steps
  1. Perform SSP.

Parameters:

Name Type Description Default
pairing_direction _Direction

Direction of pairing.

required
ref_io_capability _IoCapability

IO capabilities of the REF device.

required
ref_role _Role

Role of the REF device.

required
default_repeat_times int

Testcases repeat times.

required
Source code in navi/tests/benchmark/classic_gap_test.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
@navi_test_base.named_parameterized(
    outgoing_ref_is_NOIO_PERI=(
        _Direction.OUTGOING,
        _IoCapability.NO_OUTPUT_NO_INPUT,
        _Role.PERIPHERAL,
        _MAJOR_CASE_DEFAULT_REPEAT_TIMES,
    ),
    outgoing_ref_is_YESNO_PERI=(
        _Direction.OUTGOING,
        _IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
        _Role.PERIPHERAL,
        _MINOR_CASE_DEFAULT_REPEAT_TIMES,
    ),
    incoming_ref_is_NOIO_PERI=(
        _Direction.INCOMING,
        _IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
        _Role.PERIPHERAL,
        _MINOR_CASE_DEFAULT_REPEAT_TIMES,
    ),
    incoming_ref_is_YESNO_PERI=(
        _Direction.INCOMING,
        _IoCapability.NO_OUTPUT_NO_INPUT,
        _Role.PERIPHERAL,
        _MINOR_CASE_DEFAULT_REPEAT_TIMES,
    ),
    incoming_ref_is_YESNO_CENTRAL=(
        _Direction.INCOMING,
        _IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
        _Role.CENTRAL,
        _MINOR_CASE_DEFAULT_REPEAT_TIMES,
    ),
    outgoing_ref_is_YESNO_CENTRAL=(
        _Direction.OUTGOING,
        _IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
        _Role.CENTRAL,
        _MINOR_CASE_DEFAULT_REPEAT_TIMES,
    ),
    outgoing_ref_is_NOIO_CENTRAL=(
        _Direction.OUTGOING,
        _IoCapability.NO_OUTPUT_NO_INPUT,
        _Role.CENTRAL,
        _MINOR_CASE_DEFAULT_REPEAT_TIMES,
    ),
    incoming_ref_is_NOIO_CENTRAL=(
        _Direction.INCOMING,
        _IoCapability.NO_OUTPUT_NO_INPUT,
        _Role.CENTRAL,
        _MINOR_CASE_DEFAULT_REPEAT_TIMES,
    ),
)
@navi_test_base.retry(max_count=2)
async def test_pairing_ssp_only(
    self,
    pairing_direction: _Direction,
    ref_io_capability: _IoCapability,
    ref_role: _Role,
    default_repeat_times: int,
) -> None:
  """Tests Simple Secure Pairing.

  Test steps:
    1. Perform SSP.

  Args:
    pairing_direction: Direction of pairing.
    ref_io_capability: IO capabilities of the REF device.
    ref_role: Role of the REF device.
    default_repeat_times: Testcases repeat times.
  """
  # [REF] Disable SMP over Classic L2CAP channel.

  latency_list = list[float]()
  for i in range(default_repeat_times):
    try:
      self.ref.device.l2cap_channel_manager.deregister_fixed_channel(
          smp.SMP_BR_CID
      )
      self.pairing_delegate = pairing_utils.PairingDelegate(
          io_capability=ref_io_capability,
          auto_accept=True,
      )
      latency_seconds = await self._test_ssp_pairing_async(
          pairing_direction=pairing_direction,
          ref_io_capability=ref_io_capability,
          ref_role=ref_role,
      )
      self.success_attempt_record(
          test_round=i + 1, latency=latency_seconds, latency_list=latency_list
      )
    except (core.BaseBumbleError, AssertionError):
      self.logger.exception("Failed to make outgoing pairing")
      self.dut.bt.removeBond(self.ref.address)
      await performance_tool.cleanup_connections(self.dut, self.ref)
  self.record_sponge_data(
      repeat_times=default_repeat_times, latency_list=latency_list
  )

Bases: PerformanceTestBase

Source code in navi/tests/benchmark/hfp_ag_test.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
class HfpAgTest(test_base.PerformanceTestBase):

  @override
  async def async_setup_class(self) -> None:
    await super().async_setup_class()
    if self.dut.getprop(android_constants.Property.HFP_AG_ENABLED) != "true":
      raise signals.TestAbortClass("HFP(AG) is not enabled on DUT.")

  @override
  async def async_teardown_test(self) -> None:
    await super().async_teardown_test()
    # Make sure Bumble is off to cancel any running tasks.
    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      await self.ref.close()

  @classmethod
  def _default_hfp_configuration(cls) -> hfp.HfConfiguration:
    return hfp.HfConfiguration(
        supported_hf_features=[],
        supported_hf_indicators=[],
        supported_audio_codecs=[
            _AudioCodec.CVSD,
            _AudioCodec.MSBC,
        ],
    )

  async def pair_and_connect(self) -> None:
    with (self.dut.bl4a.register_callback(_Module.HFP_AG) as dut_cb,):
      hfp_ext.HfProtocol.setup_server(
          self.ref.device,
          sdp_handle=_HFP_SDP_HANDLE,
          configuration=self._default_hfp_configuration(),
      )

      self.logger.info("[DUT] Connect and pair REF.")
      await self.classic_connect_and_pair()

      self.logger.info("[DUT] Wait for HFP connected.")
      await dut_cb.wait_for_event(
          bl4a_api.ProfileActiveDeviceChanged(address=self.ref.address),
          timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
      )

  async def test_paired_connect_outgoing(self) -> None:
    """Tests HFP connection establishment where pairing is not involved.

    Test steps:
      1. Setup pairing between DUT and REF.
      2. Terminate ACL connection.
      3. Trigger connection from DUT.
      4. Wait HFP connected on DUT.
      5. Repeat step 3-4.
    """
    latency_list = list[float]()
    await self.pair_and_connect()
    await performance_tool.terminate_connection_from_dut(self.dut, self.ref)
    for i in range(_DEFAULT_REPEAT_TIMES):
      try:
        with (self.dut.bl4a.register_callback(_Module.HFP_AG) as dut_cb,):
          self.logger.info("[DUT] Reconnect.")
          with performance_tool.Stopwatch() as stop_watch:
            self.dut.bt.connect(self.ref.address)
            self.logger.info("[DUT] Wait for HFP connected.")
            await dut_cb.wait_for_event(
                bl4a_api.ProfileActiveDeviceChanged(address=self.ref.address),
                timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
            )
          latency_seconds = stop_watch.elapsed_time.total_seconds()
          self.success_attempt_record(
              test_round=i + 1,
              latency=latency_seconds,
              latency_list=latency_list,
          )
      except (core.BaseBumbleError, AssertionError):
        self.logger.exception("Failed to make HFP connection")
      finally:
        await performance_tool.terminate_connection_from_dut(self.dut, self.ref)
    self.record_sponge_data(
        repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
    )

  async def test_paired_connect_incoming(self) -> None:
    """Tests HFP connection establishment where pairing is not involved.

    Test steps:
      1. Setup pairing between DUT and REF.
      2. Terminate ACL connection.
      3. Trigger connection from REF.
      4. Wait HFP connected on DUT.
      5. Repeat step 3-4.
    """
    latency_list = list[float]()
    await self.pair_and_connect()
    await performance_tool.terminate_connection_from_dut(self.dut, self.ref)
    for i in range(_DEFAULT_REPEAT_TIMES):
      try:
        with (self.dut.bl4a.register_callback(_Module.HFP_AG) as dut_cb,):

          self.logger.info("[DUT] Reconnect.")
          with performance_tool.Stopwatch() as stop_watch:
            dut_ref_acl = await self.ref.device.connect(
                self.dut.address,
                core.BT_BR_EDR_TRANSPORT,
                timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
            )

            self.logger.info("[REF] Authenticate and encrypt connection.")
            await dut_ref_acl.authenticate()
            await dut_ref_acl.encrypt()

            rfcomm_channel = await rfcomm.find_rfcomm_channel_with_uuid(
                dut_ref_acl, core.BT_HANDSFREE_AUDIO_GATEWAY_SERVICE
            )
            if rfcomm_channel is None:
              self.fail("No HFP RFCOMM channel found on REF.")
            self.logger.info(
                "[REF] Found HFP RFCOMM channel %s.", rfcomm_channel
            )

            self.logger.info("[REF] Open RFCOMM Multiplexer.")
            async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
              multiplexer = await rfcomm.Client(dut_ref_acl).start()

            self.logger.info("[REF] Open RFCOMM DLC.")
            async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
              dlc = await multiplexer.open_dlc(rfcomm_channel)

            self.logger.info("[REF] Establish SLC.")
            ref_hfp_protocol = hfp_ext.HfProtocol(
                dlc, self._default_hfp_configuration()
            )
            async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
              await ref_hfp_protocol.initiate_slc()

            self.logger.info("[DUT] Wait for HFP connected.")
            await dut_cb.wait_for_event(
                bl4a_api.ProfileActiveDeviceChanged(address=self.ref.address),
                timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
            )
          latency_seconds = stop_watch.elapsed_time.total_seconds()
          self.success_attempt_record(
              test_round=i + 1,
              latency=latency_seconds,
              latency_list=latency_list,
          )
      except (core.BaseBumbleError, AssertionError):
        self.logger.exception("Failed to make HFP connection")
      finally:
        await performance_tool.terminate_connection_from_dut(self.dut, self.ref)
    self.record_sponge_data(
        repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
    )

  async def test_audio_call_sco_connection(self) -> None:
    """Tests making an outgoing phone call, observing SCO connection status.

    Test steps:
      1. Setup HFP connection.
      2. Place an outgoing call.
      3. Verify SCO connected.
      4. Repeat step 2-3.
    """
    latency_list = list[float]()
    # [REF] Setup HFP.
    hfp_configuration = hfp.HfConfiguration(
        supported_hf_features=[hfp.HfFeature.CODEC_NEGOTIATION],
        supported_hf_indicators=[],
        supported_audio_codecs=[_AudioCodec.CVSD],
    )
    ref_hfp_protocol_queue = hfp_ext.HfProtocol.setup_server(
        self.ref.device,
        sdp_handle=_HFP_SDP_HANDLE,
        configuration=hfp_configuration,
    )
    preferred_codec = _AudioCodec.CVSD
    with self.dut.bl4a.register_callback(_Module.HFP_AG) as dut_hfp_cb:
      self.logger.info("[DUT] Connect and pair REF.")
      await self.classic_connect_and_pair()

      self.logger.info("[DUT] Wait for HFP connected.")
      await dut_hfp_cb.wait_for_event(
          bl4a_api.ProfileActiveDeviceChanged(address=self.ref.address),
          timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
      )

    async with self.assert_not_timeout(
        _DEFAULT_STEP_TIMEOUT_SECONDS,
        msg="[REF] Wait for HFP connected.",
    ):
      ref_hfp_protocol = await ref_hfp_protocol_queue.get()

    self.dut.bt.audioSetRepeat(android_constants.RepeatMode.ONE)
    for i in range(_DEFAULT_REPEAT_TIMES):
      test_case_callbacks = contextlib.AsyncExitStack()
      sco_links = asyncio.Queue[device.ScoLink]()
      try:
        dut_hfp_cb = self.dut.bl4a.register_callback(_Module.HFP_AG)
        dut_telecom_cb = self.dut.bl4a.register_callback(_Module.TELECOM)
        test_case_callbacks.push(dut_hfp_cb)
        test_case_callbacks.push(dut_telecom_cb)
        self.ref.device.on(
            self.ref.device.EVENT_SCO_CONNECTION, sco_links.put_nowait
        )
        self.logger.info("[DUT] Add call.")
        with self.dut.bl4a.make_phone_call(
            _CALLER_NAME,
            _CALLER_NUMBER,
            constants.Direction.OUTGOING,
        ) as call:
          await dut_telecom_cb.wait_for_event(
              event=bl4a_api.CallStateChanged(
                  state=matcher.any_of(
                      _CallState.CONNECTING, _CallState.DIALING
                  ),
                  handle=mock.ANY,
                  name=mock.ANY,
              ),
              timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
          )
          with performance_tool.Stopwatch() as stop_watch:
            self.logger.info("[DUT] Start streaming.")
            await asyncio.to_thread(self.dut.bt.audioPlaySine)

            self.logger.info("[DUT] Wait for SCO connected.")
            await dut_hfp_cb.wait_for_event(
                _HfpAgAudioStateChange(
                    address=self.ref.address, state=_ScoState.CONNECTED
                ),
                timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
            )

            async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
              self.logger.info("[REF] Wait for SCO connected.")
              await sco_links.get()

              self.assertEqual(ref_hfp_protocol.active_codec, preferred_codec)
          latency_seconds = stop_watch.elapsed_time.total_seconds()
          self.logger.info("[DUT] Terminate call.")
          call.close()
          await dut_telecom_cb.wait_for_event(
              event=bl4a_api.CallStateChanged(
                  state=_CallState.DISCONNECTED, handle=mock.ANY, name=mock.ANY
              ),
              predicate=lambda e: e.state in [_CallState.DISCONNECTED],
              timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
          )

        self.logger.info("[DUT] Wait for SCO disconnected.")
        await dut_hfp_cb.wait_for_event(
            _HfpAgAudioStateChange(
                address=self.ref.address, state=_ScoState.DISCONNECTED
            ),
            timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
        )
        self.success_attempt_record(
            test_round=i + 1,
            latency=latency_seconds,
            latency_list=latency_list,
        )
      except (core.BaseBumbleError, AssertionError):
        self.logger.exception("Failed to make HFP connection")
      finally:
        await test_case_callbacks.aclose()
    self.record_sponge_data(
        repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
    )

Tests making an outgoing phone call, observing SCO connection status.

Test steps
  1. Setup HFP connection.
  2. Place an outgoing call.
  3. Verify SCO connected.
  4. Repeat step 2-3.
Source code in navi/tests/benchmark/hfp_ag_test.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
async def test_audio_call_sco_connection(self) -> None:
  """Tests making an outgoing phone call, observing SCO connection status.

  Test steps:
    1. Setup HFP connection.
    2. Place an outgoing call.
    3. Verify SCO connected.
    4. Repeat step 2-3.
  """
  latency_list = list[float]()
  # [REF] Setup HFP.
  hfp_configuration = hfp.HfConfiguration(
      supported_hf_features=[hfp.HfFeature.CODEC_NEGOTIATION],
      supported_hf_indicators=[],
      supported_audio_codecs=[_AudioCodec.CVSD],
  )
  ref_hfp_protocol_queue = hfp_ext.HfProtocol.setup_server(
      self.ref.device,
      sdp_handle=_HFP_SDP_HANDLE,
      configuration=hfp_configuration,
  )
  preferred_codec = _AudioCodec.CVSD
  with self.dut.bl4a.register_callback(_Module.HFP_AG) as dut_hfp_cb:
    self.logger.info("[DUT] Connect and pair REF.")
    await self.classic_connect_and_pair()

    self.logger.info("[DUT] Wait for HFP connected.")
    await dut_hfp_cb.wait_for_event(
        bl4a_api.ProfileActiveDeviceChanged(address=self.ref.address),
        timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
    )

  async with self.assert_not_timeout(
      _DEFAULT_STEP_TIMEOUT_SECONDS,
      msg="[REF] Wait for HFP connected.",
  ):
    ref_hfp_protocol = await ref_hfp_protocol_queue.get()

  self.dut.bt.audioSetRepeat(android_constants.RepeatMode.ONE)
  for i in range(_DEFAULT_REPEAT_TIMES):
    test_case_callbacks = contextlib.AsyncExitStack()
    sco_links = asyncio.Queue[device.ScoLink]()
    try:
      dut_hfp_cb = self.dut.bl4a.register_callback(_Module.HFP_AG)
      dut_telecom_cb = self.dut.bl4a.register_callback(_Module.TELECOM)
      test_case_callbacks.push(dut_hfp_cb)
      test_case_callbacks.push(dut_telecom_cb)
      self.ref.device.on(
          self.ref.device.EVENT_SCO_CONNECTION, sco_links.put_nowait
      )
      self.logger.info("[DUT] Add call.")
      with self.dut.bl4a.make_phone_call(
          _CALLER_NAME,
          _CALLER_NUMBER,
          constants.Direction.OUTGOING,
      ) as call:
        await dut_telecom_cb.wait_for_event(
            event=bl4a_api.CallStateChanged(
                state=matcher.any_of(
                    _CallState.CONNECTING, _CallState.DIALING
                ),
                handle=mock.ANY,
                name=mock.ANY,
            ),
            timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
        )
        with performance_tool.Stopwatch() as stop_watch:
          self.logger.info("[DUT] Start streaming.")
          await asyncio.to_thread(self.dut.bt.audioPlaySine)

          self.logger.info("[DUT] Wait for SCO connected.")
          await dut_hfp_cb.wait_for_event(
              _HfpAgAudioStateChange(
                  address=self.ref.address, state=_ScoState.CONNECTED
              ),
              timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
          )

          async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
            self.logger.info("[REF] Wait for SCO connected.")
            await sco_links.get()

            self.assertEqual(ref_hfp_protocol.active_codec, preferred_codec)
        latency_seconds = stop_watch.elapsed_time.total_seconds()
        self.logger.info("[DUT] Terminate call.")
        call.close()
        await dut_telecom_cb.wait_for_event(
            event=bl4a_api.CallStateChanged(
                state=_CallState.DISCONNECTED, handle=mock.ANY, name=mock.ANY
            ),
            predicate=lambda e: e.state in [_CallState.DISCONNECTED],
            timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
        )

      self.logger.info("[DUT] Wait for SCO disconnected.")
      await dut_hfp_cb.wait_for_event(
          _HfpAgAudioStateChange(
              address=self.ref.address, state=_ScoState.DISCONNECTED
          ),
          timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
      )
      self.success_attempt_record(
          test_round=i + 1,
          latency=latency_seconds,
          latency_list=latency_list,
      )
    except (core.BaseBumbleError, AssertionError):
      self.logger.exception("Failed to make HFP connection")
    finally:
      await test_case_callbacks.aclose()
  self.record_sponge_data(
      repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
  )

Tests HFP connection establishment where pairing is not involved.

Test steps
  1. Setup pairing between DUT and REF.
  2. Terminate ACL connection.
  3. Trigger connection from REF.
  4. Wait HFP connected on DUT.
  5. Repeat step 3-4.
Source code in navi/tests/benchmark/hfp_ag_test.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
async def test_paired_connect_incoming(self) -> None:
  """Tests HFP connection establishment where pairing is not involved.

  Test steps:
    1. Setup pairing between DUT and REF.
    2. Terminate ACL connection.
    3. Trigger connection from REF.
    4. Wait HFP connected on DUT.
    5. Repeat step 3-4.
  """
  latency_list = list[float]()
  await self.pair_and_connect()
  await performance_tool.terminate_connection_from_dut(self.dut, self.ref)
  for i in range(_DEFAULT_REPEAT_TIMES):
    try:
      with (self.dut.bl4a.register_callback(_Module.HFP_AG) as dut_cb,):

        self.logger.info("[DUT] Reconnect.")
        with performance_tool.Stopwatch() as stop_watch:
          dut_ref_acl = await self.ref.device.connect(
              self.dut.address,
              core.BT_BR_EDR_TRANSPORT,
              timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
          )

          self.logger.info("[REF] Authenticate and encrypt connection.")
          await dut_ref_acl.authenticate()
          await dut_ref_acl.encrypt()

          rfcomm_channel = await rfcomm.find_rfcomm_channel_with_uuid(
              dut_ref_acl, core.BT_HANDSFREE_AUDIO_GATEWAY_SERVICE
          )
          if rfcomm_channel is None:
            self.fail("No HFP RFCOMM channel found on REF.")
          self.logger.info(
              "[REF] Found HFP RFCOMM channel %s.", rfcomm_channel
          )

          self.logger.info("[REF] Open RFCOMM Multiplexer.")
          async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
            multiplexer = await rfcomm.Client(dut_ref_acl).start()

          self.logger.info("[REF] Open RFCOMM DLC.")
          async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
            dlc = await multiplexer.open_dlc(rfcomm_channel)

          self.logger.info("[REF] Establish SLC.")
          ref_hfp_protocol = hfp_ext.HfProtocol(
              dlc, self._default_hfp_configuration()
          )
          async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
            await ref_hfp_protocol.initiate_slc()

          self.logger.info("[DUT] Wait for HFP connected.")
          await dut_cb.wait_for_event(
              bl4a_api.ProfileActiveDeviceChanged(address=self.ref.address),
              timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
          )
        latency_seconds = stop_watch.elapsed_time.total_seconds()
        self.success_attempt_record(
            test_round=i + 1,
            latency=latency_seconds,
            latency_list=latency_list,
        )
    except (core.BaseBumbleError, AssertionError):
      self.logger.exception("Failed to make HFP connection")
    finally:
      await performance_tool.terminate_connection_from_dut(self.dut, self.ref)
  self.record_sponge_data(
      repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
  )

Tests HFP connection establishment where pairing is not involved.

Test steps
  1. Setup pairing between DUT and REF.
  2. Terminate ACL connection.
  3. Trigger connection from DUT.
  4. Wait HFP connected on DUT.
  5. Repeat step 3-4.
Source code in navi/tests/benchmark/hfp_ag_test.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
async def test_paired_connect_outgoing(self) -> None:
  """Tests HFP connection establishment where pairing is not involved.

  Test steps:
    1. Setup pairing between DUT and REF.
    2. Terminate ACL connection.
    3. Trigger connection from DUT.
    4. Wait HFP connected on DUT.
    5. Repeat step 3-4.
  """
  latency_list = list[float]()
  await self.pair_and_connect()
  await performance_tool.terminate_connection_from_dut(self.dut, self.ref)
  for i in range(_DEFAULT_REPEAT_TIMES):
    try:
      with (self.dut.bl4a.register_callback(_Module.HFP_AG) as dut_cb,):
        self.logger.info("[DUT] Reconnect.")
        with performance_tool.Stopwatch() as stop_watch:
          self.dut.bt.connect(self.ref.address)
          self.logger.info("[DUT] Wait for HFP connected.")
          await dut_cb.wait_for_event(
              bl4a_api.ProfileActiveDeviceChanged(address=self.ref.address),
              timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
          )
        latency_seconds = stop_watch.elapsed_time.total_seconds()
        self.success_attempt_record(
            test_round=i + 1,
            latency=latency_seconds,
            latency_list=latency_list,
        )
    except (core.BaseBumbleError, AssertionError):
      self.logger.exception("Failed to make HFP connection")
    finally:
      await performance_tool.terminate_connection_from_dut(self.dut, self.ref)
  self.record_sponge_data(
      repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
  )

Bases: PerformanceTestBase

Source code in navi/tests/benchmark/hfp_hf_test.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
class HfpHfTest(test_base.PerformanceTestBase):

  @override
  async def async_setup_class(self) -> None:
    await super().async_setup_class()
    if self.dut.getprop(_HFP_HF_ENABLED_PROPERTY) != "true":
      raise signals.TestAbortClass("DUT does not have HFP HF enabled.")

  def _setup_ag_device(self, configuration: hfp.AgConfiguration) -> None:
    def on_dlc(dlc: rfcomm.DLC):
      hfp.AgProtocol(dlc, configuration)

    self.ref.device.sdp_service_records = {
        _HFP_AG_SDP_HANDLE: hfp.make_ag_sdp_records(
            service_record_handle=_HFP_AG_SDP_HANDLE,
            rfcomm_channel=rfcomm.Server(self.ref.device).listen(on_dlc),
            configuration=configuration,
        )
    }

  async def _connect_hfp_from_ref(
      self, config: hfp.AgConfiguration
  ) -> hfp.AgProtocol:
    if not (
        dut_ref_acl := self.ref.device.find_connection_by_bd_addr(
            hci.Address(self.dut.address)
        )
    ):
      self.logger.info("[REF] Connect.")
      dut_ref_acl = await self.ref.device.connect(
          self.dut.address,
          core.BT_BR_EDR_TRANSPORT,
          timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
      )

      self.logger.info("[REF] Authenticate and encrypt connection.")
      await dut_ref_acl.authenticate()
      await dut_ref_acl.encrypt()

    sdp_record = await hfp.find_hf_sdp_record(dut_ref_acl)
    if not sdp_record:
      self.fail("DUT does not have HFP SDP record.")
    rfcomm_channel = sdp_record[0]

    self.logger.info("[REF] Found HFP RFCOMM channel %s.", rfcomm_channel)

    self.logger.info("[REF] Open RFCOMM Channel.")
    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      multiplexer = await rfcomm.Client(dut_ref_acl).start()
      dlc = await multiplexer.open_dlc(rfcomm_channel)
    return hfp.AgProtocol(dlc, config)

  async def _wait_for_hfp_state(
      self, dut_cb: _Callback, state: _HfpState
  ) -> None:
    self.logger.info("[DUT] Wait for HFP state %s.", state)
    await dut_cb.wait_for_event(
        bl4a_api.ProfileConnectionStateChanged(
            address=self.ref.address,
            state=state,
        ),
    )

  async def pair_and_connect(self) -> None:
    """Tests HFP connection establishment right after a pairing session.

    Test steps:
      1. Setup HFP on REF.
      2. Create bond from DUT.
      3. Wait HFP connected on DUT.(Android should autoconnect HFP as HF)
    """
    self._setup_ag_device(_DEFAULT_AG_CONFIGURATION)

    self.logger.info("[DUT] Connect and pair REF.")
    with self.dut.bl4a.register_callback(bl4a_api.Module.HFP_HF) as dut_cb:
      await self.classic_connect_and_pair()

      self.logger.info("[DUT] Wait for HFP connected.")
      await self._wait_for_hfp_state(dut_cb, _HfpState.CONNECTED)

  async def test_paired_connect_outgoing(self) -> None:
    """Tests HFP connection establishment where pairing is not involved.

    Test steps:
      1. Setup pairing between DUT and REF.
      2. Terminate ACL connection.
      3. Trigger connection from DUT.
      4. Wait HFP connected on DUT.
      5. Disconnect from DUT.
      6. Wait HFP disconnected on DUT.
    """
    latency_list = list[float]()
    await self.pair_and_connect()
    await performance_tool.terminate_connection_from_ref(self.dut, self.ref)
    for i in range(_DEFAULT_REPEAT_TIMES):
      try:
        with self.dut.bl4a.register_callback(bl4a_api.Module.HFP_HF) as dut_cb:
          self.logger.info("[DUT] Reconnect.")
          with performance_tool.Stopwatch() as stop_watch:
            self.dut.bt.connect(self.ref.address)
            self.logger.info("[DUT] Wait for HFP connected.")
            await self._wait_for_hfp_state(dut_cb, _HfpState.CONNECTED)

          latency_seconds = stop_watch.elapsed_time.total_seconds()
          self.success_attempt_record(
              test_round=i + 1,
              latency=latency_seconds,
              latency_list=latency_list,
          )
      except (core.BaseBumbleError, AssertionError):
        self.logger.exception("Failed to make HFP connection")
      finally:
        await performance_tool.terminate_connection_from_ref(self.dut, self.ref)
    self.record_sponge_data(
        repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
    )

  async def test_paired_connect_incoming(self) -> None:
    """Tests HFP connection establishment where pairing is not involved.

    Test steps:
      1. Setup pairing between DUT and REF.
      2. Terminate ACL connection.
      3. Trigger connection from REF.
      4. Wait HFP connected on DUT.
      5. Disconnect from REF.
      6. Wait HFP disconnected on DUT.
    """
    latency_list = list[float]()
    await self.pair_and_connect()
    await performance_tool.terminate_connection_from_ref(self.dut, self.ref)
    for i in range(_DEFAULT_REPEAT_TIMES):
      try:
        with self.dut.bl4a.register_callback(bl4a_api.Module.HFP_HF) as dut_cb:
          self.logger.info("[DUT] Reconnect.")
          with performance_tool.Stopwatch() as stop_watch:
            await self._connect_hfp_from_ref(_DEFAULT_AG_CONFIGURATION)
            self.logger.info("[DUT] Wait for HFP connected.")
            await self._wait_for_hfp_state(dut_cb, _HfpState.CONNECTED)
          latency_seconds = stop_watch.elapsed_time.total_seconds()
          self.success_attempt_record(
              test_round=i + 1,
              latency=latency_seconds,
              latency_list=latency_list,
          )
      except (core.BaseBumbleError, AssertionError):
        self.logger.exception("Failed to make HFP connection")
      finally:
        await performance_tool.terminate_connection_from_ref(self.dut, self.ref)
    self.record_sponge_data(
        repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
    )

Tests HFP connection establishment where pairing is not involved.

Test steps
  1. Setup pairing between DUT and REF.
  2. Terminate ACL connection.
  3. Trigger connection from REF.
  4. Wait HFP connected on DUT.
  5. Disconnect from REF.
  6. Wait HFP disconnected on DUT.
Source code in navi/tests/benchmark/hfp_hf_test.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
async def test_paired_connect_incoming(self) -> None:
  """Tests HFP connection establishment where pairing is not involved.

  Test steps:
    1. Setup pairing between DUT and REF.
    2. Terminate ACL connection.
    3. Trigger connection from REF.
    4. Wait HFP connected on DUT.
    5. Disconnect from REF.
    6. Wait HFP disconnected on DUT.
  """
  latency_list = list[float]()
  await self.pair_and_connect()
  await performance_tool.terminate_connection_from_ref(self.dut, self.ref)
  for i in range(_DEFAULT_REPEAT_TIMES):
    try:
      with self.dut.bl4a.register_callback(bl4a_api.Module.HFP_HF) as dut_cb:
        self.logger.info("[DUT] Reconnect.")
        with performance_tool.Stopwatch() as stop_watch:
          await self._connect_hfp_from_ref(_DEFAULT_AG_CONFIGURATION)
          self.logger.info("[DUT] Wait for HFP connected.")
          await self._wait_for_hfp_state(dut_cb, _HfpState.CONNECTED)
        latency_seconds = stop_watch.elapsed_time.total_seconds()
        self.success_attempt_record(
            test_round=i + 1,
            latency=latency_seconds,
            latency_list=latency_list,
        )
    except (core.BaseBumbleError, AssertionError):
      self.logger.exception("Failed to make HFP connection")
    finally:
      await performance_tool.terminate_connection_from_ref(self.dut, self.ref)
  self.record_sponge_data(
      repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
  )

Tests HFP connection establishment where pairing is not involved.

Test steps
  1. Setup pairing between DUT and REF.
  2. Terminate ACL connection.
  3. Trigger connection from DUT.
  4. Wait HFP connected on DUT.
  5. Disconnect from DUT.
  6. Wait HFP disconnected on DUT.
Source code in navi/tests/benchmark/hfp_hf_test.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
async def test_paired_connect_outgoing(self) -> None:
  """Tests HFP connection establishment where pairing is not involved.

  Test steps:
    1. Setup pairing between DUT and REF.
    2. Terminate ACL connection.
    3. Trigger connection from DUT.
    4. Wait HFP connected on DUT.
    5. Disconnect from DUT.
    6. Wait HFP disconnected on DUT.
  """
  latency_list = list[float]()
  await self.pair_and_connect()
  await performance_tool.terminate_connection_from_ref(self.dut, self.ref)
  for i in range(_DEFAULT_REPEAT_TIMES):
    try:
      with self.dut.bl4a.register_callback(bl4a_api.Module.HFP_HF) as dut_cb:
        self.logger.info("[DUT] Reconnect.")
        with performance_tool.Stopwatch() as stop_watch:
          self.dut.bt.connect(self.ref.address)
          self.logger.info("[DUT] Wait for HFP connected.")
          await self._wait_for_hfp_state(dut_cb, _HfpState.CONNECTED)

        latency_seconds = stop_watch.elapsed_time.total_seconds()
        self.success_attempt_record(
            test_round=i + 1,
            latency=latency_seconds,
            latency_list=latency_list,
        )
    except (core.BaseBumbleError, AssertionError):
      self.logger.exception("Failed to make HFP connection")
    finally:
      await performance_tool.terminate_connection_from_ref(self.dut, self.ref)
  self.record_sponge_data(
      repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
  )

Bases: PerformanceTestBase

Source code in navi/tests/benchmark/le_gap_test.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class LeGapTest(test_base.PerformanceTestBase):

  async def test_le_connection_outgoing(self) -> None:
    """Test make outgoing LE connections."""
    latency_list = list[float]()
    for i in range(_DEFAULT_REPEAT_TIMES):
      try:
        await self.ref.device.start_advertising(
            own_address_type=hci.OwnAddressType.RANDOM,
            auto_restart=False,
            advertising_interval_min=_ADVERTISING_INTERVAL_MIN,
            advertising_interval_max=_ADVERTISING_INTERVAL_MIN,
        )
        with performance_tool.Stopwatch() as stop_watch:
          client = await self.dut.bl4a.connect_gatt_client(
              self.ref.random_address,
              transport=android_constants.Transport.LE,
              address_type=android_constants.AddressTypeStatus.RANDOM,
              retry_count=0,
          )
        latency_seconds = stop_watch.elapsed_time.total_seconds()
        await client.disconnect()
        client.close()
        self.success_attempt_record(
            test_round=i + 1,
            latency=latency_seconds,
            latency_list=latency_list,
        )
      except (core.BaseBumbleError, AssertionError):
        self.logger.exception("Failed to make LE connection")
      finally:
        await performance_tool.cleanup_connections(self.dut, self.ref)
    self.record_sponge_data(
        repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
    )

  async def test_le_connection_incoming(self) -> None:
    """Test make incoming LE connections."""
    latency_list = list[float]()
    for i in range(_DEFAULT_REPEAT_TIMES):
      try:
        self.logger.info("[DUT] Start advertising")
        await self.dut.bl4a.start_legacy_advertiser(
            bl4a_api.LegacyAdvertiseSettings(
                own_address_type=_OwnAddressType.PUBLIC
            ),
        )
        with self.dut.bl4a.register_callback(bl4a_api.Module.ADAPTER) as dut_cb:
          with performance_tool.Stopwatch() as stop_watch:
            self.logger.info("[REF] Connect GATT")
            ref_dut_acl = await self.ref.device.connect(
                f"{self.dut.address}/P",
                core.BT_LE_TRANSPORT,
                own_address_type=_OwnAddressType.PUBLIC,
            )
            await ref_dut_acl.get_remote_le_features()
            self.logger.info("[DUT] Wait for LE-ACL connected")
            await dut_cb.wait_for_event(
                event=bl4a_api.AclConnected(
                    address=self.ref.address,
                    transport=android_constants.Transport.LE,
                ),
            )
          latency_seconds = stop_watch.elapsed_time.total_seconds()
          self.success_attempt_record(
              test_round=i + 1,
              latency=latency_seconds,
              latency_list=latency_list,
          )
          self.logger.info("[REF] Disconnect")
          await ref_dut_acl.disconnect()
          self.logger.info("[DUT] Wait for LE-ACL disconnected")
          await dut_cb.wait_for_event(
              bl4a_api.AclDisconnected(
                  address=self.ref.address,
                  transport=android_constants.Transport.LE,
              ),
          )
      except (core.BaseBumbleError, AssertionError):
        self.logger.exception("Failed to make LE connection")
      finally:
        await performance_tool.cleanup_connections(self.dut, self.ref)
    self.record_sponge_data(
        repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
    )

Test make incoming LE connections.

Source code in navi/tests/benchmark/le_gap_test.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
async def test_le_connection_incoming(self) -> None:
  """Test make incoming LE connections."""
  latency_list = list[float]()
  for i in range(_DEFAULT_REPEAT_TIMES):
    try:
      self.logger.info("[DUT] Start advertising")
      await self.dut.bl4a.start_legacy_advertiser(
          bl4a_api.LegacyAdvertiseSettings(
              own_address_type=_OwnAddressType.PUBLIC
          ),
      )
      with self.dut.bl4a.register_callback(bl4a_api.Module.ADAPTER) as dut_cb:
        with performance_tool.Stopwatch() as stop_watch:
          self.logger.info("[REF] Connect GATT")
          ref_dut_acl = await self.ref.device.connect(
              f"{self.dut.address}/P",
              core.BT_LE_TRANSPORT,
              own_address_type=_OwnAddressType.PUBLIC,
          )
          await ref_dut_acl.get_remote_le_features()
          self.logger.info("[DUT] Wait for LE-ACL connected")
          await dut_cb.wait_for_event(
              event=bl4a_api.AclConnected(
                  address=self.ref.address,
                  transport=android_constants.Transport.LE,
              ),
          )
        latency_seconds = stop_watch.elapsed_time.total_seconds()
        self.success_attempt_record(
            test_round=i + 1,
            latency=latency_seconds,
            latency_list=latency_list,
        )
        self.logger.info("[REF] Disconnect")
        await ref_dut_acl.disconnect()
        self.logger.info("[DUT] Wait for LE-ACL disconnected")
        await dut_cb.wait_for_event(
            bl4a_api.AclDisconnected(
                address=self.ref.address,
                transport=android_constants.Transport.LE,
            ),
        )
    except (core.BaseBumbleError, AssertionError):
      self.logger.exception("Failed to make LE connection")
    finally:
      await performance_tool.cleanup_connections(self.dut, self.ref)
  self.record_sponge_data(
      repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
  )

Test make outgoing LE connections.

Source code in navi/tests/benchmark/le_gap_test.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
async def test_le_connection_outgoing(self) -> None:
  """Test make outgoing LE connections."""
  latency_list = list[float]()
  for i in range(_DEFAULT_REPEAT_TIMES):
    try:
      await self.ref.device.start_advertising(
          own_address_type=hci.OwnAddressType.RANDOM,
          auto_restart=False,
          advertising_interval_min=_ADVERTISING_INTERVAL_MIN,
          advertising_interval_max=_ADVERTISING_INTERVAL_MIN,
      )
      with performance_tool.Stopwatch() as stop_watch:
        client = await self.dut.bl4a.connect_gatt_client(
            self.ref.random_address,
            transport=android_constants.Transport.LE,
            address_type=android_constants.AddressTypeStatus.RANDOM,
            retry_count=0,
        )
      latency_seconds = stop_watch.elapsed_time.total_seconds()
      await client.disconnect()
      client.close()
      self.success_attempt_record(
          test_round=i + 1,
          latency=latency_seconds,
          latency_list=latency_list,
      )
    except (core.BaseBumbleError, AssertionError):
      self.logger.exception("Failed to make LE connection")
    finally:
      await performance_tool.cleanup_connections(self.dut, self.ref)
  self.record_sponge_data(
      repeat_times=_DEFAULT_REPEAT_TIMES, latency_list=latency_list
  )

Bases: TwoDevicesTestBase

Base class for performance tests.

Source code in navi/tests/benchmark/test_base.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class PerformanceTestBase(navi_test_base.TwoDevicesTestBase):
  """Base class for performance tests."""

  def success_attempt_record(self, test_round: int, latency: float,
                             latency_list: list[float]) -> None:
    """Records a single test attempt.

    Args:
      test_round: The round of the test.
      latency: The latency of the test.
      latency_list: The list of latencies of all test attempts.
    """
    self.logger.info("Success in %.2f seconds", latency)
    self.logger.info("Test %d Success", test_round)
    latency_list.append(latency)

  def record_sponge_data(self,
                         repeat_times: int, latency_list: list[float]) -> None:
    """Records the sponge data."""
    success_count = len(latency_list)
    self.logger.info(
        "[success rate] Passes: %d / Attempts: %d",
        success_count,
        repeat_times,
    )
    self.logger.info(
        "[connection time] avg: %.2f, min: %.2f, max: %.2f, stdev: %.2f",
        statistics.mean(latency_list),
        min(latency_list),
        max(latency_list),
        statistics.stdev(latency_list),
    )
    self.record_data(
        navi_test_base.RecordData(
            test_name=self.current_test_info.name,
            properties={
                "passes": success_count,
                "attempts": repeat_times,
                "avg_latency": statistics.mean(latency_list),
                "min_latency": min(latency_list),
                "max_latency": max(latency_list),
                "stdev_latency": statistics.stdev(latency_list),
            },
        )
    )

Bases: TwoDevicesTestBase

Tests throughput of different transport.

Note that the performance could be affected a lot by the HCI throughput and latency on Bumble. For example, running this test on a Cloudtop with Pontis might lead to lower bandwidth in comparison to running on a local machine.

Source code in navi/tests/benchmark/throughput_test.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
class ThroughputTest(navi_test_base.TwoDevicesTestBase):
  """Tests throughput of different transport.

  Note that the performance could be affected a lot by the HCI throughput and
  latency on Bumble. For example, running this test on a Cloudtop with Pontis
  might lead to lower bandwidth in comparison to running on a local machine.
  """

  @override
  async def async_setup_class(self) -> None:
    await super().async_setup_class()
    # Disable logging of bumble modules to avoid log spam.
    for module in _BUMBLE_SPAM_MODULES:
      module.logger.setLevel(logging.INFO)

  @override
  async def async_teardown_class(self) -> None:
    await super().async_teardown_class()
    # Re-enable logging of bumble modules.
    for module in _BUMBLE_SPAM_MODULES:
      module.logger.setLevel(logging.DEBUG)

  @override
  async def async_setup_test(self) -> None:
    await super().async_setup_test()

    # Using highest authentication level to allow secure sockets.
    self.ref.device.pairing_config_factory = lambda _: pairing.PairingConfig(
        delegate=_PairingDelegate(
            io_capability=(
                _PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT
            )
        )
    )

  @override
  def on_fail(self, record: records.TestResultRecord) -> None:
    # Snippet might be stuck during tests. Reload it to recover.
    self.dut.reload_snippet()

  @navi_test_base.parameterized(
      (_Phy.LE_1M, _CodedPhyOption.NO_PREFERRED),
      (_Phy.LE_2M, _CodedPhyOption.NO_PREFERRED),
      (_Phy.LE_CODED, _CodedPhyOption.S2),
      (_Phy.LE_CODED, _CodedPhyOption.S8),
  )
  @navi_test_base.retry(2)
  async def test_le_l2cap(
      self,
      phy: android_constants.Phy,
      coded_options: android_constants.CodedPhyOption,
  ) -> None:
    """Tests LE L2CAP throughput.

    Args:
      phy: The PHY to be used for the connection.
      coded_options: The coded PHY options to be used for the connection.
    """
    await self.ref.device.start_advertising(
        own_address_type=hci.OwnAddressType.PUBLIC
    )

    self.logger.info("[DUT] Connect GATT to REF.")
    dut_gatt_client = await self.dut.bl4a.connect_gatt_client(
        address=self.ref.address,
        transport=android_constants.Transport.LE,
        address_type=android_constants.AddressTypeStatus.PUBLIC,
    )
    self.logger.info("[DUT] Set MTU and PHY.")
    # Trigger Data Length Extension.
    await dut_gatt_client.request_mtu(517)
    # Set preferred PHY.
    new_tx_phy, new_rx_phy = await dut_gatt_client.set_preferred_phy(
        tx_phy=phy.to_mask(),
        rx_phy=phy.to_mask(),
        phy_options=coded_options,
    )
    self.assertEqual(new_tx_phy, phy)
    self.assertEqual(new_rx_phy, phy)

    ref_accept_future: asyncio.Future[l2cap.LeCreditBasedChannel] = (
        asyncio.get_running_loop().create_future()
    )
    server = self.ref.device.create_l2cap_server(
        # Same configuration as Android.
        spec=l2cap.LeCreditBasedChannelSpec(
            mtu=65535, mps=251, max_credits=65535
        ),
        handler=ref_accept_future.set_result,
    )
    self.logger.info("[REF] Listen L2CAP on PSM %d", server.psm)

    self.logger.info("[DUT] Connect L2CAP channel to REF.")
    ref_dut_l2cap_channel, dut_ref_l2cap_channel = await asyncio.gather(
        ref_accept_future,
        self.dut.bl4a.create_l2cap_channel(
            address=self.ref.address,
            secure=False,
            psm=server.psm,
            address_type=android_constants.AddressTypeStatus.PUBLIC,
        ),
    )
    # Set the MPS to MAX_LEN - HEADER_SIZE(4) to avoid SDU fragmentation.
    assert self.ref.device.host.le_acl_packet_queue
    ref_dut_l2cap_channel.peer_mps = min(
        ref_dut_l2cap_channel.peer_mps,
        self.ref.device.host.le_acl_packet_queue.max_packet_size
        - _L2CAP_HEADER_SIZE,
    )

    # Store received SDUs in queue.
    ref_sdu_rx_queue = asyncio.Queue[bytes]()
    ref_dut_l2cap_channel.sink = ref_sdu_rx_queue.put_nowait
    expected_throughput_bytes_per_second = (
        _EXPECTED_THROUGHPUT_BYTES_PER_SECOND[(phy, coded_options)]
    )
    total_bytes = int(expected_throughput_bytes_per_second * 20.0)

    async def ref_rx_task():
      bytes_received = 0
      while bytes_received < total_bytes:
        bytes_received += len(await ref_sdu_rx_queue.get())

    self.logger.info("Start sending data from DUT to REF")
    with performance_tool.Stopwatch() as tx_stopwatch:
      async with self.assert_not_timeout(_TRANSMISSION_TIMEOUT_SECONDS):
        await asyncio.gather(
            dut_ref_l2cap_channel.write(bytes(total_bytes)),
            ref_rx_task(),
        )

    self.logger.info("Start sending data from REF to DUT")
    with performance_tool.Stopwatch() as rx_stopwatch:
      async with self.assert_not_timeout(_TRANSMISSION_TIMEOUT_SECONDS):
        ref_dut_l2cap_channel.write(bytes(total_bytes))
        await asyncio.gather(dut_ref_l2cap_channel.read(total_bytes))

    tx_throughput = total_bytes / (tx_stopwatch.elapsed_time).total_seconds()
    rx_throughput = total_bytes / (rx_stopwatch.elapsed_time).total_seconds()
    self.logger.info("Tx Throughput: %.2f KB/s", tx_throughput / 1024)
    self.logger.info("Rx Throughput: %.2f KB/s", rx_throughput / 1024)
    self.record_data(
        navi_test_base.RecordData(
            test_name=self.current_test_info.name,
            properties={
                "tx_throughput_bytes_per_second": tx_throughput,
                "rx_throughput_bytes_per_second": rx_throughput,
            },
        )
    )

  @navi_test_base.parameterized(
      (_Phy.LE_1M, _CodedPhyOption.NO_PREFERRED),
      (_Phy.LE_2M, _CodedPhyOption.NO_PREFERRED),
      (_Phy.LE_CODED, _CodedPhyOption.S2),
      (_Phy.LE_CODED, _CodedPhyOption.S8),
  )
  @navi_test_base.retry(2)
  async def test_le_gatt(
      self,
      phy: android_constants.Phy,
      coded_options: android_constants.CodedPhyOption,
  ) -> None:
    """Tests LE GATT throughput.

    Args:
      phy: The PHY to be used for the connection.
      coded_options: The coded PHY options to be used for the connection.
    """
    ref_written_queue = asyncio.Queue[bytes]()
    expected_throughput_bytes_per_second = (
        _EXPECTED_THROUGHPUT_BYTES_PER_SECOND[(phy, coded_options)]
    )
    total_bytes = int(expected_throughput_bytes_per_second * 20.0)

    ref_characteristic = gatt.Characteristic(
        uuid=_CHARACTERISTIC_UUID,
        properties=(
            gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE
            | gatt.Characteristic.Properties.NOTIFY
        ),
        permissions=gatt.Characteristic.Permissions.WRITEABLE,
        value=gatt.CharacteristicValue(
            write=lambda _, value: ref_written_queue.put_nowait(value)
        ),
    )
    self.ref.device.add_service(
        gatt.Service(
            uuid=_SERVICE_UUID,
            characteristics=[ref_characteristic],
        )
    )
    await self.ref.device.start_advertising(
        own_address_type=hci.OwnAddressType.PUBLIC
    )

    self.logger.info("[DUT] Connect to REF.")
    gatt_client = await self.dut.bl4a.connect_gatt_client(
        address=self.ref.address,
        transport=android_constants.Transport.LE,
        address_type=android_constants.AddressTypeStatus.PUBLIC,
    )
    self.logger.info("[DUT] Set MTU and PHY.")
    # Trigger Data Length Extension.
    await gatt_client.request_mtu(517)
    # Set preferred PHY.
    new_tx_phy, new_rx_phy = await gatt_client.set_preferred_phy(
        tx_phy=phy.to_mask(),
        rx_phy=phy.to_mask(),
        phy_options=coded_options,
    )
    self.assertEqual(new_tx_phy, phy)
    self.assertEqual(new_rx_phy, phy)
    characteristic_handle = bl4a_api.find_characteristic_by_uuid(
        _CHARACTERISTIC_UUID, await gatt_client.discover_services()
    ).handle
    if characteristic_handle is None:
      self.fail("Characteristic not found.")
    # Subscribe to notifications.
    await gatt_client.subscribe_characteristic_notifications(
        characteristic_handle
    )

    async def ref_rx_task():
      bytes_received = 0
      while bytes_received < total_bytes:
        bytes_received += len(await ref_written_queue.get())

    async def ref_tx_task():
      bytes_sent = 0
      while bytes_sent < total_bytes:
        payload_size = min(total_bytes - bytes_sent, _GATT_PAYLOAD_SIZE)
        await self.ref.device.gatt_server.notify_subscribers(
            ref_characteristic, bytes(payload_size)
        )
        bytes_sent += payload_size

    async def dut_rx_task():
      bytes_received = 0
      while True:
        # Snippet RPC latency is very high, so we can only poll events until
        # all of them have been received.
        bytes_received += sum(
            len(event.value)
            for event in await asyncio.to_thread(
                lambda: gatt_client.get_all_events(
                    bl4a_api.GattCharacteristicChanged
                )
            )
        )
        if bytes_received >= total_bytes:
          break

    self.logger.info("Start sending data from DUT to REF")
    with performance_tool.Stopwatch() as tx_stopwatch:
      async with self.assert_not_timeout(_TRANSMISSION_TIMEOUT_SECONDS):
        await asyncio.gather(
            gatt_client.write_characteristic_long(
                characteristic_handle,
                bytes(total_bytes),
                mtu=_GATT_PAYLOAD_SIZE,
                write_type=android_constants.GattWriteType.NO_RESPONSE,
            ),
            ref_rx_task(),
        )

    self.logger.info("Start sending data from REF to DUT")
    with performance_tool.Stopwatch() as rx_stopwatch:
      async with self.assert_not_timeout(_TRANSMISSION_TIMEOUT_SECONDS):
        await asyncio.gather(
            dut_rx_task(),
            ref_tx_task(),
        )

    tx_throughput = total_bytes / (tx_stopwatch.elapsed_time).total_seconds()
    rx_throughput = total_bytes / (rx_stopwatch.elapsed_time).total_seconds()
    self.logger.info("Tx Throughput: %.2f KB/s", tx_throughput / 1024)
    self.logger.info("Rx Throughput: %.2f KB/s", rx_throughput / 1024)
    self.record_data(
        navi_test_base.RecordData(
            test_name=self.current_test_info.name,
            properties={
                "tx_throughput_bytes_per_second": tx_throughput,
                "rx_throughput_bytes_per_second": rx_throughput,
            },
        )
    )

  @navi_test_base.retry(2)
  async def test_rfcomm(self) -> None:
    """Tests RFCOMM throughput."""

    await self.classic_connect_and_pair()

    ref_accept_future: asyncio.Future[rfcomm.DLC] = (
        asyncio.get_running_loop().create_future()
    )
    channel = rfcomm.Server(self.ref.device).listen(
        acceptor=ref_accept_future.set_result
    )
    self.ref.device.sdp_service_records[_RFCOMM_SERVICE_RECORD_HANDLE] = (
        rfcomm.make_service_sdp_records(
            service_record_handle=_RFCOMM_SERVICE_RECORD_HANDLE,
            channel=channel,
            uuid=core.UUID(_RFCOMM_UUID),
        )
    )
    self.logger.info("[REF] Listen RFCOMM on channel %d.", channel)

    self.logger.info("[DUT] Connect RFCOMM channel to REF.")
    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      ref_dut_dlc, dut_ref_dlc = await asyncio.gather(
          ref_accept_future,
          self.dut.bl4a.create_rfcomm_channel(
              address=self.ref.address,
              secure=True,
              uuid=_RFCOMM_UUID,
          ),
      )

    # Store received SDUs in queue.
    ref_sdu_rx_queue = asyncio.Queue[bytes]()
    ref_dut_dlc.sink = ref_sdu_rx_queue.put_nowait
    # Set the threshold to 6 to avoid running out of buffer.
    ref_dut_dlc.rx_credits_threshold = _RX_THRESHOLD
    total_bytes = 4 * 1024 * 1024  # 4 MB

    async def ref_rx_task():
      bytes_received = 0
      while bytes_received < total_bytes:
        bytes_received += len(await ref_sdu_rx_queue.get())

    self.logger.info("Start sending data from DUT to REF")
    with performance_tool.Stopwatch() as tx_stopwatch:
      async with self.assert_not_timeout(_TRANSMISSION_TIMEOUT_SECONDS):
        await asyncio.gather(
            dut_ref_dlc.write(bytes(total_bytes)),
            ref_rx_task(),
        )

    self.logger.info("Start sending data from REF to DUT")
    with performance_tool.Stopwatch() as rx_stopwatch:
      async with self.assert_not_timeout(_TRANSMISSION_TIMEOUT_SECONDS):
        ref_dut_dlc.write(bytes(total_bytes))
        await dut_ref_dlc.read(total_bytes)

    tx_throughput = total_bytes / (tx_stopwatch.elapsed_time).total_seconds()
    rx_throughput = total_bytes / (rx_stopwatch.elapsed_time).total_seconds()
    self.logger.info("Tx Throughput: %.2f KB/s", tx_throughput / 1024)
    self.logger.info("Rx Throughput: %.2f KB/s", rx_throughput / 1024)
    self.record_data(
        navi_test_base.RecordData(
            test_name=self.current_test_info.name,
            properties={
                "tx_throughput_bytes_per_second": tx_throughput,
                "rx_throughput_bytes_per_second": rx_throughput,
            },
        )
    )

Tests LE GATT throughput.

Parameters:

Name Type Description Default
phy Phy

The PHY to be used for the connection.

required
coded_options CodedPhyOption

The coded PHY options to be used for the connection.

required
Source code in navi/tests/benchmark/throughput_test.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
@navi_test_base.parameterized(
    (_Phy.LE_1M, _CodedPhyOption.NO_PREFERRED),
    (_Phy.LE_2M, _CodedPhyOption.NO_PREFERRED),
    (_Phy.LE_CODED, _CodedPhyOption.S2),
    (_Phy.LE_CODED, _CodedPhyOption.S8),
)
@navi_test_base.retry(2)
async def test_le_gatt(
    self,
    phy: android_constants.Phy,
    coded_options: android_constants.CodedPhyOption,
) -> None:
  """Tests LE GATT throughput.

  Args:
    phy: The PHY to be used for the connection.
    coded_options: The coded PHY options to be used for the connection.
  """
  ref_written_queue = asyncio.Queue[bytes]()
  expected_throughput_bytes_per_second = (
      _EXPECTED_THROUGHPUT_BYTES_PER_SECOND[(phy, coded_options)]
  )
  total_bytes = int(expected_throughput_bytes_per_second * 20.0)

  ref_characteristic = gatt.Characteristic(
      uuid=_CHARACTERISTIC_UUID,
      properties=(
          gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE
          | gatt.Characteristic.Properties.NOTIFY
      ),
      permissions=gatt.Characteristic.Permissions.WRITEABLE,
      value=gatt.CharacteristicValue(
          write=lambda _, value: ref_written_queue.put_nowait(value)
      ),
  )
  self.ref.device.add_service(
      gatt.Service(
          uuid=_SERVICE_UUID,
          characteristics=[ref_characteristic],
      )
  )
  await self.ref.device.start_advertising(
      own_address_type=hci.OwnAddressType.PUBLIC
  )

  self.logger.info("[DUT] Connect to REF.")
  gatt_client = await self.dut.bl4a.connect_gatt_client(
      address=self.ref.address,
      transport=android_constants.Transport.LE,
      address_type=android_constants.AddressTypeStatus.PUBLIC,
  )
  self.logger.info("[DUT] Set MTU and PHY.")
  # Trigger Data Length Extension.
  await gatt_client.request_mtu(517)
  # Set preferred PHY.
  new_tx_phy, new_rx_phy = await gatt_client.set_preferred_phy(
      tx_phy=phy.to_mask(),
      rx_phy=phy.to_mask(),
      phy_options=coded_options,
  )
  self.assertEqual(new_tx_phy, phy)
  self.assertEqual(new_rx_phy, phy)
  characteristic_handle = bl4a_api.find_characteristic_by_uuid(
      _CHARACTERISTIC_UUID, await gatt_client.discover_services()
  ).handle
  if characteristic_handle is None:
    self.fail("Characteristic not found.")
  # Subscribe to notifications.
  await gatt_client.subscribe_characteristic_notifications(
      characteristic_handle
  )

  async def ref_rx_task():
    bytes_received = 0
    while bytes_received < total_bytes:
      bytes_received += len(await ref_written_queue.get())

  async def ref_tx_task():
    bytes_sent = 0
    while bytes_sent < total_bytes:
      payload_size = min(total_bytes - bytes_sent, _GATT_PAYLOAD_SIZE)
      await self.ref.device.gatt_server.notify_subscribers(
          ref_characteristic, bytes(payload_size)
      )
      bytes_sent += payload_size

  async def dut_rx_task():
    bytes_received = 0
    while True:
      # Snippet RPC latency is very high, so we can only poll events until
      # all of them have been received.
      bytes_received += sum(
          len(event.value)
          for event in await asyncio.to_thread(
              lambda: gatt_client.get_all_events(
                  bl4a_api.GattCharacteristicChanged
              )
          )
      )
      if bytes_received >= total_bytes:
        break

  self.logger.info("Start sending data from DUT to REF")
  with performance_tool.Stopwatch() as tx_stopwatch:
    async with self.assert_not_timeout(_TRANSMISSION_TIMEOUT_SECONDS):
      await asyncio.gather(
          gatt_client.write_characteristic_long(
              characteristic_handle,
              bytes(total_bytes),
              mtu=_GATT_PAYLOAD_SIZE,
              write_type=android_constants.GattWriteType.NO_RESPONSE,
          ),
          ref_rx_task(),
      )

  self.logger.info("Start sending data from REF to DUT")
  with performance_tool.Stopwatch() as rx_stopwatch:
    async with self.assert_not_timeout(_TRANSMISSION_TIMEOUT_SECONDS):
      await asyncio.gather(
          dut_rx_task(),
          ref_tx_task(),
      )

  tx_throughput = total_bytes / (tx_stopwatch.elapsed_time).total_seconds()
  rx_throughput = total_bytes / (rx_stopwatch.elapsed_time).total_seconds()
  self.logger.info("Tx Throughput: %.2f KB/s", tx_throughput / 1024)
  self.logger.info("Rx Throughput: %.2f KB/s", rx_throughput / 1024)
  self.record_data(
      navi_test_base.RecordData(
          test_name=self.current_test_info.name,
          properties={
              "tx_throughput_bytes_per_second": tx_throughput,
              "rx_throughput_bytes_per_second": rx_throughput,
          },
      )
  )

Tests LE L2CAP throughput.

Parameters:

Name Type Description Default
phy Phy

The PHY to be used for the connection.

required
coded_options CodedPhyOption

The coded PHY options to be used for the connection.

required
Source code in navi/tests/benchmark/throughput_test.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
@navi_test_base.parameterized(
    (_Phy.LE_1M, _CodedPhyOption.NO_PREFERRED),
    (_Phy.LE_2M, _CodedPhyOption.NO_PREFERRED),
    (_Phy.LE_CODED, _CodedPhyOption.S2),
    (_Phy.LE_CODED, _CodedPhyOption.S8),
)
@navi_test_base.retry(2)
async def test_le_l2cap(
    self,
    phy: android_constants.Phy,
    coded_options: android_constants.CodedPhyOption,
) -> None:
  """Tests LE L2CAP throughput.

  Args:
    phy: The PHY to be used for the connection.
    coded_options: The coded PHY options to be used for the connection.
  """
  await self.ref.device.start_advertising(
      own_address_type=hci.OwnAddressType.PUBLIC
  )

  self.logger.info("[DUT] Connect GATT to REF.")
  dut_gatt_client = await self.dut.bl4a.connect_gatt_client(
      address=self.ref.address,
      transport=android_constants.Transport.LE,
      address_type=android_constants.AddressTypeStatus.PUBLIC,
  )
  self.logger.info("[DUT] Set MTU and PHY.")
  # Trigger Data Length Extension.
  await dut_gatt_client.request_mtu(517)
  # Set preferred PHY.
  new_tx_phy, new_rx_phy = await dut_gatt_client.set_preferred_phy(
      tx_phy=phy.to_mask(),
      rx_phy=phy.to_mask(),
      phy_options=coded_options,
  )
  self.assertEqual(new_tx_phy, phy)
  self.assertEqual(new_rx_phy, phy)

  ref_accept_future: asyncio.Future[l2cap.LeCreditBasedChannel] = (
      asyncio.get_running_loop().create_future()
  )
  server = self.ref.device.create_l2cap_server(
      # Same configuration as Android.
      spec=l2cap.LeCreditBasedChannelSpec(
          mtu=65535, mps=251, max_credits=65535
      ),
      handler=ref_accept_future.set_result,
  )
  self.logger.info("[REF] Listen L2CAP on PSM %d", server.psm)

  self.logger.info("[DUT] Connect L2CAP channel to REF.")
  ref_dut_l2cap_channel, dut_ref_l2cap_channel = await asyncio.gather(
      ref_accept_future,
      self.dut.bl4a.create_l2cap_channel(
          address=self.ref.address,
          secure=False,
          psm=server.psm,
          address_type=android_constants.AddressTypeStatus.PUBLIC,
      ),
  )
  # Set the MPS to MAX_LEN - HEADER_SIZE(4) to avoid SDU fragmentation.
  assert self.ref.device.host.le_acl_packet_queue
  ref_dut_l2cap_channel.peer_mps = min(
      ref_dut_l2cap_channel.peer_mps,
      self.ref.device.host.le_acl_packet_queue.max_packet_size
      - _L2CAP_HEADER_SIZE,
  )

  # Store received SDUs in queue.
  ref_sdu_rx_queue = asyncio.Queue[bytes]()
  ref_dut_l2cap_channel.sink = ref_sdu_rx_queue.put_nowait
  expected_throughput_bytes_per_second = (
      _EXPECTED_THROUGHPUT_BYTES_PER_SECOND[(phy, coded_options)]
  )
  total_bytes = int(expected_throughput_bytes_per_second * 20.0)

  async def ref_rx_task():
    bytes_received = 0
    while bytes_received < total_bytes:
      bytes_received += len(await ref_sdu_rx_queue.get())

  self.logger.info("Start sending data from DUT to REF")
  with performance_tool.Stopwatch() as tx_stopwatch:
    async with self.assert_not_timeout(_TRANSMISSION_TIMEOUT_SECONDS):
      await asyncio.gather(
          dut_ref_l2cap_channel.write(bytes(total_bytes)),
          ref_rx_task(),
      )

  self.logger.info("Start sending data from REF to DUT")
  with performance_tool.Stopwatch() as rx_stopwatch:
    async with self.assert_not_timeout(_TRANSMISSION_TIMEOUT_SECONDS):
      ref_dut_l2cap_channel.write(bytes(total_bytes))
      await asyncio.gather(dut_ref_l2cap_channel.read(total_bytes))

  tx_throughput = total_bytes / (tx_stopwatch.elapsed_time).total_seconds()
  rx_throughput = total_bytes / (rx_stopwatch.elapsed_time).total_seconds()
  self.logger.info("Tx Throughput: %.2f KB/s", tx_throughput / 1024)
  self.logger.info("Rx Throughput: %.2f KB/s", rx_throughput / 1024)
  self.record_data(
      navi_test_base.RecordData(
          test_name=self.current_test_info.name,
          properties={
              "tx_throughput_bytes_per_second": tx_throughput,
              "rx_throughput_bytes_per_second": rx_throughput,
          },
      )
  )

Tests RFCOMM throughput.

Source code in navi/tests/benchmark/throughput_test.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
@navi_test_base.retry(2)
async def test_rfcomm(self) -> None:
  """Tests RFCOMM throughput."""

  await self.classic_connect_and_pair()

  ref_accept_future: asyncio.Future[rfcomm.DLC] = (
      asyncio.get_running_loop().create_future()
  )
  channel = rfcomm.Server(self.ref.device).listen(
      acceptor=ref_accept_future.set_result
  )
  self.ref.device.sdp_service_records[_RFCOMM_SERVICE_RECORD_HANDLE] = (
      rfcomm.make_service_sdp_records(
          service_record_handle=_RFCOMM_SERVICE_RECORD_HANDLE,
          channel=channel,
          uuid=core.UUID(_RFCOMM_UUID),
      )
  )
  self.logger.info("[REF] Listen RFCOMM on channel %d.", channel)

  self.logger.info("[DUT] Connect RFCOMM channel to REF.")
  async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
    ref_dut_dlc, dut_ref_dlc = await asyncio.gather(
        ref_accept_future,
        self.dut.bl4a.create_rfcomm_channel(
            address=self.ref.address,
            secure=True,
            uuid=_RFCOMM_UUID,
        ),
    )

  # Store received SDUs in queue.
  ref_sdu_rx_queue = asyncio.Queue[bytes]()
  ref_dut_dlc.sink = ref_sdu_rx_queue.put_nowait
  # Set the threshold to 6 to avoid running out of buffer.
  ref_dut_dlc.rx_credits_threshold = _RX_THRESHOLD
  total_bytes = 4 * 1024 * 1024  # 4 MB

  async def ref_rx_task():
    bytes_received = 0
    while bytes_received < total_bytes:
      bytes_received += len(await ref_sdu_rx_queue.get())

  self.logger.info("Start sending data from DUT to REF")
  with performance_tool.Stopwatch() as tx_stopwatch:
    async with self.assert_not_timeout(_TRANSMISSION_TIMEOUT_SECONDS):
      await asyncio.gather(
          dut_ref_dlc.write(bytes(total_bytes)),
          ref_rx_task(),
      )

  self.logger.info("Start sending data from REF to DUT")
  with performance_tool.Stopwatch() as rx_stopwatch:
    async with self.assert_not_timeout(_TRANSMISSION_TIMEOUT_SECONDS):
      ref_dut_dlc.write(bytes(total_bytes))
      await dut_ref_dlc.read(total_bytes)

  tx_throughput = total_bytes / (tx_stopwatch.elapsed_time).total_seconds()
  rx_throughput = total_bytes / (rx_stopwatch.elapsed_time).total_seconds()
  self.logger.info("Tx Throughput: %.2f KB/s", tx_throughput / 1024)
  self.logger.info("Rx Throughput: %.2f KB/s", rx_throughput / 1024)
  self.record_data(
      navi_test_base.RecordData(
          test_name=self.current_test_info.name,
          properties={
              "tx_throughput_bytes_per_second": tx_throughput,
              "rx_throughput_bytes_per_second": rx_throughput,
          },
      )
  )