Skip to content

Commit eb774ad

Browse files
chrismccordmickel8
authored andcommitted
Rewrite to LV
1 parent a4d4738 commit eb774ad

File tree

4 files changed

+408
-113
lines changed

4 files changed

+408
-113
lines changed

assets/publisher.js

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ export function createPublisherHook(iceServers = []) {
33
async mounted() {
44
const view = this;
55

6+
view.handleEvent("start-streaming", () => view.startStreaming(view));
7+
view.handleEvent("stop-streaming", () => view.stopStreaming(view));
8+
69
view.audioDevices = document.getElementById("lex-audio-devices");
710
view.videoDevices = document.getElementById("lex-video-devices");
811

@@ -43,10 +46,6 @@ export function createPublisherHook(iceServers = []) {
4346
view.setupStream(view);
4447
};
4548

46-
view.button.onclick = function () {
47-
view.startStreaming(view);
48-
};
49-
5049
// handle remote events
5150
view.handleEvent(`answer-${view.el.id}`, async (answer) => {
5251
if (view.pc) {
@@ -177,11 +176,6 @@ export function createPublisherHook(iceServers = []) {
177176
},
178177

179178
async startStreaming(view) {
180-
view.button.innerText = "Stop streaming";
181-
view.button.onclick = function () {
182-
view.stopStreaming(view);
183-
};
184-
185179
view.disableControls(view);
186180

187181
view.pc = new RTCPeerConnection({ iceServers: iceServers });
@@ -275,6 +269,7 @@ export function createPublisherHook(iceServers = []) {
275269
}
276270
}, 1000);
277271
} else if (view.pc.connectionState === "failed") {
272+
view.pushEvent("stop-streaming", {reason: "failed"})
278273
view.stopStreaming(view);
279274
}
280275
};
@@ -311,11 +306,6 @@ export function createPublisherHook(iceServers = []) {
311306
view.resetStats(view);
312307

313308
view.enableControls(view);
314-
315-
view.button.innerText = "Start Streaming";
316-
view.button.onclick = function () {
317-
view.startStreaming(view);
318-
};
319309
},
320310

321311
resetStats(view) {

lib/live_ex_webrtc/publisher.ex

Lines changed: 210 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
defmodule LiveExWebRTC.Publisher do
22
@moduledoc """
3-
`Phoenix.LiveComponent` for sending audio and video via WebRTC from a browser to a Phoenix app (browser publishes).
3+
Component for sending audio and video via WebRTC from a browser to a Phoenix app (browser publishes).
44
55
It will render a view with:
66
* audio and video device selects
@@ -42,36 +42,47 @@ defmodule LiveExWebRTC.Publisher do
4242
## Examples
4343
4444
```elixir
45-
<.live_component
46-
module={LiveExWebRTC.Publisher}
47-
id="publisher"
48-
ice_servers={[%{urls: "stun:stun.l.google.com:19302"}]}
49-
/>
45+
TODO
5046
```
5147
"""
52-
use Phoenix.LiveComponent
48+
use Phoenix.LiveView
5349

50+
alias LiveExWebRTC.Publisher
5451
alias ExWebRTC.{ICECandidate, PeerConnection, SessionDescription}
52+
alias Phoenix.PubSub
5553

56-
@typedoc """
57-
Message sent to the `Phoenix.LiveView` after component's initialization.
54+
defstruct id: nil,
55+
pc: nil,
56+
streaming?: false,
57+
audio_track_id: nil,
58+
video_track_id: nil,
59+
on_packet: nil,
60+
on_connected: nil,
61+
pubsub: nil,
62+
ice_servers: nil,
63+
ice_ip_filter: nil,
64+
ice_port_range: nil,
65+
audio_codecs: nil,
66+
video_codecs: nil,
67+
name: nil
68+
69+
attr(:socket, Phoenix.LiveView.Socket, required: true)
70+
attr(:publisher, __MODULE__, required: true)
71+
72+
def studio(assigns) do
73+
~H"""
74+
<%= live_render(@socket, __MODULE__, id: @publisher.id, session: %{"publisher_id" => @publisher.id}) %>
75+
"""
76+
end
77+
78+
def render(%{publisher: nil} = assigns) do
79+
~H"""
80+
"""
81+
end
5882

59-
* `pc` - `ExWebRTC.PeerConnection`'s pid spawned by this live component
60-
* `audio_track_id` - id of audio track
61-
* `video_track_id` - id of video track
62-
"""
63-
@type init_msg() ::
64-
{:live_ex_webrtc,
65-
%{
66-
pc: pid(),
67-
audio_track_id: String.t(),
68-
video_track_id: String.t()
69-
}}
70-
71-
@impl true
7283
def render(assigns) do
7384
~H"""
74-
<div id={@id} phx-hook="Publisher" class="h-full w-full flex justify-between gap-6">
85+
<div id={@publisher.id} phx-hook="Publisher" class="h-full w-full flex justify-between gap-6">
7586
<div class="w-full flex flex-col">
7687
<details>
7788
<summary class="font-bold text-[#0d0d0d] py-2.5">Devices</summary>
@@ -180,11 +191,20 @@ defmodule LiveExWebRTC.Publisher do
180191
</div>
181192
</div>
182193
</div>
183-
<div class="py-2.5">
194+
<div :if={@publisher.streaming?} class="py-2.">
195+
<button
196+
id="lex-button"
197+
class="rounded-lg w-full px-2.5 py-2.5 bg-brand/100 disabled:bg-brand/50 hover:bg-brand/90 text-white font-bold"
198+
phx-click="stop-streaming"
199+
>
200+
Stop streaming
201+
</button>
202+
</div>
203+
<div :if={!@publisher.streaming?} class="py-2.5">
184204
<button
185205
id="lex-button"
186206
class="rounded-lg w-full px-2.5 py-2.5 bg-brand/100 disabled:bg-brand/50 hover:bg-brand/90 text-white font-bold"
187-
disabled
207+
phx-click="start-streaming"
188208
>
189209
Start streaming
190210
</button>
@@ -194,13 +214,130 @@ defmodule LiveExWebRTC.Publisher do
194214
"""
195215
end
196216

197-
@impl true
198-
def handle_event(_event, _unsigned_params, %{assigns: %{pc: nil}} = socket) do
217+
def mount(_params, %{"publisher_id" => pub_id}, socket) do
218+
socket = assign(socket, publisher: nil)
219+
220+
if connected?(socket) do
221+
ref = make_ref()
222+
send(socket.parent_pid, {__MODULE__, {:attached, ref, self(), %{publisher_id: pub_id}}})
223+
224+
socket =
225+
receive do
226+
{^ref, %Publisher{id: ^pub_id} = publisher} -> assign(socket, publisher: publisher)
227+
after
228+
5000 -> exit(:timeout)
229+
end
230+
231+
{:ok, socket}
232+
else
233+
{:ok, socket}
234+
end
235+
end
236+
237+
def attach(socket, opts) do
238+
opts =
239+
Keyword.validate!(opts, [
240+
:id,
241+
:name,
242+
:pubsub,
243+
:on_packet,
244+
:on_connected,
245+
:ice_servers,
246+
:ice_ip_filter,
247+
:ice_port_range,
248+
:audio_codecs,
249+
:video_codecs
250+
])
251+
252+
publisher = %Publisher{
253+
id: Keyword.fetch!(opts, :id),
254+
pubsub: Keyword.fetch!(opts, :pubsub),
255+
on_packet: Keyword.get(opts, :on_packet),
256+
on_connected: Keyword.get(opts, :on_connected),
257+
ice_servers: Keyword.get(opts, :ice_servers, [%{urls: "stun:stun.l.google.com:19302"}]),
258+
ice_ip_filter: Keyword.get(opts, :ice_ip_filter),
259+
ice_port_range: Keyword.get(opts, :ice_port_range),
260+
audio_codecs: Keyword.get(opts, :audio_codecs),
261+
video_codecs: Keyword.get(opts, :video_codecs),
262+
name: Keyword.get(opts, :name)
263+
}
264+
265+
socket
266+
|> assign(publisher: publisher)
267+
|> attach_hook(:publisher_infos, :handle_info, &attached_handle_info/2)
268+
end
269+
270+
def handle_info({:live_ex_webrtc, :keyframe_req}, socket) do
271+
%{publisher: publisher} = socket.assigns
272+
273+
if pc = publisher.pc do
274+
:ok = PeerConnection.send_pli(pc, publisher.video_track_id)
275+
end
276+
277+
{:noreply, socket}
278+
end
279+
280+
def handle_info({:ex_webrtc, _pc, {:rtp, track_id, nil, packet}}, socket) do
281+
%{publisher: publisher} = socket.assigns
282+
283+
case publisher do
284+
%Publisher{video_track_id: ^track_id} ->
285+
PubSub.broadcast(
286+
publisher.pubsub,
287+
"streams:video:#{publisher.id}",
288+
{:live_ex_webrtc, :video, packet}
289+
)
290+
291+
if publisher.on_packet, do: publisher.on_packet.(publisher.id, :video, packet, socket)
292+
{:noreply, socket}
293+
294+
%Publisher{audio_track_id: ^track_id} ->
295+
PubSub.broadcast(
296+
publisher.pubsub,
297+
"streams:audio:#{publisher.id}",
298+
{:live_ex_webrtc, :audio, packet}
299+
)
300+
301+
if publisher.on_packet, do: publisher.on_packet.(publisher.id, :audio, packet, socket)
302+
{:noreply, socket}
303+
end
304+
end
305+
306+
def handle_info({:ex_webrtc, _pid, {:connection_state_change, :connected}}, socket) do
307+
%{publisher: pub} = socket.assigns
308+
if pub.on_connected, do: pub.on_connected.(pub.id)
199309
{:noreply, socket}
200310
end
201311

202-
@impl true
312+
def handle_info({:ex_webrtc, _, _}, socket) do
313+
{:noreply, socket}
314+
end
315+
316+
defp attached_handle_info({__MODULE__, {:attached, ref, pid, _meta}}, socket) do
317+
send(pid, {ref, socket.assigns.publisher})
318+
{:halt, socket}
319+
end
320+
321+
defp attached_handle_info(_msg, socket) do
322+
{:cont, socket}
323+
end
324+
325+
def handle_event("start-streaming", _, socket) do
326+
{:noreply,
327+
socket
328+
|> assign(publisher: %Publisher{socket.assigns.publisher | streaming?: true})
329+
|> push_event("start-streaming", %{})}
330+
end
331+
332+
def handle_event("stop-streaming", _, socket) do
333+
{:noreply,
334+
socket
335+
|> assign(publisher: %Publisher{socket.assigns.publisher | streaming?: false})
336+
|> push_event("stop-streaming", %{})}
337+
end
338+
203339
def handle_event("offer", unsigned_params, socket) do
340+
%{publisher: publisher} = socket.assigns
204341
offer = SessionDescription.from_json(unsigned_params)
205342
{:ok, pc} = spawn_peer_connection(socket)
206343

@@ -211,51 +348,74 @@ defmodule LiveExWebRTC.Publisher do
211348
%{kind: :video, receiver: %{track: video_track}}
212349
] = PeerConnection.get_transceivers(pc)
213350

214-
info = %{pc: pc, audio_track_id: audio_track.id, video_track_id: video_track.id}
215-
send(self(), {:live_ex_webrtc, info})
216-
217351
{:ok, answer} = PeerConnection.create_answer(pc)
218352
:ok = PeerConnection.set_local_description(pc, answer)
219353
:ok = gather_candidates(pc)
220354
answer = PeerConnection.get_local_description(pc)
221355

222-
socket = assign(socket, :pc, pc)
223-
socket = push_event(socket, "answer-#{socket.assigns.id}", SessionDescription.to_json(answer))
356+
# subscribe now that we are initialized
357+
PubSub.subscribe(publisher.pubsub, "publishers:#{publisher.id}")
224358

225-
{:noreply, socket}
359+
new_publisher = %Publisher{
360+
publisher
361+
| pc: pc,
362+
audio_track_id: audio_track.id,
363+
video_track_id: video_track.id
364+
}
365+
366+
{:noreply,
367+
socket
368+
|> assign(publisher: new_publisher)
369+
|> push_event("answer-#{publisher.id}", SessionDescription.to_json(answer))}
226370
end
227371

228-
@impl true
229372
def handle_event("ice", "null", socket) do
230-
:ok = PeerConnection.add_ice_candidate(socket.assigns.pc, %ICECandidate{candidate: ""})
231-
{:noreply, socket}
373+
%{publisher: publisher} = socket.assigns
374+
375+
case publisher do
376+
%Publisher{pc: nil} ->
377+
{:noreply, socket}
378+
379+
%Publisher{pc: pc} ->
380+
:ok = PeerConnection.add_ice_candidate(pc, %ICECandidate{candidate: ""})
381+
{:noreply, socket}
382+
end
232383
end
233384

234-
@impl true
235385
def handle_event("ice", unsigned_params, socket) do
236-
cand =
237-
unsigned_params
238-
|> Jason.decode!()
239-
|> ExWebRTC.ICECandidate.from_json()
386+
%{publisher: publisher} = socket.assigns
240387

241-
:ok = PeerConnection.add_ice_candidate(socket.assigns.pc, cand)
388+
case publisher do
389+
%Publisher{pc: nil} ->
390+
{:noreply, socket}
242391

243-
{:noreply, socket}
392+
%Publisher{pc: pc} ->
393+
cand =
394+
unsigned_params
395+
|> Jason.decode!()
396+
|> ExWebRTC.ICECandidate.from_json()
397+
398+
:ok = PeerConnection.add_ice_candidate(pc, cand)
399+
400+
{:noreply, socket}
401+
end
244402
end
245403

246404
defp spawn_peer_connection(socket) do
405+
%{publisher: publisher} = socket.assigns
406+
247407
pc_opts =
248408
[
249-
ice_servers: socket.assigns[:ice_servers],
250-
ice_ip_filter: socket.assigns[:ice_ip_filter],
251-
ice_port_range: socket.assigns[:ice_port_range],
252-
audio_codecs: socket.assigns[:audio_codecs],
253-
video_codecs: socket.assigns[:video_codecs]
409+
ice_servers: publisher.ice_servers,
410+
ice_ip_filter: publisher.ice_ip_filter,
411+
ice_port_range: publisher.ice_port_range,
412+
audio_codecs: publisher.audio_codecs,
413+
video_codecs: publisher.video_codecs
254414
]
255415
|> Enum.reject(fn {_k, v} -> v == nil end)
256416

257417
gen_server_opts =
258-
[name: socket.assigns[:gen_server_name]]
418+
[name: publisher.name]
259419
|> Enum.reject(fn {_k, v} -> v == nil end)
260420

261421
PeerConnection.start(pc_opts, gen_server_opts)

0 commit comments

Comments
 (0)