1. 什麼是管道#
管道是一種用於 進程間通信 (IPC) 的機制,它允許一個進程的輸出直接作為下一個進程的輸入,它是一種通過 內核緩衝區 實現的通信方式。
它的特點:
- 單向通信:標準管道中,數據只能沿一個方向流動(雙向通信可以用命名管道實現)
- 半雙工:數據可以在兩個方向上流動,但不能同時進行
- 臨時性:管道通常用於短期的進程間通信
- 內核緩衝區:數據通過內核緩衝區傳遞,而不是共享內存的方式
2. 使用重定向 IO 的方式實現管道#
我們要實現一個類似 ps -aux | grep top
的功能,在 Golang
中我們可以使用下面的方式。
簡單的來說,就是我們創建一個緩衝區,然後重定向進程的 stdin/stdout
到這個緩衝區,這樣就實現了共享內存的通信方法。
Tip
在這裡,我們緩衝區使用 Golang
bytes
包中的bytes.Buffer
對象,它提供了一個緩衝區,用於高效的構建和操作字節切片,它可以動態增長,適用於構建和處理頻繁拼接和修改的字節數據。
package main
import (
"bytes"
"fmt"
"os/exec"
)
func main() {
cmd1 := exec.Command("ps", "aux")
cmd2 := exec.Command("grep", "xxx")
var outputBuf1 bytes.Buffer
cmd1.Stdout = &outputBuf1
if err := cmd1.Start(); err != nil {
fmt.Printf("Error: The first command can't be start up %s\n", err)
return
}
if err := cmd1.Wait(); err != nil {
fmt.Printf("Error: Couldn't wait for the first command: %s\n", err)
return
}
cmd2.Stdin = &outputBuf1
var outputBuf2 bytes.Buffer
cmd2.Stdout = &outputBuf2
if err := cmd2.Start(); err != nil {
fmt.Printf("Error: The second command can't be start up %s\n", err)
return
}
if err := cmd2.Wait(); err != nil {
fmt.Printf("Error: Couldn't wait for the second command: %s\n", err)
return
}
fmt.Printf("%s\n", outputBuf2.String())
}
3. Golang 中管道的用法#
Golang 中提供了 2 種管道的使用方法,os.Pipe()
和 io.Pipe()
,這是兩種不同的實現方法,前者依賴操作系統的管道機制,後者是使用 Golang
實現的,他們都是匿名管道。
特性 | os.Pipe() | io.Pipe() |
---|---|---|
實現層面 | 操作系統級別管道,使用底層系統調用創建。 | 純 Go 實現的內存管道,不涉及操作系統調用。 |
使用場景 | 適合與外部進程通信或在不同的操作系統線程間通信。 | 主要用於同一 Go 程序的不同 Goroutine 間的數據傳遞。 |
性能 | 對於大量數據傳輸,較高效,利用操作系統的緩衝區。 | 小數據量傳輸較快,但不適合大量數據傳輸。 |
跨平台兼容性 | 行為可能因操作系統不同而異。 | 跨平台行為一致,因為純 Go 實現。 |
文件描述符 | 返回 *os.File 類型,包含底層文件描述符。 | 返回 io.Reader 和 io.Writer 接口,不涉及文件描述符。 |
關閉行為 | 需要手動關閉讀端和寫端的文件描述符。 | 一端關閉時,另一端自動返回 EOF。 |
多路復用 | 支持操作系統級多路復用(如 select 、poll 或 epoll ),適合處理多個 I/O 源。 | 不直接支持操作系統多路復用,但可通過 channel 和 select 實現類似效果。 |
原子操作 | 操作系統保證小於等於 PIPE_BUF (通常為 4096 字節)的寫入操作是原子的。大於此的寫操作可能被分割。 | 所有寫操作原子性由 Go 運行時確保,利用互斥鎖保證並發安全。 |
- os.Pipe() 更適合用於系統級別的任務,比如跨進程通信、標準輸入輸出的重定向等。
- io.Pipe() 更適合 Go 內部的並發編程,用於 Goroutine 間的數據流傳遞,可以與 Go 的並發特性(如
channel
、select
)無縫結合。
3.1 os.Pipe()#
package main
import (
"bytes"
"fmt"
"os"
"sync"
)
func main() {
reader, writer, err := os.Pipe()
var wg sync.WaitGroup
if err != nil {
fmt.Printf("Error creating pipe: %v\n", err)
return
}
wg.Add(1)
// 管道這裡不管是讀還是寫都存在阻塞
go func() {
defer wg.Done()
output := make([]byte, 64)
n, err := reader.Read(output)
if err != nil {
fmt.Printf("Error reading from pipe: %v\n", err)
return
}
fmt.Printf("Read %d bytes\n", n)
}()
var inputs bytes.Buffer
for i := 65; i <= 90; i++ {
inputs.WriteByte(byte(i))
}
n, err := writer.Write(inputs.Bytes())
if err != nil {
fmt.Printf("Error writing to pipe: %v\n", err)
return
}
fmt.Printf("Wrote %d bytes\n", n)
wg.Wait()
}
3.2 io.Pipe()#
io.Pipe()
是一個純內存管道,由 Go 語言在內存中實現。其性能限制主要來源於以下幾個方面:
-
內存緩衝:
io.Pipe()
沒有底層操作系統的支持,而是通過 Go 的緩衝和同步機制在內存中實現。由於沒有直接的內核支持,它需要通過互斥鎖(Mutex)和條件變量(Cond)來實現讀寫同步和阻塞,這在高頻、大量數據傳輸時會成為性能瓶頸。 -
Goroutine 調度開銷:
io.Pipe()
的設計目標是在 Goroutine 間傳遞數據。因此,數據傳遞和阻塞喚醒都在 Go 運行時的 Goroutine 調度中進行。頻繁的數據傳遞或大量 Goroutine 的場景下,這種調度開銷會降低性能。 -
系統級緩衝缺乏:操作系統的內核層面通常會為文件描述符分配緩衝區(例如
os.Pipe()
),而io.Pipe()
缺乏此支持。在傳輸大量數據時,由於沒有內核緩衝,數據需要在內存中反復讀寫,增加了內存分配和垃圾回收的負擔。
io.Pipe()
的使用方法和os.Pipe()
基本一致,例子如下:
package main
import (
"bytes"
"fmt"
"io"
"sync"
)
func main() {
reader, writer := io.Pipe()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
output := make([]byte, 256)
n, err := reader.Read(output)
if err != nil {
fmt.Printf("Error reading from reader: %v\n", err)
return
}
fmt.Printf("Read %d bytes\n", n)
}()
var inputs bytes.Buffer
for i := 65; i <= 90; i++ {
inputs.WriteByte(byte(i))
}
n, err := writer.Write(inputs.Bytes())
if err != nil {
fmt.Printf("Error writing to writer: %v\n", err)
return
}
fmt.Printf("Wrote %d bytes\n", n)
wg.Wait()
}