Serverless的概念火了,業界已經不再討論要不要用Serverless的問題了,而是高喊Serverless First的口號力求快速擁抱Serverless,無伺服器並不是Serverless的本質,不需要關心伺服器的情況就能高效工作,才是Serverless勝出的核心要義。網際網路時代流量的大起大落,很多科技巨頭在面對流量的衝擊時也都敗下陣來,針對前幾個月B站的崩潰事件,筆者還曾寫過《B站的前端崩了,後端的你別慌》來分析來龍去脈,而Serverless憑藉快速伸縮的自動彈性特點,可以從容應對類似的衝擊,這也讓這種新技術出盡的風頭。

在Serverless的喧囂背後,Rust看似牢牢佔據了C位,但其實在高併發這個話題下要總結的模式與套路其實很多,尤其是像Tokio專業的程式設計框架,對於程式設計師編寫高效能程式的幫助很大。因此本文把之前介紹過的Tokio相關知識點進行一下補充和總結。

Future到底是個什麼概念

簡單來講Future不是一個值,而是一種值型別,一種在未來才能得到的值型別。Future物件必須實現Rust標準庫中的std::future:: future介面。Future的輸出Output是Future完成後才能生成的值。在Rust中Future透過管理器呼叫Future::poll來推動Future的運算。Future本質上是一個狀態機,而且可以巢狀使用,我們來看一下面這個例子,在main函式中,我們例項化MainFuture並呼叫。await,而MainFuture除了在幾個狀態之間遷移以外,還會呼叫一個Delay的Future,從而實現Future的巢狀。

MainFuture以State0狀態做為初始狀態。當排程器呼叫poll方法時,MainFuture會嘗試儘可能地提升其狀態。如果future完成,則返回Poll::Ready,如果MainFuture沒有完成,則是由於它等待的DelayFuture沒有達到Ready狀態,那麼此時返回Pending。排程器收到Pending結果,會將這個MainFuture重新放回待排程的隊列當中,稍後會再度呼叫Poll方法來推進Future的執行。具體如下

use std::future::Future;

use std::pin::Pin;

use std::task::{Context, Poll};

use std::time::{Duration, Instant};

struct Delay {

when: Instant,

}

impl Future for Delay {

type Output = &‘static str;

fn poll(self: Pin<&mut Self>, cx: &mut Context<’_>)

-> Poll<&‘static str>

{

if Instant::now() >= self。when {

println!(“Hello world”);

Poll::Ready(“done”)

} else {

cx。waker()。wake_by_ref();

Poll::Pending

}

}

}

enum MainFuture {

State0,

State1(Delay),

Terminated,

}

impl Future for MainFuture {

type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<’_>)

-> Poll<()>

{

use MainFuture::*;

loop {

match *self {

State0 => {

let when = Instant::now() +

Duration::from_millis(1);

let future = Delay { when };

println!(“init status”);

*self = State1(future);

}

State1(ref mut my_future) => {

match Pin::new(my_future)。poll(cx) {

Poll::Ready(out) => {

assert_eq!(out, “done”);

println!(“delay finished this future is ready”);

*self = Terminated;

return Poll::Ready(());

}

Poll::Pending => {

println!(“not ready”);

return Poll::Pending;

}

}

}

Terminated => {

panic!(“future polled after completion”)

}

}

}

}

}

#[tokio::main]

async fn main() {

let when = Instant::now() + Duration::from_millis(10);

let mainFuture=MainFuture::State0;

mainFuture。await;

}

當然這個Future的實現存在一個明顯的問題,透過執行結果也可以知道偵錯程式明顯在需要等待的情況下還執行了很多次的Poll操作,理想狀態下需要當Future有進展時再執行Poll操作。不斷輪徇的Poll其實就退化成了低效的輪詢。

解決之道在於poll函式中的Context引數,這個Context就是Future的waker(),透過呼叫waker可以向執行器發出訊號,表明這個任務應該進行Poll操作了。當Future的狀態推進時,呼叫wake來通知執行器,才是正解這就需要把Delay部分的程式碼改一下:

let waker = cx。waker()。clone();

let when = self。when;

// Spawn a timer thread。

thread::spawn(move || {

let now = Instant::now();

if now < when {

thread::sleep(when - now);

}

waker。wake();

});

無論是哪種高併發框架,本質上講都是基於這種Task/Poll機制的排程器, poll做的本質工作就是監測鏈條上前續Task的執行狀態。

Rust高併發程式設計總結

用好Poll的機制,就能避免上面出現事件迴圈定期遍歷整個事件佇列的排程演算法,Poll的精髓就是把狀態為ready的事件通知給對應的處理程式,而基於poll設計的如tokio框架進行應用開發時,程式設計師根本不必關心整個訊息傳遞,只需要用and_then、spawn等方法建立任務鏈條並讓系統工作起來就可以了。

資料禎的實現

幀是資料傳輸中的最小單位,幀粒度以下的位元組資料對於應用來說沒有任何意義,同時不完整的幀也應該在幀的處理層進行過濾,read_frame方法在返回之前等待接收到整個幀。對TcpStream::read()的單個呼叫可能返回任意數量的資料。它可以包含整個框架,部分框架,或多個框架。如果接收到部分幀,資料將被緩衝,並從套接字讀取更多資料。如果接收到多個幀,則返回第一個幀,其餘的資料將被緩衝,直到下一次呼叫read_frame。要實現這一點,Connection需要一個讀緩衝區欄位。資料從套接字讀入讀緩衝區。當一個幀被解析時,相應的資料將從緩衝區中刪除。我們將使用BytesMut作為緩衝區型別。

use bytes::BytesMut;use tokio::net::TcpStream;

pub struct Connection {

stream: TcpStream,

buffer: BytesMut,

}

impl Connection {

pub fn new(stream: TcpStream) -> Connection {

Connection {

stream,

// Allocate the buffer with 4kb of capacity。

buffer: BytesMut::with_capacity(4096),

}

}

}

read_frame函式嘗試解析幀。如果有足夠的資料來解析幀,則將幀返回給read_frame()的呼叫者。否則,將嘗試從套接字中讀取更多資料到緩衝區中。讀取更多資料後,再次呼叫parse_frame()。這一次,如果接收到足夠的資料,解析可能會成功。當從流中讀取資料時,返回值為0表示不再從對等端接收資料。如果讀緩衝區中仍然有資料,這表明已經接收到部分幀,連線正在突然終止。這是一個錯誤條件,並返回Err。

use mini_redis::{Frame, Result};

pub async fn read_frame(&mut self)

-> Result>

{

loop {

if let Some(frame) = self。parse_frame()? {

return Ok(Some(frame));

}

// Ensure the buffer has capacity

if self。buffer。len() == self。cursor {

// Grow the buffer

self。buffer。resize(self。cursor * 2, 0);

}

// Read into the buffer, tracking the number

// of bytes read

let n = self。stream。read(

&mut self。buffer[self。cursor。。])。await?;

if 0 == n {

if self。cursor == 0 {

return Ok(None);

} else {

return Err(“connection reset by peer”。into());

}

} else {

// Update our cursor

self。cursor += n;

}

}

}

一定要小心的Select

另外還有一個值得注意的點是select,在使用一個以上的通道時,任何一個通道都可以先完成。選擇select!關鍵字將在所有的通道上等待,並將提到最先返回通道上的值。注意select!當等到第一個返回之後,其它未完成的任務將被取消。具體如下:

use tokio::sync::oneshot;

async fn some_operation() -> String {

“hello beyondma”。to_string()

}

#[tokio::main]

async fn main() {

let (mut tx1, rx1) = oneshot::channel();

let (tx2, rx2) = oneshot::channel();

tokio::spawn(async {

let _ = tx1。send(“hello beyondma”);

});

tokio::spawn(async {

let _ = tx2。send(“hi beyondma”);

});

tokio::select! {

val = rx1 => {

println!(“rx1 completed first with {:?}”, val);

}

val = rx2 => {

println!(“rx2 completed first with {:?}”, val);

}

}

}

以上這段程式碼的執行結果要不是

hello beyondma

要麼是

hi beyondma

不可能出現兩個都被輸出的情況。

為了解釋select的機制,我們自行設計一個MySelect的future,在對MySelect進行poll操作時,將輪詢第一個分支。如果已經準備好,則使用該值並完成MySelect。在MySelect。await接收到一個Ready後,整個future被丟棄。具體如下:

use tokio::sync::oneshot;

use std::future::Future;

use std::pin::Pin;

use std::task::{Context, Poll};

struct MySelect {

rx1: oneshot::Receiver<&‘static str>,

rx2: oneshot::Receiver<&’static str>,

}

impl Future for MySelect {

type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<‘_>) -> Poll<()> {

if let Poll::Ready(val) = Pin::new(&mut self。rx1)。poll(cx) {

println!(“rx1 completed first with {:?}”, val);

return Poll::Ready(());

}

if let Poll::Ready(val) = Pin::new(&mut self。rx2)。poll(cx) {

println!(“rx2 completed first with {:?}”, val);

return Poll::Ready(());

}

Poll::Pending

}

}

#[tokio::main]

async fn main() {

let (tx1, rx1) = oneshot::channel();

let (tx2, rx2) = oneshot::channel();

// use tx1 and tx2

tokio::spawn(async {

let _ = tx1。send(“hello beyondma”);

});

tokio::spawn(async {

let _ = tx2。send(“hi beyondma”);

});

MySelect {

rx1,

rx2,

}。await;

}

Rust是近些年來隨著Serverless一起新興起的語言,表面上看他像是C,既沒有JVM虛擬機器也沒有GC垃圾回收器,但仔細一瞧他還不是C,Rust特別不信任程式設計師,力圖讓Rust編譯器把程式中的錯誤殺死在在生成可執行檔案之前的Build階段。由於沒有GC所以Rust當中獨創了一套變數的生命週期及借調用機制。開發者必須時刻小心變數的生命週期是否存在問題。

而且Rust難的像火星語言,多路通道在使用之前要clone,帶鎖的雜湊表用之前要先unwrap,種種用法和Java、Go完全不同,但是也正在由於這樣嚴格的使用限制,我們剛剛所提到的Go語言中Gorotine會出現的問題,在Rust中都不會出現,因為Go的那些用法,通通不符合Rust變數生命週期的檢查,想編譯透過都是不可能完成的任務。

所以Rust很像逍遙派,想入門非常難,但只要能出師,寫的程式能透過編譯,那你百分百是一位高手,所以這是一門下限很高,上限同樣也很高的極致語言。

目前Rust的高併發程式設計框架最具代表性的就是Tokio,本文開頭Future的例子就是基於Tokio框架編寫的,這裡也不加贅述了。

根據官方的說法每個Rust的Tokio任務只有64位元組大小,這比直接透過folk執行緒去網路請求,效率會提升幾個數量級,在高併發框架的幫助下,開發者完全可以做到極限壓榨硬體的效能。