This commit is contained in:
Sagi Dayan 2020-04-12 19:33:24 -04:00
parent b02adbdb43
commit 5408a2dba8
9 changed files with 154 additions and 113 deletions

View file

@ -16,41 +16,67 @@ class SignalingController {
}
register(callModel) {
if (!calls[this.callId])
calls[this.callId] = new CallSession(callModel);
calls[this.callId] =
new CallSession(callModel, this.onCallEnded.bind(this));
else
console.log(`Call #${this.callId} Already Found`);
const callSession = calls[this.callId];
callSession.registerUser(this.user, this.socket)
.then(
success => {
.then(success => {
if (!success) throw new Error('Invalid User');
})
.catch(
error => {
});
}
onCallEnded(callId) {
console.log(`Call ${callId} Ended`);
delete calls[callId];
}
onClose() {
calls[this.callId].removeUser(this.user);
console.log(`User #${this.user.id} left call ${this.callId}`);
}
}
class CallSession {
// states: NEW -> STARTED -> IN_PROGRESS -> ENDED
constructor(callModel) {
constructor(callModel, onCallEndedCallback) {
this.onCallEndedCallback = onCallEndedCallback;
this.callId = callModel.id;
this.callModel = callModel;
this.state = callModel.state;
this.user_1 = {id: callModel.user_1, socket: null, userModel: null};
this.user_2 = {id: callModel.user_2, socket: null, userModel: null};
this.startTime = Date.now();
this.heartbeat = setInterval(this.onHeartbeat.bind(this), 5000);
this.heartbeat =
setInterval(this.onHeartbeat.bind(this), 1000); // Every second
}
onHeartbeat() {
console.log(`We have ${Object.keys(calls).length} ongoing calls. Ids=${
Object.keys(calls)}`)
console.log(`Heartbeat for call #${this.callId} State: ${this.state}`);
const now = Date.now();
const elapsed = ((now - this.startTime) / 60000).toPrecision(1);
const newStateTimeout = 5; // 5 min
const startedStateTimeout = 10; // 10 min
console.log(`Heartbeat for call #${this.callId} State: ${
this.state}, time-elapsed: ${(elapsed)}min`);
if (this.state === 'ENDED') return this.endCall();
if (this.state === 'NEW' && elapsed >= newStateTimeout)
return this.endCall();
if (this.state === 'STARTED' && elapsed >= startedStateTimeout)
return this.endCall();
}
removeUser(user) {
let userIndex = -1;
if (this.user_1.id === user.id)
userIndex = 1;
else if (this.user_2.id === user.id)
userIndex = 2;
if (userIndex < 0) return false;
this[`user_${userIndex}`].userModel = null;
this[`user_${userIndex}`].socket = null;
this.updateState();
if (this.state === 'ENDED') this.endCall();
}
async registerUser(user, socket) {
let userIndex = -1;
@ -65,15 +91,30 @@ class CallSession {
socket.on('wrtc:sdp:answer', this.onSdpAnswer.bind(this));
socket.on('wrtc:ice', this.onIceCandidate.bind(this));
await this.updateState();
if (this.state === 'STARTED') await this.sendStandby(socket, userIndex);
if (this.state === 'IN_PROGRESS') await this.sendStart(socket, userIndex);
if (this.state === 'STARTED')
await this.sendStandby(socket, userIndex);
else if (this.state === 'IN_PROGRESS')
await this.sendStart(socket, userIndex);
return true;
}
endCall() {
this.state = 'ENDED';
if (this.callModel.state != 'ENDED') {
this.callModel.state = this.state;
this.callModel.save();
}
if (this.user_1.socket) this.user_1.socket.close();
if (this.user_2.socket) this.user_1.socket.close();
clearInterval(this.heartbeat);
this.onCallEndedCallback(this.callId);
}
async sendStandby(socket, userIndex) {
console.log(`Call #${this.callId} sendStandby -> ${userIndex}`);
const iceServers = (await IceServer.all()).rows.map(i => i.toJSON());
socket.emit('call:standby', {iceServers, id: userIndex});
}
async sendStart(socket, userIndex) {
console.log(`Call #${this.callId} sendStart -> ${userIndex}`);
const iceServers = (await IceServer.all()).rows.map(i => i.toJSON());
socket.emit('call:start', {iceServers, id: userIndex});
}
@ -85,18 +126,25 @@ class CallSession {
this.state = 'IN_PROGRESS';
else
this.state = 'STARTED';
break;
case 'STARTED':
if (this.areAllPartnersConnected()) this.state = 'IN_PROGRESS';
break;
case 'IN_PROGRESS':
if (!this.areAllPartnersConnected()) this.state = 'ENDED';
break;
}
this.callModel.state = this.state;
await this.callModel.save();
console.log(`Call #${this.callId} state=${this.state}`);
}
areAllPartnersConnected() {
return !!this.user_1.socket && !!this.user_2.socket;
}
async onIceCandidate(payload) {
const {from = id, ice} = payload;
const {from = payload.id, ice} = payload;
const to = from === 1 ? 2 : 1;
this[`user_${to}`].socket.emit('wrtc:ice', {sdp});
this[`user_${to}`].socket.emit('wrtc:ice', {ice});
console.log(`[Signal] [onIceCandidate] ${from} -> ${to}`)
return true;
}

View file

@ -19,6 +19,7 @@ class WsCallAuth {
if (!call) {
throw new Error('Call not found');
}
if (call.state === 'ENDED') throw new Error('This call has ended');
if (user.id === call.user_1 || user.id === call.user_2) {
ctx.call = call;
await next()

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -1,62 +1,70 @@
export default class CallService {
private callId: string;
import WebSocketService from "../scripts/websocket.service";
import { EventEmitter } from "events";
export default class CallManager {
private inCall: boolean;
private peerId: number;
private subscription;
private localStream: MediaStream;
private remoteStream: MediaStream;
private signalingChannel;
private needToAddStream: boolean = true;
private emitter = new EventEmitter();
private pc: RTCPeerConnection;
constructor(private ws) {
this.callId = null;
constructor(private ws: WebSocketService, private callId: number) {
this.inCall = false;
this.peerId = -1;
this.subscription = null;
this.pc = null;
this.localStream = null;
this.remoteStream = null;
this.remoteStream = new MediaStream();
}
async connectToCall(callId: string): Promise<boolean> {
async connectToCall(mediaConstraints: MediaStreamConstraints): Promise<boolean> {
if (this.inCall) throw new Error('Already connected to call');
this.subscription = this.ws.subscribe(`call:${callId}`);
const subscription = this.subscription;
console.log('connecting to call');
await this.getUserMedia(mediaConstraints);
this.signalingChannel = this.ws.subscribe(`call:${this.callId}`);
const signalingChannel = this.signalingChannel;
const self = this;
return new Promise((resolve, reject) => {
subscription.on('error', (e) => {
signalingChannel.on('close', self.close.bind(self));
signalingChannel.on('call:start', self.onCallStart.bind(self));
signalingChannel.on('call:standby', self.onCallStandby.bind(self));
signalingChannel.on('wrtc:sdp:offer', self.onRemoteOffer.bind(self));
signalingChannel.on('wrtc:sdp:answer', self.onRemoteAnswer.bind(self));
signalingChannel.on('wrtc:ice', self.onRemoteIce.bind(self));
signalingChannel.on('error', (e) => {
console.error(e);
resolve(false)
});
subscription.on('ready', () => {
this.callId = callId;
this.inCall = true;
signalingChannel.on('ready', () => {
console.log('in Ready');
self.inCall = true;
resolve(true)
});
subscription.on('close', self.close.bind(this));
subscription.on('call:start', self.onCallStart.bind(this));
subscription.on('call:standby', self.onCallStandby.bind(this));
subscription.on('wrtc:sdp:offer', self.onRemoteOffer.bind(this));
subscription.on('wrtc:sdp:answer', self.onRemoteAnswer.bind(this));
subscription.on('wrtc:ice', self.onRemoteIce.bind(this));
});
}
on(event: ECallEvents, callback: (...args) => void) {
this.emitter.on(event, callback);
}
private emit(event: ECallEvents, data: any) {
this.emitter.emit(event, data);
}
private send(event: string, payload: { [key: string]: any }) {
this.subscription.emit(event, {
this.signalingChannel.emit(event, {
id: this.peerId,
...payload
})
}
async onCallStart(payload: { iceServers: RTCIceServer[], id: number }) {
console.log('onCallStart');
console.log(payload);
this.peerId = payload.id;
this.pc = new RTCPeerConnection({ iceServers: payload.iceServers });
console.log('Created PeerConnection');
console.log('adding tracks to pc');
this.localStream.getTracks().forEach(t => this.pc.addTrack(t, this.localStream));
this.setupPeerConnectionListeners();
const sdp = await this.pc.createOffer();
await this.pc.setLocalDescription(sdp);
console.log('Local description Set');
console.log('Local description Set', sdp.sdp);
this.send('wrtc:sdp:offer', {
sdp
});
@ -64,10 +72,13 @@ export default class CallService {
}
async onCallStandby(payload: { iceServers: RTCIceServer[], id: number }) {
console.log('onCallStandby');
console.log(payload);
this.peerId = payload.id;
this.pc = new RTCPeerConnection({ iceServers: payload.iceServers });
console.log('Created PeerConnection');
console.log('adding tracks to pc');
this.localStream.getTracks().forEach(t => this.pc.addTrack(t, this.localStream));
this.setupPeerConnectionListeners();
return true;
}
@ -94,22 +105,13 @@ export default class CallService {
this.pc.addEventListener('icegatheringstatechange', event => {
console.log('icegatheringstatechange', this.pc.iceGatheringState);
});
if (this.needToAddStream && this.localStream) {
this.localStream.getTracks().forEach(t => {
console.log('adding track to pc - in the event list');
console.log(t);
this.pc.addTrack(t, this.localStream);
});
this.needToAddStream = false;
}
}
onLocalIce(event) {
if (event.candidate) {
console.log('Sending candidate');
this.send('wrtc:ice', {
ice: event.candidate
ice: event.candidate,
});
}
@ -117,19 +119,19 @@ export default class CallService {
async onRemoteOffer(payload) {
const offer = new RTCSessionDescription(payload.sdp);
await this.pc.setRemoteDescription(offer);
console.log('Remote offer Set');
console.log('Remote offer Set', offer.sdp);
const sdp = await this.pc.createAnswer();
this.send('wrtc:sdp:answer', {
sdp
});
await this.pc.setLocalDescription(sdp);
console.log('Local answer Set');
console.log('Local answer Set', sdp.sdp);
return true;
}
async onRemoteAnswer(payload) {
const answer = new RTCSessionDescription(payload.sdp);
await this.pc.setRemoteDescription(answer);
console.log('Remote answer Set');
console.log('Remote answer Set', answer.sdp);
return true;
}
async onRemoteIce(payload) {
@ -138,18 +140,9 @@ export default class CallService {
return true;
}
async getUserMedia(constraints: MediaStreamConstraints = { video: false, audio: true }) {
async getUserMedia(constraints: MediaStreamConstraints = { video: true, audio: true }) {
if (this.localStream) return this.localStream;
this.localStream = await navigator.mediaDevices.getUserMedia(constraints);
if (this.pc) {
if (this.needToAddStream && this.localStream) {
this.localStream.getTracks().forEach(t => {
console.log('adding track to pc - in get user media');
this.pc.addTrack(t, this.localStream);
});
this.needToAddStream = false;
}
}
return this.localStream;
}
@ -158,8 +151,20 @@ export default class CallService {
}
close() {
if (this.subscription) this.subscription.close();
this.subscription = null;
console.log('Closing...');
if (!this.inCall) return;
this.emit(ECallEvents.CLOSE, this.callId);
if (this.signalingChannel) this.signalingChannel.close();
this.signalingChannel = null;
if (this.pc) this.pc.close();
if (this.localStream) this.localStream.getTracks().forEach(t => t.stop());
this.localStream = null;
this.remoteStream = null;
this.inCall = false;
}
}
export enum ECallEvents {
CLOSE = 'CLOSE',
REMOTE_STREAM = 'REMOTE_STREAM'
}

View file

@ -1,6 +1,5 @@
import Ws from "@adonisjs/websocket-client";
import CallService from './call.service';
import UserChannelService from './user.channel.service';
import { EventEmitter } from 'events';
let singleton: WebSocketService = null;
@ -9,17 +8,13 @@ enum EEvents {
CONNECTION_ONLINE,
CONNECTION_OFFLINE,
INCOMING_CALL,
SIGNALING_EVENTS,
CALL_ACTIONS
}
export default class WebSocketService {
static Events = EEvents;
private emitter;
private callService: CallService;
private constructor(private ws, private userChannelService: UserChannelService) {
this.emitter = new EventEmitter();
this.callService = new CallService(this.ws);
this.userChannelService.on('new:connection', this.onUserNewConnection.bind(this));
this.userChannelService.on('connection:online', this.onUserConnectionOnline.bind(this));
this.userChannelService.on('connection:offline', this.onUserConnectionOffline.bind(this));
@ -32,9 +27,11 @@ export default class WebSocketService {
removeListener(event: EEvents, callback) {
this.emitter.removeListener(event, callback);
}
// onPublicChannelMessage(msg) {
// this.emitter
// }
subscribe(channel) {
const subscription = this.ws.subscribe(channel);
console.log(subscription);
return subscription;
}
private onUserNewConnection(data) {
this.emitter.emit(EEvents.NEW_CONNECTION, data);
@ -48,15 +45,6 @@ export default class WebSocketService {
this.emitter.emit(EEvents.CONNECTION_OFFLINE, data);
}
async getLocalMedia(constraints: MediaStreamConstraints = null) {
return this.callService.getUserMedia(constraints);
}
getRemoteStream() {
return this.callService.getRemoteStream();
}
static getInstance(): Promise<WebSocketService> {
return new Promise((resolve, reject) => {
// resolve();
@ -82,17 +70,4 @@ export default class WebSocketService {
});
});
}
async connectToCall(callId: string) {
return this.callService.connectToCall(callId);
}
async leaveCall() {
this.callService.close();
}
onSignalingMsg(message) {
console.log(message);
}
}

View file

@ -3,15 +3,16 @@
<div v-if="loading">
<Loading />
</div>
<div v-else>
<div v-else class="is-flex">
<video
:srcObject="localStream"
autoplay="true"
controls="false"
playsinline="true"
muted="true"
style="max-width:40%"
/>
<video :srcObject="remoteStream" autoplay="true" controls="false" />
<video :srcObject="remoteStream" autoplay="true" controls="false" style="max-width:40%" />
</div>
</div>
</template>
@ -19,6 +20,7 @@
<script lang="ts">
import Loading from "../../shared/components/Loading/Loading.vue";
import WebsocketService from "../scripts/websocket.service";
import CallManager, { ECallEvents } from "../classes/call.manager";
import Services from "../../services/index";
import { mapActions, mapGetters } from "vuex";
export default {
@ -29,19 +31,24 @@ export default {
async created() {
this.loading = true;
try {
const callId = Number(this.$route.params.id);
const ws = await WebsocketService.getInstance();
const success = await ws.connectToCall(this.$route.params.id);
this.callManager = new CallManager(ws, callId);
this.callManager.on(ECallEvents.CLOSE, this.callEnded);
const success = await this.callManager.connectToCall({
video: false,
audio: true
});
if (!success) {
this.notify({ message: "Can find this call...", level: "danger" });
this.$router.push({ path: `/` });
return false;
}
this.signalingChannel = this.localStream = await ws.getLocalMedia({
this.localStream = this.callManager.getUserMedia({
video: false,
audio: true
});
this.remoteStream = ws.getRemoteStream();
console.log(this.localStream);
this.remoteStream = this.callManager.getRemoteStream();
this.notify({ message: "Connected!", level: "success" });
} catch (e) {
console.error(e);
@ -52,11 +59,17 @@ export default {
},
async beforeDestroy() {
console.log("destroyed");
const ws = await WebsocketService.getInstance();
ws.leaveCall();
this.callManager.close();
return true;
},
methods: {
async setupCall(): Promise<boolean> {
return true;
},
callEnded(callId) {
this.notify({ message: `Call #${callId} Ended` });
this.$router.push({ path: `/` });
},
...mapActions(["notify"])
},
computed: {
@ -65,10 +78,9 @@ export default {
data() {
return {
loading: true,
call: null,
localStream: null,
remoteStream: null,
signalingChannel: null
localStream: new MediaStream(),
remoteStream: new MediaStream(),
callManager: null
};
},
beforeCreate: () => {}