世界新动态:什么是RPC?聊聊node中怎么实现 RPC 通信

时间:2022-11-03 20:51:42       来源:转载


【资料图】

【相关教程推荐:nodejs视频教程】

什么是RPC?

RPC:Remote Procedure Call(远程过程调用)是指远程过程调用,也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数/方法,由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。

RPC vs HTTP

相同点

都是两台计算机之间的网络通信。ajax是浏览器和服务器之间的通行,RPC是服务器与服务器之间的通行需要双方约定一个数据格式

不同点

寻址服务器不同

ajax是使用 DNS作为寻址服务获取域名所对应的ip地址,浏览器拿到ip地址之后发送请求获取数据。

RPC一般是在内网里面相互请求,所以它一般不用DNS做寻址服务。因为在内网,所以可以使用规定的id或者一个虚拟vip,比如v5:8001,然后到寻址服务器获取v5所对应的ip地址。

应用层协议不同

ajax使用http协议,它是一个文本协议,我们交互数据的时候文件格式要么是html,要么是json对象,使用json的时候就是key-value的形式。

RPC采用二进制协议。采用二进制传输,它传输的包是这样子的[0001 0001 0111 0110 0010],里面都是二进制,一般采用那几位表示一个字段,比如前6位是一个字段,依次类推。

这样就不需要http传输json对象里面的key,所以有更小的数据体积。

因为传输的是二进制,更适合于计算机来理解,文本协议更适合人类理解,所以计算机去解读各个字段的耗时是比文本协议少很多的。

TCP通讯方式

单工通信:只能客户端给服务端发消息,或者只能服务端给客户端发消息

半双工通信:在某个时间段内只能客户端给服务端发消息,过了这个时间段服务端可以给客户端发消息。如果把时间分成很多时间片,在一个时间片内就属于单工通信

全双工通信:客户端和服务端能相互通信

选择这三种通信方式的哪一种主要考虑的因素是:实现难度和成本。全双工通信是要比半双工通信的成本要高的,在某些场景下还是可以考虑使用半双工通信。

ajax是一种半双工通信。http是文本协议,但是它底层是tcp协议,http文本在tcp这一层会经历从二进制数据流到文本的转换过程。

理解RPC只是在更深入地理解前端技术。

buffer编解码二进制数据包

创建buffer

buffer.from: 从已有的数据创建二进制

const buffer1 = Buffer.from("geekbang")const buffer2 = Buffer.from([0, 1, 2, 3, 4])
登录后复制

buffer.alloc: 创建一个空的二进制

const buffer3 = Buffer.alloc(20)
登录后复制

往buffer里面写东西

buffer.write(string, offset): 写入字符串buffer.writeInt8(value, offset): int8表示二进制8位(8位表示一个字节)所能表示的整数,offset开始写入之前要跳过的字节数。buffer.writeInt16BE(value, offset): int16(两个字节数),表示16个二进制位所能表示的整数,即32767。超过这个数程序会报错。
const buffer = Buffer.from([1, 2, 3, 4]) // // 往第二个字节里面写入12buffer.writeInt8(12, 1) // 
登录后复制

大端BE与小端LE:主要是对于2个以上字节的数据排列方式不同(writeInt8因为只有一个字节,所以没有大端和小端),大端的话就是低位地址放高位,小端就是低位地址放低位。如下:

const buffer = Buffer.from([1, 2, 3, 4])buffer.writeInt16BE(512, 2) // buffer.writeInt16LE(512, 2) // 
登录后复制

RPC传输的二进制如何表示传递的字段

PC传输的二进制是如何表示字段的呢?现在有个二进制包[00, 00, 00, 00, 00, 00, 00],我们假定前三个字节表示一个字段值,后面两个表示一个字段的值,最后两个也表示一个字段的值。那写法如下:

writeInt16BE(value, 0)writeInt16BE(value, 2)writeInt16BE(value, 4)
登录后复制

发现像这样写,不仅要知道写入的值,还要知道值的数据类型,这样就很麻烦。不如json格式那么方便。针对这种情况业界也有解决方案。npm有个库protocol-buffers,把我们写的参数转化为buffer

// test.proto 定义的协议文件message Column {  required float num  = 1;  required string payload = 2;}// index.jsconst fs = require("fs")var protobuf = require("protocol-buffers")var messages = protobuf(fs.readFileSync("test.proto"))var buf = messages.Column.encode({num: 42,payload: "hello world"})console.log(buf)// var obj = messages.Column.decode(buf)console.log(obj)// { num: 42, payload: "hello world" }
登录后复制

net建立RPC通道

半双工通信

服务端代码:

const net = require("net")const LESSON_DATA = {  136797: "01 | 课程介绍",  136798: "02 | 内容综述",  136799: "03 | Node.js是什么?",  136800: "04 | Node.js可以用来做什么?",  136801: "05 | 课程实战项目介绍",  136803: "06 | 什么是技术预研?",  136804: "07 | Node.js开发环境安装",  136806: "08 | 第一个Node.js程序:石头剪刀布游戏",  136807: "09 | 模块:CommonJS规范",  136808: "10 | 模块:使用模块规范改造石头剪刀布游戏",  136809: "11 | 模块:npm",  141994: "12 | 模块:Node.js内置模块",  143517: "13 | 异步:非阻塞I/O",  143557: "14 | 异步:异步编程之callback",  143564: "15 | 异步:事件循环",  143644: "16 | 异步:异步编程之Promise",  146470: "17 | 异步:异步编程之async/await",  146569: "18 | HTTP:什么是HTTP服务器?",  146582: "19 | HTTP:简单实现一个HTTP服务器"}const server = net.createServer(socket => {  // 监听客户端发送的消息  socket.on("data", buffer => {    const lessonId = buffer.readInt32BE()    setTimeout(() => {      // 往客户端发送消息      socket.write(LESSON_DATA[lessonId])    }, 1000)  })})server.listen(4000)
登录后复制

客户端代码:

const net = require("net")const socket = new net.Socket({})const LESSON_IDS = [  "136797",  "136798",  "136799",  "136800",  "136801",  "136803",  "136804",  "136806",  "136807",  "136808",  "136809",  "141994",  "143517",  "143557",  "143564",  "143644",  "146470",  "146569",  "146582"]socket.connect({  host: "127.0.0.1",  port: 4000})let buffer = Buffer.alloc(4)buffer.writeInt32BE(LESSON_IDS[Math.floor(Math.random() * LESSON_IDS.length)])// 往服务端发送消息socket.write(buffer)// 监听从服务端传回的消息socket.on("data", buffer => {  console.log(buffer.toString())  // 获取到数据之后再次发送消息  buffer = Buffer.alloc(4)  buffer.writeInt32BE(LESSON_IDS[Math.floor(Math.random() * LESSON_IDS.length)])  socket.write(buffer)})
登录后复制

以上半双工通信步骤如下:

客户端发送消息 socket.write(buffer)服务端接受消息后往客户端发送消息 socket.write(buffer)客户端接受消息后再次发送消息

这样在一个时间端之内,只有一个端往另一个端发送消息,这样就实现了半双工通信。那如何实现全双工通信呢,也就是在客户端往服务端发送消息的同时,服务端还没有消息返回给客户端之前,客户端又发送了一个消息给服务端。

全双工通信

先来看一个场景:

客户端发送了一个id1的请求,但是服务端还来不及返回,接着客户端又发送了一个id2的请求。

等了一个之后,服务端先把id2的结果返回了,然后再把id1的结果返回。

那如何结果匹配到对应的请求上呢?

如果按照时间顺序,那么id1的请求对应了id2的结果,因为id2是先返回的;id2的请求对应了id1的结果,这样就导致请求包和返回包错位的情况。

怎么办呢?

我们可以给请求包和返回包都带上序号,这样就能对应上。

错位处理

客户端代码:

socket.on("data", buffer => {  // 包序号  const seqBuffer = buffer.slice(0, 2)  // 服务端返回的内容  const titleBuffer = buffer.slice(2)      console.log(seqBuffer.readInt16BE(), titleBuffer.toString())})// 包序号let seq = 0function encode(index) {  // 请求包的长度现在是6 = 2(包序号) + 4(课程id)  buffer = Buffer.alloc(6)  buffer.writeInt16BE(seq)  buffer.writeInt32BE(LESSON_IDS[index], 2)  seq++  return buffer}// 每50ms发送一次请求setInterval(() => {  id = Math.floor(Math.random() * LESSON_IDS.length)  socket.write(encode(id))}, 50)
登录后复制

服务端代码:

const server = net.createServer(socket => {  socket.on("data", buffer => {    // 把包序号取出    const seqBuffer = buffer.slice(0, 2)    // 从第2个字节开始读取    const lessonId = buffer.readInt32BE(2)    setTimeout(() => {      const buffer = Buffer.concat([        seqBuffer,        Buffer.from(LESSON_DATA[lessonId])      ])      socket.write(buffer)      // 这里返回时间采用随机的,这样就不会按顺序返回,就可以测试错位的情况    }, 10 + Math.random() * 1000)  })})
登录后复制
客户端把包序号和对应的id给服务端服务端取出包序号和对应的id,然后把包序号和id对应的内容返回给客户端,同时设置返回的时间是随机的,这样就不会按照顺序返回。

粘包处理

如果我们这样发送请求:

for (let i = 0; i < 100; i++) {  id = Math.floor(Math.random() * LESSON_IDS.length)  socket.write(encode(id))}
登录后复制

我们发现服务端接收到的信息如下:

登录后复制

这是因为TCP自己做的一个优化,它会把所有的请求包拼接在一起,这样就会产生粘包的现象。

服务端需要把包进行拆分,拆分成100个小包。

那如何拆分呢?

首先客户端发送的数据包包括两部分:定长的包头和不定长的包体

包头又分为两部分:包序号及包体的长度。只有知道包体的长度,才能知道从哪里进行分割。

let seq = 0function encode(data) {    // 正常情况下,这里应该是使用 protocol-buffers 来encode一段代表业务数据的数据包    // 为了不要混淆重点,这个例子比较简单,就直接把课程id转buffer发送    const body = Buffer.alloc(4);    body.writeInt32BE(LESSON_IDS[data.id]);    // 一般来说,一个rpc调用的数据包会分为定长的包头和不定长的包体两部分    // 包头的作用就是用来记载包的序号和包的长度,以实现全双工通信    const header = Buffer.alloc(6); // 包序号占2个字节,包体长度占4个字节,共6个字节    header.writeInt16BE(seq)    header.writeInt32BE(body.length, 2);    // 包头和包体拼起来发送    const buffer = Buffer.concat([header, body])    console.log(`包${seq}传输的课程id为${LESSON_IDS[data.id]}`);    seq++;    return buffer;}// 并发for (let i = 0; i < 100; i++) {    id = Math.floor(Math.random() * LESSON_IDS.length)    socket.write(encode({ id }))}
登录后复制

服务端进行拆包

const server = net.createServer(socket => {  let oldBuffer = null  socket.on("data", buffer => {    // 把上一次data事件使用残余的buffer接上来    if (oldBuffer) {      buffer = Buffer.concat([oldBuffer, buffer])    }    let packageLength = 0    // 只要还存在可以解成完整包的包长    while ((packageLength = checkComplete(buffer))) {      // 确定包的长度后进行slice分割      const package = buffer.slice(0, packageLength)      // 剩余的包利用循环继续分割      buffer = buffer.slice(packageLength)      // 把这个包解成数据和seq      const result = decode(package)      // 计算得到要返回的结果,并write返回      socket.write(encode(LESSON_DATA[result.data], result.seq))    }    // 把残余的buffer记下来    oldBuffer = buffer  })})
登录后复制

checkComplete函数的作用来确定一个数据包的长度,然后进行分割:

function checkComplete(buffer) {  // 如果包的长度小于6个字节说明只有包头,没有包体,那么直接返回0  if (buffer.length <= 6) {    return 0  }  // 读取包头的第二个字节,取出包体的长度  const bodyLength = buffer.readInt32BE(2)  // 请求包包括包头(6个字节)和包体body  return 6 + bodyLength}
登录后复制

decode对包进行解密:

function decode(buffer) {  // 读取包头  const header = buffer.slice(0, 6)  const seq = header.readInt16BE()      // 读取包体    // 正常情况下,这里应该是使用 protobuf 来decode一段代表业务数据的数据包  // 为了不要混淆重点,这个例子比较简单,就直接读一个Int32即可  const body = buffer.slice(6).readInt32BE()  // 这里把seq和数据返回出去  return {    seq,    data: body  }}
登录后复制

encode把客户端想要的数据转化为二进制返回,这个包同样包括包头和包体,包头又包括包需要包序号和包体的长度。

function encode(data, seq) {  // 正常情况下,这里应该是使用 protobuf 来encode一段代表业务数据的数据包  // 为了不要混淆重点,这个例子比较简单,就直接把课程标题转buffer返回  const body = Buffer.from(data)  // 一般来说,一个rpc调用的数据包会分为定长的包头和不定长的包体两部分  // 包头的作用就是用来记载包的序号和包的长度,以实现全双工通信  const header = Buffer.alloc(6)  header.writeInt16BE(seq)  header.writeInt32BE(body.length, 2)  const buffer = Buffer.concat([header, body])  return buffer}
登录后复制

当客户端收到服务端发送的包之后,同样也要进行拆包,因为所有的包同样都粘在一起了:

登录后复制

因此,客户端也需要拆包,拆包策略与服务端的拆包策略是一致的:

let oldBuffer = nullsocket.on("data", buffer => {  // 把上一次data事件使用残余的buffer接上来  if (oldBuffer) {    buffer = Buffer.concat([oldBuffer, buffer])  }  let completeLength = 0  // 只要还存在可以解成完整包的包长  while ((completeLength = checkComplete(buffer))) {    const package = buffer.slice(0, completeLength)    buffer = buffer.slice(completeLength)    // 把这个包解成数据和seq    const result = decode(package)    console.log(`包${result.seq},返回值是${result.data}`)  }  // 把残余的buffer记下来  oldBuffer = buffer})
登录后复制

到这里就实现了双全工通行,这样客户端和服务端随时都可以往对方发小消息了。

更多node相关知识,请访问:nodejs 教程!

以上就是什么是RPC?聊聊node中怎么实现 RPC 通信的详细内容,更多请关注php中文网其它相关文章!

关键词: 发送消息 比较简单 这里应该是