Skip to content

Functionality

Bases: TwoDevicesTestBase

Tests A2DP Sink and AVRCP Controller profiles.

Source code in navi/tests/functionality/a2dp_sink_test.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
class A2dpSinkTest(navi_test_base.TwoDevicesTestBase):
  """Tests A2DP Sink and AVRCP Controller profiles."""

  @override
  async def async_setup_class(self) -> None:
    await super().async_setup_class()

    if self.dut.device.is_emulator:
      # Force enable A2DP Sink and AVRCP Controller on emulator.
      self.dut.setprop(_Property.A2DP_SINK_ENABLED, "true")
      self.dut.setprop(_Property.AVRCP_CONTROLLER_ENABLED, "true")

    if self.dut.getprop(_Property.A2DP_SINK_ENABLED) != "true":
      raise signals.TestAbortClass("A2DP Sink is not enabled on DUT.")
    if self.dut.getprop(_Property.AVRCP_CONTROLLER_ENABLED) != "true":
      raise signals.TestAbortClass("AVRCP Controller is not enabled on DUT.")

  @override
  async def async_setup_test(self) -> None:
    await super().async_setup_test()
    # Setup SDP service records.
    self.ref.device.sdp_service_records = {
        _A2DP_SERVICE_RECORD_HANDLE: a2dp.make_audio_source_service_sdp_records(
            _A2DP_SERVICE_RECORD_HANDLE
        ),
        _AVRCP_TARGET_RECORD_HANDLE: avrcp.make_target_service_sdp_records(
            _AVRCP_TARGET_RECORD_HANDLE
        ),
    }

  def _setup_a2dp_source_device(
      self,
      bumble_device: device.Device,
      codecs: Sequence[a2dp_ext.A2dpCodec] = (
          a2dp_ext.A2dpCodec.SBC,
          a2dp_ext.A2dpCodec.AAC,
      ),
  ):
    # Setup AVDTP server.
    avdtp_protocol_queue = asyncio.Queue[avdtp.Protocol]()
    avdtp_listener = avdtp.Listener.for_device(device=bumble_device)

    def on_avdtp_connection(protocol: avdtp.Protocol) -> None:
      for codec in codecs:
        protocol.add_source(
            codec.get_default_capabilities(),
            codec.get_media_packet_pump(protocol.l2cap_channel.peer_mtu),
        )
      avdtp_protocol_queue.put_nowait(protocol)

    avdtp_listener.on(avdtp_listener.EVENT_CONNECTION, on_avdtp_connection)
    # Setup AVRCP server.
    avrcp_delegate = avrcp.Delegate()
    avrcp_protocol_starts = asyncio.Queue[None]()
    avrcp_protocol = avrcp.Protocol(delegate=avrcp_delegate)
    avrcp_protocol.listen(bumble_device)
    avrcp_protocol.on(
        avrcp_protocol.EVENT_START,
        lambda: avrcp_protocol_starts.put_nowait(None),
    )
    return avdtp_protocol_queue, avrcp_protocol, avrcp_protocol_starts

  async def test_paired_connect_outgoing(self) -> None:
    """Tests A2DP connection establishment right after a pairing session.

    Test steps:
      1. Connect and pair REF.
      2. Make A2DP and AVRCP connection from DUT.
    """
    ref_avdtp_protocol_queue, ref_avrcp_protocol, ref_avrcp_protocol_queue = (
        self._setup_a2dp_source_device(self.ref.device)
    )
    del ref_avrcp_protocol

    self.logger.info("[DUT] Connect and pair REF.")
    await self.classic_connect_and_pair()

    dut_a2dp_sink_callback = self.dut.bl4a.register_callback(
        bl4a_api.Module.A2DP_SINK
    )
    self.test_case_context.push(dut_a2dp_sink_callback)
    dut_avrcp_controller_callback = self.dut.bl4a.register_callback(
        bl4a_api.Module.AVRCP_CONTROLLER
    )
    self.test_case_context.push(dut_avrcp_controller_callback)

    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      self.logger.info("[REF] Wait for AVDTP connection")
      avdtp_protocol = await ref_avdtp_protocol_queue.get()
      self.logger.info("[REF] Discover remote endpoints")
      await avdtp_protocol.discover_remote_endpoints()

    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      self.logger.info("[REF] Wait for AVRCP connection")
      await ref_avrcp_protocol_queue.get()

    self.logger.info("[DUT] Waiting for A2DP connection state changed.")
    await dut_a2dp_sink_callback.wait_for_event(
        bl4a_api.ProfileConnectionStateChanged(
            address=self.ref.address,
            state=android_constants.ConnectionState.CONNECTED,
        )
    )

    self.logger.info("[DUT] Waiting for AVRCP connection state changed.")
    await dut_avrcp_controller_callback.wait_for_event(
        bl4a_api.ProfileConnectionStateChanged(
            address=self.ref.address,
            state=android_constants.ConnectionState.CONNECTED,
        )
    )

  async def test_streaming_sbc(self) -> None:
    """Tests streaming SBC.

    Test steps:
      1. Connect and pair REF.
      2. Make A2DP and AVRCP connection from DUT.
      3. Start streaming SBC.
      4. Stop streaming SBC.
    """
    ref_avdtp_protocol_queue, ref_avrcp_protocol, ref_avrcp_protocol_queue = (
        self._setup_a2dp_source_device(
            self.ref.device, codecs=[a2dp_ext.A2dpCodec.SBC]
        )
    )
    del ref_avrcp_protocol, ref_avrcp_protocol_queue

    self.logger.info("[DUT] Connect and pair REF.")
    await self.classic_connect_and_pair()

    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      self.logger.info("[REF] Wait for AVDTP connection")
      avdtp_protocol = await ref_avdtp_protocol_queue.get()
      self.logger.info("[REF] Discover remote endpoints")
      await avdtp_protocol.discover_remote_endpoints()

    sources = a2dp_ext.find_local_endpoints_by_codec(
        avdtp_protocol, a2dp.A2DP_SBC_CODEC_TYPE, avdtp.LocalSource
    )
    if not sources:
      self.fail("No A2DP local SBC source found")

    if not (stream := sources[0].stream):
      # If there is only one source, DUT will automatically create a stream.
      self.fail("No A2DP SBC stream found")

    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      self.logger.info("[REF] Start stream")
      await stream.start()

    await asyncio.sleep(_DEFAULT_STREAM_DURATION_SECONDS)

    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      self.logger.info("[REF] Stop stream")
      await stream.stop()

Tests A2DP connection establishment right after a pairing session.

Test steps
  1. Connect and pair REF.
  2. Make A2DP and AVRCP connection from DUT.
Source code in navi/tests/functionality/a2dp_sink_test.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
async def test_paired_connect_outgoing(self) -> None:
  """Tests A2DP connection establishment right after a pairing session.

  Test steps:
    1. Connect and pair REF.
    2. Make A2DP and AVRCP connection from DUT.
  """
  ref_avdtp_protocol_queue, ref_avrcp_protocol, ref_avrcp_protocol_queue = (
      self._setup_a2dp_source_device(self.ref.device)
  )
  del ref_avrcp_protocol

  self.logger.info("[DUT] Connect and pair REF.")
  await self.classic_connect_and_pair()

  dut_a2dp_sink_callback = self.dut.bl4a.register_callback(
      bl4a_api.Module.A2DP_SINK
  )
  self.test_case_context.push(dut_a2dp_sink_callback)
  dut_avrcp_controller_callback = self.dut.bl4a.register_callback(
      bl4a_api.Module.AVRCP_CONTROLLER
  )
  self.test_case_context.push(dut_avrcp_controller_callback)

  async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
    self.logger.info("[REF] Wait for AVDTP connection")
    avdtp_protocol = await ref_avdtp_protocol_queue.get()
    self.logger.info("[REF] Discover remote endpoints")
    await avdtp_protocol.discover_remote_endpoints()

  async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
    self.logger.info("[REF] Wait for AVRCP connection")
    await ref_avrcp_protocol_queue.get()

  self.logger.info("[DUT] Waiting for A2DP connection state changed.")
  await dut_a2dp_sink_callback.wait_for_event(
      bl4a_api.ProfileConnectionStateChanged(
          address=self.ref.address,
          state=android_constants.ConnectionState.CONNECTED,
      )
  )

  self.logger.info("[DUT] Waiting for AVRCP connection state changed.")
  await dut_avrcp_controller_callback.wait_for_event(
      bl4a_api.ProfileConnectionStateChanged(
          address=self.ref.address,
          state=android_constants.ConnectionState.CONNECTED,
      )
  )

Tests streaming SBC.

Test steps
  1. Connect and pair REF.
  2. Make A2DP and AVRCP connection from DUT.
  3. Start streaming SBC.
  4. Stop streaming SBC.
Source code in navi/tests/functionality/a2dp_sink_test.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
async def test_streaming_sbc(self) -> None:
  """Tests streaming SBC.

  Test steps:
    1. Connect and pair REF.
    2. Make A2DP and AVRCP connection from DUT.
    3. Start streaming SBC.
    4. Stop streaming SBC.
  """
  ref_avdtp_protocol_queue, ref_avrcp_protocol, ref_avrcp_protocol_queue = (
      self._setup_a2dp_source_device(
          self.ref.device, codecs=[a2dp_ext.A2dpCodec.SBC]
      )
  )
  del ref_avrcp_protocol, ref_avrcp_protocol_queue

  self.logger.info("[DUT] Connect and pair REF.")
  await self.classic_connect_and_pair()

  async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
    self.logger.info("[REF] Wait for AVDTP connection")
    avdtp_protocol = await ref_avdtp_protocol_queue.get()
    self.logger.info("[REF] Discover remote endpoints")
    await avdtp_protocol.discover_remote_endpoints()

  sources = a2dp_ext.find_local_endpoints_by_codec(
      avdtp_protocol, a2dp.A2DP_SBC_CODEC_TYPE, avdtp.LocalSource
  )
  if not sources:
    self.fail("No A2DP local SBC source found")

  if not (stream := sources[0].stream):
    # If there is only one source, DUT will automatically create a stream.
    self.fail("No A2DP SBC stream found")

  async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
    self.logger.info("[REF] Start stream")
    await stream.start()

  await asyncio.sleep(_DEFAULT_STREAM_DURATION_SECONDS)

  async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
    self.logger.info("[REF] Stop stream")
    await stream.stop()

Bases: MultiDevicesTestBase

Source code in navi/tests/functionality/asha_dual_devices_test.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
class AshaDualDevicesTest(navi_test_base.MultiDevicesTestBase):
  NUM_REF_DEVICES = 2
  ref_asha_services: list[asha.AshaService] = []

  @override
  async def async_setup_class(self) -> None:
    await super().async_setup_class()

    if self.dut.getprop(_PROPERTY_ASHA_ENABLED) != 'true':
      raise signals.TestAbortClass('ASHA is not supported on DUT.')

  @override
  async def async_setup_test(self) -> None:
    self.ref_asha_services = []
    await super().async_setup_test()
    await self._prepare_paired_devices()

  async def _prepare_paired_devices(self) -> None:
    """Pairs DUT with REF devices."""

    for i, dev in enumerate(self.refs):
      if i == 0:
        device_capabilities = asha.DeviceCapabilities.IS_DUAL
      else:
        device_capabilities = (
            asha.DeviceCapabilities.IS_DUAL | asha.DeviceCapabilities.IS_RIGHT
        )
      asha_service = asha.AshaService(
          capability=asha.DeviceCapabilities(device_capabilities),
          hisyncid=_HISYNC_ID,
          device=dev.device,
      )
      self.ref_asha_services.append(asha_service)
      dev.device.add_service(asha_service)

    with self.dut.bl4a.register_callback(_Module.ASHA) as dut_cb:
      for ref in self.refs:
        await self.le_connect_and_pair(
            ref_address_type=hci.OwnAddressType.RANDOM,
            ref=ref,
            connect_profiles=True,
        )
        self.logger.info(
            '[DUT] Wait for ASHA connected to %s', ref.random_address
        )
        await dut_cb.wait_for_event(
            bl4a_api.ProfileConnectionStateChanged(
                address=ref.random_address,
                state=android_constants.ConnectionState.CONNECTED,
            ),
        )

  async def test_active_devices_should_contain_both_sides(self) -> None:
    """Tests that both sides of the dual device are active."""
    self.assertCountEqual(
        self.dut.bt.getActiveDevices(android_constants.Profile.HEARING_AID),
        [ref.random_address for ref in self.refs],
    )

  @navi_test_base.retry(max_count=3)
  async def test_reconnect(self) -> None:
    """Tests reconnecting ASHA from DUT to REF devices.

    Test steps:
      1. Disconnect ACL from REF devices.
      2. Restart advertising on REF devices.
      3. Wait for DUT to reconnect to REF devices.
    """

    with self.dut.bl4a.register_callback(_Module.ADAPTER) as dut_cb:
      for ref in self.refs:
        ref_address = ref.random_address
        if not (
            acl := ref.device.find_connection_by_bd_addr(
                hci.Address(self.dut.address),
                transport=core.BT_LE_TRANSPORT,
            )
        ):
          continue
        async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
          await acl.disconnect()
        self.logger.info('[DUT] Wait for ACL disconnected from %s', ref_address)
        await dut_cb.wait_for_event(
            bl4a_api.AclDisconnected(
                address=ref_address, transport=android_constants.Transport.LE
            )
        )

    with self.dut.bl4a.register_callback(_Module.ASHA) as dut_cb:
      for ref, asha_service in zip(self.refs, self.ref_asha_services):
        ref_address = ref.random_address
        async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
          await ref.device.create_advertising_set(
              advertising_parameters=_DEFAULT_ADVERTISING_PARAMETERS,
              advertising_data=asha_service.get_advertising_data(),
          )
        if ref_address in self.dut.bt.getActiveDevices(
            android_constants.Profile.HEARING_AID
        ):
          self.logger.info('[DUT] ASHA already connected to %s', ref_address)
        else:
          self.logger.info('[DUT] Wait for ASHA connected to %s', ref_address)
          await dut_cb.wait_for_event(
              bl4a_api.ProfileConnectionStateChanged(
                  address=ref_address,
                  state=android_constants.ConnectionState.CONNECTED,
              ),
          )

  @navi_test_base.parameterized(_StreamType.MEDIA, _StreamType.VOICE_COMM)
  async def test_streaming(self, stream_type: _StreamType) -> None:
    """Tests ASHA streaming.

    Test Steps:
      1. Establish ASHA connection.
      2. (Optional) Start phone call.
      3. Start streaming.
      4. Verify audio data is received.
      5. Stop streaming.

    Args:
      stream_type: The stream type to test.
    """

    audio_sinks = [asyncio.Queue[bytes](), asyncio.Queue[bytes]()]

    for asha_service, audio_sink in zip(self.ref_asha_services, audio_sinks):
      asha_service.audio_sink = audio_sink.put_nowait

    watcher = pyee_extensions.EventWatcher()
    start_event_lists = [
        watcher.async_monitor(asha_service, asha.AshaService.Event.STARTED)
        for asha_service in self.ref_asha_services
    ]
    stop_event_lists = [
        watcher.async_monitor(asha_service, asha.AshaService.Event.STOPPED)
        for asha_service in self.ref_asha_services
    ]

    with contextlib.ExitStack() as exit_stack:
      if stream_type == _StreamType.VOICE_COMM:
        self.logger.info('[DUT] Start phone call')
        exit_stack.enter_context(
            self.dut.bl4a.make_phone_call(
                caller_name='Pixel Bluetooth',
                caller_number='123456789',
                direction=constants.Direction.OUTGOING,
            )
        )

      self.logger.info('[DUT] Start streaming')
      await asyncio.to_thread(self.dut.bt.audioPlaySine)
      for i in range(len(self.refs)):
        async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
          self.logger.info('[REF] Wait for audio started on %dth device', i)
          await start_event_lists[i].get()
          self.logger.info('[REF] Wait for audio data on %dth device', i)
          await audio_sinks[i].get()

      await asyncio.sleep(_STREAMING_TIME_SECONDS)

      self.logger.info('[DUT] Stop streaming')
      await asyncio.to_thread(self.dut.bt.audioStop)
      for i in range(len(self.refs)):
        async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
          self.logger.info('[REF] Wait for audio stopped on %dth device', i)
          await stop_event_lists[i].get()

  async def test_set_volume(self) -> None:
    """Tests ASHA set volume.

    Test Steps:
      1. Establish ASHA connection.
      2. Set volume to min.
      3. Verify volume changed to -128.
      4. Set volume to max.
      5. Verify volume changed to 0.
    """
    stream_type = android_constants.StreamType.MUSIC

    volume_lists = [
        pyee_extensions.EventTriggeredValueObserver(
            ref_asha_service,
            asha.AshaService.Event.VOLUME_CHANGED,
            functools.partial(
                lambda service: cast(asha.AshaService, service).volume,
                ref_asha_service,
            ),
        )
        for ref_asha_service in self.ref_asha_services
    ]

    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      self.logger.info('[DUT] Set volume to min')
      self.dut.bt.setVolume(stream_type, self.dut.bt.getMinVolume(stream_type))
      for i in range(len(self.refs)):
        self.logger.info('[REF] Wait for volume changed on %dth device', i)
        await volume_lists[i].wait_for_target_value(-128)

    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      self.logger.info('[DUT] Set volume to max')
      self.dut.bt.setVolume(stream_type, self.dut.bt.getMaxVolume(stream_type))
      for i in range(len(self.refs)):
        self.logger.info('[REF] Wait for volume changed on %dth device', i)
        await volume_lists[i].wait_for_target_value(0)

Tests that both sides of the dual device are active.

Source code in navi/tests/functionality/asha_dual_devices_test.py
110
111
112
113
114
115
async def test_active_devices_should_contain_both_sides(self) -> None:
  """Tests that both sides of the dual device are active."""
  self.assertCountEqual(
      self.dut.bt.getActiveDevices(android_constants.Profile.HEARING_AID),
      [ref.random_address for ref in self.refs],
  )

Tests reconnecting ASHA from DUT to REF devices.

Test steps
  1. Disconnect ACL from REF devices.
  2. Restart advertising on REF devices.
  3. Wait for DUT to reconnect to REF devices.
Source code in navi/tests/functionality/asha_dual_devices_test.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
@navi_test_base.retry(max_count=3)
async def test_reconnect(self) -> None:
  """Tests reconnecting ASHA from DUT to REF devices.

  Test steps:
    1. Disconnect ACL from REF devices.
    2. Restart advertising on REF devices.
    3. Wait for DUT to reconnect to REF devices.
  """

  with self.dut.bl4a.register_callback(_Module.ADAPTER) as dut_cb:
    for ref in self.refs:
      ref_address = ref.random_address
      if not (
          acl := ref.device.find_connection_by_bd_addr(
              hci.Address(self.dut.address),
              transport=core.BT_LE_TRANSPORT,
          )
      ):
        continue
      async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
        await acl.disconnect()
      self.logger.info('[DUT] Wait for ACL disconnected from %s', ref_address)
      await dut_cb.wait_for_event(
          bl4a_api.AclDisconnected(
              address=ref_address, transport=android_constants.Transport.LE
          )
      )

  with self.dut.bl4a.register_callback(_Module.ASHA) as dut_cb:
    for ref, asha_service in zip(self.refs, self.ref_asha_services):
      ref_address = ref.random_address
      async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
        await ref.device.create_advertising_set(
            advertising_parameters=_DEFAULT_ADVERTISING_PARAMETERS,
            advertising_data=asha_service.get_advertising_data(),
        )
      if ref_address in self.dut.bt.getActiveDevices(
          android_constants.Profile.HEARING_AID
      ):
        self.logger.info('[DUT] ASHA already connected to %s', ref_address)
      else:
        self.logger.info('[DUT] Wait for ASHA connected to %s', ref_address)
        await dut_cb.wait_for_event(
            bl4a_api.ProfileConnectionStateChanged(
                address=ref_address,
                state=android_constants.ConnectionState.CONNECTED,
            ),
        )

Tests ASHA set volume.

Test Steps
  1. Establish ASHA connection.
  2. Set volume to min.
  3. Verify volume changed to -128.
  4. Set volume to max.
  5. Verify volume changed to 0.
Source code in navi/tests/functionality/asha_dual_devices_test.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
async def test_set_volume(self) -> None:
  """Tests ASHA set volume.

  Test Steps:
    1. Establish ASHA connection.
    2. Set volume to min.
    3. Verify volume changed to -128.
    4. Set volume to max.
    5. Verify volume changed to 0.
  """
  stream_type = android_constants.StreamType.MUSIC

  volume_lists = [
      pyee_extensions.EventTriggeredValueObserver(
          ref_asha_service,
          asha.AshaService.Event.VOLUME_CHANGED,
          functools.partial(
              lambda service: cast(asha.AshaService, service).volume,
              ref_asha_service,
          ),
      )
      for ref_asha_service in self.ref_asha_services
  ]

  async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
    self.logger.info('[DUT] Set volume to min')
    self.dut.bt.setVolume(stream_type, self.dut.bt.getMinVolume(stream_type))
    for i in range(len(self.refs)):
      self.logger.info('[REF] Wait for volume changed on %dth device', i)
      await volume_lists[i].wait_for_target_value(-128)

  async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
    self.logger.info('[DUT] Set volume to max')
    self.dut.bt.setVolume(stream_type, self.dut.bt.getMaxVolume(stream_type))
    for i in range(len(self.refs)):
      self.logger.info('[REF] Wait for volume changed on %dth device', i)
      await volume_lists[i].wait_for_target_value(0)

Tests ASHA streaming.

Test Steps
  1. Establish ASHA connection.
  2. (Optional) Start phone call.
  3. Start streaming.
  4. Verify audio data is received.
  5. Stop streaming.

Parameters:

Name Type Description Default
stream_type _StreamType

The stream type to test.

required
Source code in navi/tests/functionality/asha_dual_devices_test.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
@navi_test_base.parameterized(_StreamType.MEDIA, _StreamType.VOICE_COMM)
async def test_streaming(self, stream_type: _StreamType) -> None:
  """Tests ASHA streaming.

  Test Steps:
    1. Establish ASHA connection.
    2. (Optional) Start phone call.
    3. Start streaming.
    4. Verify audio data is received.
    5. Stop streaming.

  Args:
    stream_type: The stream type to test.
  """

  audio_sinks = [asyncio.Queue[bytes](), asyncio.Queue[bytes]()]

  for asha_service, audio_sink in zip(self.ref_asha_services, audio_sinks):
    asha_service.audio_sink = audio_sink.put_nowait

  watcher = pyee_extensions.EventWatcher()
  start_event_lists = [
      watcher.async_monitor(asha_service, asha.AshaService.Event.STARTED)
      for asha_service in self.ref_asha_services
  ]
  stop_event_lists = [
      watcher.async_monitor(asha_service, asha.AshaService.Event.STOPPED)
      for asha_service in self.ref_asha_services
  ]

  with contextlib.ExitStack() as exit_stack:
    if stream_type == _StreamType.VOICE_COMM:
      self.logger.info('[DUT] Start phone call')
      exit_stack.enter_context(
          self.dut.bl4a.make_phone_call(
              caller_name='Pixel Bluetooth',
              caller_number='123456789',
              direction=constants.Direction.OUTGOING,
          )
      )

    self.logger.info('[DUT] Start streaming')
    await asyncio.to_thread(self.dut.bt.audioPlaySine)
    for i in range(len(self.refs)):
      async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
        self.logger.info('[REF] Wait for audio started on %dth device', i)
        await start_event_lists[i].get()
        self.logger.info('[REF] Wait for audio data on %dth device', i)
        await audio_sinks[i].get()

    await asyncio.sleep(_STREAMING_TIME_SECONDS)

    self.logger.info('[DUT] Stop streaming')
    await asyncio.to_thread(self.dut.bt.audioStop)
    for i in range(len(self.refs)):
      async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
        self.logger.info('[REF] Wait for audio stopped on %dth device', i)
        await stop_event_lists[i].get()

Bases: TwoDevicesTestBase

Tests Bluetooth Quality Report.

Source code in navi/tests/functionality/bqr_test.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
class BluetoothQualityReportTest(navi_test_base.TwoDevicesTestBase):
  """Tests Bluetooth Quality Report."""
  bqr_event_mask: int

  @override
  async def async_setup_class(self) -> None:
    await super().async_setup_class()
    bqr_event_mask_str = self.dut.getprop("persist.bluetooth.bqr.event_mask")
    if not bqr_event_mask_str:
      raise signals.TestAbortClass("BQR is not enabled on DUT.")
    self.bqr_event_mask = int(bqr_event_mask_str)

  async def test_approach_lsto_classic_connection(self) -> None:
    """Tests classic connection approach LSTO."""

    bqr_cb = self.dut.bl4a.register_callback(bl4a_api.Module.BQR)
    adapter_cb = self.dut.bl4a.register_callback(bl4a_api.Module.ADAPTER)
    self.test_case_context.push(bqr_cb)
    self.test_case_context.push(adapter_cb)

    self.logger.info("[REF] Connect to DUT.")
    ref_dut_acl = await self.ref.device.connect(
        self.dut.address,
        transport=core.PhysicalTransport.BR_EDR,
        timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
    )
    await adapter_cb.wait_for_event(
        bl4a_api.AclConnected(
            address=self.ref.address,
            transport=android_constants.Transport.CLASSIC,
        )
    )

    self.logger.info("[REF] Disconnect to trigger LSTO.")
    # Reason must be power off, or else LSTO will not be triggered.
    await ref_dut_acl.disconnect(
        reason=hci.HCI_REMOTE_DEVICE_TERMINATED_CONNECTION_DUE_TO_POWER_OFF_ERROR
    )

    if self.bqr_event_mask & BqrEventMaskBitIndex.APPROACH_LSTO:
      self.logger.info("[DUT] Wait for BQR event: APPROACH_LSTO.")
      await bqr_cb.wait_for_event(
          bl4a_api.BluetoothQualityReportReady(
              device=self.ref.address,
              quality_report_id=android_constants.BluetoothQualityReportId.APPROACH_LSTO,
              status=0,
              common=mock.ANY,
          ),
          timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
      )
    if self.bqr_event_mask & BqrEventMaskBitIndex.RF_STATS_MODE_EVENT_TRIGGER:
      self.logger.info("[DUT] Wait for BQR event: RF_STATS.")
      await bqr_cb.wait_for_event(
          bl4a_api.BluetoothQualityReportReady(
              device=_MASKED_ADDRESS,
              quality_report_id=android_constants.BluetoothQualityReportId.RF_STATS,
              status=0,
              common=mock.ANY,
          ),
          timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
      )

  async def test_energy_monitoring_when_power_unplug(self) -> None:
    """Tests power unplug will trigger energy monitoring."""

    if int(self.dut.getprop("ro.build.version.sdk")) < 36:
      self.skipTest(
          "Energy monitor event is not supported before SDK API level: 36."
      )
    if not self.bqr_event_mask & BqrEventMaskBitIndex.ENERGY_MONITORING_MODE:
      self.skipTest("Energy monitor event is not enabled on DUT.")

    bqr_cb = self.dut.bl4a.register_callback(bl4a_api.Module.BQR)
    self.test_case_context.push(bqr_cb)

    self.logger.info("[DUT] Set battery unplug and battery level to low.")
    self.dut.shell("cmd battery unplug")
    self.dut.shell("cmd battery set level 9")
    self.test_case_context.callback(lambda: self.dut.shell("cmd battery reset"))

    self.logger.info("[DUT] Wait for BQR event.")
    await bqr_cb.wait_for_event(
        bl4a_api.BluetoothQualityReportReady(
            device=_MASKED_ADDRESS,
            quality_report_id=android_constants.BluetoothQualityReportId.ENERGY_MONITOR,
            status=0,
            common=mock.ANY,
        ),
        timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
    )

Tests classic connection approach LSTO.

Source code in navi/tests/functionality/bqr_test.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
async def test_approach_lsto_classic_connection(self) -> None:
  """Tests classic connection approach LSTO."""

  bqr_cb = self.dut.bl4a.register_callback(bl4a_api.Module.BQR)
  adapter_cb = self.dut.bl4a.register_callback(bl4a_api.Module.ADAPTER)
  self.test_case_context.push(bqr_cb)
  self.test_case_context.push(adapter_cb)

  self.logger.info("[REF] Connect to DUT.")
  ref_dut_acl = await self.ref.device.connect(
      self.dut.address,
      transport=core.PhysicalTransport.BR_EDR,
      timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
  )
  await adapter_cb.wait_for_event(
      bl4a_api.AclConnected(
          address=self.ref.address,
          transport=android_constants.Transport.CLASSIC,
      )
  )

  self.logger.info("[REF] Disconnect to trigger LSTO.")
  # Reason must be power off, or else LSTO will not be triggered.
  await ref_dut_acl.disconnect(
      reason=hci.HCI_REMOTE_DEVICE_TERMINATED_CONNECTION_DUE_TO_POWER_OFF_ERROR
  )

  if self.bqr_event_mask & BqrEventMaskBitIndex.APPROACH_LSTO:
    self.logger.info("[DUT] Wait for BQR event: APPROACH_LSTO.")
    await bqr_cb.wait_for_event(
        bl4a_api.BluetoothQualityReportReady(
            device=self.ref.address,
            quality_report_id=android_constants.BluetoothQualityReportId.APPROACH_LSTO,
            status=0,
            common=mock.ANY,
        ),
        timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
    )
  if self.bqr_event_mask & BqrEventMaskBitIndex.RF_STATS_MODE_EVENT_TRIGGER:
    self.logger.info("[DUT] Wait for BQR event: RF_STATS.")
    await bqr_cb.wait_for_event(
        bl4a_api.BluetoothQualityReportReady(
            device=_MASKED_ADDRESS,
            quality_report_id=android_constants.BluetoothQualityReportId.RF_STATS,
            status=0,
            common=mock.ANY,
        ),
        timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
    )

Tests power unplug will trigger energy monitoring.

Source code in navi/tests/functionality/bqr_test.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
async def test_energy_monitoring_when_power_unplug(self) -> None:
  """Tests power unplug will trigger energy monitoring."""

  if int(self.dut.getprop("ro.build.version.sdk")) < 36:
    self.skipTest(
        "Energy monitor event is not supported before SDK API level: 36."
    )
  if not self.bqr_event_mask & BqrEventMaskBitIndex.ENERGY_MONITORING_MODE:
    self.skipTest("Energy monitor event is not enabled on DUT.")

  bqr_cb = self.dut.bl4a.register_callback(bl4a_api.Module.BQR)
  self.test_case_context.push(bqr_cb)

  self.logger.info("[DUT] Set battery unplug and battery level to low.")
  self.dut.shell("cmd battery unplug")
  self.dut.shell("cmd battery set level 9")
  self.test_case_context.callback(lambda: self.dut.shell("cmd battery reset"))

  self.logger.info("[DUT] Wait for BQR event.")
  await bqr_cb.wait_for_event(
      bl4a_api.BluetoothQualityReportReady(
          device=_MASKED_ADDRESS,
          quality_report_id=android_constants.BluetoothQualityReportId.ENERGY_MONITOR,
          status=0,
          common=mock.ANY,
      ),
      timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
  )

Bases: TwoDevicesTestBase

Tests related to Bluetooth Classic pairing.

Source code in navi/tests/functionality/classic_pairing_test.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class ClassicPairingTest(navi_test_base.TwoDevicesTestBase):
  """Tests related to Bluetooth Classic pairing."""

  async def test_legacy_pairing_incoming_mode3(self) -> None:
    """Tests incoming Legacy Pairing in mode 3.

    Test steps:
      1. Enable always Authentication and disable build-in pairing delegation on
      REF.
      2. Connect DUT from REF.
      3. Wait for pairing requests on REF.
      4. Set pairing PIN on REF.
      5. Wait for pairing requests on DUT.
      6. Set pairing PIN on DUT.
      7. Verify final states.
    """

    pairing_delegate = pairing_utils.PairingDelegate(
        io_capability=pairing.PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY,
        auto_accept=False,
    )

    def pairing_config_factory(
        connection: device.Connection,
    ) -> pairing.PairingConfig:
      del connection  # Unused callback parameter.
      return pairing.PairingConfig(delegate=pairing_delegate)

    self.ref.device.pairing_config_factory = pairing_config_factory

    dut_cb = self.dut.bl4a.register_callback(bl4a_api.Module.ADAPTER)
    self.test_case_context.push(dut_cb)
    self.logger.info('[REF] Disable SSP on REF.')
    await self.ref.device.send_command(
        hci.HCI_Write_Simple_Pairing_Mode_Command(simple_pairing_mode=0),
        check_result=True,
    )
    self.logger.info('[REF] Enable always authenticate on REF.')
    await self.ref.device.send_command(
        hci.HCI_Write_Authentication_Enable_Command(authentication_enable=1),
        check_result=True,
    )

    self.logger.info('[REF] Connect to DUT.')
    ref_connection_task = asyncio.create_task(
        self.ref.device.connect(
            self.dut.address,
            transport=core.BT_BR_EDR_TRANSPORT,
        )
    )

    self.logger.info('[REF] Wait for pairing request.')
    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      ref_pairing_request = await pairing_delegate.pairing_events.get()
      self.assertEqual(
          ref_pairing_request.variant,
          pairing_utils.PairingVariant.PIN_CODE_REQUEST,
      )

    self.logger.info('[REF] Handle pairing confirmation.')
    pairing_delegate.pairing_answers.put_nowait(_PIN_CODE_DEFAULT)

    self.logger.info('[DUT] Wait for pairing request.')
    dut_pairing_request = await dut_cb.wait_for_event(
        bl4a_api.PairingRequest,
        predicate=lambda e: (e.address == self.ref.address),
        timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
    )
    self.assertEqual(
        dut_pairing_request.variant, android_constants.PairingVariant.PIN
    )

    self.logger.info('[DUT] Handle pairing confirmation.')
    self.dut.bt.setPin(self.ref.address, _PIN_CODE_DEFAULT)

    self.logger.info('[DUT] Check final state.')
    dut_pairing_complete = await dut_cb.wait_for_event(
        bl4a_api.BondStateChanged,
        predicate=lambda e: (e.state in _TERMINATED_BOND_STATES),
        timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
    )
    self.assertEqual(
        dut_pairing_complete.state, android_constants.BondState.BONDED
    )

    self.logger.info('[REF] Wait for connection complete.')
    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      await ref_connection_task

Tests incoming Legacy Pairing in mode 3.

Test steps
  1. Enable always Authentication and disable build-in pairing delegation on REF.
  2. Connect DUT from REF.
  3. Wait for pairing requests on REF.
  4. Set pairing PIN on REF.
  5. Wait for pairing requests on DUT.
  6. Set pairing PIN on DUT.
  7. Verify final states.
Source code in navi/tests/functionality/classic_pairing_test.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
async def test_legacy_pairing_incoming_mode3(self) -> None:
  """Tests incoming Legacy Pairing in mode 3.

  Test steps:
    1. Enable always Authentication and disable build-in pairing delegation on
    REF.
    2. Connect DUT from REF.
    3. Wait for pairing requests on REF.
    4. Set pairing PIN on REF.
    5. Wait for pairing requests on DUT.
    6. Set pairing PIN on DUT.
    7. Verify final states.
  """

  pairing_delegate = pairing_utils.PairingDelegate(
      io_capability=pairing.PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY,
      auto_accept=False,
  )

  def pairing_config_factory(
      connection: device.Connection,
  ) -> pairing.PairingConfig:
    del connection  # Unused callback parameter.
    return pairing.PairingConfig(delegate=pairing_delegate)

  self.ref.device.pairing_config_factory = pairing_config_factory

  dut_cb = self.dut.bl4a.register_callback(bl4a_api.Module.ADAPTER)
  self.test_case_context.push(dut_cb)
  self.logger.info('[REF] Disable SSP on REF.')
  await self.ref.device.send_command(
      hci.HCI_Write_Simple_Pairing_Mode_Command(simple_pairing_mode=0),
      check_result=True,
  )
  self.logger.info('[REF] Enable always authenticate on REF.')
  await self.ref.device.send_command(
      hci.HCI_Write_Authentication_Enable_Command(authentication_enable=1),
      check_result=True,
  )

  self.logger.info('[REF] Connect to DUT.')
  ref_connection_task = asyncio.create_task(
      self.ref.device.connect(
          self.dut.address,
          transport=core.BT_BR_EDR_TRANSPORT,
      )
  )

  self.logger.info('[REF] Wait for pairing request.')
  async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
    ref_pairing_request = await pairing_delegate.pairing_events.get()
    self.assertEqual(
        ref_pairing_request.variant,
        pairing_utils.PairingVariant.PIN_CODE_REQUEST,
    )

  self.logger.info('[REF] Handle pairing confirmation.')
  pairing_delegate.pairing_answers.put_nowait(_PIN_CODE_DEFAULT)

  self.logger.info('[DUT] Wait for pairing request.')
  dut_pairing_request = await dut_cb.wait_for_event(
      bl4a_api.PairingRequest,
      predicate=lambda e: (e.address == self.ref.address),
      timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
  )
  self.assertEqual(
      dut_pairing_request.variant, android_constants.PairingVariant.PIN
  )

  self.logger.info('[DUT] Handle pairing confirmation.')
  self.dut.bt.setPin(self.ref.address, _PIN_CODE_DEFAULT)

  self.logger.info('[DUT] Check final state.')
  dut_pairing_complete = await dut_cb.wait_for_event(
      bl4a_api.BondStateChanged,
      predicate=lambda e: (e.state in _TERMINATED_BOND_STATES),
      timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
  )
  self.assertEqual(
      dut_pairing_complete.state, android_constants.BondState.BONDED
  )

  self.logger.info('[REF] Wait for connection complete.')
  async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
    await ref_connection_task

Bases: MultiDevicesTestBase

Source code in navi/tests/functionality/coex_test.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
class CoexTest(navi_test_base.MultiDevicesTestBase):
  ref_supports_lc3: bool

  @override
  async def async_setup_class(self) -> None:
    await super().async_setup_class()
    for ref in self.refs:
      response = await ref.device.send_command(
          hci.HCI_Read_Local_Supported_Codecs_Command(),
          check_result=True,
      )
      supported_codecs = list(
          hci.CodecID(codec)
          for codec in response.return_parameters.standard_codec_ids
      )
      self.logger.info("[REF] Supported codecs: %s", supported_codecs)
      self.ref_supports_lc3 = hci.CodecID.LC3 in supported_codecs

  @override
  async def async_teardown_test(self) -> None:
    await super().async_teardown_test()
    self.dut.bt.audioStop()
    # Reset audio attributes.
    self.dut.bt.setAudioAttributes(None, False)

  def _setup_headset_device(
      self,
      hfp_configuration: hfp.HfConfiguration,
      a2dp_codecs: list[a2dp_ext.A2dpCodec],
  ) -> None:
    """Setup HFP and A2DP servicer on the REF device."""
    for ref in self.refs:
      hfp_ext.HfProtocol.setup_server(
          ref.device,
          sdp_handle=_HFP_SDP_HANDLE,
          configuration=hfp_configuration,
      )
      a2dp_ext.setup_sink_server(
          ref.device,
          [codec.get_default_capabilities() for codec in a2dp_codecs],
          _A2DP_SERVICE_RECORD_HANDLE,
      )
      a2dp_ext.setup_avrcp_server(
          ref.device,
          avrcp_controller_handle=_AVRCP_CONTROLLER_RECORD_HANDLE,
          avrcp_target_handle=_AVRCP_TARGET_RECORD_HANDLE,
      )

  async def test_pair_and_connect(self) -> None:
    """Tests HFP connection establishment right after a pairing session.

    Test steps:
      1. Setup HFP and A2DP on REF.
      2. Create bond from DUT.
      3. Wait HFP and A2DP connected on DUT.
      (Android should autoconnect HFP as AG)
    """
    self.ref = self.refs[0]
    with (
        self.dut.bl4a.register_callback(_Module.A2DP) as dut_cb_a2dp,
        self.dut.bl4a.register_callback(_Module.HFP_AG) as dut_cb_hfp,
    ):
      self._setup_headset_device(
          hfp_configuration=_DEFAULT_HFP_CONFIGURATION,
          a2dp_codecs=[a2dp_ext.A2dpCodec.SBC],
      )
      self.logger.info("[DUT] Connect and pair REF.")
      await self.classic_connect_and_pair()
      self.logger.info("[DUT] Wait for A2DP connected.")
      await dut_cb_a2dp.wait_for_event(
          bl4a_api.ProfileConnectionStateChanged(
              address=self.ref.address,
              state=android_constants.ConnectionState.CONNECTED,
          ),
      )
      self.logger.info("[DUT] Wait for HFP connected.")
      await dut_cb_hfp.wait_for_event(
          bl4a_api.ProfileActiveDeviceChanged(self.ref.address)
      )

  @navi_test_base.named_parameterized(
      cvsd=dict(
          supported_audio_codecs=[_AudioCodec.CVSD],
      ),
      cvsd_and_msbc=dict(
          supported_audio_codecs=[_AudioCodec.CVSD, _AudioCodec.MSBC],
      ),
      cvsd_msbc_and_lc3_swb=dict(
          supported_audio_codecs=[
              _AudioCodec.LC3_SWB,
              _AudioCodec.CVSD,
              _AudioCodec.MSBC,
          ],
      ),
      handle_audio_focus=dict(
          supported_audio_codecs=[
              _AudioCodec.LC3_SWB,
              _AudioCodec.CVSD,
              _AudioCodec.MSBC,
          ],
          handle_audio_focus=True,
      ),
  )
  async def test_call_during_a2dp_playback(
      self,
      supported_audio_codecs: list[hfp.AudioCodec],
      handle_audio_focus: bool = False,
  ) -> None:
    """Tests making an outgoing phone call, observing SCO connection status.

    Test steps:
      1. Setup HFP and A2DP connection.
      2. Play sine and check A2DP is playing.
      3. Place an outgoing call.
      4. Check A2DP is stopped.
      5. Verify SCO connected.
      6. Terminate the call.
      7. Verify SCO disconnected.
      8. Verify A2DP resumed.

    Args:
      supported_audio_codecs: Audio codecs supported by REF device.
      handle_audio_focus: Whether to enable audio focus handling.
    """
    self.ref = self.refs[0]
    if (
        _AudioCodec.LC3_SWB in supported_audio_codecs
        and not self.ref_supports_lc3
    ):
      self.skipTest("LC3 not supported on REF.")

    # Enable audio focus handling.
    self.dut.bt.setAudioAttributes(None, handle_audio_focus)

    # [REF] Setup HFP.
    hfp_configuration = hfp.HfConfiguration(
        supported_hf_features=[hfp.HfFeature.CODEC_NEGOTIATION],
        supported_hf_indicators=[],
        supported_audio_codecs=supported_audio_codecs,
    )
    self._setup_headset_device(
        hfp_configuration=hfp_configuration,
        a2dp_codecs=[a2dp_ext.A2dpCodec.SBC],
    )

    dut_hfp_cb = self.dut.bl4a.register_callback(_Module.HFP_AG)
    dut_a2dp_cb = self.dut.bl4a.register_callback(_Module.A2DP)
    self.test_case_context.push(dut_hfp_cb)
    self.test_case_context.push(dut_a2dp_cb)

    self.logger.info("[DUT] Connect and pair REF.")
    await self.classic_connect_and_pair()

    self.logger.info("[DUT] Wait for A2DP connected.")
    await dut_a2dp_cb.wait_for_event(
        bl4a_api.ProfileConnectionStateChanged(
            address=self.ref.address,
            state=android_constants.ConnectionState.CONNECTED,
        ),
    )
    self.logger.info("[DUT] Wait for HFP connected.")
    await dut_hfp_cb.wait_for_event(
        bl4a_api.ProfileActiveDeviceChanged(self.ref.address)
    )

    self.logger.info("[DUT] Start stream.")
    self.dut.bt.audioSetRepeat(android_constants.RepeatMode.ALL)
    self.dut.bt.audioPlaySine()

    self.logger.info("[DUT] Check A2DP is playing.")
    await dut_a2dp_cb.wait_for_event(
        bl4a_api.A2dpPlayingStateChanged(
            address=self.ref.address, state=android_constants.A2dpState.PLAYING
        ),
    )

    sco_links = asyncio.Queue[device.ScoLink]()
    self.ref.device.on(
        self.ref.device.EVENT_SCO_CONNECTION, sco_links.put_nowait
    )

    dut_player_cb = self.dut.bl4a.register_callback(_Module.PLAYER)
    self.test_case_context.push(dut_player_cb)

    self.logger.info("[DUT] Add call.")
    call = self.dut.bl4a.make_phone_call(
        _CALLER_NAME,
        _CALLER_NUMBER,
        constants.Direction.OUTGOING,
    )
    with call:
      self.logger.info("[DUT] Check A2DP is not playing.")
      await dut_a2dp_cb.wait_for_event(
          bl4a_api.A2dpPlayingStateChanged(
              address=self.ref.address,
              state=android_constants.A2dpState.NOT_PLAYING,
          ),
      )
      self.logger.info("[DUT] Wait for SCO connected.")
      await dut_hfp_cb.wait_for_event(
          _HfpAgAudioStateChange(
              address=self.ref.address, state=_ScoState.CONNECTED
          ),
      )
      if handle_audio_focus:
        self.logger.info("[DUT] Wait for player paused.")
        await dut_player_cb.wait_for_event(
            bl4a_api.PlayerIsPlayingChanged(is_playing=False),
        )

      async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
        self.logger.info("[REF] Wait for SCO connected.")
        sco_link = await sco_links.get()

      sco_disconnected = asyncio.Event()
      sco_link.once(
          sco_link.EVENT_DISCONNECTION, lambda *_: sco_disconnected.set()
      )

      self.logger.info("[DUT] Terminate call.")
      call.close()

    self.logger.info("[DUT] Wait for SCO disconnected.")
    await dut_hfp_cb.wait_for_event(
        _HfpAgAudioStateChange(
            address=self.ref.address, state=_ScoState.DISCONNECTED
        ),
    )
    self.logger.info("[REF] Wait for SCO disconnected.")
    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      await sco_disconnected.wait()

    self.logger.info("[DUT] Wait for A2DP resume.")
    await dut_a2dp_cb.wait_for_event(
        bl4a_api.A2dpPlayingStateChanged(
            address=self.ref.address, state=android_constants.A2dpState.PLAYING
        ),
    )
    if handle_audio_focus:
      self.logger.info("[DUT] Wait for player resumed.")
      await dut_player_cb.wait_for_event(
          bl4a_api.PlayerIsPlayingChanged(is_playing=True),
      )

  async def test_multidevice_hf_switch(self) -> None:
    """Tests DUT switch active hfp devices.

    Test steps:
      1. Setup two HFP HF devices.
      2. DUT pair with REF0.
      3. DUT pair with REF1.
      4. DUT make outgoing call.
      5. DUT answer the call.
      6. DUT switch active device to REF0.
      7. DUT switch active device to REF1.
    """
    if self.dut.bt.maxConnectedAudioDevices() < 2:
      self.skipTest("[DUT] Multi-device HF is not supported.")

    with self.dut.bl4a.register_callback(bl4a_api.Module.HFP_AG) as dut_hfp_cb:
      for i, ref in enumerate(self.refs):
        self.logger.info("[REF%s] Setup HFP HF", i)
        hfp_ext.HfProtocol.setup_server(
            ref.device,
            sdp_handle=_HFP_SDP_HANDLE,
            configuration=_DEFAULT_HFP_CONFIGURATION,
        )

        await self.classic_connect_and_pair(ref)

        self.logger.info("[DUT] Wait for HFP connected to REF%s", i)
        await dut_hfp_cb.wait_for_event(
            bl4a_api.ProfileActiveDeviceChanged(address=ref.address),
        )

    with (
        self.dut.bl4a.register_callback(
            bl4a_api.Module.TELECOM
        ) as dut_telecom_cb,
        self.dut.bl4a.make_phone_call(
            _CALLER_NAME,
            _CALLER_NUMBER,
            constants.Direction.OUTGOING,
        ) as call,
    ):
      self.logger.info("[DUT] Wait for call dialing.")
      await dut_telecom_cb.wait_for_event(
          bl4a_api.CallStateChanged(
              handle=mock.ANY,
              name=mock.ANY,
              state=android_constants.CallState.DIALING,
          ),
      )

      self.logger.info("[DUT] Answer call.")
      call.answer()

      self.logger.info("[DUT] Wait for call active.")
      await dut_telecom_cb.wait_for_event(
          bl4a_api.CallStateChanged(
              handle=mock.ANY,
              name=mock.ANY,
              state=android_constants.CallState.ACTIVE,
          ),
      )

      self.logger.info("[DUT] Start streaming.")
      self.dut.bt.audioSetRepeat(android_constants.RepeatMode.ONE)
      await asyncio.to_thread(self.dut.bt.audioPlaySine)

      # The default route should be REF1.
      for i, ref in enumerate(self.refs):
        with self.dut.bl4a.register_callback(_Module.HFP_AG) as dut_hfp_cb:
          self.assertNotEqual(
              self.dut.bt.hfpAgGetAudioState(ref.address), _ScoState.CONNECTED,
              f"SCO is already connected to REF{i}.",
          )

          self.logger.info("[DUT] Switch to REF%s", i)
          self.dut.bt.setActiveDevice(
              ref.address, android_constants.ActiveDeviceUse.PHONE_CALL,
          )

          self.logger.info("[DUT] Wait for HFP connected to REF%s", i)
          await dut_hfp_cb.wait_for_event(
              bl4a_api.ProfileActiveDeviceChanged(ref.address)
          )

          self.logger.info("[DUT] Wait for SCO connected to REF%s", i)
          await dut_hfp_cb.wait_for_event(
              event=_HfpAgAudioStateChange(
                  address=ref.address, state=_ScoState.CONNECTED
              ),
          )

      self.logger.info("[DUT] Terminate call.")
      call.close()

Tests making an outgoing phone call, observing SCO connection status.

Test steps
  1. Setup HFP and A2DP connection.
  2. Play sine and check A2DP is playing.
  3. Place an outgoing call.
  4. Check A2DP is stopped.
  5. Verify SCO connected.
  6. Terminate the call.
  7. Verify SCO disconnected.
  8. Verify A2DP resumed.

Parameters:

Name Type Description Default
supported_audio_codecs list[AudioCodec]

Audio codecs supported by REF device.

required
handle_audio_focus bool

Whether to enable audio focus handling.

False
Source code in navi/tests/functionality/coex_test.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
@navi_test_base.named_parameterized(
    cvsd=dict(
        supported_audio_codecs=[_AudioCodec.CVSD],
    ),
    cvsd_and_msbc=dict(
        supported_audio_codecs=[_AudioCodec.CVSD, _AudioCodec.MSBC],
    ),
    cvsd_msbc_and_lc3_swb=dict(
        supported_audio_codecs=[
            _AudioCodec.LC3_SWB,
            _AudioCodec.CVSD,
            _AudioCodec.MSBC,
        ],
    ),
    handle_audio_focus=dict(
        supported_audio_codecs=[
            _AudioCodec.LC3_SWB,
            _AudioCodec.CVSD,
            _AudioCodec.MSBC,
        ],
        handle_audio_focus=True,
    ),
)
async def test_call_during_a2dp_playback(
    self,
    supported_audio_codecs: list[hfp.AudioCodec],
    handle_audio_focus: bool = False,
) -> None:
  """Tests making an outgoing phone call, observing SCO connection status.

  Test steps:
    1. Setup HFP and A2DP connection.
    2. Play sine and check A2DP is playing.
    3. Place an outgoing call.
    4. Check A2DP is stopped.
    5. Verify SCO connected.
    6. Terminate the call.
    7. Verify SCO disconnected.
    8. Verify A2DP resumed.

  Args:
    supported_audio_codecs: Audio codecs supported by REF device.
    handle_audio_focus: Whether to enable audio focus handling.
  """
  self.ref = self.refs[0]
  if (
      _AudioCodec.LC3_SWB in supported_audio_codecs
      and not self.ref_supports_lc3
  ):
    self.skipTest("LC3 not supported on REF.")

  # Enable audio focus handling.
  self.dut.bt.setAudioAttributes(None, handle_audio_focus)

  # [REF] Setup HFP.
  hfp_configuration = hfp.HfConfiguration(
      supported_hf_features=[hfp.HfFeature.CODEC_NEGOTIATION],
      supported_hf_indicators=[],
      supported_audio_codecs=supported_audio_codecs,
  )
  self._setup_headset_device(
      hfp_configuration=hfp_configuration,
      a2dp_codecs=[a2dp_ext.A2dpCodec.SBC],
  )

  dut_hfp_cb = self.dut.bl4a.register_callback(_Module.HFP_AG)
  dut_a2dp_cb = self.dut.bl4a.register_callback(_Module.A2DP)
  self.test_case_context.push(dut_hfp_cb)
  self.test_case_context.push(dut_a2dp_cb)

  self.logger.info("[DUT] Connect and pair REF.")
  await self.classic_connect_and_pair()

  self.logger.info("[DUT] Wait for A2DP connected.")
  await dut_a2dp_cb.wait_for_event(
      bl4a_api.ProfileConnectionStateChanged(
          address=self.ref.address,
          state=android_constants.ConnectionState.CONNECTED,
      ),
  )
  self.logger.info("[DUT] Wait for HFP connected.")
  await dut_hfp_cb.wait_for_event(
      bl4a_api.ProfileActiveDeviceChanged(self.ref.address)
  )

  self.logger.info("[DUT] Start stream.")
  self.dut.bt.audioSetRepeat(android_constants.RepeatMode.ALL)
  self.dut.bt.audioPlaySine()

  self.logger.info("[DUT] Check A2DP is playing.")
  await dut_a2dp_cb.wait_for_event(
      bl4a_api.A2dpPlayingStateChanged(
          address=self.ref.address, state=android_constants.A2dpState.PLAYING
      ),
  )

  sco_links = asyncio.Queue[device.ScoLink]()
  self.ref.device.on(
      self.ref.device.EVENT_SCO_CONNECTION, sco_links.put_nowait
  )

  dut_player_cb = self.dut.bl4a.register_callback(_Module.PLAYER)
  self.test_case_context.push(dut_player_cb)

  self.logger.info("[DUT] Add call.")
  call = self.dut.bl4a.make_phone_call(
      _CALLER_NAME,
      _CALLER_NUMBER,
      constants.Direction.OUTGOING,
  )
  with call:
    self.logger.info("[DUT] Check A2DP is not playing.")
    await dut_a2dp_cb.wait_for_event(
        bl4a_api.A2dpPlayingStateChanged(
            address=self.ref.address,
            state=android_constants.A2dpState.NOT_PLAYING,
        ),
    )
    self.logger.info("[DUT] Wait for SCO connected.")
    await dut_hfp_cb.wait_for_event(
        _HfpAgAudioStateChange(
            address=self.ref.address, state=_ScoState.CONNECTED
        ),
    )
    if handle_audio_focus:
      self.logger.info("[DUT] Wait for player paused.")
      await dut_player_cb.wait_for_event(
          bl4a_api.PlayerIsPlayingChanged(is_playing=False),
      )

    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      self.logger.info("[REF] Wait for SCO connected.")
      sco_link = await sco_links.get()

    sco_disconnected = asyncio.Event()
    sco_link.once(
        sco_link.EVENT_DISCONNECTION, lambda *_: sco_disconnected.set()
    )

    self.logger.info("[DUT] Terminate call.")
    call.close()

  self.logger.info("[DUT] Wait for SCO disconnected.")
  await dut_hfp_cb.wait_for_event(
      _HfpAgAudioStateChange(
          address=self.ref.address, state=_ScoState.DISCONNECTED
      ),
  )
  self.logger.info("[REF] Wait for SCO disconnected.")
  async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
    await sco_disconnected.wait()

  self.logger.info("[DUT] Wait for A2DP resume.")
  await dut_a2dp_cb.wait_for_event(
      bl4a_api.A2dpPlayingStateChanged(
          address=self.ref.address, state=android_constants.A2dpState.PLAYING
      ),
  )
  if handle_audio_focus:
    self.logger.info("[DUT] Wait for player resumed.")
    await dut_player_cb.wait_for_event(
        bl4a_api.PlayerIsPlayingChanged(is_playing=True),
    )

Tests DUT switch active hfp devices.

Test steps
  1. Setup two HFP HF devices.
  2. DUT pair with REF0.
  3. DUT pair with REF1.
  4. DUT make outgoing call.
  5. DUT answer the call.
  6. DUT switch active device to REF0.
  7. DUT switch active device to REF1.
Source code in navi/tests/functionality/coex_test.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
async def test_multidevice_hf_switch(self) -> None:
  """Tests DUT switch active hfp devices.

  Test steps:
    1. Setup two HFP HF devices.
    2. DUT pair with REF0.
    3. DUT pair with REF1.
    4. DUT make outgoing call.
    5. DUT answer the call.
    6. DUT switch active device to REF0.
    7. DUT switch active device to REF1.
  """
  if self.dut.bt.maxConnectedAudioDevices() < 2:
    self.skipTest("[DUT] Multi-device HF is not supported.")

  with self.dut.bl4a.register_callback(bl4a_api.Module.HFP_AG) as dut_hfp_cb:
    for i, ref in enumerate(self.refs):
      self.logger.info("[REF%s] Setup HFP HF", i)
      hfp_ext.HfProtocol.setup_server(
          ref.device,
          sdp_handle=_HFP_SDP_HANDLE,
          configuration=_DEFAULT_HFP_CONFIGURATION,
      )

      await self.classic_connect_and_pair(ref)

      self.logger.info("[DUT] Wait for HFP connected to REF%s", i)
      await dut_hfp_cb.wait_for_event(
          bl4a_api.ProfileActiveDeviceChanged(address=ref.address),
      )

  with (
      self.dut.bl4a.register_callback(
          bl4a_api.Module.TELECOM
      ) as dut_telecom_cb,
      self.dut.bl4a.make_phone_call(
          _CALLER_NAME,
          _CALLER_NUMBER,
          constants.Direction.OUTGOING,
      ) as call,
  ):
    self.logger.info("[DUT] Wait for call dialing.")
    await dut_telecom_cb.wait_for_event(
        bl4a_api.CallStateChanged(
            handle=mock.ANY,
            name=mock.ANY,
            state=android_constants.CallState.DIALING,
        ),
    )

    self.logger.info("[DUT] Answer call.")
    call.answer()

    self.logger.info("[DUT] Wait for call active.")
    await dut_telecom_cb.wait_for_event(
        bl4a_api.CallStateChanged(
            handle=mock.ANY,
            name=mock.ANY,
            state=android_constants.CallState.ACTIVE,
        ),
    )

    self.logger.info("[DUT] Start streaming.")
    self.dut.bt.audioSetRepeat(android_constants.RepeatMode.ONE)
    await asyncio.to_thread(self.dut.bt.audioPlaySine)

    # The default route should be REF1.
    for i, ref in enumerate(self.refs):
      with self.dut.bl4a.register_callback(_Module.HFP_AG) as dut_hfp_cb:
        self.assertNotEqual(
            self.dut.bt.hfpAgGetAudioState(ref.address), _ScoState.CONNECTED,
            f"SCO is already connected to REF{i}.",
        )

        self.logger.info("[DUT] Switch to REF%s", i)
        self.dut.bt.setActiveDevice(
            ref.address, android_constants.ActiveDeviceUse.PHONE_CALL,
        )

        self.logger.info("[DUT] Wait for HFP connected to REF%s", i)
        await dut_hfp_cb.wait_for_event(
            bl4a_api.ProfileActiveDeviceChanged(ref.address)
        )

        self.logger.info("[DUT] Wait for SCO connected to REF%s", i)
        await dut_hfp_cb.wait_for_event(
            event=_HfpAgAudioStateChange(
                address=ref.address, state=_ScoState.CONNECTED
            ),
        )

    self.logger.info("[DUT] Terminate call.")
    call.close()

Tests HFP connection establishment right after a pairing session.

Test steps
  1. Setup HFP and A2DP on REF.
  2. Create bond from DUT.
  3. Wait HFP and A2DP connected on DUT. (Android should autoconnect HFP as AG)
Source code in navi/tests/functionality/coex_test.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
async def test_pair_and_connect(self) -> None:
  """Tests HFP connection establishment right after a pairing session.

  Test steps:
    1. Setup HFP and A2DP on REF.
    2. Create bond from DUT.
    3. Wait HFP and A2DP connected on DUT.
    (Android should autoconnect HFP as AG)
  """
  self.ref = self.refs[0]
  with (
      self.dut.bl4a.register_callback(_Module.A2DP) as dut_cb_a2dp,
      self.dut.bl4a.register_callback(_Module.HFP_AG) as dut_cb_hfp,
  ):
    self._setup_headset_device(
        hfp_configuration=_DEFAULT_HFP_CONFIGURATION,
        a2dp_codecs=[a2dp_ext.A2dpCodec.SBC],
    )
    self.logger.info("[DUT] Connect and pair REF.")
    await self.classic_connect_and_pair()
    self.logger.info("[DUT] Wait for A2DP connected.")
    await dut_cb_a2dp.wait_for_event(
        bl4a_api.ProfileConnectionStateChanged(
            address=self.ref.address,
            state=android_constants.ConnectionState.CONNECTED,
        ),
    )
    self.logger.info("[DUT] Wait for HFP connected.")
    await dut_cb_hfp.wait_for_event(
        bl4a_api.ProfileActiveDeviceChanged(self.ref.address)
    )

Bases: MultiDevicesTestBase

Tests for LE Audio Unicast client functionality, where the remote device set contains two individual devices.

When running this test, please make sure the ref device supports CIS Peripheral.

Supported devices are: - Pixel 8 and later - Pixel 8a and later - Pixel Watch 3 and later

Unsupported devices are: - Pixel 7 and earlier - Pixel 7a and earlier - Pixel Watch 1, 2, Fitbit Ace LTE (P11)

Source code in navi/tests/functionality/le_audio_unicast_client_dual_device_test.py
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
class LeAudioUnicastClientDualDeviceTest(navi_test_base.MultiDevicesTestBase):
  """Tests for LE Audio Unicast client functionality, where the remote device set contains two individual devices.

  When running this test, please make sure the ref device supports CIS
  Peripheral.

  Supported devices are:
  - Pixel 8 and later
  - Pixel 8a and later
  - Pixel Watch 3 and later

  Unsupported devices are:
  - Pixel 7 and earlier
  - Pixel 7a and earlier
  - Pixel Watch 1, 2, Fitbit Ace LTE (P11)
  """

  first_bond_timestamp: datetime.datetime | None = None
  dut_vcp_enabled: bool = False
  dut_mcp_enabled: bool = False
  dut_ccp_enabled: bool = False

  @classmethod
  def _default_pacs(
      cls, audio_location: bap.AudioLocation
  ) -> pacs.PublishedAudioCapabilitiesService:
    return pacs.PublishedAudioCapabilitiesService(
        supported_source_context=bap.ContextType(0xFFFF),
        available_source_context=bap.ContextType(0xFFFF),
        supported_sink_context=bap.ContextType(0xFFFF),
        available_sink_context=bap.ContextType(0xFFFF),
        sink_audio_locations=audio_location,
        source_audio_locations=audio_location,
        sink_pac=[
            pacs.PacRecord(
                coding_format=hci.CodingFormat(hci.CodecID.LC3),
                codec_specific_capabilities=bap.CodecSpecificCapabilities(
                    supported_sampling_frequencies=(
                        bap.SupportedSamplingFrequency.FREQ_16000
                        | bap.SupportedSamplingFrequency.FREQ_32000
                        | bap.SupportedSamplingFrequency.FREQ_48000
                    ),
                    supported_frame_durations=(
                        bap.SupportedFrameDuration.DURATION_10000_US_SUPPORTED
                    ),
                    supported_audio_channel_count=[1],
                    min_octets_per_codec_frame=26,
                    max_octets_per_codec_frame=120,
                    supported_max_codec_frames_per_sdu=1,
                ),
            )
        ],
        source_pac=[
            pacs.PacRecord(
                coding_format=hci.CodingFormat(hci.CodecID.LC3),
                codec_specific_capabilities=bap.CodecSpecificCapabilities(
                    supported_sampling_frequencies=(
                        bap.SupportedSamplingFrequency.FREQ_16000
                        | bap.SupportedSamplingFrequency.FREQ_32000
                    ),
                    supported_frame_durations=(
                        bap.SupportedFrameDuration.DURATION_10000_US_SUPPORTED
                    ),
                    supported_audio_channel_count=[1],
                    min_octets_per_codec_frame=26,
                    max_octets_per_codec_frame=120,
                    supported_max_codec_frames_per_sdu=1,
                ),
            )
        ],
    )

  def _setup_unicast_server(
      self,
      ref: device.Device,
      audio_location: bap.AudioLocation,
      sirk: bytes,
      sirk_type: csip.SirkType,
  ) -> None:
    ref.add_services([
        self._default_pacs(audio_location),
        ascs.AudioStreamControlService(
            ref,
            sink_ase_id=[1],
            source_ase_id=[2],
        ),
        cap.CommonAudioServiceService(
            csip.CoordinatedSetIdentificationService(
                set_identity_resolving_key=sirk,
                set_identity_resolving_key_type=sirk_type,
                coordinated_set_size=2,
            )
        ),
        vcs.VolumeControlService(),
    ])

  @retry.retry_on_exception()
  async def _pair_major_device(self) -> None:
    ref_address = self.refs[0].random_address
    with (
        self.dut.bl4a.register_callback(
            bl4a_api.Module.ADAPTER
        ) as dut_adapter_cb,
        self.dut.bl4a.register_callback(bl4a_api.Module.LE_AUDIO) as dut_lea_cb,
    ):
      async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
        self.logger.info("[REF|%s] Start advertising", ref_address)
        csis = _get_service_from_device(
            self.refs[0].device, csip.CoordinatedSetIdentificationService
        )
        await self.refs[0].device.create_advertising_set(
            advertising_parameters=_DEFAUILT_ADVERTISING_PARAMETERS,
            advertising_data=bytes(
                bap.UnicastServerAdvertisingData(
                    announcement_type=bap.AnnouncementType.GENERAL
                )
            )
            + csis.get_advertising_data(),
        )

      self.logger.info("[DUT] Create bond with %s", ref_address)
      self.dut.bt.createBond(
          ref_address,
          android_constants.Transport.LE,
          android_constants.AddressTypeStatus.RANDOM,
      )
      self.logger.info("[DUT] Wait for pairing request")
      await dut_adapter_cb.wait_for_event(
          bl4a_api.PairingRequest,
          lambda e: e.address == ref_address,
          timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
      )
      self.logger.info("[DUT] Accept pairing request")
      self.assertTrue(self.dut.bt.setPairingConfirmation(ref_address, True))
      self.logger.info("[DUT] Wait for bond state change")
      event = await dut_adapter_cb.wait_for_event(
          bl4a_api.BondStateChanged,
          lambda e: e.address == ref_address
          and e.state in _TERMINATION_BOND_STATES,
      )
      self.assertEqual(event.state, android_constants.BondState.BONDED)
      self.first_bond_timestamp = datetime.datetime.now()

      self.logger.info("[DUT] Wait for UUID Change")
      await dut_adapter_cb.wait_for_event(
          bl4a_api.UuidChanged(address=ref_address, uuids=mock.ANY)
      )
      self.dut.bt.connect(ref_address)

      self.logger.info("[DUT] Wait for LE Audio connected")
      await dut_lea_cb.wait_for_event(
          bl4a_api.ProfileConnectionStateChanged(
              address=ref_address, state=_ConnectionState.CONNECTED
          ),
          timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
      )

  @retry.retry_on_exception()
  async def _pair_minor_device(self) -> None:
    if not self.first_bond_timestamp:
      self.fail("Major device has not been paired")

    ref_address = self.refs[1].random_address
    with (
        self.dut.bl4a.register_callback(
            bl4a_api.Module.ADAPTER
        ) as dut_adapter_cb,
        self.dut.bl4a.register_callback(bl4a_api.Module.LE_AUDIO) as dut_lea_cb,
    ):
      csis = _get_service_from_device(
          self.refs[1].device, csip.CoordinatedSetIdentificationService
      )
      async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
        self.logger.info("[REF|%s] Start advertising", ref_address)
        await self.refs[1].device.create_advertising_set(
            advertising_parameters=_DEFAUILT_ADVERTISING_PARAMETERS,
            advertising_data=bytes(
                core.AdvertisingData([
                    (
                        core.AdvertisingData.COMPLETE_LOCAL_NAME,
                        bytes("Bumble Right", "utf-8"),
                    ),
                    (
                        core.AdvertisingData.FLAGS,
                        bytes([
                            core.AdvertisingData.LE_GENERAL_DISCOVERABLE_MODE_FLAG
                        ]),
                    ),
                    (
                        core.AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
                        bytes(csip.CoordinatedSetIdentificationService.UUID),
                    ),
                ])
            )
            + bytes(
                bap.UnicastServerAdvertisingData(
                    announcement_type=bap.AnnouncementType.GENERAL,
                    available_audio_contexts=bap.ContextType(0xFFFF),
                )
            )
            + csis.get_advertising_data(),
        )

      # When CSIS set member is discovered, Settings should automatically pair
      # to it and accept the pairing request.
      # However, Settings may not start discovery automatically, so here we need
      # to discover the CSIS set member manually.
      self.logger.info("[DUT] Start discovery")
      self.dut.bt.startInquiry()
      self.logger.info("[DUT] Wait for 2nd pairing request")
      await dut_adapter_cb.wait_for_event(
          bl4a_api.PairingRequest,
          lambda e: e.address == ref_address,
          timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
      )
      if (
          elapsed_time := (datetime.datetime.now() - self.first_bond_timestamp)
      ) > datetime.timedelta(seconds=10):
        self.logger.info(
            "Pairing request takes %.2fs > 10s, need to manually accept",
            elapsed_time.total_seconds(),
        )
        self.dut.bt.setPairingConfirmation(ref_address, True)

      self.logger.info("[DUT] Wait for 2nd REF to be bonded")
      event = await dut_adapter_cb.wait_for_event(
          bl4a_api.BondStateChanged,
          lambda e: e.address == ref_address
          and e.state in _TERMINATION_BOND_STATES,
      )
      self.assertEqual(event.state, android_constants.BondState.BONDED)

      self.logger.info("[DUT] Wait for UUID Change")
      await dut_adapter_cb.wait_for_event(
          bl4a_api.UuidChanged(address=ref_address, uuids=mock.ANY)
      )
      self.dut.bt.connect(ref_address)

      self.logger.info("[DUT] Wait for 2nd REF to be connected")
      await dut_lea_cb.wait_for_event(
          bl4a_api.ProfileConnectionStateChanged(
              address=ref_address, state=_ConnectionState.CONNECTED
          ),
          timeout=_DEFAULT_STEP_TIMEOUT_SECONDS,
      )

  _PROXY = TypeVar("_PROXY", bound=gatt_client.ProfileServiceProxy)

  async def _make_service_clients(
      self, proxy_class: type[_PROXY]
  ) -> list[_PROXY]:
    self.logger.info("[REF] Connect %s", proxy_class.__name__)
    clients = []
    for ref in self.refs:
      ref_dut_acl = ref.device.find_connection_by_bd_addr(
          hci.Address(self.dut.address), transport=core.BT_LE_TRANSPORT
      )
      if not ref_dut_acl:
        self.fail("No ACL connection found")
      async with device.Peer(ref_dut_acl) as peer:
        client = peer.create_service_proxy(proxy_class)
        if not client:
          self.fail("Failed to connect %s", proxy_class.__name__)
        clients.append(client)
    return clients

  @override
  async def async_setup_class(self) -> None:
    await super().async_setup_class()
    if self.dut.getprop(_AndroidProperty.BAP_UNICAST_CLIENT_ENABLED) != "true":
      raise signals.TestAbortClass("Unicast client is not enabled")

    if (
        self.dut.getprop(_AndroidProperty.LEAUDIO_BYPASS_ALLOW_LIST) != "true"
        and not self.dut.device.is_emulator
    ):
      # Allow list will not be used in the test, but here we still check if the
      # allow list is empty to make sure DUT is ready to use LE Audio.
      if not self.dut.getprop(_AndroidProperty.LEAUDIO_ALLOW_LIST):
        raise signals.TestAbortClass(
            "Allow list is empty, DUT is probably not ready to use LE Audio."
        )

    self.dut_vcp_enabled = (
        self.dut.getprop(_AndroidProperty.VCP_CONTROLLER_ENABLED) == "true"
    )
    self.dut_mcp_enabled = (
        self.dut.getprop(_AndroidProperty.MCP_SERVER_ENABLED) == "true"
    )
    self.dut_ccp_enabled = (
        self.dut.getprop(_AndroidProperty.CCP_SERVER_ENABLED) == "true"
    )
    for ref in self.refs:
      ref.config.cis_enabled = True
      async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
        await ref.open()
        if not ref.device.supports_le_features(
            hci.LeFeatureMask.CONNECTED_ISOCHRONOUS_STREAM_PERIPHERAL
        ):
          raise signals.TestAbortClass("REF does not support CIS peripheral")

    # Disable the allow list to allow the connect LE Audio to Bumble.
    self.dut.setprop(_AndroidProperty.LEAUDIO_BYPASS_ALLOW_LIST, "true")
    # Always repeat audio to avoid audio stopping.
    self.dut.bt.audioSetRepeat(android_constants.RepeatMode.ONE)

  @override
  async def async_setup_test(self) -> None:
    # Make sure BT is enabled before removing bonding devices.
    self.assertTrue(self.dut.bt.enable())
    self.dut.bt.waitForAdapterState(android_constants.AdapterState.ON)
    # Due to b/396352434, Settings might crash and freeze after restaring BT if
    # there are still some bonding devices.
    # So we need to remove all bonding/bonded devices before BT to avoid the
    # ANR.
    for device_address in self.dut.bt.getBondedDevices():
      self.dut.bt.removeBond(device_address)

    await super().async_setup_test()
    sirk = secrets.token_bytes(csip.SET_IDENTITY_RESOLVING_KEY_LENGTH)
    for ref, audio_location in zip(
        self.refs, (bap.AudioLocation.FRONT_LEFT, bap.AudioLocation.FRONT_RIGHT)
    ):
      self._setup_unicast_server(
          ref=ref.device,
          audio_location=audio_location,
          sirk=sirk,
          sirk_type=csip.SirkType.ENCRYPTED,
      )
      # Override pairing config factory to set identity address type and
      # io capability.
      ref.device.pairing_config_factory = lambda _: pairing.PairingConfig(
          identity_address_type=pairing.PairingConfig.AddressType.RANDOM,
          delegate=pairing.PairingDelegate(),
      )
    self.first_bond_timestamp = None

  @override
  async def async_teardown_test(self) -> None:
    await super().async_teardown_test()
    # Make sure audio is stopped before starting the test.
    await asyncio.to_thread(self.dut.bt.audioStop)

  @navi_test_base.named_parameterized(
      plaintext_sirk=dict(sirk_type=csip.SirkType.PLAINTEXT),
      encrypted_sirk=dict(sirk_type=csip.SirkType.ENCRYPTED),
  )
  async def test_pair_and_connect(self, sirk_type: csip.SirkType) -> None:
    """Tests pairing and connecting to Unicast servers in a CSIP set.

    Test steps:
      1. Override the SIRK type for all refs.
      2. Pair and connect the major device.
      3. Pair and connect the minor device.
      4. Check if both devices are connected and active.

    Args:
      sirk_type: The SIRK type to use.
    """
    # Override the SIRK type for all refs.
    for ref in self.refs:
      csis = _get_service_from_device(
          ref.device, csip.CoordinatedSetIdentificationService
      )
      csis.set_identity_resolving_key_type = sirk_type

    # Pair and connect devices.
    await self._pair_major_device()
    await self._pair_minor_device()

    # Check if both devices are connected and active.
    self.assertCountEqual(
        self.dut.bt.getActiveDevices(android_constants.Profile.LE_AUDIO),
        [self.refs[0].random_address, self.refs[1].random_address],
    )

  @navi_test_base.named_parameterized(
      ("active", True),
      ("passive", False),
  )
  async def test_reconnect(self, is_active: bool) -> None:
    """Tests to reconnect the LE Audio Unicast server.

    Args:
      is_active: True if reconnect is actively initialized by DUT, otherwise TA
        will be used to perform the reconnection passively.
    """
    if self.dut.device.is_emulator and not is_active:
      self.skipTest("Rootcanal doesn't support APCF.")

    # Pair and connect devices.
    await self._pair_major_device()
    await self._pair_minor_device()

    with self.dut.bl4a.register_callback(bl4a_api.Module.ADAPTER) as dut_cb:
      for ref in self.refs:
        if is_active:
          self.logger.info("[DUT] Disconnect REF")
          self.dut.bt.disconnect(ref.random_address)
        else:
          if not (
              ref_dut_acl := ref.device.find_connection_by_bd_addr(
                  hci.Address(self.dut.address), transport=core.BT_LE_TRANSPORT
              )
          ):
            self.fail("Unable to find connection between REF and DUT")
          async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
            self.logger.info("[REF] Disconnect DUT")
            await ref_dut_acl.disconnect()

        self.logger.info("[DUT] Wait for disconnected")
        await dut_cb.wait_for_event(
            bl4a_api.AclDisconnected(
                address=ref.random_address,
                transport=android_constants.Transport.LE,
            )
        )

    with self.dut.bl4a.register_callback(bl4a_api.Module.LE_AUDIO) as dut_cb:
      for ref in self.refs:
        self.logger.info("[REF] Start advertising")
        announcement_type = (
            bap.AnnouncementType.GENERAL
            if is_active
            else bap.AnnouncementType.TARGETED
        )
        bap_announcement = bap.UnicastServerAdvertisingData(
            announcement_type=(announcement_type),
            available_audio_contexts=bap.ContextType(0xFFFF),
        )
        cap_announcement = _CapAnnouncement(announcement_type=announcement_type)
        async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
          await ref.device.create_advertising_set(
              advertising_parameters=_DEFAUILT_ADVERTISING_PARAMETERS,
              advertising_data=bytes(bap_announcement)
              + bytes(cap_announcement),
          )
        if is_active:
          self.logger.info("[DUT] Reconnect REF")
          self.dut.bt.connect(ref.random_address)

        self.logger.info("[DUT] Wait for LE Audio connected")
        await dut_cb.wait_for_event(
            bl4a_api.ProfileConnectionStateChanged(
                address=ref.random_address, state=_ConnectionState.CONNECTED
            ),
        )

    # Check if both devices are connected and active.
    self.assertCountEqual(
        self.dut.bt.getActiveDevices(android_constants.Profile.LE_AUDIO),
        [self.refs[0].random_address, self.refs[1].random_address],
    )

  async def test_unidirectional_audio_stream(self) -> None:
    """Tests unidirectional audio stream between DUT and REF.

    Test steps:
      1. [Optional] Wait for audio streaming to stop if it is already streaming.
      2. Start audio streaming from DUT.
      3. Wait for audio streaming to start from REF.
      4. Stop audio streaming from DUT.
      5. Wait for audio streaming to stop from REF.
    """
    # Pair and connect devices.
    await self._pair_major_device()
    await self._pair_minor_device()

    sink_ase_states: list[pyee_extensions.EventTriggeredValueObserver] = []
    for ref in self.refs:
      sink_ase_states.extend(
          pyee_extensions.EventTriggeredValueObserver(
              ase,
              ase.EVENT_STATE_CHANGE,
              functools.partial(
                  lambda ase: cast(ascs.AseStateMachine, ase).state, ase
              ),
          )
          for ase in _get_service_from_device(
              ref.device, ascs.AudioStreamControlService
          ).ase_state_machines.values()
          if ase.role == ascs.AudioRole.SINK
      )

    # Make sure audio is not streaming.
    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      for ase_state in sink_ase_states:
        await ase_state.wait_for_target_value(ascs.AseStateMachine.State.IDLE)

    self.logger.info("[DUT] Start audio streaming")
    await asyncio.to_thread(self.dut.bt.audioPlaySine)
    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      for ase_state in sink_ase_states:
        await ase_state.wait_for_target_value(
            ascs.AseStateMachine.State.STREAMING
        )

    # Streaming for 1 second.
    await asyncio.sleep(_STREAMING_TIME_SECONDS)

    self.logger.info("[DUT] Stop audio streaming")
    await asyncio.to_thread(self.dut.bt.audioStop)
    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      for ase_state in sink_ase_states:
        await ase_state.wait_for_target_value(ascs.AseStateMachine.State.IDLE)

  async def test_bidirectional_audio_stream(self) -> None:
    """Tests bidirectional audio stream between DUT and REF.

    Test steps:
      1. [Optional] Wait for audio streaming to stop if it is already streaming.
      2. Put a call on DUT to make conversational audio context.
      3. Start audio streaming from DUT.
      4. Wait for audio streaming to start from REF.
      5. Stop audio streaming from DUT.
      6. Wait for audio streaming to stop from REF.
    """
    # Pair and connect devices.
    await self._pair_major_device()
    await self._pair_minor_device()

    ase_states: list[pyee_extensions.EventTriggeredValueObserver] = []
    for ref in self.refs:
      ase_states.extend(
          pyee_extensions.EventTriggeredValueObserver(
              ase,
              ase.EVENT_STATE_CHANGE,
              functools.partial(
                  lambda ase: cast(ascs.AseStateMachine, ase).state, ase
              ),
          )
          for ase in _get_service_from_device(
              ref.device, ascs.AudioStreamControlService
          ).ase_state_machines.values()
      )

    dut_telecom_cb = self.dut.bl4a.register_callback(bl4a_api.Module.TELECOM)
    self.test_case_context.push(dut_telecom_cb)
    call = self.dut.bl4a.make_phone_call(
        _CALLER_NAME,
        _CALLER_NUMBER,
        constants.Direction.OUTGOING,
    )

    with call:
      await dut_telecom_cb.wait_for_event(
          bl4a_api.CallStateChanged,
          lambda e: (e.state in (_CallState.CONNECTING, _CallState.DIALING)),
      )

      # Make sure audio is not streaming.
      async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
        for ase_state in ase_states:
          await ase_state.wait_for_target_value(ascs.AseStateMachine.State.IDLE)

      self.logger.info("[DUT] Start audio streaming")
      await asyncio.to_thread(self.dut.bt.audioPlaySine)

      async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
        # With current configuration, all ASEs will be active in bidirectional
        # streaming.
        for ase_state in ase_states:
          await ase_state.wait_for_target_value(
              ascs.AseStateMachine.State.STREAMING
          )

      # Streaming for 1 second.
      await asyncio.sleep(_STREAMING_TIME_SECONDS)

      self.logger.info("[DUT] Stop audio streaming")
      await asyncio.to_thread(self.dut.bt.audioStop)

    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      for ase_state in ase_states:
        await ase_state.wait_for_target_value(ascs.AseStateMachine.State.IDLE)

  async def test_reconfiguration(self) -> None:
    """Tests reconfiguration from media to conversational.

    Test steps:
      1. [Optional] Wait for audio streaming to stop if it is already streaming.
      2. Start audio streaming from DUT.
      3. Wait for audio streaming to start from REF.
      4. Put a call on DUT to trigger reconfiguration.
      5. Wait for ASE to be reconfigured.
    """
    # Pair and connect devices.
    await self._pair_major_device()
    await self._pair_minor_device()

    sink_ases: list[ascs.AseStateMachine] = []
    for ref in self.refs:
      sink_ases.extend(
          ase
          for ase in _get_service_from_device(
              ref.device, ascs.AudioStreamControlService
          ).ase_state_machines.values()
          if ase.role == ascs.AudioRole.SINK
      )
    sink_ase_states = [
        pyee_extensions.EventTriggeredValueObserver(
            ase,
            ase.EVENT_STATE_CHANGE,
            functools.partial(
                lambda ase: cast(ascs.AseStateMachine, ase).state, ase
            ),
        )
        for ase in sink_ases
    ]

    # Make sure audio is not streaming.
    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      for ase_state in sink_ase_states:
        await ase_state.wait_for_target_value(ascs.AseStateMachine.State.IDLE)

    self.logger.info("[DUT] Start audio streaming")
    await asyncio.to_thread(self.dut.bt.audioPlaySine)
    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      for ase_state in sink_ase_states:
        await ase_state.wait_for_target_value(
            ascs.AseStateMachine.State.STREAMING
        )
    for sink_ase in sink_ases:
      self.assertIsInstance(sink_ase.metadata, le_audio.Metadata)
      if (entry := _get_audio_context_entry(sink_ase)) is None:
        self.fail("Audio context is not found")
      context_type = struct.unpack_from("<H", entry.data)[0]
      self.assertNotEqual(context_type, bap.ContextType.PROHIBITED)
      self.assertFalse(context_type & bap.ContextType.CONVERSATIONAL)

    # Streaming for 1 second.
    await asyncio.sleep(_STREAMING_TIME_SECONDS)

    call = self.dut.bl4a.make_phone_call(
        _CALLER_NAME,
        _CALLER_NUMBER,
        constants.Direction.OUTGOING,
    )
    with call:
      async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
        self.logger.info("[DUT] Wait for ASE to be released")
        for ase_state in sink_ase_states:
          await ase_state.wait_for_target_value(ascs.AseStateMachine.State.IDLE)
        self.logger.info("[DUT] Wait for ASE to be reconfigured")
        for ase_state in sink_ase_states:
          await ase_state.wait_for_target_value(
              ascs.AseStateMachine.State.STREAMING
          )
        for sink_ase in sink_ases:
          if (entry := _get_audio_context_entry(sink_ase)) is None:
            self.fail("Audio context is not found")
          context_type = struct.unpack_from("<H", entry.data)[0]
          self.assertTrue(context_type & bap.ContextType.CONVERSATIONAL)

  async def test_streaming_later_join(self) -> None:
    """Tests connecting to devices later during streaming.

    Test steps:
      1. Start audio streaming from DUT.
      2. Start advertising from REF(Left), wait for DUT to connect.
      3. Wait for audio streaming to start from REF(Left).
      4. Start advertising from REF(Right), wait for DUT to connect.
      5. Wait for audio streaming to start from REF(Right).
    """
    if self.dut.device.is_emulator:
      self.skipTest("Rootcanal doesn't support APCF.")

    # Pair and connect the major device.
    await self._pair_major_device()
    await self._pair_minor_device()

    with self.dut.bl4a.register_callback(bl4a_api.Module.ADAPTER) as dut_cb:
      for ref in self.refs:
        self.logger.info("[DUT] Disconnect REF")
        self.dut.bt.disconnect(ref.random_address)
        self.logger.info("[DUT] Wait for disconnected")
        await dut_cb.wait_for_event(
            bl4a_api.AclDisconnected(
                address=ref.random_address,
                transport=android_constants.Transport.LE,
            )
        )

    sink_ase_states: list[pyee_extensions.EventTriggeredValueObserver] = []
    for ref in self.refs:
      sink_ase_states.extend(
          pyee_extensions.EventTriggeredValueObserver(
              ase,
              ase.EVENT_STATE_CHANGE,
              functools.partial(
                  lambda ase: cast(ascs.AseStateMachine, ase).state, ase
              ),
          )
          for ase in _get_service_from_device(
              ref.device, ascs.AudioStreamControlService
          ).ase_state_machines.values()
          if ase.role == ascs.AudioRole.SINK
      )

    self.logger.info("[DUT] Start audio streaming")
    await asyncio.to_thread(self.dut.bt.audioPlaySine)

    for sink_ase, ref in zip(sink_ase_states, self.refs):
      async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
        await ref.device.create_advertising_set(
            advertising_parameters=_DEFAUILT_ADVERTISING_PARAMETERS,
            advertising_data=bytes(
                _CapAnnouncement(
                    announcement_type=bap.AnnouncementType.TARGETED
                )
            ),
        )
        self.logger.info("[REF] Wait for ASE to be streaming")
        await sink_ase.wait_for_target_value(
            ascs.AseStateMachine.State.STREAMING
        )

  async def test_volume_initialization(self) -> None:
    """Makes sure DUT sets the volume correctly after connecting to REF."""
    if not self.dut_vcp_enabled:
      self.skipTest("VCP is not enabled on DUT")

    # Pair and connect the devices.
    await self._pair_major_device()
    await self._pair_minor_device()

    # Wait for the volume to be stable.
    await asyncio.sleep(_PREPARE_TIME_SECONDS)

    ratio = vcs.MAX_VOLUME / self.dut.bt.getMaxVolume(_StreamType.MUSIC)
    dut_volume = self.dut.bt.getVolume(_StreamType.MUSIC)
    ref_expected_volume = decimal.Decimal(dut_volume * ratio).to_integral_exact(
        rounding=decimal.ROUND_HALF_UP
    )
    # The behavior to set volume is not clear, but we can make sure the volume
    # should be correctly synchronized between DUT and all REF devices.
    for ref in self.refs:
      ref_vcs = _get_service_from_device(ref.device, vcs.VolumeControlService)
      self.assertEqual(ref_expected_volume, ref_vcs.volume_setting)

  @navi_test_base.named_parameterized(
      dict(
          testcase_name="from_dut",
          issuer=constants.TestRole.DUT,
      ),
      dict(
          testcase_name="from_ref",
          issuer=constants.TestRole.REF,
      ),
  )
  async def test_set_volume(self, issuer: constants.TestRole) -> None:
    """Tests setting volume over LEA VCP from DUT or REF.

    Test steps:
      1. Set volume from DUT or REF.
      2. Wait for the volume to be set correctly on the other devices.

    Args:
      issuer: The role that issues the volume setting request.
    """
    if not self.dut_vcp_enabled:
      self.skipTest("VCP is not enabled on DUT")

    await self._pair_major_device()
    await self._pair_minor_device()

    dut_max_volume = self.dut.bt.getMaxVolume(_StreamType.MUSIC)

    def dut_to_ref_volume(dut_volume: int) -> int:
      return int(
          decimal.Decimal(
              dut_volume / dut_max_volume * vcs.MAX_VOLUME
          ).to_integral_exact(rounding=decimal.ROUND_HALF_UP)
      )

    def get_volume_setting(service: vcs.VolumeControlService) -> int:
      return service.volume_setting

    # DUT's VCS client might not be stable at the beginning. If we set volume
    # immediately, the volume might not be set correctly.
    await asyncio.sleep(_PREPARE_TIME_SECONDS)

    ref_vcs_services = [
        _get_service_from_device(ref.device, vcs.VolumeControlService)
        for ref in self.refs
    ]

    for dut_volume in range(dut_max_volume + 1):
      ref_volume = dut_to_ref_volume(dut_volume)
      if self.dut.bt.getVolume(_StreamType.MUSIC) == dut_volume:
        # Skip if DUT volume is already set to the target.
        continue
      with self.dut.bl4a.register_callback(
          bl4a_api.Module.AUDIO
      ) as dut_audio_cb:
        if issuer == constants.TestRole.DUT:
          self.logger.info("[DUT] Set volume to %d", dut_volume)
          self.dut.bt.setVolume(_StreamType.MUSIC, dut_volume)
        else:
          self.logger.info("[REF] Set volume to %d", dut_volume)
          ref_vcs_services[0].volume_setting = ref_volume
          await self.refs[0].device.notify_subscribers(
              ref_vcs_services[0].volume_state
          )

        async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
          for i, ref_vcs_service in enumerate(ref_vcs_services):
            self.logger.info(
                "[REF-%d] Wait for volume to be set to %d", i, ref_volume
            )
            await pyee_extensions.EventTriggeredValueObserver[int](
                ref_vcs_service,
                ref_vcs_service.EVENT_VOLUME_STATE_CHANGE,
                functools.partial(get_volume_setting, ref_vcs_service),
            ).wait_for_target_value(ref_volume)
        # Only when remote device sets volume, DUT can receive the intent.
        if issuer == constants.TestRole.REF:
          self.logger.info("[DUT] Wait for volume to be set")
          await dut_audio_cb.wait_for_event(
              bl4a_api.VolumeChanged(
                  stream_type=_StreamType.MUSIC, volume_value=dut_volume
              ),
          )

  async def test_mcp_play_pause(self) -> None:
    """Tests playing and pausing media playback over MCP.

    Test steps:
      1. Connect MCP.
      2. Subscribe MCP characteristics.
      3. Play media playback on DUT.
      4. Pause media playback over MCP.
      5. Wait for playback to pause.
      6. Resume media playback over MCP.
      7. Wait for playback to start.
    """
    if not self.dut_mcp_enabled:
      self.skipTest("MCP is not enabled on DUT")

    await self._pair_major_device()
    await self._pair_minor_device()

    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      self.logger.info("[REF] Connect GMCS")
      ref_mcp_clients = await self._make_service_clients(
          _GenericMediaControlServiceProxy
      )
      self.logger.info("[REF] Subscribe MCP characteristics")
      for ref_mcp_client in ref_mcp_clients:
        await ref_mcp_client.subscribe_characteristics()

    media_states = [
        await gatt_helper.MutableCharacteristicState.create(
            ref_mcp_client.media_state
        )
        for ref_mcp_client in ref_mcp_clients
        if ref_mcp_client.media_state
    ]
    self.assertLen(media_states, self.NUM_REF_DEVICES)

    dut_player_cb = self.dut.bl4a.register_callback(bl4a_api.Module.PLAYER)
    self.test_case_context.push(dut_player_cb)

    self.logger.info("[DUT] Play")
    await asyncio.to_thread(self.dut.bt.audioPlaySine)
    self.logger.info("[DUT] Wait for playback started")
    await dut_player_cb.wait_for_event(
        bl4a_api.PlayerIsPlayingChanged(is_playing=True)
    )

    async with self.assert_not_timeout(
        _DEFAULT_STEP_TIMEOUT_SECONDS, msg="[REF] Pause"
    ):
      # Pause from the first REF device.
      result = await ref_mcp_clients[0].write_control_point(_McpOpcode.PAUSE)
      self.assertEqual(result, mcp.MediaControlPointResultCode.SUCCESS)
      for i, media_state in enumerate(media_states):
        self.logger.info("[REF-%d] Wait for media state to be PAUSED", i)
        await media_state.wait_for_target_value(bytes([mcp.MediaState.PAUSED]))
    self.logger.info("[DUT] Wait for playback paused")
    await dut_player_cb.wait_for_event(
        bl4a_api.PlayerIsPlayingChanged(is_playing=False),
    )

    async with self.assert_not_timeout(
        _DEFAULT_STEP_TIMEOUT_SECONDS, msg="[REF] Play"
    ):
      # Resume from the second REF device.
      result = await ref_mcp_clients[1].write_control_point(_McpOpcode.PLAY)
      self.assertEqual(result, mcp.MediaControlPointResultCode.SUCCESS)
      for i, media_state in enumerate(media_states):
        self.logger.info("[REF-%d] Wait for media state to be PLAYING", i)
        await media_state.wait_for_target_value(bytes([mcp.MediaState.PLAYING]))
    self.logger.info("[DUT] Wait for playback started")
    await dut_player_cb.wait_for_event(
        bl4a_api.PlayerIsPlayingChanged(is_playing=True)
    )

  async def test_ccp_accept_and_terminate_call(self) -> None:
    """Tests answering and terminating a call over CCP.

    Test steps:
      1. Connect CCP.
      2. Read and subscribe CCP characteristics.
      3. Put an incoming call from DUT.
      4. Accept the call on REF.
      5. Terminate the call on REF.
    """
    if not self.dut_ccp_enabled:
      self.skipTest("CCP is not enabled on DUT")

    await self._pair_major_device()
    await self._pair_minor_device()

    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      self.logger.info("[REF] Connect TBS")
      ref_tbs_clients = await self._make_service_clients(
          ccp.GenericTelephoneBearerServiceProxy
      )
      self.logger.info("[REF] Read and subscribe TBS characteristics")
      for ref_tbs_client in ref_tbs_clients:
        await ref_tbs_client.read_and_subscribe_characteristics()

    expected_call_index = 1
    call = self.dut.bl4a.make_phone_call(
        _CALLER_NAME, _CALLER_NUMBER, _Direction.INCOMING
    )
    self.test_case_context.push(call)
    dut_telecom_cb = self.dut.bl4a.register_callback(bl4a_api.Module.TELECOM)
    self.test_case_context.push(dut_telecom_cb)
    for i, ref_tbs_client in enumerate(ref_tbs_clients):
      async with self.assert_not_timeout(
          _DEFAULT_STEP_TIMEOUT_SECONDS,
          msg=f"[REF-{i}] Wait for call state change",
      ):
        await ref_tbs_client.call_state.wait_for_target_value(
            bytes(
                [expected_call_index, ccp.CallState.INCOMING, ccp.CallFlag(0)]
            )
        )

    async with self.assert_not_timeout(
        _DEFAULT_STEP_TIMEOUT_SECONDS, msg="[REF-0] Accept call"
    ):
      await ref_tbs_clients[0].accept(expected_call_index)

    self.logger.info("[DUT] Wait for call to be active")
    await dut_telecom_cb.wait_for_event(
        bl4a_api.CallStateChanged(
            state=_CallState.ACTIVE, handle=mock.ANY, name=mock.ANY
        ),
    )

    async with self.assert_not_timeout(
        _DEFAULT_STEP_TIMEOUT_SECONDS, msg="[REF-1] Terminate call"
    ):
      await ref_tbs_clients[1].terminate(expected_call_index)

    self.logger.info("[DUT] Wait for call to be disconnected")
    await dut_telecom_cb.wait_for_event(
        bl4a_api.CallStateChanged(
            state=_CallState.DISCONNECTED, handle=mock.ANY, name=mock.ANY
        ),
    )
    for i, ref_tbs_client in enumerate(ref_tbs_clients):
      async with self.assert_not_timeout(
          _DEFAULT_STEP_TIMEOUT_SECONDS,
          msg=f"[REF-{i}] Wait for call state change",
      ):
        await ref_tbs_client.call_state.wait_for_target_value(b"")

Tests bidirectional audio stream between DUT and REF.

Test steps
  1. [Optional] Wait for audio streaming to stop if it is already streaming.
  2. Put a call on DUT to make conversational audio context.
  3. Start audio streaming from DUT.
  4. Wait for audio streaming to start from REF.
  5. Stop audio streaming from DUT.
  6. Wait for audio streaming to stop from REF.
Source code in navi/tests/functionality/le_audio_unicast_client_dual_device_test.py
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
async def test_bidirectional_audio_stream(self) -> None:
  """Tests bidirectional audio stream between DUT and REF.

  Test steps:
    1. [Optional] Wait for audio streaming to stop if it is already streaming.
    2. Put a call on DUT to make conversational audio context.
    3. Start audio streaming from DUT.
    4. Wait for audio streaming to start from REF.
    5. Stop audio streaming from DUT.
    6. Wait for audio streaming to stop from REF.
  """
  # Pair and connect devices.
  await self._pair_major_device()
  await self._pair_minor_device()

  ase_states: list[pyee_extensions.EventTriggeredValueObserver] = []
  for ref in self.refs:
    ase_states.extend(
        pyee_extensions.EventTriggeredValueObserver(
            ase,
            ase.EVENT_STATE_CHANGE,
            functools.partial(
                lambda ase: cast(ascs.AseStateMachine, ase).state, ase
            ),
        )
        for ase in _get_service_from_device(
            ref.device, ascs.AudioStreamControlService
        ).ase_state_machines.values()
    )

  dut_telecom_cb = self.dut.bl4a.register_callback(bl4a_api.Module.TELECOM)
  self.test_case_context.push(dut_telecom_cb)
  call = self.dut.bl4a.make_phone_call(
      _CALLER_NAME,
      _CALLER_NUMBER,
      constants.Direction.OUTGOING,
  )

  with call:
    await dut_telecom_cb.wait_for_event(
        bl4a_api.CallStateChanged,
        lambda e: (e.state in (_CallState.CONNECTING, _CallState.DIALING)),
    )

    # Make sure audio is not streaming.
    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      for ase_state in ase_states:
        await ase_state.wait_for_target_value(ascs.AseStateMachine.State.IDLE)

    self.logger.info("[DUT] Start audio streaming")
    await asyncio.to_thread(self.dut.bt.audioPlaySine)

    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      # With current configuration, all ASEs will be active in bidirectional
      # streaming.
      for ase_state in ase_states:
        await ase_state.wait_for_target_value(
            ascs.AseStateMachine.State.STREAMING
        )

    # Streaming for 1 second.
    await asyncio.sleep(_STREAMING_TIME_SECONDS)

    self.logger.info("[DUT] Stop audio streaming")
    await asyncio.to_thread(self.dut.bt.audioStop)

  async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
    for ase_state in ase_states:
      await ase_state.wait_for_target_value(ascs.AseStateMachine.State.IDLE)

Tests answering and terminating a call over CCP.

Test steps
  1. Connect CCP.
  2. Read and subscribe CCP characteristics.
  3. Put an incoming call from DUT.
  4. Accept the call on REF.
  5. Terminate the call on REF.
Source code in navi/tests/functionality/le_audio_unicast_client_dual_device_test.py
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
async def test_ccp_accept_and_terminate_call(self) -> None:
  """Tests answering and terminating a call over CCP.

  Test steps:
    1. Connect CCP.
    2. Read and subscribe CCP characteristics.
    3. Put an incoming call from DUT.
    4. Accept the call on REF.
    5. Terminate the call on REF.
  """
  if not self.dut_ccp_enabled:
    self.skipTest("CCP is not enabled on DUT")

  await self._pair_major_device()
  await self._pair_minor_device()

  async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
    self.logger.info("[REF] Connect TBS")
    ref_tbs_clients = await self._make_service_clients(
        ccp.GenericTelephoneBearerServiceProxy
    )
    self.logger.info("[REF] Read and subscribe TBS characteristics")
    for ref_tbs_client in ref_tbs_clients:
      await ref_tbs_client.read_and_subscribe_characteristics()

  expected_call_index = 1
  call = self.dut.bl4a.make_phone_call(
      _CALLER_NAME, _CALLER_NUMBER, _Direction.INCOMING
  )
  self.test_case_context.push(call)
  dut_telecom_cb = self.dut.bl4a.register_callback(bl4a_api.Module.TELECOM)
  self.test_case_context.push(dut_telecom_cb)
  for i, ref_tbs_client in enumerate(ref_tbs_clients):
    async with self.assert_not_timeout(
        _DEFAULT_STEP_TIMEOUT_SECONDS,
        msg=f"[REF-{i}] Wait for call state change",
    ):
      await ref_tbs_client.call_state.wait_for_target_value(
          bytes(
              [expected_call_index, ccp.CallState.INCOMING, ccp.CallFlag(0)]
          )
      )

  async with self.assert_not_timeout(
      _DEFAULT_STEP_TIMEOUT_SECONDS, msg="[REF-0] Accept call"
  ):
    await ref_tbs_clients[0].accept(expected_call_index)

  self.logger.info("[DUT] Wait for call to be active")
  await dut_telecom_cb.wait_for_event(
      bl4a_api.CallStateChanged(
          state=_CallState.ACTIVE, handle=mock.ANY, name=mock.ANY
      ),
  )

  async with self.assert_not_timeout(
      _DEFAULT_STEP_TIMEOUT_SECONDS, msg="[REF-1] Terminate call"
  ):
    await ref_tbs_clients[1].terminate(expected_call_index)

  self.logger.info("[DUT] Wait for call to be disconnected")
  await dut_telecom_cb.wait_for_event(
      bl4a_api.CallStateChanged(
          state=_CallState.DISCONNECTED, handle=mock.ANY, name=mock.ANY
      ),
  )
  for i, ref_tbs_client in enumerate(ref_tbs_clients):
    async with self.assert_not_timeout(
        _DEFAULT_STEP_TIMEOUT_SECONDS,
        msg=f"[REF-{i}] Wait for call state change",
    ):
      await ref_tbs_client.call_state.wait_for_target_value(b"")

Tests playing and pausing media playback over MCP.

Test steps
  1. Connect MCP.
  2. Subscribe MCP characteristics.
  3. Play media playback on DUT.
  4. Pause media playback over MCP.
  5. Wait for playback to pause.
  6. Resume media playback over MCP.
  7. Wait for playback to start.
Source code in navi/tests/functionality/le_audio_unicast_client_dual_device_test.py
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
async def test_mcp_play_pause(self) -> None:
  """Tests playing and pausing media playback over MCP.

  Test steps:
    1. Connect MCP.
    2. Subscribe MCP characteristics.
    3. Play media playback on DUT.
    4. Pause media playback over MCP.
    5. Wait for playback to pause.
    6. Resume media playback over MCP.
    7. Wait for playback to start.
  """
  if not self.dut_mcp_enabled:
    self.skipTest("MCP is not enabled on DUT")

  await self._pair_major_device()
  await self._pair_minor_device()

  async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
    self.logger.info("[REF] Connect GMCS")
    ref_mcp_clients = await self._make_service_clients(
        _GenericMediaControlServiceProxy
    )
    self.logger.info("[REF] Subscribe MCP characteristics")
    for ref_mcp_client in ref_mcp_clients:
      await ref_mcp_client.subscribe_characteristics()

  media_states = [
      await gatt_helper.MutableCharacteristicState.create(
          ref_mcp_client.media_state
      )
      for ref_mcp_client in ref_mcp_clients
      if ref_mcp_client.media_state
  ]
  self.assertLen(media_states, self.NUM_REF_DEVICES)

  dut_player_cb = self.dut.bl4a.register_callback(bl4a_api.Module.PLAYER)
  self.test_case_context.push(dut_player_cb)

  self.logger.info("[DUT] Play")
  await asyncio.to_thread(self.dut.bt.audioPlaySine)
  self.logger.info("[DUT] Wait for playback started")
  await dut_player_cb.wait_for_event(
      bl4a_api.PlayerIsPlayingChanged(is_playing=True)
  )

  async with self.assert_not_timeout(
      _DEFAULT_STEP_TIMEOUT_SECONDS, msg="[REF] Pause"
  ):
    # Pause from the first REF device.
    result = await ref_mcp_clients[0].write_control_point(_McpOpcode.PAUSE)
    self.assertEqual(result, mcp.MediaControlPointResultCode.SUCCESS)
    for i, media_state in enumerate(media_states):
      self.logger.info("[REF-%d] Wait for media state to be PAUSED", i)
      await media_state.wait_for_target_value(bytes([mcp.MediaState.PAUSED]))
  self.logger.info("[DUT] Wait for playback paused")
  await dut_player_cb.wait_for_event(
      bl4a_api.PlayerIsPlayingChanged(is_playing=False),
  )

  async with self.assert_not_timeout(
      _DEFAULT_STEP_TIMEOUT_SECONDS, msg="[REF] Play"
  ):
    # Resume from the second REF device.
    result = await ref_mcp_clients[1].write_control_point(_McpOpcode.PLAY)
    self.assertEqual(result, mcp.MediaControlPointResultCode.SUCCESS)
    for i, media_state in enumerate(media_states):
      self.logger.info("[REF-%d] Wait for media state to be PLAYING", i)
      await media_state.wait_for_target_value(bytes([mcp.MediaState.PLAYING]))
  self.logger.info("[DUT] Wait for playback started")
  await dut_player_cb.wait_for_event(
      bl4a_api.PlayerIsPlayingChanged(is_playing=True)
  )

Tests pairing and connecting to Unicast servers in a CSIP set.

Test steps
  1. Override the SIRK type for all refs.
  2. Pair and connect the major device.
  3. Pair and connect the minor device.
  4. Check if both devices are connected and active.

Parameters:

Name Type Description Default
sirk_type SirkType

The SIRK type to use.

required
Source code in navi/tests/functionality/le_audio_unicast_client_dual_device_test.py
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
@navi_test_base.named_parameterized(
    plaintext_sirk=dict(sirk_type=csip.SirkType.PLAINTEXT),
    encrypted_sirk=dict(sirk_type=csip.SirkType.ENCRYPTED),
)
async def test_pair_and_connect(self, sirk_type: csip.SirkType) -> None:
  """Tests pairing and connecting to Unicast servers in a CSIP set.

  Test steps:
    1. Override the SIRK type for all refs.
    2. Pair and connect the major device.
    3. Pair and connect the minor device.
    4. Check if both devices are connected and active.

  Args:
    sirk_type: The SIRK type to use.
  """
  # Override the SIRK type for all refs.
  for ref in self.refs:
    csis = _get_service_from_device(
        ref.device, csip.CoordinatedSetIdentificationService
    )
    csis.set_identity_resolving_key_type = sirk_type

  # Pair and connect devices.
  await self._pair_major_device()
  await self._pair_minor_device()

  # Check if both devices are connected and active.
  self.assertCountEqual(
      self.dut.bt.getActiveDevices(android_constants.Profile.LE_AUDIO),
      [self.refs[0].random_address, self.refs[1].random_address],
  )

Tests reconfiguration from media to conversational.

Test steps
  1. [Optional] Wait for audio streaming to stop if it is already streaming.
  2. Start audio streaming from DUT.
  3. Wait for audio streaming to start from REF.
  4. Put a call on DUT to trigger reconfiguration.
  5. Wait for ASE to be reconfigured.
Source code in navi/tests/functionality/le_audio_unicast_client_dual_device_test.py
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
async def test_reconfiguration(self) -> None:
  """Tests reconfiguration from media to conversational.

  Test steps:
    1. [Optional] Wait for audio streaming to stop if it is already streaming.
    2. Start audio streaming from DUT.
    3. Wait for audio streaming to start from REF.
    4. Put a call on DUT to trigger reconfiguration.
    5. Wait for ASE to be reconfigured.
  """
  # Pair and connect devices.
  await self._pair_major_device()
  await self._pair_minor_device()

  sink_ases: list[ascs.AseStateMachine] = []
  for ref in self.refs:
    sink_ases.extend(
        ase
        for ase in _get_service_from_device(
            ref.device, ascs.AudioStreamControlService
        ).ase_state_machines.values()
        if ase.role == ascs.AudioRole.SINK
    )
  sink_ase_states = [
      pyee_extensions.EventTriggeredValueObserver(
          ase,
          ase.EVENT_STATE_CHANGE,
          functools.partial(
              lambda ase: cast(ascs.AseStateMachine, ase).state, ase
          ),
      )
      for ase in sink_ases
  ]

  # Make sure audio is not streaming.
  async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
    for ase_state in sink_ase_states:
      await ase_state.wait_for_target_value(ascs.AseStateMachine.State.IDLE)

  self.logger.info("[DUT] Start audio streaming")
  await asyncio.to_thread(self.dut.bt.audioPlaySine)
  async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
    for ase_state in sink_ase_states:
      await ase_state.wait_for_target_value(
          ascs.AseStateMachine.State.STREAMING
      )
  for sink_ase in sink_ases:
    self.assertIsInstance(sink_ase.metadata, le_audio.Metadata)
    if (entry := _get_audio_context_entry(sink_ase)) is None:
      self.fail("Audio context is not found")
    context_type = struct.unpack_from("<H", entry.data)[0]
    self.assertNotEqual(context_type, bap.ContextType.PROHIBITED)
    self.assertFalse(context_type & bap.ContextType.CONVERSATIONAL)

  # Streaming for 1 second.
  await asyncio.sleep(_STREAMING_TIME_SECONDS)

  call = self.dut.bl4a.make_phone_call(
      _CALLER_NAME,
      _CALLER_NUMBER,
      constants.Direction.OUTGOING,
  )
  with call:
    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      self.logger.info("[DUT] Wait for ASE to be released")
      for ase_state in sink_ase_states:
        await ase_state.wait_for_target_value(ascs.AseStateMachine.State.IDLE)
      self.logger.info("[DUT] Wait for ASE to be reconfigured")
      for ase_state in sink_ase_states:
        await ase_state.wait_for_target_value(
            ascs.AseStateMachine.State.STREAMING
        )
      for sink_ase in sink_ases:
        if (entry := _get_audio_context_entry(sink_ase)) is None:
          self.fail("Audio context is not found")
        context_type = struct.unpack_from("<H", entry.data)[0]
        self.assertTrue(context_type & bap.ContextType.CONVERSATIONAL)

Tests to reconnect the LE Audio Unicast server.

Parameters:

Name Type Description Default
is_active bool

True if reconnect is actively initialized by DUT, otherwise TA will be used to perform the reconnection passively.

required
Source code in navi/tests/functionality/le_audio_unicast_client_dual_device_test.py
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
@navi_test_base.named_parameterized(
    ("active", True),
    ("passive", False),
)
async def test_reconnect(self, is_active: bool) -> None:
  """Tests to reconnect the LE Audio Unicast server.

  Args:
    is_active: True if reconnect is actively initialized by DUT, otherwise TA
      will be used to perform the reconnection passively.
  """
  if self.dut.device.is_emulator and not is_active:
    self.skipTest("Rootcanal doesn't support APCF.")

  # Pair and connect devices.
  await self._pair_major_device()
  await self._pair_minor_device()

  with self.dut.bl4a.register_callback(bl4a_api.Module.ADAPTER) as dut_cb:
    for ref in self.refs:
      if is_active:
        self.logger.info("[DUT] Disconnect REF")
        self.dut.bt.disconnect(ref.random_address)
      else:
        if not (
            ref_dut_acl := ref.device.find_connection_by_bd_addr(
                hci.Address(self.dut.address), transport=core.BT_LE_TRANSPORT
            )
        ):
          self.fail("Unable to find connection between REF and DUT")
        async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
          self.logger.info("[REF] Disconnect DUT")
          await ref_dut_acl.disconnect()

      self.logger.info("[DUT] Wait for disconnected")
      await dut_cb.wait_for_event(
          bl4a_api.AclDisconnected(
              address=ref.random_address,
              transport=android_constants.Transport.LE,
          )
      )

  with self.dut.bl4a.register_callback(bl4a_api.Module.LE_AUDIO) as dut_cb:
    for ref in self.refs:
      self.logger.info("[REF] Start advertising")
      announcement_type = (
          bap.AnnouncementType.GENERAL
          if is_active
          else bap.AnnouncementType.TARGETED
      )
      bap_announcement = bap.UnicastServerAdvertisingData(
          announcement_type=(announcement_type),
          available_audio_contexts=bap.ContextType(0xFFFF),
      )
      cap_announcement = _CapAnnouncement(announcement_type=announcement_type)
      async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
        await ref.device.create_advertising_set(
            advertising_parameters=_DEFAUILT_ADVERTISING_PARAMETERS,
            advertising_data=bytes(bap_announcement)
            + bytes(cap_announcement),
        )
      if is_active:
        self.logger.info("[DUT] Reconnect REF")
        self.dut.bt.connect(ref.random_address)

      self.logger.info("[DUT] Wait for LE Audio connected")
      await dut_cb.wait_for_event(
          bl4a_api.ProfileConnectionStateChanged(
              address=ref.random_address, state=_ConnectionState.CONNECTED
          ),
      )

  # Check if both devices are connected and active.
  self.assertCountEqual(
      self.dut.bt.getActiveDevices(android_constants.Profile.LE_AUDIO),
      [self.refs[0].random_address, self.refs[1].random_address],
  )

Tests setting volume over LEA VCP from DUT or REF.

Test steps
  1. Set volume from DUT or REF.
  2. Wait for the volume to be set correctly on the other devices.

Parameters:

Name Type Description Default
issuer TestRole

The role that issues the volume setting request.

required
Source code in navi/tests/functionality/le_audio_unicast_client_dual_device_test.py
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
@navi_test_base.named_parameterized(
    dict(
        testcase_name="from_dut",
        issuer=constants.TestRole.DUT,
    ),
    dict(
        testcase_name="from_ref",
        issuer=constants.TestRole.REF,
    ),
)
async def test_set_volume(self, issuer: constants.TestRole) -> None:
  """Tests setting volume over LEA VCP from DUT or REF.

  Test steps:
    1. Set volume from DUT or REF.
    2. Wait for the volume to be set correctly on the other devices.

  Args:
    issuer: The role that issues the volume setting request.
  """
  if not self.dut_vcp_enabled:
    self.skipTest("VCP is not enabled on DUT")

  await self._pair_major_device()
  await self._pair_minor_device()

  dut_max_volume = self.dut.bt.getMaxVolume(_StreamType.MUSIC)

  def dut_to_ref_volume(dut_volume: int) -> int:
    return int(
        decimal.Decimal(
            dut_volume / dut_max_volume * vcs.MAX_VOLUME
        ).to_integral_exact(rounding=decimal.ROUND_HALF_UP)
    )

  def get_volume_setting(service: vcs.VolumeControlService) -> int:
    return service.volume_setting

  # DUT's VCS client might not be stable at the beginning. If we set volume
  # immediately, the volume might not be set correctly.
  await asyncio.sleep(_PREPARE_TIME_SECONDS)

  ref_vcs_services = [
      _get_service_from_device(ref.device, vcs.VolumeControlService)
      for ref in self.refs
  ]

  for dut_volume in range(dut_max_volume + 1):
    ref_volume = dut_to_ref_volume(dut_volume)
    if self.dut.bt.getVolume(_StreamType.MUSIC) == dut_volume:
      # Skip if DUT volume is already set to the target.
      continue
    with self.dut.bl4a.register_callback(
        bl4a_api.Module.AUDIO
    ) as dut_audio_cb:
      if issuer == constants.TestRole.DUT:
        self.logger.info("[DUT] Set volume to %d", dut_volume)
        self.dut.bt.setVolume(_StreamType.MUSIC, dut_volume)
      else:
        self.logger.info("[REF] Set volume to %d", dut_volume)
        ref_vcs_services[0].volume_setting = ref_volume
        await self.refs[0].device.notify_subscribers(
            ref_vcs_services[0].volume_state
        )

      async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
        for i, ref_vcs_service in enumerate(ref_vcs_services):
          self.logger.info(
              "[REF-%d] Wait for volume to be set to %d", i, ref_volume
          )
          await pyee_extensions.EventTriggeredValueObserver[int](
              ref_vcs_service,
              ref_vcs_service.EVENT_VOLUME_STATE_CHANGE,
              functools.partial(get_volume_setting, ref_vcs_service),
          ).wait_for_target_value(ref_volume)
      # Only when remote device sets volume, DUT can receive the intent.
      if issuer == constants.TestRole.REF:
        self.logger.info("[DUT] Wait for volume to be set")
        await dut_audio_cb.wait_for_event(
            bl4a_api.VolumeChanged(
                stream_type=_StreamType.MUSIC, volume_value=dut_volume
            ),
        )

Tests connecting to devices later during streaming.

Test steps
  1. Start audio streaming from DUT.
  2. Start advertising from REF(Left), wait for DUT to connect.
  3. Wait for audio streaming to start from REF(Left).
  4. Start advertising from REF(Right), wait for DUT to connect.
  5. Wait for audio streaming to start from REF(Right).
Source code in navi/tests/functionality/le_audio_unicast_client_dual_device_test.py
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
async def test_streaming_later_join(self) -> None:
  """Tests connecting to devices later during streaming.

  Test steps:
    1. Start audio streaming from DUT.
    2. Start advertising from REF(Left), wait for DUT to connect.
    3. Wait for audio streaming to start from REF(Left).
    4. Start advertising from REF(Right), wait for DUT to connect.
    5. Wait for audio streaming to start from REF(Right).
  """
  if self.dut.device.is_emulator:
    self.skipTest("Rootcanal doesn't support APCF.")

  # Pair and connect the major device.
  await self._pair_major_device()
  await self._pair_minor_device()

  with self.dut.bl4a.register_callback(bl4a_api.Module.ADAPTER) as dut_cb:
    for ref in self.refs:
      self.logger.info("[DUT] Disconnect REF")
      self.dut.bt.disconnect(ref.random_address)
      self.logger.info("[DUT] Wait for disconnected")
      await dut_cb.wait_for_event(
          bl4a_api.AclDisconnected(
              address=ref.random_address,
              transport=android_constants.Transport.LE,
          )
      )

  sink_ase_states: list[pyee_extensions.EventTriggeredValueObserver] = []
  for ref in self.refs:
    sink_ase_states.extend(
        pyee_extensions.EventTriggeredValueObserver(
            ase,
            ase.EVENT_STATE_CHANGE,
            functools.partial(
                lambda ase: cast(ascs.AseStateMachine, ase).state, ase
            ),
        )
        for ase in _get_service_from_device(
            ref.device, ascs.AudioStreamControlService
        ).ase_state_machines.values()
        if ase.role == ascs.AudioRole.SINK
    )

  self.logger.info("[DUT] Start audio streaming")
  await asyncio.to_thread(self.dut.bt.audioPlaySine)

  for sink_ase, ref in zip(sink_ase_states, self.refs):
    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      await ref.device.create_advertising_set(
          advertising_parameters=_DEFAUILT_ADVERTISING_PARAMETERS,
          advertising_data=bytes(
              _CapAnnouncement(
                  announcement_type=bap.AnnouncementType.TARGETED
              )
          ),
      )
      self.logger.info("[REF] Wait for ASE to be streaming")
      await sink_ase.wait_for_target_value(
          ascs.AseStateMachine.State.STREAMING
      )

Tests unidirectional audio stream between DUT and REF.

Test steps
  1. [Optional] Wait for audio streaming to stop if it is already streaming.
  2. Start audio streaming from DUT.
  3. Wait for audio streaming to start from REF.
  4. Stop audio streaming from DUT.
  5. Wait for audio streaming to stop from REF.
Source code in navi/tests/functionality/le_audio_unicast_client_dual_device_test.py
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
async def test_unidirectional_audio_stream(self) -> None:
  """Tests unidirectional audio stream between DUT and REF.

  Test steps:
    1. [Optional] Wait for audio streaming to stop if it is already streaming.
    2. Start audio streaming from DUT.
    3. Wait for audio streaming to start from REF.
    4. Stop audio streaming from DUT.
    5. Wait for audio streaming to stop from REF.
  """
  # Pair and connect devices.
  await self._pair_major_device()
  await self._pair_minor_device()

  sink_ase_states: list[pyee_extensions.EventTriggeredValueObserver] = []
  for ref in self.refs:
    sink_ase_states.extend(
        pyee_extensions.EventTriggeredValueObserver(
            ase,
            ase.EVENT_STATE_CHANGE,
            functools.partial(
                lambda ase: cast(ascs.AseStateMachine, ase).state, ase
            ),
        )
        for ase in _get_service_from_device(
            ref.device, ascs.AudioStreamControlService
        ).ase_state_machines.values()
        if ase.role == ascs.AudioRole.SINK
    )

  # Make sure audio is not streaming.
  async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
    for ase_state in sink_ase_states:
      await ase_state.wait_for_target_value(ascs.AseStateMachine.State.IDLE)

  self.logger.info("[DUT] Start audio streaming")
  await asyncio.to_thread(self.dut.bt.audioPlaySine)
  async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
    for ase_state in sink_ase_states:
      await ase_state.wait_for_target_value(
          ascs.AseStateMachine.State.STREAMING
      )

  # Streaming for 1 second.
  await asyncio.sleep(_STREAMING_TIME_SECONDS)

  self.logger.info("[DUT] Stop audio streaming")
  await asyncio.to_thread(self.dut.bt.audioStop)
  async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
    for ase_state in sink_ase_states:
      await ase_state.wait_for_target_value(ascs.AseStateMachine.State.IDLE)

Makes sure DUT sets the volume correctly after connecting to REF.

Source code in navi/tests/functionality/le_audio_unicast_client_dual_device_test.py
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
async def test_volume_initialization(self) -> None:
  """Makes sure DUT sets the volume correctly after connecting to REF."""
  if not self.dut_vcp_enabled:
    self.skipTest("VCP is not enabled on DUT")

  # Pair and connect the devices.
  await self._pair_major_device()
  await self._pair_minor_device()

  # Wait for the volume to be stable.
  await asyncio.sleep(_PREPARE_TIME_SECONDS)

  ratio = vcs.MAX_VOLUME / self.dut.bt.getMaxVolume(_StreamType.MUSIC)
  dut_volume = self.dut.bt.getVolume(_StreamType.MUSIC)
  ref_expected_volume = decimal.Decimal(dut_volume * ratio).to_integral_exact(
      rounding=decimal.ROUND_HALF_UP
  )
  # The behavior to set volume is not clear, but we can make sure the volume
  # should be correctly synchronized between DUT and all REF devices.
  for ref in self.refs:
    ref_vcs = _get_service_from_device(ref.device, vcs.VolumeControlService)
    self.assertEqual(ref_expected_volume, ref_vcs.volume_setting)

Bases: TwoDevicesTestBase

Source code in navi/tests/functionality/le_pairing_test.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
class LePairingTest(navi_test_base.TwoDevicesTestBase):

  @retry.retry_on_exception()
  async def _make_outgoing_connection(
      self, ref_connection_address_type: _AddressType, create_bond: bool
  ) -> device.Connection:
    ref_addr = str(
        self.ref.random_address
        if ref_connection_address_type == _AddressType.RANDOM
        else self.ref.address
    )
    self.logger.info('[REF] Start advertising.')
    await self.ref.device.start_advertising(
        own_address_type=ref_connection_address_type
    )

    with pyee_extensions.EventWatcher() as watcher:
      ref_dut_connection_future = asyncio.get_running_loop().create_future()

      @watcher.on(self.ref.device, 'connection')
      def _(connection: device.Connection) -> None:
        if connection.transport == core.BT_LE_TRANSPORT:
          ref_dut_connection_future.set_result(connection)

      self.logger.info('[DUT] Connect to REF.')
      if create_bond:
        self.assertTrue(
            self.dut.bt.createBond(
                ref_addr,
                android_constants.Transport.LE,
                ref_connection_address_type,
            )
        )
      else:
        gatt_client = await self.dut.bl4a.connect_gatt_client(
            address=ref_addr,
            address_type=ref_connection_address_type,
            transport=android_constants.Transport.LE,
        )
        self.test_case_context.push(gatt_client)

      async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
        ref_dut_connection = await ref_dut_connection_future

      await self.ref.device.stop_advertising()
      return ref_dut_connection

  @retry.retry_on_exception()
  async def _make_incoming_connection(
      self, ref_connection_address_type: _AddressType
  ) -> device.Connection:
    # Generate a random UUID for testing.
    service_uuid = str(uuid.uuid4())

    self.logger.info(
        '[DUT] Start advertising with service UUID %s.', service_uuid
    )
    advertise = await self.dut.bl4a.start_legacy_advertiser(
        settings=bl4a_api.LegacyAdvertiseSettings(
            own_address_type=_AddressType.RANDOM
        ),
        advertising_data=bl4a_api.AdvertisingData(service_uuids=[service_uuid]),
    )

    self.logger.info('[REF] Scan for DUT.')
    scan_result = asyncio.get_running_loop().create_future()
    with advertise, pyee_extensions.EventWatcher() as watcher:

      def on_advertising_report(adv: device.Advertisement) -> None:
        if service_uuids := adv.data.get(
            core.AdvertisingData.Type.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS
        ):
          if service_uuid in service_uuids and not scan_result.done():
            scan_result.set_result(adv.address)

      watcher.on(self.ref.device, 'advertisement', on_advertising_report)
      await self.ref.device.start_scanning()
      self.logger.info(
          '[REF] Wait for advertising report(scan result) from DUT.'
      )
      async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
        dut_addr = await scan_result
      await self.ref.device.stop_scanning()

      self.logger.info('[REF] Connect to DUT.')
      ref_dut_connection = await self.ref.device.connect(
          dut_addr,
          transport=core.BT_LE_TRANSPORT,
          own_address_type=ref_connection_address_type,
      )
      # Remote may not receive CONNECT_IND, so we need to send something to make
      # sure connection is established correctly.
      await ref_dut_connection.get_remote_le_features()

    return ref_dut_connection

  @navi_test_base.parameterized(*(
      (
          variant,
          connection_direction,
          pairing_direction,
          ref_io_capability,
          ref_connection_address_type,
          smp_key_distribution,
      )
      for (
          variant,
          connection_direction,
          pairing_direction,
          ref_io_capability,
          ref_connection_address_type,
          smp_key_distribution,
      ) in itertools.product(
          list(TestVariant),
          list(_Direction),
          list(_Direction),
          (
              pairing.PairingDelegate.NO_OUTPUT_NO_INPUT,
              pairing.PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
          ),
          (_AddressType.RANDOM, _AddressType.PUBLIC),
          (
              # IRK + LTK
              _KeyDistribution.DISTRIBUTE_ENCRYPTION_KEY
              | _KeyDistribution.DISTRIBUTE_IDENTITY_KEY,
              # IRK + LTK + LK (CTKD)
              _KeyDistribution.DISTRIBUTE_ENCRYPTION_KEY
              | _KeyDistribution.DISTRIBUTE_IDENTITY_KEY
              | _KeyDistribution.DISTRIBUTE_LINK_KEY,
          ),
      )
      # Android cannot send SMP_Security_Request.
      if not (
          connection_direction == _Direction.INCOMING
          and pairing_direction == _Direction.OUTGOING
      )
  ))
  @navi_test_base.retry(max_count=2)
  async def test_secure_pairing(
      self,
      variant: TestVariant,
      connection_direction: _Direction,
      pairing_direction: _Direction,
      ref_io_capability: pairing.PairingDelegate.IoCapability,
      ref_connection_address_type: _AddressType,
      smp_key_distribution: _KeyDistribution,
  ) -> None:
    """Tests LE Secure pairing.

    Test steps:
      1. Setup configurations.
      2. Make ACL connections.
      3. Start pairing.
      4. Wait for pairing requests and verify pins.
      5. Make actions corresponding to variants.
      6. Verify final states.

    Args:
      variant: Action to perform in the pairing procedure.
      connection_direction: Direction of connection. DUT->REF is outgoing, and
        vice versa.
      pairing_direction: Direction of pairing. DUT->REF is outgoing, and vice
        versa.
      ref_io_capability: IO Capability on the REF device.
      ref_connection_address_type: OwnAddressType of REF used in LE-ACL.
      smp_key_distribution: Key distribution to be specified by the REF device.
    """

    # #######################
    # Setup stage
    # #######################

    pairing_delegate = pairing_utils.PairingDelegate(
        auto_accept=True,
        io_capability=ref_io_capability,
        local_initiator_key_distribution=smp_key_distribution,
        local_responder_key_distribution=smp_key_distribution,
    )

    def pairing_config_factory(
        _: device.Connection,
    ) -> pairing.PairingConfig:
      return pairing.PairingConfig(
          sc=True,
          mitm=True,
          bonding=True,
          identity_address_type=pairing.PairingConfig.AddressType.PUBLIC,
          delegate=pairing_delegate,
      )

    self.ref.device.pairing_config_factory = pairing_config_factory

    dut_cb = self.dut.bl4a.register_callback(bl4a_api.Module.ADAPTER)
    self.test_case_context.push(dut_cb)
    ref_addr = str(
        self.ref.random_address
        if ref_connection_address_type == _AddressType.RANDOM
        else self.ref.address
    ).upper()

    need_double_confirmation = (
        connection_direction == _Direction.OUTGOING
        and pairing_direction == _Direction.INCOMING
    )

    # ##############################################
    # Connecting & pairing initiating stage
    # ##############################################

    ref_dut: device.Connection
    pair_task: asyncio.Task | None = None
    if connection_direction == _Direction.OUTGOING:
      if pairing_direction == _Direction.INCOMING:
        ref_dut = await self._make_outgoing_connection(
            ref_connection_address_type, create_bond=False
        )
        self.logger.info('[REF] Request pairing.')
        ref_dut.request_pairing()
      else:
        self.logger.info('[DUT] Start pairing.')
        ref_dut = await self._make_outgoing_connection(
            ref_connection_address_type, create_bond=True
        )
        # Clean all bond state events since there might be some events produced
        # by retries.
        dut_cb.get_all_events(bl4a_api.BondStateChanged)
    else:
      ref_dut = await self._make_incoming_connection(
          ref_connection_address_type
      )
      if pairing_direction == _Direction.INCOMING:
        self.logger.info('[REF] Start pairing.')
        pair_task = asyncio.create_task(ref_dut.pair())
      else:
        self.logger.info('[DUT] Start pairing.')
        self.dut.bt.createBond(
            ref_addr,
            android_constants.Transport.LE,
            ref_connection_address_type,
        )

    # #######################
    # Pairing stage
    # #######################

    self.logger.info('[DUT] Wait for pairing request.')
    dut_pairing_event = await dut_cb.wait_for_event(
        bl4a_api.PairingRequest,
        lambda e: (e.address == ref_addr),
        timeout=_DEFAULT_SETUP_TIMEOUT_SECONDS,
    )

    if need_double_confirmation:
      self.logger.info('[DUT] Provide initial pairing confirmation.')
      self.dut.bt.setPairingConfirmation(ref_addr, True)
      self.logger.info('[DUT] Wait for 2nd pairing request.')
      dut_pairing_event = await dut_cb.wait_for_event(
          bl4a_api.PairingRequest,
          lambda e: (e.address == ref_addr),
          timeout=_DEFAULT_SETUP_TIMEOUT_SECONDS,
      )

    self.logger.info('[REF] Wait for pairing request.')
    ref_pairing_event = await asyncio.wait_for(
        pairing_delegate.pairing_events.get(),
        timeout=_DEFAULT_SETUP_TIMEOUT_SECONDS,
    )
    ref_answer = variant != TestVariant.REJECTED

    self.logger.info('[DUT] Check reported pairing method.')
    match ref_io_capability:
      case pairing.PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT:
        expected_dut_pairing_variant = _AndroidPairingVariant.CONSENT
        expected_ref_pairing_variant = _BumblePairingVariant.JUST_WORK
      case pairing.PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT:
        expected_dut_pairing_variant = (
            _AndroidPairingVariant.PASSKEY_CONFIRMATION
        )
        expected_ref_pairing_variant = _BumblePairingVariant.NUMERIC_COMPARISON
        self.assertEqual(ref_pairing_event.arg, dut_pairing_event.pin)
      case _:
        raise ValueError(f'Unsupported IO capability: {ref_io_capability}')

    self.assertEqual(dut_pairing_event.variant, expected_dut_pairing_variant)

    self.logger.info('[REF] Check reported pairing method.')
    self.assertEqual(ref_pairing_event.variant, expected_ref_pairing_variant)

    self.logger.info('[DUT] Handle pairing confirmation.')
    match variant:
      case TestVariant.ACCEPT | TestVariant.REJECTED:
        self.dut.bt.setPairingConfirmation(ref_addr, True)
      case TestVariant.REJECT:
        self.dut.bt.cancelBond(ref_addr)
      case _:
        # [DUT] Do nothing.
        ...

    self.logger.info('[REF] Handle pairing confirmation.')
    if variant == TestVariant.DISCONNECTED:
      await ref_dut.disconnect()

    pairing_delegate.pairing_answers.put_nowait(ref_answer)

    self.logger.info('[DUT] Check final state.')
    expect_state = (
        android_constants.BondState.BONDED
        if variant == TestVariant.ACCEPT
        else android_constants.BondState.NONE
    )
    actual_state = (
        await dut_cb.wait_for_event(
            bl4a_api.BondStateChanged,
            lambda e: (e.state in _TERMINATED_BOND_STATES),
            timeout=_DEFAULT_SETUP_TIMEOUT_SECONDS,
        )
    ).state
    self.assertEqual(actual_state, expect_state)

    if pair_task:
      self.logger.info('[REF] Wait pairing complete.')
      if variant == TestVariant.ACCEPT:
        await pair_task
      else:
        with self.assertRaises((core.ProtocolError, asyncio.CancelledError)):
          await pair_task

  @navi_test_base.parameterized(*(
      (
          variant,
          connection_direction,
          pairing_direction,
          ref_io_capability,
      )
      for (
          variant,
          connection_direction,
          pairing_direction,
          ref_io_capability,
      ) in itertools.product(
          list(TestVariant),
          list(_Direction),
          list(_Direction),
          (
              pairing.PairingDelegate.NO_OUTPUT_NO_INPUT,
              pairing.PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
              pairing.PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
              pairing.PairingDelegate.DISPLAY_OUTPUT_ONLY,
              pairing.PairingDelegate.KEYBOARD_INPUT_ONLY,
          ),
      )
      # Android cannot send SMP_Security_Request.
      if not (
          connection_direction == _Direction.INCOMING
          and pairing_direction == _Direction.OUTGOING
      )
  ))
  @navi_test_base.retry(max_count=2)
  async def test_legacy_pairing(
      self,
      variant: TestVariant,
      connection_direction: _Direction,
      pairing_direction: _Direction,
      ref_io_capability: pairing.PairingDelegate.IoCapability,
  ) -> None:
    """Tests LE Secure pairing.

    Test steps:
      1. Setup configurations.
      2. Make ACL connections.
      3. Start pairing.
      4. Wait for pairing requests and verify pins.
      5. Make actions corresponding to variants.
      6. Verify final states.

    Args:
      variant: Action to perform in the pairing procedure.
      connection_direction: Direction of connection. DUT->REF is outgoing, and
        vice versa.
      pairing_direction: Direction of pairing. DUT->REF is outgoing, and vice
        versa.
      ref_io_capability: IO Capability on the REF device.
    """

    # ####################### Setup ##########################
    pairing_delegate = pairing_utils.PairingDelegate(
        auto_accept=True,
        io_capability=ref_io_capability,
        local_initiator_key_distribution=pairing.PairingDelegate.DEFAULT_KEY_DISTRIBUTION,
        local_responder_key_distribution=pairing.PairingDelegate.DEFAULT_KEY_DISTRIBUTION,
    )

    def pairing_config_factory(_: device.Connection) -> pairing.PairingConfig:
      return pairing.PairingConfig(
          sc=False,
          mitm=True,
          bonding=True,
          identity_address_type=pairing.PairingConfig.AddressType.PUBLIC,
          delegate=pairing_delegate,
      )

    self.ref.device.pairing_config_factory = pairing_config_factory

    dut_cb = self.dut.bl4a.register_callback(bl4a_api.Module.ADAPTER)
    self.test_case_context.push(dut_cb)
    ref_addr = self.ref.random_address

    need_double_confirmation = (
        connection_direction == _Direction.OUTGOING
        and pairing_direction == _Direction.INCOMING
    )

    # ####################### Connecting ##########################
    ref_dut: device.Connection
    pair_task: asyncio.Task | None = None
    if connection_direction == _Direction.OUTGOING:
      if pairing_direction == _Direction.INCOMING:
        ref_dut = await self._make_outgoing_connection(
            _AddressType.RANDOM, create_bond=False
        )
        self.logger.info('[REF] Request pairing.')
        ref_dut.request_pairing()
      else:
        self.logger.info('[DUT] Start pairing.')
        ref_dut = await self._make_outgoing_connection(
            _AddressType.RANDOM, create_bond=True
        )
        # Clean all bond state events since there might be some events produced
        # by retries.
        dut_cb.get_all_events(bl4a_api.BondStateChanged)
    else:
      ref_dut = await self._make_incoming_connection(_AddressType.RANDOM)
      if pairing_direction == _Direction.INCOMING:
        self.logger.info('[REF] Start pairing.')
        pair_task = asyncio.create_task(ref_dut.pair())
      else:
        self.logger.info('[DUT] Start pairing.')
        self.dut.bt.createBond(
            ref_addr,
            android_constants.Transport.LE,
            _AddressType.RANDOM,
        )

    # ####################### Pairing ##########################
    self.logger.info('[DUT] Wait for pairing request.')
    dut_pairing_event = await dut_cb.wait_for_event(
        bl4a_api.PairingRequest,
        lambda e: (e.address == ref_addr),
        timeout=_DEFAULT_SETUP_TIMEOUT_SECONDS,
    )

    if need_double_confirmation:
      self.logger.info('[DUT] Provide initial pairing confirmation.')
      self.dut.bt.setPairingConfirmation(ref_addr, True)
      self.logger.info('[DUT] Wait for 2nd pairing request.')
      dut_pairing_event = await dut_cb.wait_for_event(
          bl4a_api.PairingRequest,
          lambda e: (e.address == ref_addr),
          timeout=_DEFAULT_SETUP_TIMEOUT_SECONDS,
      )

    if (
        ref_io_capability
        != pairing.PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT
    ):
      self.logger.info('[REF] Wait for pairing request.')
      async with self.assert_not_timeout(_DEFAULT_SETUP_TIMEOUT_SECONDS):
        ref_pairing_event = await pairing_delegate.pairing_events.get()
    else:
      ref_pairing_event = pairing_utils.PairingEvent(
          _BumblePairingVariant.JUST_WORK, None
      )

    dut_accept = variant != TestVariant.REJECT
    ref_accept = variant != TestVariant.REJECTED
    ref_answer: pairing_utils.PairingAnswer
    dut_answer: Callable[[], Any]

    self.logger.info('[DUT] Check reported pairing method.')
    match ref_io_capability, connection_direction:
      case (pairing.PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT, _):
        expected_dut_pairing_variant = _AndroidPairingVariant.CONSENT
        expected_ref_pairing_variant = _BumblePairingVariant.JUST_WORK
        ref_answer = ref_accept
        dut_answer = lambda: self.dut.bt.setPairingConfirmation(ref_addr, True)
      case (
          pairing.PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY,
          _,
      ) | (
          pairing.PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
          _Direction.OUTGOING,
      ):
        expected_dut_pairing_variant = _AndroidPairingVariant.DISPLAY_PASSKEY
        expected_ref_pairing_variant = (
            _BumblePairingVariant.PASSKEY_ENTRY_REQUEST
        )
        ref_answer = dut_pairing_event.pin if ref_accept else None
        dut_answer = lambda: None
      case (
          pairing.PairingDelegate.IoCapability.DISPLAY_OUTPUT_ONLY
          | pairing.PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
          _,
      ) | (
          pairing.PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
          _Direction.INCOMING,
      ):
        expected_dut_pairing_variant = _AndroidPairingVariant.PIN
        expected_ref_pairing_variant = (
            _BumblePairingVariant.PASSKEY_ENTRY_NOTIFICATION
        )
        ref_answer = dut_pairing_event.pin if ref_accept else None
        dut_answer = lambda: self.dut.bt.setPin(
            ref_addr, f'{ref_pairing_event.arg:06}'
        )
      case _:
        raise ValueError(f'Unsupported IO capability: {ref_io_capability}')

    self.assertEqual(dut_pairing_event.variant, expected_dut_pairing_variant)
    self.assertEqual(ref_pairing_event.variant, expected_ref_pairing_variant)

    self.logger.info('[DUT] Handle pairing confirmation.')
    if dut_accept:
      dut_answer()
    else:
      self.dut.bt.cancelBond(ref_addr)

    self.logger.info('[REF] Handle pairing confirmation.')
    match variant:
      case TestVariant.ACCEPT | TestVariant.REJECT:
        pairing_delegate.pairing_answers.put_nowait(ref_answer)
      case TestVariant.DISCONNECTED:
        await ref_dut.disconnect()
      case TestVariant.REJECTED:
        smp_session = self.ref.device.smp_manager.sessions[ref_dut.handle]
        smp_session.send_pairing_failed(smp.SMP_UNSPECIFIED_REASON_ERROR)

    self.logger.info('[DUT] Check final state.')
    expect_state = (
        android_constants.BondState.BONDED
        if variant == TestVariant.ACCEPT
        else android_constants.BondState.NONE
    )
    bond_state_changed_event = await dut_cb.wait_for_event(
        bl4a_api.BondStateChanged,
        lambda e: (e.state in _TERMINATED_BOND_STATES),
        timeout=_DEFAULT_SETUP_TIMEOUT_SECONDS,
    )
    self.assertEqual(bond_state_changed_event.state, expect_state)

    if pair_task:
      self.logger.info('[REF] Wait pairing complete.')
      if variant == TestVariant.ACCEPT:
        await pair_task
      else:
        with self.assertRaises((core.ProtocolError, asyncio.CancelledError)):
          await pair_task

  @navi_test_base.named_parameterized(*[
      dict(
          testcase_name=(
              f'{"secure" if sc else "legacy"}_{direction.name}_{address_type.name}'
              .lower()
          ),
          sc=sc,
          pairing_direction=direction,
          ref_connection_address_type=address_type,
      )
      for sc, direction, address_type in itertools.product(
          {True, False},
          {_Direction.OUTGOING, _Direction.INCOMING},
          {_AddressType.PUBLIC, _AddressType.RANDOM},
      )
      # Legacy incoming pairing is not supported on Android.
      if sc or direction == _Direction.OUTGOING
  ])
  async def test_oob_pairing(
      self,
      sc: bool,
      pairing_direction: _Direction,
      ref_connection_address_type: _AddressType,
  ) -> None:
    """Tests LE OOB pairing.

    Test steps:
      1. Setup configurations.
      2. Exchange OOB data.
      3. Start pairing.
      4. Verify final states.

    Note: Legacy variants fail from 24Q4 to 25Q3 due to stack issue.

    Args:
      sc: Whether to use secure connection.
      pairing_direction: Direction of pairing. DUT->REF is outgoing, and vice
        versa.
      ref_connection_address_type: Address type of the REF device.
    """

    # TODO: Remove this when the patch is merged.
    class Session(smp.Session):

      def __init__(
          self,
          manager: smp.Manager,
          connection: smp.Connection,
          pairing_config: pairing.PairingConfig,
          is_initiator: bool,
      ) -> None:
        super().__init__(manager, connection, pairing_config, is_initiator)
        if pairing_config.oob and (not self.sc or pairing_config.oob.peer_data):
          self.oob_data_flag = 1
        else:
          self.oob_data_flag = 0

    self.ref.device.smp_manager.session_proxy = Session

    pairing_delegate = pairing_utils.PairingDelegate(
        auto_accept=True,
        local_initiator_key_distribution=pairing.PairingDelegate.DEFAULT_KEY_DISTRIBUTION,
        local_responder_key_distribution=pairing.PairingDelegate.DEFAULT_KEY_DISTRIBUTION,
    )
    ref_oob_context = pairing.OobContext()
    ref_oob_legacy_context = pairing.OobLegacyContext()
    ref_oob_config = pairing.PairingConfig.OobConfig(
        our_context=ref_oob_context,
        peer_data=None,
        legacy_context=ref_oob_legacy_context,
    )
    if ref_connection_address_type == _AddressType.RANDOM:
      ref_address_bytes = bytes(self.ref.device.random_address)
      ref_address_type = hci.AddressType.RANDOM_DEVICE
      ref_address = self.ref.random_address
    else:
      ref_address_bytes = bytes(self.ref.device.public_address)
      ref_address_type = hci.AddressType.PUBLIC_DEVICE
      ref_address = self.ref.address
    ref_address_with_type_bytes = ref_address_bytes + bytes([ref_address_type])

    def pairing_config_factory(_: device.Connection) -> pairing.PairingConfig:
      return pairing.PairingConfig(
          sc=sc,
          mitm=True,
          bonding=True,
          identity_address_type=pairing.PairingConfig.AddressType.PUBLIC,
          delegate=pairing_delegate,
          oob=ref_oob_config,
      )

    self.ref.device.pairing_config_factory = pairing_config_factory

    dut_cb = self.dut.bl4a.register_callback(bl4a_api.Module.ADAPTER)
    self.test_case_context.push(dut_cb)
    ref_pairing_events = asyncio.Queue[None]()

    # Register a callback to get pairing events from the REF device.
    @self.ref.device.on(device.Device.EVENT_CONNECTION)
    def _(connection: device.Connection) -> None:
      connection.on(connection.EVENT_PAIRING, ref_pairing_events.put)

    if pairing_direction == _Direction.INCOMING:
      shared_data_from_dut = self.dut.bl4a.generate_oob_data(
          android_constants.Transport.LE
      )
      ref_oob_config.peer_data = pairing.OobSharedData(
          c=shared_data_from_dut.confirmation_hash,
          r=shared_data_from_dut.randomizer_hash or b'',
      )
      ref_oob_legacy_context.tk = shared_data_from_dut.le_temporary_key or b''
      ref_dut = await self._make_incoming_connection(
          ref_connection_address_type
      )
      self.logger.info('[REF] Start pairing.')
      async with self.assert_not_timeout(_DEFAULT_SETUP_TIMEOUT_SECONDS):
        await ref_dut.pair()
    else:
      self.logger.info('[REF] Start advertising.')
      async with self.assert_not_timeout(_DEFAULT_SETUP_TIMEOUT_SECONDS):
        await self.ref.device.start_advertising(
            own_address_type=ref_connection_address_type, auto_restart=False
        )

      shared_data_from_ref = ref_oob_context.share()
      dut_oob_data = bl4a_api.OobData(
          confirmation_hash=shared_data_from_ref.c,
          randomizer_hash=shared_data_from_ref.r,
          device_address_with_type=ref_address_with_type_bytes,
          le_device_role=core.LeRole.PERIPHERAL_ONLY,
          le_temporary_key=ref_oob_legacy_context.tk,
      )
      self.logger.info('[DUT] Start OOB pairing.')
      result = self.dut.bl4a.create_bond_oob(
          address=ref_address,
          address_type=(
              android_constants.AddressTypeStatus.RANDOM
              if ref_connection_address_type == _AddressType.RANDOM
              else android_constants.AddressTypeStatus.PUBLIC
          ),
          transport=android_constants.Transport.LE,
          p_192_data=dut_oob_data if not sc else None,
          p_256_data=dut_oob_data if sc else None,
      )
      self.assertTrue(result, '[DUT] Failed to create bond')

    self.logger.info('[DUT] Wait for pairing complete.')
    bonded_event = await dut_cb.wait_for_event(
        bl4a_api.BondStateChanged(
            address=ref_address, state=matcher.any_of(*_TERMINATED_BOND_STATES)
        )
    )
    self.assertEqual(bonded_event.state, android_constants.BondState.BONDED)

    self.logger.info('[REF] Wait for pairing complete.')
    async with self.assert_not_timeout(_DEFAULT_SETUP_TIMEOUT_SECONDS):
      await ref_pairing_events.get()

Tests LE Secure pairing.

Test steps
  1. Setup configurations.
  2. Make ACL connections.
  3. Start pairing.
  4. Wait for pairing requests and verify pins.
  5. Make actions corresponding to variants.
  6. Verify final states.

Parameters:

Name Type Description Default
variant TestVariant

Action to perform in the pairing procedure.

required
connection_direction _Direction

Direction of connection. DUT->REF is outgoing, and vice versa.

required
pairing_direction _Direction

Direction of pairing. DUT->REF is outgoing, and vice versa.

required
ref_io_capability IoCapability

IO Capability on the REF device.

required
Source code in navi/tests/functionality/le_pairing_test.py
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
@navi_test_base.parameterized(*(
    (
        variant,
        connection_direction,
        pairing_direction,
        ref_io_capability,
    )
    for (
        variant,
        connection_direction,
        pairing_direction,
        ref_io_capability,
    ) in itertools.product(
        list(TestVariant),
        list(_Direction),
        list(_Direction),
        (
            pairing.PairingDelegate.NO_OUTPUT_NO_INPUT,
            pairing.PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
            pairing.PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
            pairing.PairingDelegate.DISPLAY_OUTPUT_ONLY,
            pairing.PairingDelegate.KEYBOARD_INPUT_ONLY,
        ),
    )
    # Android cannot send SMP_Security_Request.
    if not (
        connection_direction == _Direction.INCOMING
        and pairing_direction == _Direction.OUTGOING
    )
))
@navi_test_base.retry(max_count=2)
async def test_legacy_pairing(
    self,
    variant: TestVariant,
    connection_direction: _Direction,
    pairing_direction: _Direction,
    ref_io_capability: pairing.PairingDelegate.IoCapability,
) -> None:
  """Tests LE Secure pairing.

  Test steps:
    1. Setup configurations.
    2. Make ACL connections.
    3. Start pairing.
    4. Wait for pairing requests and verify pins.
    5. Make actions corresponding to variants.
    6. Verify final states.

  Args:
    variant: Action to perform in the pairing procedure.
    connection_direction: Direction of connection. DUT->REF is outgoing, and
      vice versa.
    pairing_direction: Direction of pairing. DUT->REF is outgoing, and vice
      versa.
    ref_io_capability: IO Capability on the REF device.
  """

  # ####################### Setup ##########################
  pairing_delegate = pairing_utils.PairingDelegate(
      auto_accept=True,
      io_capability=ref_io_capability,
      local_initiator_key_distribution=pairing.PairingDelegate.DEFAULT_KEY_DISTRIBUTION,
      local_responder_key_distribution=pairing.PairingDelegate.DEFAULT_KEY_DISTRIBUTION,
  )

  def pairing_config_factory(_: device.Connection) -> pairing.PairingConfig:
    return pairing.PairingConfig(
        sc=False,
        mitm=True,
        bonding=True,
        identity_address_type=pairing.PairingConfig.AddressType.PUBLIC,
        delegate=pairing_delegate,
    )

  self.ref.device.pairing_config_factory = pairing_config_factory

  dut_cb = self.dut.bl4a.register_callback(bl4a_api.Module.ADAPTER)
  self.test_case_context.push(dut_cb)
  ref_addr = self.ref.random_address

  need_double_confirmation = (
      connection_direction == _Direction.OUTGOING
      and pairing_direction == _Direction.INCOMING
  )

  # ####################### Connecting ##########################
  ref_dut: device.Connection
  pair_task: asyncio.Task | None = None
  if connection_direction == _Direction.OUTGOING:
    if pairing_direction == _Direction.INCOMING:
      ref_dut = await self._make_outgoing_connection(
          _AddressType.RANDOM, create_bond=False
      )
      self.logger.info('[REF] Request pairing.')
      ref_dut.request_pairing()
    else:
      self.logger.info('[DUT] Start pairing.')
      ref_dut = await self._make_outgoing_connection(
          _AddressType.RANDOM, create_bond=True
      )
      # Clean all bond state events since there might be some events produced
      # by retries.
      dut_cb.get_all_events(bl4a_api.BondStateChanged)
  else:
    ref_dut = await self._make_incoming_connection(_AddressType.RANDOM)
    if pairing_direction == _Direction.INCOMING:
      self.logger.info('[REF] Start pairing.')
      pair_task = asyncio.create_task(ref_dut.pair())
    else:
      self.logger.info('[DUT] Start pairing.')
      self.dut.bt.createBond(
          ref_addr,
          android_constants.Transport.LE,
          _AddressType.RANDOM,
      )

  # ####################### Pairing ##########################
  self.logger.info('[DUT] Wait for pairing request.')
  dut_pairing_event = await dut_cb.wait_for_event(
      bl4a_api.PairingRequest,
      lambda e: (e.address == ref_addr),
      timeout=_DEFAULT_SETUP_TIMEOUT_SECONDS,
  )

  if need_double_confirmation:
    self.logger.info('[DUT] Provide initial pairing confirmation.')
    self.dut.bt.setPairingConfirmation(ref_addr, True)
    self.logger.info('[DUT] Wait for 2nd pairing request.')
    dut_pairing_event = await dut_cb.wait_for_event(
        bl4a_api.PairingRequest,
        lambda e: (e.address == ref_addr),
        timeout=_DEFAULT_SETUP_TIMEOUT_SECONDS,
    )

  if (
      ref_io_capability
      != pairing.PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT
  ):
    self.logger.info('[REF] Wait for pairing request.')
    async with self.assert_not_timeout(_DEFAULT_SETUP_TIMEOUT_SECONDS):
      ref_pairing_event = await pairing_delegate.pairing_events.get()
  else:
    ref_pairing_event = pairing_utils.PairingEvent(
        _BumblePairingVariant.JUST_WORK, None
    )

  dut_accept = variant != TestVariant.REJECT
  ref_accept = variant != TestVariant.REJECTED
  ref_answer: pairing_utils.PairingAnswer
  dut_answer: Callable[[], Any]

  self.logger.info('[DUT] Check reported pairing method.')
  match ref_io_capability, connection_direction:
    case (pairing.PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT, _):
      expected_dut_pairing_variant = _AndroidPairingVariant.CONSENT
      expected_ref_pairing_variant = _BumblePairingVariant.JUST_WORK
      ref_answer = ref_accept
      dut_answer = lambda: self.dut.bt.setPairingConfirmation(ref_addr, True)
    case (
        pairing.PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY,
        _,
    ) | (
        pairing.PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
        _Direction.OUTGOING,
    ):
      expected_dut_pairing_variant = _AndroidPairingVariant.DISPLAY_PASSKEY
      expected_ref_pairing_variant = (
          _BumblePairingVariant.PASSKEY_ENTRY_REQUEST
      )
      ref_answer = dut_pairing_event.pin if ref_accept else None
      dut_answer = lambda: None
    case (
        pairing.PairingDelegate.IoCapability.DISPLAY_OUTPUT_ONLY
        | pairing.PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
        _,
    ) | (
        pairing.PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
        _Direction.INCOMING,
    ):
      expected_dut_pairing_variant = _AndroidPairingVariant.PIN
      expected_ref_pairing_variant = (
          _BumblePairingVariant.PASSKEY_ENTRY_NOTIFICATION
      )
      ref_answer = dut_pairing_event.pin if ref_accept else None
      dut_answer = lambda: self.dut.bt.setPin(
          ref_addr, f'{ref_pairing_event.arg:06}'
      )
    case _:
      raise ValueError(f'Unsupported IO capability: {ref_io_capability}')

  self.assertEqual(dut_pairing_event.variant, expected_dut_pairing_variant)
  self.assertEqual(ref_pairing_event.variant, expected_ref_pairing_variant)

  self.logger.info('[DUT] Handle pairing confirmation.')
  if dut_accept:
    dut_answer()
  else:
    self.dut.bt.cancelBond(ref_addr)

  self.logger.info('[REF] Handle pairing confirmation.')
  match variant:
    case TestVariant.ACCEPT | TestVariant.REJECT:
      pairing_delegate.pairing_answers.put_nowait(ref_answer)
    case TestVariant.DISCONNECTED:
      await ref_dut.disconnect()
    case TestVariant.REJECTED:
      smp_session = self.ref.device.smp_manager.sessions[ref_dut.handle]
      smp_session.send_pairing_failed(smp.SMP_UNSPECIFIED_REASON_ERROR)

  self.logger.info('[DUT] Check final state.')
  expect_state = (
      android_constants.BondState.BONDED
      if variant == TestVariant.ACCEPT
      else android_constants.BondState.NONE
  )
  bond_state_changed_event = await dut_cb.wait_for_event(
      bl4a_api.BondStateChanged,
      lambda e: (e.state in _TERMINATED_BOND_STATES),
      timeout=_DEFAULT_SETUP_TIMEOUT_SECONDS,
  )
  self.assertEqual(bond_state_changed_event.state, expect_state)

  if pair_task:
    self.logger.info('[REF] Wait pairing complete.')
    if variant == TestVariant.ACCEPT:
      await pair_task
    else:
      with self.assertRaises((core.ProtocolError, asyncio.CancelledError)):
        await pair_task

Tests LE OOB pairing.

Test steps
  1. Setup configurations.
  2. Exchange OOB data.
  3. Start pairing.
  4. Verify final states.

Note: Legacy variants fail from 24Q4 to 25Q3 due to stack issue.

Parameters:

Name Type Description Default
sc bool

Whether to use secure connection.

required
pairing_direction _Direction

Direction of pairing. DUT->REF is outgoing, and vice versa.

required
ref_connection_address_type _AddressType

Address type of the REF device.

required
Source code in navi/tests/functionality/le_pairing_test.py
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
@navi_test_base.named_parameterized(*[
    dict(
        testcase_name=(
            f'{"secure" if sc else "legacy"}_{direction.name}_{address_type.name}'
            .lower()
        ),
        sc=sc,
        pairing_direction=direction,
        ref_connection_address_type=address_type,
    )
    for sc, direction, address_type in itertools.product(
        {True, False},
        {_Direction.OUTGOING, _Direction.INCOMING},
        {_AddressType.PUBLIC, _AddressType.RANDOM},
    )
    # Legacy incoming pairing is not supported on Android.
    if sc or direction == _Direction.OUTGOING
])
async def test_oob_pairing(
    self,
    sc: bool,
    pairing_direction: _Direction,
    ref_connection_address_type: _AddressType,
) -> None:
  """Tests LE OOB pairing.

  Test steps:
    1. Setup configurations.
    2. Exchange OOB data.
    3. Start pairing.
    4. Verify final states.

  Note: Legacy variants fail from 24Q4 to 25Q3 due to stack issue.

  Args:
    sc: Whether to use secure connection.
    pairing_direction: Direction of pairing. DUT->REF is outgoing, and vice
      versa.
    ref_connection_address_type: Address type of the REF device.
  """

  # TODO: Remove this when the patch is merged.
  class Session(smp.Session):

    def __init__(
        self,
        manager: smp.Manager,
        connection: smp.Connection,
        pairing_config: pairing.PairingConfig,
        is_initiator: bool,
    ) -> None:
      super().__init__(manager, connection, pairing_config, is_initiator)
      if pairing_config.oob and (not self.sc or pairing_config.oob.peer_data):
        self.oob_data_flag = 1
      else:
        self.oob_data_flag = 0

  self.ref.device.smp_manager.session_proxy = Session

  pairing_delegate = pairing_utils.PairingDelegate(
      auto_accept=True,
      local_initiator_key_distribution=pairing.PairingDelegate.DEFAULT_KEY_DISTRIBUTION,
      local_responder_key_distribution=pairing.PairingDelegate.DEFAULT_KEY_DISTRIBUTION,
  )
  ref_oob_context = pairing.OobContext()
  ref_oob_legacy_context = pairing.OobLegacyContext()
  ref_oob_config = pairing.PairingConfig.OobConfig(
      our_context=ref_oob_context,
      peer_data=None,
      legacy_context=ref_oob_legacy_context,
  )
  if ref_connection_address_type == _AddressType.RANDOM:
    ref_address_bytes = bytes(self.ref.device.random_address)
    ref_address_type = hci.AddressType.RANDOM_DEVICE
    ref_address = self.ref.random_address
  else:
    ref_address_bytes = bytes(self.ref.device.public_address)
    ref_address_type = hci.AddressType.PUBLIC_DEVICE
    ref_address = self.ref.address
  ref_address_with_type_bytes = ref_address_bytes + bytes([ref_address_type])

  def pairing_config_factory(_: device.Connection) -> pairing.PairingConfig:
    return pairing.PairingConfig(
        sc=sc,
        mitm=True,
        bonding=True,
        identity_address_type=pairing.PairingConfig.AddressType.PUBLIC,
        delegate=pairing_delegate,
        oob=ref_oob_config,
    )

  self.ref.device.pairing_config_factory = pairing_config_factory

  dut_cb = self.dut.bl4a.register_callback(bl4a_api.Module.ADAPTER)
  self.test_case_context.push(dut_cb)
  ref_pairing_events = asyncio.Queue[None]()

  # Register a callback to get pairing events from the REF device.
  @self.ref.device.on(device.Device.EVENT_CONNECTION)
  def _(connection: device.Connection) -> None:
    connection.on(connection.EVENT_PAIRING, ref_pairing_events.put)

  if pairing_direction == _Direction.INCOMING:
    shared_data_from_dut = self.dut.bl4a.generate_oob_data(
        android_constants.Transport.LE
    )
    ref_oob_config.peer_data = pairing.OobSharedData(
        c=shared_data_from_dut.confirmation_hash,
        r=shared_data_from_dut.randomizer_hash or b'',
    )
    ref_oob_legacy_context.tk = shared_data_from_dut.le_temporary_key or b''
    ref_dut = await self._make_incoming_connection(
        ref_connection_address_type
    )
    self.logger.info('[REF] Start pairing.')
    async with self.assert_not_timeout(_DEFAULT_SETUP_TIMEOUT_SECONDS):
      await ref_dut.pair()
  else:
    self.logger.info('[REF] Start advertising.')
    async with self.assert_not_timeout(_DEFAULT_SETUP_TIMEOUT_SECONDS):
      await self.ref.device.start_advertising(
          own_address_type=ref_connection_address_type, auto_restart=False
      )

    shared_data_from_ref = ref_oob_context.share()
    dut_oob_data = bl4a_api.OobData(
        confirmation_hash=shared_data_from_ref.c,
        randomizer_hash=shared_data_from_ref.r,
        device_address_with_type=ref_address_with_type_bytes,
        le_device_role=core.LeRole.PERIPHERAL_ONLY,
        le_temporary_key=ref_oob_legacy_context.tk,
    )
    self.logger.info('[DUT] Start OOB pairing.')
    result = self.dut.bl4a.create_bond_oob(
        address=ref_address,
        address_type=(
            android_constants.AddressTypeStatus.RANDOM
            if ref_connection_address_type == _AddressType.RANDOM
            else android_constants.AddressTypeStatus.PUBLIC
        ),
        transport=android_constants.Transport.LE,
        p_192_data=dut_oob_data if not sc else None,
        p_256_data=dut_oob_data if sc else None,
    )
    self.assertTrue(result, '[DUT] Failed to create bond')

  self.logger.info('[DUT] Wait for pairing complete.')
  bonded_event = await dut_cb.wait_for_event(
      bl4a_api.BondStateChanged(
          address=ref_address, state=matcher.any_of(*_TERMINATED_BOND_STATES)
      )
  )
  self.assertEqual(bonded_event.state, android_constants.BondState.BONDED)

  self.logger.info('[REF] Wait for pairing complete.')
  async with self.assert_not_timeout(_DEFAULT_SETUP_TIMEOUT_SECONDS):
    await ref_pairing_events.get()

Tests LE Secure pairing.

Test steps
  1. Setup configurations.
  2. Make ACL connections.
  3. Start pairing.
  4. Wait for pairing requests and verify pins.
  5. Make actions corresponding to variants.
  6. Verify final states.

Parameters:

Name Type Description Default
variant TestVariant

Action to perform in the pairing procedure.

required
connection_direction _Direction

Direction of connection. DUT->REF is outgoing, and vice versa.

required
pairing_direction _Direction

Direction of pairing. DUT->REF is outgoing, and vice versa.

required
ref_io_capability IoCapability

IO Capability on the REF device.

required
ref_connection_address_type _AddressType

OwnAddressType of REF used in LE-ACL.

required
smp_key_distribution _KeyDistribution

Key distribution to be specified by the REF device.

required
Source code in navi/tests/functionality/le_pairing_test.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
@navi_test_base.parameterized(*(
    (
        variant,
        connection_direction,
        pairing_direction,
        ref_io_capability,
        ref_connection_address_type,
        smp_key_distribution,
    )
    for (
        variant,
        connection_direction,
        pairing_direction,
        ref_io_capability,
        ref_connection_address_type,
        smp_key_distribution,
    ) in itertools.product(
        list(TestVariant),
        list(_Direction),
        list(_Direction),
        (
            pairing.PairingDelegate.NO_OUTPUT_NO_INPUT,
            pairing.PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
        ),
        (_AddressType.RANDOM, _AddressType.PUBLIC),
        (
            # IRK + LTK
            _KeyDistribution.DISTRIBUTE_ENCRYPTION_KEY
            | _KeyDistribution.DISTRIBUTE_IDENTITY_KEY,
            # IRK + LTK + LK (CTKD)
            _KeyDistribution.DISTRIBUTE_ENCRYPTION_KEY
            | _KeyDistribution.DISTRIBUTE_IDENTITY_KEY
            | _KeyDistribution.DISTRIBUTE_LINK_KEY,
        ),
    )
    # Android cannot send SMP_Security_Request.
    if not (
        connection_direction == _Direction.INCOMING
        and pairing_direction == _Direction.OUTGOING
    )
))
@navi_test_base.retry(max_count=2)
async def test_secure_pairing(
    self,
    variant: TestVariant,
    connection_direction: _Direction,
    pairing_direction: _Direction,
    ref_io_capability: pairing.PairingDelegate.IoCapability,
    ref_connection_address_type: _AddressType,
    smp_key_distribution: _KeyDistribution,
) -> None:
  """Tests LE Secure pairing.

  Test steps:
    1. Setup configurations.
    2. Make ACL connections.
    3. Start pairing.
    4. Wait for pairing requests and verify pins.
    5. Make actions corresponding to variants.
    6. Verify final states.

  Args:
    variant: Action to perform in the pairing procedure.
    connection_direction: Direction of connection. DUT->REF is outgoing, and
      vice versa.
    pairing_direction: Direction of pairing. DUT->REF is outgoing, and vice
      versa.
    ref_io_capability: IO Capability on the REF device.
    ref_connection_address_type: OwnAddressType of REF used in LE-ACL.
    smp_key_distribution: Key distribution to be specified by the REF device.
  """

  # #######################
  # Setup stage
  # #######################

  pairing_delegate = pairing_utils.PairingDelegate(
      auto_accept=True,
      io_capability=ref_io_capability,
      local_initiator_key_distribution=smp_key_distribution,
      local_responder_key_distribution=smp_key_distribution,
  )

  def pairing_config_factory(
      _: device.Connection,
  ) -> pairing.PairingConfig:
    return pairing.PairingConfig(
        sc=True,
        mitm=True,
        bonding=True,
        identity_address_type=pairing.PairingConfig.AddressType.PUBLIC,
        delegate=pairing_delegate,
    )

  self.ref.device.pairing_config_factory = pairing_config_factory

  dut_cb = self.dut.bl4a.register_callback(bl4a_api.Module.ADAPTER)
  self.test_case_context.push(dut_cb)
  ref_addr = str(
      self.ref.random_address
      if ref_connection_address_type == _AddressType.RANDOM
      else self.ref.address
  ).upper()

  need_double_confirmation = (
      connection_direction == _Direction.OUTGOING
      and pairing_direction == _Direction.INCOMING
  )

  # ##############################################
  # Connecting & pairing initiating stage
  # ##############################################

  ref_dut: device.Connection
  pair_task: asyncio.Task | None = None
  if connection_direction == _Direction.OUTGOING:
    if pairing_direction == _Direction.INCOMING:
      ref_dut = await self._make_outgoing_connection(
          ref_connection_address_type, create_bond=False
      )
      self.logger.info('[REF] Request pairing.')
      ref_dut.request_pairing()
    else:
      self.logger.info('[DUT] Start pairing.')
      ref_dut = await self._make_outgoing_connection(
          ref_connection_address_type, create_bond=True
      )
      # Clean all bond state events since there might be some events produced
      # by retries.
      dut_cb.get_all_events(bl4a_api.BondStateChanged)
  else:
    ref_dut = await self._make_incoming_connection(
        ref_connection_address_type
    )
    if pairing_direction == _Direction.INCOMING:
      self.logger.info('[REF] Start pairing.')
      pair_task = asyncio.create_task(ref_dut.pair())
    else:
      self.logger.info('[DUT] Start pairing.')
      self.dut.bt.createBond(
          ref_addr,
          android_constants.Transport.LE,
          ref_connection_address_type,
      )

  # #######################
  # Pairing stage
  # #######################

  self.logger.info('[DUT] Wait for pairing request.')
  dut_pairing_event = await dut_cb.wait_for_event(
      bl4a_api.PairingRequest,
      lambda e: (e.address == ref_addr),
      timeout=_DEFAULT_SETUP_TIMEOUT_SECONDS,
  )

  if need_double_confirmation:
    self.logger.info('[DUT] Provide initial pairing confirmation.')
    self.dut.bt.setPairingConfirmation(ref_addr, True)
    self.logger.info('[DUT] Wait for 2nd pairing request.')
    dut_pairing_event = await dut_cb.wait_for_event(
        bl4a_api.PairingRequest,
        lambda e: (e.address == ref_addr),
        timeout=_DEFAULT_SETUP_TIMEOUT_SECONDS,
    )

  self.logger.info('[REF] Wait for pairing request.')
  ref_pairing_event = await asyncio.wait_for(
      pairing_delegate.pairing_events.get(),
      timeout=_DEFAULT_SETUP_TIMEOUT_SECONDS,
  )
  ref_answer = variant != TestVariant.REJECTED

  self.logger.info('[DUT] Check reported pairing method.')
  match ref_io_capability:
    case pairing.PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT:
      expected_dut_pairing_variant = _AndroidPairingVariant.CONSENT
      expected_ref_pairing_variant = _BumblePairingVariant.JUST_WORK
    case pairing.PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT:
      expected_dut_pairing_variant = (
          _AndroidPairingVariant.PASSKEY_CONFIRMATION
      )
      expected_ref_pairing_variant = _BumblePairingVariant.NUMERIC_COMPARISON
      self.assertEqual(ref_pairing_event.arg, dut_pairing_event.pin)
    case _:
      raise ValueError(f'Unsupported IO capability: {ref_io_capability}')

  self.assertEqual(dut_pairing_event.variant, expected_dut_pairing_variant)

  self.logger.info('[REF] Check reported pairing method.')
  self.assertEqual(ref_pairing_event.variant, expected_ref_pairing_variant)

  self.logger.info('[DUT] Handle pairing confirmation.')
  match variant:
    case TestVariant.ACCEPT | TestVariant.REJECTED:
      self.dut.bt.setPairingConfirmation(ref_addr, True)
    case TestVariant.REJECT:
      self.dut.bt.cancelBond(ref_addr)
    case _:
      # [DUT] Do nothing.
      ...

  self.logger.info('[REF] Handle pairing confirmation.')
  if variant == TestVariant.DISCONNECTED:
    await ref_dut.disconnect()

  pairing_delegate.pairing_answers.put_nowait(ref_answer)

  self.logger.info('[DUT] Check final state.')
  expect_state = (
      android_constants.BondState.BONDED
      if variant == TestVariant.ACCEPT
      else android_constants.BondState.NONE
  )
  actual_state = (
      await dut_cb.wait_for_event(
          bl4a_api.BondStateChanged,
          lambda e: (e.state in _TERMINATED_BOND_STATES),
          timeout=_DEFAULT_SETUP_TIMEOUT_SECONDS,
      )
  ).state
  self.assertEqual(actual_state, expect_state)

  if pair_task:
    self.logger.info('[REF] Wait pairing complete.')
    if variant == TestVariant.ACCEPT:
      await pair_task
    else:
      with self.assertRaises((core.ProtocolError, asyncio.CancelledError)):
        await pair_task

Bases: TwoDevicesTestBase

Source code in navi/tests/functionality/rfcomm_socket_test.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
class RfcommSocketTest(navi_test_base.TwoDevicesTestBase):

  @override
  async def async_setup_test(self) -> None:
    await super().async_setup_test()

    # Using highest authentication level to allow secure sockets.
    def pairing_config_factory(
        connection: device.Connection,
    ) -> pairing.PairingConfig:
      del connection  # Unused parameter.
      return pairing.PairingConfig(
          delegate=_PairingDelegate(
              io_capability=(
                  _PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT
              )
          )
      )

    self.ref.device.pairing_config_factory = pairing_config_factory
    # Disable CTKD.
    self.ref.device.l2cap_channel_manager.deregister_fixed_channel(
        smp.SMP_BR_CID
    )
    # Clear SDP records.
    self.ref.device.sdp_service_records.clear()

  async def _setup_pairing(self) -> None:
    ref_dut_acl = await self.classic_connect_and_pair()

    # Terminate ACL connection after pairing.
    with self.dut.bl4a.register_callback(bl4a_api.Module.ADAPTER) as dut_cb:
      # Disconnection may "fail" if the ACL is already disconnecting or
      # disconnected.
      with contextlib.suppress(core.BaseBumbleError):
        await ref_dut_acl.disconnect()
      self.logger.info("[DUT] Wait for disconnected.")
      await dut_cb.wait_for_event(bl4a_api.AclDisconnected)

    # Wait for 2 seconds to let controllers become idle.
    await asyncio.sleep(datetime.timedelta(seconds=2).total_seconds())

  def _setup_rfcomm_server_on_ref(
      self,
      service_record_handle: int,
      rfcomm_uuid: str,
      rfcomm_server: rfcomm.Server,
  ) -> asyncio.Queue[rfcomm.DLC]:
    """Sets up RFCOMM server on REF and returns a queue for its result.

    Args:
      service_record_handle: The service record handle for the RFCOMM server.
      rfcomm_uuid: The UUID of the RFCOMM server.
      rfcomm_server: The RFCOMM server to listen on.

    Returns:
      An asyncio queue for the RFCOMM server's incoming DLC.
    """

    accept_queue = asyncio.Queue[rfcomm.DLC](maxsize=1)
    rfcomm_channel = rfcomm_server.listen(
        acceptor=accept_queue.put_nowait,
    )
    self.logger.info(
        "[REF] Create RFCOMM socket server with rfcomm_uuid %s.",
        rfcomm_uuid,
    )

    self.ref.device.sdp_service_records[service_record_handle] = (
        rfcomm.make_service_sdp_records(
            service_record_handle=service_record_handle,
            channel=rfcomm_channel,
            uuid=core.UUID(rfcomm_uuid),
        )
    )

    return accept_queue

  @navi_test_base.parameterized(1, 2)
  async def test_rfcomm_socket_connections_simultaneously(
      self, num_connections: int
  ) -> None:
    """Tests one or two RFCOMM socket connections simultaneously.

    Typical duration: 30-50s.

    Test steps:
      1. Create RFCOMM sockets server on REF.
      2. Pair DUT and REF.
      3. Create RFCOMM sockets connection from DUT to REF.
      4. Verify RFCOMM sockets connection are successful.

    Args:
      num_connections: The number of RFCOMM socket connections to create.
    """

    # Initialize RFCOMM sockets server on REF.
    rfcomm_server = rfcomm.Server(self.ref.device)

    # Create RFCOMM sockets server on REF.
    rfcomm_uuid_list = [str(uuid.uuid4()) for _ in range(num_connections)]
    ref_accept_queues = [
        self._setup_rfcomm_server_on_ref(i, rfcomm_uuid, rfcomm_server)
        for i, rfcomm_uuid in enumerate(rfcomm_uuid_list)
    ]

    # Pair DUT and REF.
    await self._setup_pairing()

    # Create RFCOMM sockets connection from DUT to REF.
    rfcomm_connection_coroutines_list = [
        self.dut.bl4a.create_rfcomm_channel_async(
            address=self.ref.address,
            secure=True,
            uuid=rfcomm_uuid,
        )
        for rfcomm_uuid in rfcomm_uuid_list
    ]

    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      # Wait for all RFCOMM sockets connection to complete, and get the results.
      self.logger.info("[REF] Wait for all RFCOMM connections to be accepted.")
      server_accept_results = await asyncio.gather(
          *[q.get() for q in ref_accept_queues]
      )
      self.logger.info("[DUT] Wait for all RFCOMM connections to complete.")
      await asyncio.gather(*rfcomm_connection_coroutines_list)

      # Verify both RFCOMM sockets connection are successful.
      for dlc_result in server_accept_results:
        self.logger.info("dlc_result: %s", dlc_result)
        self.assertEqual(
            dlc_result.state,
            rfcomm.DLC.State.CONNECTED,
            "DLC connection failed. Expected state: CONNECTED, but got:"
            f" {dlc_result.state.name}",
        )

  async def test_concurrent_rfcomm_connect_fail_raises_exception(self) -> None:
    """Tests concurrent RFCOMM connect fail should raises exception.

    Typical duration: 30-60s.

    Test steps:
      1. Create TWO RFCOMM sockets server on REF.
      2. Connect TWO RFCOMM sockets from DUT to REF at the same time.
      3. Reject the Rfcomm connection request on REF by l2cap connection
      request with No resources available.
      4. Verify the DUT can catch the exceptions raised for both RFCOMM
      connections .
    """
    # TODO: Remove this skip once the flag is removed.
    if not self.dut.bluetooth_flags.get(
        "fix_socket_connection_failed_no_callback", False
    ):
      self.skipTest("Skip until the fix flag(ag/33721603) is enabled.")

    original_on_l2cap_connection_request = (
        self.ref.device.l2cap_channel_manager.on_l2cap_connection_request
    )

    def custom_on_l2cap_connection_request(
        connection: device.Connection,
        cid: int,
        request: l2cap.L2CAP_Connection_Request,
    ) -> None:
      self.logger.info(
          " _custom_on_l2cap_connection_request:: psm: %s", request.psm
      )

      if request.psm == rfcomm.RFCOMM_PSM:
        self.logger.info(" RFCOMM L2CAP connection request rejected")
        self.ref.device.l2cap_channel_manager.send_control_frame(
            connection,
            cid,
            l2cap.L2CAP_Connection_Response(
                identifier=request.identifier,
                destination_cid=0,
                source_cid=request.source_cid,
                result=l2cap.L2CAP_Connection_Response.Result.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE,
                status=0x0000,
            ),
        )
      else:
        original_on_l2cap_connection_request(connection, cid, request)

    # Replace the original on_l2cap_connection_request with the custom one.
    self.ref.device.l2cap_channel_manager.on_l2cap_connection_request = (
        custom_on_l2cap_connection_request
    )

    ref_accept_future = asyncio.get_running_loop().create_future()
    rfcomm_connection_coroutines_list: list[
        Coroutine[None, None, bl4a_api.RfcommChannel]
    ] = []

    rfcomm_server = rfcomm.Server(self.ref.device)
    for i in range(2):
      # Create RFCOMM sockets server on REF.
      rfcomm_channel = rfcomm_server.listen(
          acceptor=ref_accept_future.set_result,
      )
      rfcomm_uuid = str(uuid.uuid4())
      self.logger.info(
          "[REF] Create %d RFCOMM socket server with rfcomm_uuid %s.",
          i,
          rfcomm_uuid,
      )
      self.ref.device.sdp_service_records[i] = rfcomm.make_service_sdp_records(
          service_record_handle=i,
          channel=rfcomm_channel,
          uuid=core.UUID(rfcomm_uuid),
      )

      # Create RFCOMM socket connection from DUT to REF.
      rfcomm_connection_coroutines_list.append(
          self.dut.bl4a.create_rfcomm_channel_async(
              address=self.ref.address,
              secure=True,
              uuid=rfcomm_uuid,
          )
      )

    # Await the pairing request from DUT and accept the request.
    with self.dut.bl4a.register_callback(bl4a_api.Module.ADAPTER) as dut_cb:
      self.logger.info("[DUT] Wait for pairing request.")
      await dut_cb.wait_for_event(
          bl4a_api.PairingRequest(
              address=self.ref.address, variant=mock.ANY, pin=mock.ANY
          ),
          timeout=10,
      )

      # Wait for 5 seconds for:
      #   1. Simulate user interaction delay with a pop-up.
      #   2. Ensures both RFCOMM sockets complete SDP, creating an L2CAP
      #     connection collision.
      #   3. RFCOMM L2CAP connection request will be pending due to incomplete
      #     encryption.
      self.logger.info("[DUT] setPairingConfirmation Wait for 5 seconds.")
      await asyncio.sleep(_DEFAULT_STEP_TIMEOUT_SECONDS)
      self.assertTrue(
          self.dut.bt.setPairingConfirmation(self.ref.address, True)
      )

      # wait for both RFCOMM sockets connection to fail.
      async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
        for coro in rfcomm_connection_coroutines_list:
          with self.assertRaises(errors.SnippetError):
            await coro

Tests concurrent RFCOMM connect fail should raises exception.

Typical duration: 30-60s.

Test steps
  1. Create TWO RFCOMM sockets server on REF.
  2. Connect TWO RFCOMM sockets from DUT to REF at the same time.
  3. Reject the Rfcomm connection request on REF by l2cap connection request with No resources available.
  4. Verify the DUT can catch the exceptions raised for both RFCOMM connections .
Source code in navi/tests/functionality/rfcomm_socket_test.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
async def test_concurrent_rfcomm_connect_fail_raises_exception(self) -> None:
  """Tests concurrent RFCOMM connect fail should raises exception.

  Typical duration: 30-60s.

  Test steps:
    1. Create TWO RFCOMM sockets server on REF.
    2. Connect TWO RFCOMM sockets from DUT to REF at the same time.
    3. Reject the Rfcomm connection request on REF by l2cap connection
    request with No resources available.
    4. Verify the DUT can catch the exceptions raised for both RFCOMM
    connections .
  """
  # TODO: Remove this skip once the flag is removed.
  if not self.dut.bluetooth_flags.get(
      "fix_socket_connection_failed_no_callback", False
  ):
    self.skipTest("Skip until the fix flag(ag/33721603) is enabled.")

  original_on_l2cap_connection_request = (
      self.ref.device.l2cap_channel_manager.on_l2cap_connection_request
  )

  def custom_on_l2cap_connection_request(
      connection: device.Connection,
      cid: int,
      request: l2cap.L2CAP_Connection_Request,
  ) -> None:
    self.logger.info(
        " _custom_on_l2cap_connection_request:: psm: %s", request.psm
    )

    if request.psm == rfcomm.RFCOMM_PSM:
      self.logger.info(" RFCOMM L2CAP connection request rejected")
      self.ref.device.l2cap_channel_manager.send_control_frame(
          connection,
          cid,
          l2cap.L2CAP_Connection_Response(
              identifier=request.identifier,
              destination_cid=0,
              source_cid=request.source_cid,
              result=l2cap.L2CAP_Connection_Response.Result.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE,
              status=0x0000,
          ),
      )
    else:
      original_on_l2cap_connection_request(connection, cid, request)

  # Replace the original on_l2cap_connection_request with the custom one.
  self.ref.device.l2cap_channel_manager.on_l2cap_connection_request = (
      custom_on_l2cap_connection_request
  )

  ref_accept_future = asyncio.get_running_loop().create_future()
  rfcomm_connection_coroutines_list: list[
      Coroutine[None, None, bl4a_api.RfcommChannel]
  ] = []

  rfcomm_server = rfcomm.Server(self.ref.device)
  for i in range(2):
    # Create RFCOMM sockets server on REF.
    rfcomm_channel = rfcomm_server.listen(
        acceptor=ref_accept_future.set_result,
    )
    rfcomm_uuid = str(uuid.uuid4())
    self.logger.info(
        "[REF] Create %d RFCOMM socket server with rfcomm_uuid %s.",
        i,
        rfcomm_uuid,
    )
    self.ref.device.sdp_service_records[i] = rfcomm.make_service_sdp_records(
        service_record_handle=i,
        channel=rfcomm_channel,
        uuid=core.UUID(rfcomm_uuid),
    )

    # Create RFCOMM socket connection from DUT to REF.
    rfcomm_connection_coroutines_list.append(
        self.dut.bl4a.create_rfcomm_channel_async(
            address=self.ref.address,
            secure=True,
            uuid=rfcomm_uuid,
        )
    )

  # Await the pairing request from DUT and accept the request.
  with self.dut.bl4a.register_callback(bl4a_api.Module.ADAPTER) as dut_cb:
    self.logger.info("[DUT] Wait for pairing request.")
    await dut_cb.wait_for_event(
        bl4a_api.PairingRequest(
            address=self.ref.address, variant=mock.ANY, pin=mock.ANY
        ),
        timeout=10,
    )

    # Wait for 5 seconds for:
    #   1. Simulate user interaction delay with a pop-up.
    #   2. Ensures both RFCOMM sockets complete SDP, creating an L2CAP
    #     connection collision.
    #   3. RFCOMM L2CAP connection request will be pending due to incomplete
    #     encryption.
    self.logger.info("[DUT] setPairingConfirmation Wait for 5 seconds.")
    await asyncio.sleep(_DEFAULT_STEP_TIMEOUT_SECONDS)
    self.assertTrue(
        self.dut.bt.setPairingConfirmation(self.ref.address, True)
    )

    # wait for both RFCOMM sockets connection to fail.
    async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
      for coro in rfcomm_connection_coroutines_list:
        with self.assertRaises(errors.SnippetError):
          await coro

Tests one or two RFCOMM socket connections simultaneously.

Typical duration: 30-50s.

Test steps
  1. Create RFCOMM sockets server on REF.
  2. Pair DUT and REF.
  3. Create RFCOMM sockets connection from DUT to REF.
  4. Verify RFCOMM sockets connection are successful.

Parameters:

Name Type Description Default
num_connections int

The number of RFCOMM socket connections to create.

required
Source code in navi/tests/functionality/rfcomm_socket_test.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
@navi_test_base.parameterized(1, 2)
async def test_rfcomm_socket_connections_simultaneously(
    self, num_connections: int
) -> None:
  """Tests one or two RFCOMM socket connections simultaneously.

  Typical duration: 30-50s.

  Test steps:
    1. Create RFCOMM sockets server on REF.
    2. Pair DUT and REF.
    3. Create RFCOMM sockets connection from DUT to REF.
    4. Verify RFCOMM sockets connection are successful.

  Args:
    num_connections: The number of RFCOMM socket connections to create.
  """

  # Initialize RFCOMM sockets server on REF.
  rfcomm_server = rfcomm.Server(self.ref.device)

  # Create RFCOMM sockets server on REF.
  rfcomm_uuid_list = [str(uuid.uuid4()) for _ in range(num_connections)]
  ref_accept_queues = [
      self._setup_rfcomm_server_on_ref(i, rfcomm_uuid, rfcomm_server)
      for i, rfcomm_uuid in enumerate(rfcomm_uuid_list)
  ]

  # Pair DUT and REF.
  await self._setup_pairing()

  # Create RFCOMM sockets connection from DUT to REF.
  rfcomm_connection_coroutines_list = [
      self.dut.bl4a.create_rfcomm_channel_async(
          address=self.ref.address,
          secure=True,
          uuid=rfcomm_uuid,
      )
      for rfcomm_uuid in rfcomm_uuid_list
  ]

  async with self.assert_not_timeout(_DEFAULT_STEP_TIMEOUT_SECONDS):
    # Wait for all RFCOMM sockets connection to complete, and get the results.
    self.logger.info("[REF] Wait for all RFCOMM connections to be accepted.")
    server_accept_results = await asyncio.gather(
        *[q.get() for q in ref_accept_queues]
    )
    self.logger.info("[DUT] Wait for all RFCOMM connections to complete.")
    await asyncio.gather(*rfcomm_connection_coroutines_list)

    # Verify both RFCOMM sockets connection are successful.
    for dlc_result in server_accept_results:
      self.logger.info("dlc_result: %s", dlc_result)
      self.assertEqual(
          dlc_result.state,
          rfcomm.DLC.State.CONNECTED,
          "DLC connection failed. Expected state: CONNECTED, but got:"
          f" {dlc_result.state.name}",
      )