start
webrtc实现点对点通讯

webrtc实现点对点通讯

概述

关于我个人研究使用webrtc的原因,手上有一只性能很低的德小鸡(euserv),受大墙影响+cf减速套餐,资源加载速度是真的感人😅,整点不会的东西试试水。站点目前够低调的,没考虑国内cdn,凑合用!

关于 WEBRTC 能干些啥,搜索引擎上挺多,去看看比我吧啦吧啦好。

本文只包含datacancel通道创建,音视频直播类似,添加通道就行。

代码需要根据实际情况改动才能使用。

原理

个人部分理解,欢迎吐槽。

整体结构

信令:实现方式很多,只要能实现客户端AB初始化时,基础信息交换,最常使用的是websocket。

ICE 服务器:可以分为帮助两个客户端打洞建立 P2P 连接的 STUN 服务器,还有如果连不通就直接转发的 TURN 服务器。

客户端:需要建立通讯的客户端,只能一对一,有新增客户端,需要建立单独的连接。

逻辑

创建前,必须包含数据通道或音视频通道,否则不会连通。


1. A创建Offer

2. A保存Offer(设置本地描述)
3. A发送Offer给B(期间A连接stun服务器获取ICE,并给B)
4. B收到并设置远端sdp为Offer
5. B创建Answer
6. B设置本地sdp为Answer
7. B发送Answer给A
8. A收到并设置远端sdp为Answer
9. A发送Ice Candidates给B
10. B发送Ice Candidates给A
11. A,B连通并实现数据传输

开始使用

信令服务器

这里使用netcore实现websocket。netcore其他websocket细节,去微软官网看看手册吧。

//Program.cs

var webSocketOptions = new WebSocketOptions
{
    KeepAliveInterval = TimeSpan.FromMinutes(5)
};

app.UseWebSockets(webSocketOptions);
app.UseSocketMsg(); //自定义中间件名称

个人自定义中间件实现信令,只是简单实现了客户端间通讯。HostKey部分可删除,这里只是测试单节点连接。

//WebSocketMsg.cs 

using System.Collections;
using System.Net.WebSockets;
using System.Text;
using System.Text.RegularExpressions;
using Newtonsoft.Json;

namespace myblog.Unit
{
    public static class WebSocketMsgExtensions
    {

        public static IApplicationBuilder UseSocketMsg(this IApplicationBuilder builder)
        {
            return builder.UseMiddleware();
        }
    }
    public partial class WebSocketMsg
    {
        public static Hashtable socketList = new();
        public static Regex regHost = new Regex("(?<=HostKey\":\")\\w+(?=\")", RegexOptions.IgnoreCase); //用于匹配hostkey
        public class HostsInfo
        {
            public string Key { get; set; } = Guid.NewGuid().ToString("N"); //主机识别key
            public WebSocket Socket { get; set; }
            public Boolean IsHost { get; set; }
        }
        public static string thisHostKey = string.Empty;
        private readonly RequestDelegate _next;

        public WebSocketMsg(RequestDelegate next)
        {
            _next = next;
        }

        public async Task InvokeAsync(HttpContext context)
        {
            if (context.Request.Path == "/ws")
            {
                if (context.WebSockets.IsWebSocketRequest)
                {

                    using var webSocket = await context.WebSockets.AcceptWebSocketAsync();

          //记录socket客户端
                    var info = new HostsInfo
                    {
                        Socket = webSocket,
                        IsHost = socketList.Count > 0 ? false : true
                    };
                    socketList.Add(info.Key, info);
                    await Echo(info);
                }
                else
                {
                    context.Response.StatusCode = StatusCodes.Status400BadRequest;
                }
            }
            else
            {
                await _next(context);
            }
        }

        private async Task Echo(HostsInfo info)
        {
            try
            {
                var buffer = new byte[1024 * 4];
                if (info.IsHost)
                {
                    thisHostKey = info.Key;//更新主干key
                    SendMsgAsync(new { type = "init", isHost = true, key = info.Key, HostKey = string.Empty }, info.Socket);
                }else {
                    var hs = socketList[thisHostKey];
                    if (hs is null)
                    {
                        //升级为骨干节点
                        info.IsHost = true;
                        thisHostKey = info.Key;//更新主干key
                        SendMsgAsync(new { type = "init", isHost = true, key = info.Key, HostKey = string.Empty }, info.Socket);
                    }
                    else
                    {
                        SendMsgAsync(new { type = "init", isHost = false, key = info.Key, HostKey = thisHostKey }, info.Socket);
                    }
                }

                var receiveResult = await info.Socket.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None);
                while (!receiveResult.CloseStatus.HasValue)
                {
          //Host客户端ping
                    var msg = Encoding.UTF8.GetString(new ArraySegment(buffer, 0, receiveResult.Count));
                    if (msg.IndexOf("ping") > -1)
                    {
                        receiveResult = await info.Socket.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None);
                        continue;
                    }
                    var HostKey = regHost.Match(msg).ToString();//匹配消息目标客户端
                    //不含key
                    if (string.IsNullOrEmpty(HostKey))
                    {
                        SendMsgAsync(new { type = "error", msg = "" }, info.Socket, receiveResult);
                    }
                    else
                    {
                        var soc = socketList[HostKey];
                        if (soc is null)
                        {
                            SendMsgAsync(new { type = "error", msg = "主机丢失" }, info.Socket, receiveResult);
                        }
                        else
                        {
                            SendMsgAsync(buffer, ((HostsInfo)soc).Socket, receiveResult);
                        }
                    }
                    receiveResult = await info.Socket.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None);
                }
                closeHost(info);
                await info.Socket.CloseAsync(
                    receiveResult.CloseStatus.Value,
                    receiveResult.CloseStatusDescription,
                    CancellationToken.None);
            }
            catch (Exception e)
            {
                closeHost(info);
            }
        }
        private async void SendMsgAsync(dynamic msg, WebSocket soc, WebSocketReceiveResult receiveResult = null)
        {
            if (msg.GetType().Name == "Byte[]")
            {
                await soc.SendAsync(new ArraySegment(msg, 0, receiveResult.Count), WebSocketMessageType.Text, true, CancellationToken.None);
            }
            else
            {
                await soc.SendAsync(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(msg)), WebSocketMessageType.Text, true, CancellationToken.None);
            }
        }
    //关闭当前客户端连接,并判断是否为Host,是则转移Host
        private void closeHost(HostsInfo info)
        {
            socketList.Remove(info.Key);
            var hs = socketList[thisHostKey];
            if (hs is null && socketList.Count > 0)
            {
                foreach (HostsInfo item in socketList.Values)
                {
                    item.IsHost = true;
                    thisHostKey = item.Key;
                    SendMsgAsync(new { type = "init", isHost = true, key = item.Key, HostKey = string.Empty }, item.Socket);
                    break;
                }
            }
        }
    }
}

客户端

客户端A,连接发起方为offer

 //客户端初始化连接时,信令返回 init数据,数据内包含生成的guid作为唯一识别。////////////////////////////////////////////
const defaults = {
    services: {
        iceServers: [
            {
                urls: [
                    "stun:stun1.l.google.com:19302"
                ],
            }
        ],
    }
};

var pc = new RTCPeerConnection(defaults.services);
pc.info = {
    HostKey: null,//远程目标主机key,发起端初始为空,后续客户端B返回后填回
    key: "*************************",//当前主机key,信令生成的guid
    type: e.type,//offer 或 answer
};
//监测ICE变动,更新后发送给客户端B        
pc.onicecandidate = function (e) {
    log("=================" + e.candidate)
    if (e.candidate) {
        //通过socket客户端B交换ICE
        socket.Msg(JSON.stringify({ key: pc.info.key, HostKey: pc.info.HostKey, type: "icecandidate", candidate: e.candidate }));
    } else {
        log('this is the end candidate')
    }
}
pc.oniceconnectionstatechange = function (event) {
    //监听链接状态
    // log('ICE state change event: ', event);
    switch (pc.iceConnectionState) {
        case 'connected':
            break;
        case 'completed':
            break;
        case 'disconnected':
            break;
        case 'closed':
            break;
    }
}
//发起端开启数据通道,通道支持数据类型(DOMString, Blob, ArrayBuffer,ArrayBufferView),自动判断,无需手动设置
var channel = pc.createDataChannel('chat');
channel.onopen = function (e) {
    //开启成功后,发送数据
    channel.send(JSON.stringify({ type: "msg", msg: 'Hi you B!I am A;' }));
}
channel.onclose = function (e) {
    //通道关闭时,执行操作
}
channel.onmessage = function (e) {
    //接收数据,根据type判断执行事件
    let obj = JSON.parse(e.data);
    switch (obj.type) {
        case "icecandidate":
            //添加远端提供的ICE
            if (pc.canTrickleIceCandidates) {
                pc.addIceCandidate(obj.candidate)
                    .then(() => {
                        log('Successed to add ice candidate');
                    })
                    .catch((err) => {

                    });
            }
            break;
        case "msg":
            //打印收到的消息
            consol.log(obj.msg)
            break;
    }
}
//前面都为初始化部分,此部分创建offer发送到客户端B
pc.createOffer().then((offer) => {
    pc.setLocalDescription(offer).then(() => {
        log('createOffer:success');
        socket.Msg(JSON.stringify({ HostKey: pc.info.HostKey, key: pc.info.key, type: offer.type, sdp: offer.sdp }));
    }).catch((err) => {
        log('setLocalDescription:faild')
    });
}).catch((err) => {
    log('createOffer:faild')
})

客户端B,应答方为answer

//客户端初始化连接时,信令返回 init数据,数据内包含生成的guid作为唯一识别。//客户端A有连接的情况下,客户端B连接,init返回数据包含客户端A的Guid标识
////////////////////////////////////////////
var pc = new RTCPeerConnection(defaults.services);
pc.info = {
    HostKey: "*******************",//远程目标主机key,init返回
    key: "*******************",//当前主机key,init返回
    type: e.type,//offer 或 answer
};
//监测ICE变动,更新后发送给客户端A       
pc.onicecandidate = function (e) {
    log("=================" + e.candidate)
    if (e.candidate) {
        //通过socket发送到客户端A交换ICE
        socket.Msg(JSON.stringify({ key: pc.info.key, HostKey: pc.info.HostKey, type: "icecandidate", candidate: e.candidate }));
    } else {
        log('this is the end candidate')
    }
}
pc.oniceconnectionstatechange = function (event) {
    //监听链接状态
    // log('ICE state change event: ', event);
    switch (pc.iceConnectionState) {
        case 'connected':
            break;
        case 'completed':
            break;
        case 'disconnected':
            break;
        case 'closed':
            break;
    }
}
//数据接收方绑定事件
pc.ondatachannel = function (e) {
    channel.onopen = function (e) {
        //开启成功后,发送数据
        channel.send(JSON.stringify({ type: "msg", msg: 'Hi you A!I am B;' }));
    }
    channel.onclose = function (e) {
        //通道关闭时,执行操作
    }
    channel.onmessage = function (e) {
        //接收数据,根据type判断执行事件
        let obj = JSON.parse(e.data);
        switch (obj.type) {
            case "icecandidate":
                //添加远端提供的ICE
                if (pc.canTrickleIceCandidates) {
                    pc.addIceCandidate(obj.candidate)
                        .then(() => {
                            log('Successed to add ice candidate');
                        })
                        .catch((err) => {
    
                        });
                }
                break;
            case "msg":
                //打印收到的消息
                consol.log(obj.msg)
                break;
        }
    }
}
//此处offer是客户端A生成通过socket发送过来的
pc.setRemoteDescription(offer).then(function () {
    //把本地远端sdp设置为客服端A生成的offer后,生成answer发送回A
    pc.createAnswer().then(function (answer) {
        pc.setLocalDescription(answer)//客户端B生成的answer也需要手动设置
        socket.Msg(JSON.stringify({ key: pc.info.key, HostKey: pc.info.HostKey, type: answer.type, sdp: answer.sdp }));
    }).catch(function (e: any) {
        console.log('fromRemoteOffer:createAnswer---Error:', e)
    });
}).catch(function (e: any) {
    console.log('fromRemoteOffer:setRemoteDescription---Error:', e)
});


客户端B返回的answer

//客户端A收到answer后调用
pc.setRemoteDescription(answer).then(function () {
                log("fromRemoteAnswer:success")
            }).catch((err: any) => {
                log(err);
            });


本站已支持webrtc部分文件共享传输,具体还需要修改。

参考资料

chrme浏览器webrtc调试地址:chrome://webrtc-internals/

webrtc实例:https://webrtc.github.io/samples/

webrtc媒体传输:https://www.jianshu.com/p/215ac2022650

^