一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - C/C++ - C和Java沒那么香了,Serverless時代Rust即將稱王?

C和Java沒那么香了,Serverless時代Rust即將稱王?

2021-11-17 14:54beyondma C/C++

Serverless Computing,即”無服務器計算”,其實這一概念在剛剛提出的時候并沒有獲得太多的關注,直到2014年AWS Lambda這一里程碑式的產品出現。Serverless算是正式走進了云計算的舞臺

高并發模式初探

在這個高并發時代最重要的設計模式無疑是生產者、消費者模式,比如著名的消息隊列kafka其實就是一個生產者消費者模式的典型實現。其實生產者消費者問題,也就是有限緩沖問題,可以用以下場景進行簡要描述,生產者生成一定量的產品放到庫房,并不斷重復此過程;與此同時,消費者也在緩沖區消耗這些數據,但由于庫房大小有限,所以生產者和消費者之間步調協調,生產者不會在庫房滿的情況放入端口,消費者也不會在庫房空時消耗數據。詳見下圖:

C和Java沒那么香了,Serverless時代Rust即將稱王?

而如果在生產者與消費者之間完美協調并保持高效,這就是高并發要解決的本質問題。

C語言的高并發案例

筆者在前文曾經介紹過TDEngine的相關代碼,其中Sheduler模塊的相關調度算法就使用了生產、消費者模式進行消息傳遞功能的實現,也就是有多個生產者(producer)生成并不斷向隊列中傳遞消息,也有多個消費者(consumer)不斷從隊列中取消息。

后面我們也會說明類型功能在Go、Java等高級語言中類似的功能已經被封裝好了,但是在C語言中你就必須要用好互斥體( mutex)和信號量(semaphore)并協調他們之間的關系。由于C語言的實現是最復雜的,先來看結構體設計和他的注釋:

?
1
2
3
4
5
6
7
8
9
10
11
12
typedef struct {
  char            label[16];//消息內容
  sem_t           emptySem;//此信號量代表隊列的可寫狀態
  sem_t           fullSem;//此信號量代表隊列的可讀狀態
  pthread_mutex_t queueMutex;//此互斥體為保證消息不會被誤修改,保證線程程安全
  int             fullSlot;//隊尾位置
  int             emptySlot;//隊頭位置
  int             queueSize;#隊列長度
  int             numOfThreads;//同時操作的線程數量
  pthread_t *     qthread;//線程指針
  SSchedMsg *     queue;//隊列指針
} SSchedQueue;

再來看Shceduler初始化函數,這里需要特別說明的是,兩個信號量的創建,其中emptySem是隊列的可寫狀態,初始化時其值為queueSize,即初始時隊列可寫,可接受消息長度為隊列長度,fullSem是隊列的可讀狀態,初始化時其值為0,即初始時隊列不可讀。具體代碼及我的注釋如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
void *taosInitScheduler(int queueSize, int numOfThreads, char *label) {
  pthread_attr_t attr;
  SSchedQueue *  pSched = (SSchedQueue *)malloc(sizeof(SSchedQueue));
  memset(pSched, 0, sizeof(SSchedQueue));
  pSched->queueSize = queueSize;
  pSched->numOfThreads = numOfThreads;
  strcpy(pSched->label, label);
  if (pthread_mutex_init(&pSched->queueMutex, NULL) < 0) {
    pError("init %s:queueMutex failed, reason:%s", pSched->label, strerror(errno));
    goto _error;
  }
   //emptySem是隊列的可寫狀態,初始化時其值為queueSize,即初始時隊列可寫,可接受消息長度為隊列長度。
  if (sem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) {
    pError("init %s:empty semaphore failed, reason:%s", pSched->label, strerror(errno));
    goto _error;
  }
 //fullSem是隊列的可讀狀態,初始化時其值為0,即初始時隊列不可讀
  if (sem_init(&pSched->fullSem, 0, 0) != 0) {
    pError("init %s:full semaphore failed, reason:%s", pSched->label, strerror(errno));
    goto _error;
  }
  if ((pSched->queue = (SSchedMsg *)malloc((size_t)pSched->queueSize * sizeof(SSchedMsg))) == NULL) {
    pError("%s: no enough memory for queue, reason:%s", pSched->label, strerror(errno));
    goto _error;
  }
  memset(pSched->queue, 0, (size_t)pSched->queueSize * sizeof(SSchedMsg));
  pSched->fullSlot = 0;//實始化時隊列為空,故隊頭和隊尾的位置都是0
  pSched->emptySlot = 0;//實始化時隊列為空,故隊頭和隊尾的位置都是0
  pSched->qthread = malloc(sizeof(pthread_t) * (size_t)pSched->numOfThreads);
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
  for (int i = 0; i < pSched->numOfThreads; ++i) {
    if (pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched) != 0) {
      pError("%s: failed to create rpc thread, reason:%s", pSched->label, strerror(errno));
      goto _error;
    }
  }
  pTrace("%s scheduler is initialized, numOfThreads:%d", pSched->label, pSched->numOfThreads);
  return (void *)pSched;
_error:
  taosCleanUpScheduler(pSched);
  return NULL;
}

再來看讀消息的taosProcessSchedQueue函數這其實是消費者一方的實現,這個函數的主要邏輯是

1.使用無限循環,只要隊列可讀即sem_wait(&pSched->fullSem)不再阻塞就繼續向下處理

2.在操作msg前,加入互斥體防止msg被誤用。

3.讀操作完畢后修改fullSlot的值,注意這為避免fullSlot溢出,需要對于queueSize取余。同時退出互斥體。

4.對emptySem進行post操作,即把emptySem的值加1,如emptySem原值為5,讀取一個消息后,emptySem的值為6,即可寫狀態,且能接受的消息數量為6

具體代碼及注釋如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void *taosProcessSchedQueue(void *param) {
  SSchedMsg    msg;
  SSchedQueue *pSched = (SSchedQueue *)param;
 //注意這里是個無限循環,只要隊列可讀即sem_wait(&pSched->fullSem)不再阻塞就繼續處理
  while (1) {
    if (sem_wait(&pSched->fullSem) != 0) {
      pError("wait %s fullSem failed, errno:%d, reason:%s", pSched->label, errno, strerror(errno));
      if (errno == EINTR) {
        /* sem_wait is interrupted by interrupt, ignore and continue */
        continue;
      }
    }
     //加入互斥體防止msg被誤用。
    if (pthread_mutex_lock(&pSched->queueMutex) != 0)
      pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
    msg = pSched->queue[pSched->fullSlot];
    memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg));
    //讀取完畢修改fullSlot的值,注意這為避免fullSlot溢出,需要對于queueSize取余。
    pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize;
     //讀取完畢修改退出互斥體
    if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
      pError("unlock %s queueMutex failed, reason:%s\n", pSched->label, strerror(errno));
     //讀取完畢對emptySem進行post操作,即把emptySem的值加1,如emptySem原值為5,讀取一個消息后,emptySem的值為6,即可寫狀態,且能接受的消息數量為6
    if (sem_post(&pSched->emptySem) != 0)
      pError("post %s emptySem failed, reason:%s\n", pSched->label, strerror(errno));
    if (msg.fp)
      (*(msg.fp))(&msg);
    else if (msg.tfp)
      (*(msg.tfp))(msg.ahandle, msg.thandle);
  }
}

最后寫消息的taosScheduleTask函數也就是生產的實現,其基本邏輯是

1.寫隊列前先對emptySem進行減1操作,如emptySem原值為1,那么減1后為0,也就是隊列已滿,必須在讀取消息后,即emptySem進行post操作后,隊列才能進行可寫狀態。

2.加入互斥體防止msg被誤操作,寫入完成后退出互斥體

3.寫隊列完成后對fullSem進行加1操作,如fullSem原值為0,那么加1后為1,也就是隊列可讀,咱們上面介紹的讀取taosProcessSchedQueue中sem_wait(&pSched->fullSem)不再阻塞就繼續向下。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) {
  SSchedQueue *pSched = (SSchedQueue *)qhandle;
  if (pSched == NULL) {
    pError("sched is not ready, msg:%p is dropped", pMsg);
    return 0;
  }
  //在寫隊列前先對emptySem進行減1操作,如emptySem原值為1,那么減1后為0,也就是隊列已滿,必須在讀取消息后,即emptySem進行post操作后,隊列才能進行可寫狀態。
  if (sem_wait(&pSched->emptySem) != 0) pError("wait %s emptySem failed, reason:%s", pSched->label, strerror(errno));
//加入互斥體防止msg被誤操作
  if (pthread_mutex_lock(&pSched->queueMutex) != 0)
    pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
  pSched->queue[pSched->emptySlot] = *pMsg;
  pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize;
  if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
    pError("unlock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
  //在寫隊列前先對fullSem進行加1操作,如fullSem原值為0,那么加1后為1,也就是隊列可讀,咱們上面介紹的讀取函數可以進行處理。
  if (sem_post(&pSched->fullSem) != 0) pError("post %s fullSem failed, reason:%s", pSched->label, strerror(errno));
  return 0;
}

Java的高并發實現

從并發模型來看,Go和Rust都有channel這個概念,也都是通過Channel來實現線(協)程間的同步,由于channel帶有讀寫狀態且保證數據順序,而且channel的封裝程度和效率明顯可以做的更高,因此Go和Rust官方都會建議使用channel(通信)來共享內存,而不是使用共享內存來通信。

為了讓幫助大家找到區別,我們先以Java為例來,看一下沒有channel的高級語言Java,生產者消費者該如何實現,代碼及注釋如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class Storage {
 
    // 倉庫最大存儲量
    private final int MAX_SIZE = 10;
    // 倉庫存儲的載體
    private LinkedList<Object> list = new LinkedList<Object>();
    // 鎖
    private final Lock lock = new ReentrantLock();
    // 倉庫滿的信號量
    private final Condition full = lock.newCondition();
    // 倉庫空的信號量
    private final Condition empty = lock.newCondition();
 
    public void produce()
    {
        // 獲得鎖
        lock.lock();
        while (list.size() + 1 > MAX_SIZE) {
            System.out.println("【生產者" + Thread.currentThread().getName()
                     + "】倉庫已滿");
            try {
                full.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        list.add(new Object());
        System.out.println("【生產者" + Thread.currentThread().getName()
                 + "】生產一個產品,現庫存" + list.size());
 
        empty.signalAll();
        lock.unlock();
    }
 
    public void consume()
    {
        // 獲得鎖
        lock.lock();
        while (list.size() == 0) {
            System.out.println("【消費者" + Thread.currentThread().getName()
                     + "】倉庫為空");
            try {
                empty.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        list.remove();
        System.out.println("【消費者" + Thread.currentThread().getName()
                 + "】消費一個產品,現庫存" + list.size());
 
        full.signalAll();
        lock.unlock();
    }
}

在Java、C#這種面向對象,但是沒有channel語言中,生產者、消費者模式至少要借助一個lock和兩個信號量共同完成。其中鎖的作用是保證同是時間,倉庫中只有一個用戶進行數據的修改,而還需要表示倉庫滿的信號量,一旦達到倉庫滿的情況則將此信號量置為阻塞狀態,從而阻止其它生產者再向倉庫運商品了,反之倉庫空的信號量也是一樣,一旦倉庫空了,也要阻其它消費者再前來消費了。

Go的高并發實現

我們剛剛也介紹過了Go語言中官方推薦使用channel來實現協程間通信,所以不需要再添加lock和信號量就能實現模式了,以下代碼中我們通過子goroutine完成了生產者的功能,在在另一個子goroutine中實現了消費者的功能,注意要阻塞主goroutine以確保子goroutine能夠執行,從而輕而易舉的就這完成了生產者消費者模式。下面我們就通過具體實踐中來看一下生產者消費者模型的實現。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main
 
import (
    "fmt"
    "time"
)
 
func Product(ch chan<- int) { //生產者
    for i := 0; i < 3; i++ {
        fmt.Println("Product  produceed", i)
        ch <- i //由于channel是goroutine安全的,所以此處沒有必要必須加鎖或者加lock操作.
    }
}
func Consumer(ch <-chan int) {
    for i := 0; i < 3; i++ {
        j := <-ch //由于channel是goroutine安全的,所以此處沒有必要必須加鎖或者加lock操作.
        fmt.Println("Consmuer consumed ", j)
    }
}
func main() {
    ch := make(chan int)
    go Product(ch)//注意生產者與消費者放在不同goroutine中
    go Consumer(ch)//注意生產者與消費者放在不同goroutine中
    time.Sleep(time.Second * 1)//防止主goroutine退出
    /*運行結果并不確定,可能為
        Product  produceed 0
    Product  produceed 1
    Consmuer consumed  0
    Consmuer consumed  1
    Product  produceed 2
    Consmuer consumed  2
    */
 
}

可以看到和Java比起來使用GO來實現并發式的生產者消費者模式的確是更為清爽了。

Rust的高并發實現

不得不說Rust的難度實在太高了,雖然筆者之前在匯編、C、Java等方面的經驗可以幫助我快速掌握Go語言。但是假期看了兩天Rust真想大呼告辭,這尼瑪也太勸退了。在Rust官方提供的功能中,其實并不包括多生產者、多消費者的channel,std:sync空間下只有一個多生產者單消費者(mpsc)的channel。其樣例實現如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
    let (tx, rx) = mpsc::channel();
    let tx1 = mpsc::Sender::clone(&tx);
    let tx2 = mpsc::Sender::clone(&tx);
    thread::spawn(move || {
        let vals = vec![
            String::from("1"),
            String::from("3"),
            String::from("5"),
            String::from("7"),
        ];
        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    thread::spawn(move || {
        let vals = vec![
            String::from("11"),
            String::from("13"),
            String::from("15"),
            String::from("17"),
        ];
        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    thread::spawn(move || {
        let vals = vec![
            String::from("21"),
            String::from("23"),
            String::from("25"),
            String::from("27"),
        ];
        for val in vals {
            tx2.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    for rec in rx {
        println!("Got: {}", rec);
    }
}

可以看到在Rust下實現生產者消費者是不難的,但是生產者可以clone多個,不過消費者卻只能有一個,究其原因是因為Rust下沒有GC也就是垃圾回收功能,而想保證安全Rust就必須要對于變更使用權限進行嚴格管理。在Rust下使用move關鍵字進行變更的所有權轉移,但是按照Rust對于變更生產周期的管理規定,線程間權限轉移的所有權接收者在同一時間只能有一個,這也是Rust官方只提供MPSC的原因,

?
1
2
3
4
5
6
7
8
use std::thread;
fn main() {
    let s = "hello";
    let handle = thread::spawn(move || {
        println!("{}", s);
    });
    handle.join().unwrap();
}

當然Rust下有一個API比較貼心就是join,他可以所有子線程都執行結束再退出主線程,這比Go中要手工阻塞還是要有一定的提高。而如果你想用多生產者、多消費者的功能,就要入手crossbeam模塊了,這個模塊掌握起來難度也真的不低。

總結

通過上面的比較我們可以用一張表格來說明幾種主流語言的情況對比:

語言 安全性 運行速度 進程啟動速度 學習難度
C 極快 極快 困難
Java 一般 一般 一般
Go 較快 較快 一般
Rust 極快(基本比肩C) 極快(基本比肩C) 極困難

可以看到Rust以其高安全性、基本比肩C的運行及啟動速度必將在Serverless的時代獨占鰲頭,Go基本也能緊隨其后,而C語言程序中難以避免的野指針,Java相對較低的運行及啟動速度,可能都不太適用于函數式運算的場景,Java在企業級開發的時代打敗各種C#之類的對手,但是在云時代好像還真沒有之前統治力那么強了,真可謂是打敗你的往往不是你的對手,而是其它空間的降維打擊。

這篇文章的內容就到這了,希望能給你帶來幫助,也希望您可以多多關注服務器之家的更多內容!

原文鏈接:https://blog.csdn.net/BEYONDMA/article/details/117868448

延伸 · 閱讀

精彩推薦
  • C/C++學習C++編程的必備軟件

    學習C++編程的必備軟件

    本文給大家分享的是作者在學習使用C++進行編程的時候所用到的一些常用的軟件,這里推薦給大家...

    謝恩銘10102021-05-08
  • C/C++C語言中炫酷的文件操作實例詳解

    C語言中炫酷的文件操作實例詳解

    內存中的數據都是暫時的,當程序結束時,它們都將丟失,為了永久性的保存大量的數據,C語言提供了對文件的操作,這篇文章主要給大家介紹了關于C語言中文件...

    針眼_6702022-01-24
  • C/C++C/C++經典實例之模擬計算器示例代碼

    C/C++經典實例之模擬計算器示例代碼

    最近在看到的一個需求,本以為比較簡單,但花了不少時間,所以下面這篇文章主要給大家介紹了關于C/C++經典實例之模擬計算器的相關資料,文中通過示...

    jia150610152021-06-07
  • C/C++詳解c語言中的 strcpy和strncpy字符串函數使用

    詳解c語言中的 strcpy和strncpy字符串函數使用

    strcpy 和strcnpy函數是字符串復制函數。接下來通過本文給大家介紹c語言中的strcpy和strncpy字符串函數使用,感興趣的朋友跟隨小編要求看看吧...

    spring-go5642021-07-02
  • C/C++C++之重載 重定義與重寫用法詳解

    C++之重載 重定義與重寫用法詳解

    這篇文章主要介紹了C++之重載 重定義與重寫用法詳解,本篇文章通過簡要的案例,講解了該項技術的了解與使用,以下就是詳細內容,需要的朋友可以參考下...

    青山的青6062022-01-04
  • C/C++c++ 單線程實現同時監聽多個端口

    c++ 單線程實現同時監聽多個端口

    這篇文章主要介紹了c++ 單線程實現同時監聽多個端口的方法,幫助大家更好的理解和學習使用c++,感興趣的朋友可以了解下...

    源之緣11542021-10-27
  • C/C++深入理解goto語句的替代實現方式分析

    深入理解goto語句的替代實現方式分析

    本篇文章是對goto語句的替代實現方式進行了詳細的分析介紹,需要的朋友參考下...

    C語言教程網7342020-12-03
  • C/C++C語言實現電腦關機程序

    C語言實現電腦關機程序

    這篇文章主要為大家詳細介紹了C語言實現電腦關機程序,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下...

    xiaocaidayong8482021-08-20
主站蜘蛛池模板: 国产亚洲精品一区二区在线播放 | 天天色影视综合网 | 欧美兽皇video | 精品久久久久久午夜 | 五月天婷婷亚洲 | 99久久国产综合精品网成人影院 | 欧美亚洲激情在线 | 日本高清在线播放一区二区三区 | 色婷婷影院在线视频免费播放 | 婷婷sese| 四虎永久在线精品国产馆v视影院 | 高人先生免费观看全集 | 亚洲高清无在码在线电影 | 亚洲麻豆精品果冻传媒 | 亚洲a图 | 国产91精品久久久久久久 | 9久热这里只有精品免费 | xvideoscom极品肌肉警察 | 99re在线视频免费观看 | 99久久免费看国产精品 | 国语自产拍在线播放不卡 | aaa大片 | 国产suv精品| 99精品热视频 | 九色PORNY丨视频入口 | 欧美国产在线观看 | 国产一二三区视频 | 9久爱午夜视频 | 国产成人盗拍精品免费视频 | 亚洲欧美日韩天堂 | 欧美成人影院免费观 | 亚洲国产成人超福利久久精品 | 免费精品国产在线观看 | 波多野结衣一区 | 丝袜捆绑调教视频免费区 | 2019nv天堂香蕉在线观看 | 龟甲情感超市全文阅读 小说 | 欧美精品日韩一区二区三区 | 国产区成人精品视频 | 极品在线 | 色琪琪久久se色 |