ポン酢ブログ(β)

The harder you work, the luckier you get. - Gary Player

EnvoyのTraffic tapping (TCP)機能をxDSサーバを実装しながらまとめてみる

最近Envoyを少しだけ触ってみている。Envoyは基本的にIstioのProxyで使われることも多いとは思うし、基本的には何かコントロールプレーンがあって、そのデータプレーンとして使われるイメージを想像している。(多分)

個人的にこのEnvoyをデータプレーンとして使いつつ、そのコントロールプレーンを作れる(?)xDSサーバーに興味があったのでこれに入門しつつ、このxDSサーバーから対象のEnvoyをトラフィックをダンプできる、TCPプロキシとして利用する設定を流し込んでみたのでまとめる。

この記事で取り扱うこと

  • EnvoyのxDSサーバを作成する
  • EnvoyのTraffic tapping機能を利用する
    • EnvoyのTraffic tappingの結果をデコードする(Protocol Buffers -> PCAP編)
    • EnvoyのTraffic tappingの結果をデコードする(Admin編)

Static Config等で利用する場合も、だいたいxDSサーバーから流し込んでいる設定はProtocol Buffersであるので、読み替えればxDSサーバーを用いなくとも流し込むことができる。

EnvoyのxDSサーバを作成する

github.com

上が今回実装したやつ、まだ色々雑な作りではあるので修正します。方針としては、普通にgRPCのサーバを作る形になる。Envoyが公式でGolangの実装とProtocol BuffersのGolangバインドを提供してくれているのでそれを使う。

github.com

基本的にはサンプルをそのまま実装しただけで、まだNodeのIDのハッシュとかも適当のままになっている。 xDSの設定自体はスナップショットとして取っておくが、そのスナップショットをどこに置くかという話になったので、TiKVに置けるようにした。

TiKVはTiDBで使う為のKey-Valueストアだが、TiDBの中にライブラリがあるのでそれを使うとTiKVをKVSとして使うことができる。普段ならこういうとき、etcdなどを使うが、今回はなんとなくで使ってみた。TiDBのソースコードを若干斜め読みしたが、テストもしやすいようになっていたし、 github.com/pingcap/tidb/store/mockstore でテスト用のモックも提供されていたので、テストを書く気にもなったので、普通に便利だと思う。

雰囲気はこんな感じで書ける。

type TiKVRepository struct {
    storage kv.Storage
}

func NewTiKVRepository(address string) (*TiKVRepository, error) {
    driver := tikv.Driver{}
    storage, err := driver.Open(address)
    if err != nil {
        return nil, err
    }
    repository := &TiKVRepository{storage: storage}
    return repository, nil
}

func (t *TiKVRepository) GetEndpoint(ctx context.Context, name string) (*model.Endpoint, error) {
    b, err := t.get(ctx, fmt.Sprintf("endpoint/%s", name))
    if err != nil {
        return nil, err
    }
    endpoint, err := model.UnmarshalEndpoint(b)
    if err != nil {
        return nil, err
    }
    return endpoint, nil
}

雰囲気はこんな感じなので詳しくはTiKVのクライアント部分に書いてあります。

TCPプロキシ部分の設定は、Envoyの公式ドキュメントであるTCP proxyAPI リファレンスを参考に設定を入れていく。

ClusterやListenerについては割と普通に設定するだけなので、以下に設定を作成する関数をまるっと載せておきます。

// Cluster を作成
func makeCluster(clusterName, upstream string, port uint32) *cluster.Cluster {
    return &cluster.Cluster{
        Name:                 clusterName,
        ConnectTimeout:       ptypes.DurationProto(5 * time.Second),
        ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_LOGICAL_DNS},
        LbPolicy:             cluster.Cluster_ROUND_ROBIN,
        LoadAssignment:       makeEndpoint(clusterName, upstream, port),
        DnsLookupFamily:      cluster.Cluster_V4_ONLY,
    }
}

// upstreamを指定する
func makeEndpoint(clusterName string, upstream string, port uint32) *endpoint.ClusterLoadAssignment {
    return &endpoint.ClusterLoadAssignment{
        ClusterName: clusterName,
        Endpoints: []*endpoint.LocalityLbEndpoints{{
            LbEndpoints: []*endpoint.LbEndpoint{{
                HostIdentifier: &endpoint.LbEndpoint_Endpoint{
                    Endpoint: &endpoint.Endpoint{
                        Address: &core.Address{
                            Address: &core.Address_SocketAddress{
                                SocketAddress: &core.SocketAddress{
                                    Protocol: core.SocketAddress_TCP,
                                    Address:  upstream,
                                    PortSpecifier: &core.SocketAddress_PortValue{
                                        PortValue: port,
                                    },
                                },
                            },
                        },
                    },
                },
            }},
        }},
    }
}

func makeTCPListener(listenerName, clusterName string, port uint32) *listener.Listener {
    // TCPProxyのFilterを作成
    p := &tcpproxy.TcpProxy{
        StatPrefix:       fmt.Sprintf("tcp-%d", port),
        ClusterSpecifier: &tcpproxy.TcpProxy_Cluster{Cluster: clusterName},
    }
    pbst, err := ptypes.MarshalAny(p)
    if err != nil {
        panic(err)
    }

    l := &listener.Listener{
        Name: listenerName,
        Address: &core.Address{
            Address: &core.Address_SocketAddress{
                SocketAddress: &core.SocketAddress{
                    Protocol: core.SocketAddress_TCP,
                    Address:  "0.0.0.0",
                    PortSpecifier: &core.SocketAddress_PortValue{
                        PortValue: port,
                    },
                },
            },
        },
        FilterChains: []*listener.FilterChain{
            {
                Filters: []*listener.Filter{
                    {
                        Name: wellknown.TCPProxy,
                        ConfigType: &listener.Filter_TypedConfig{
                            TypedConfig: pbst,
                        },
                    },
                },
            },
        },
    }

    return l
}

このxDSサーバはHTTPサーバとしての機能も備えていて、HTTP経由でこの設定を追加や削除できるようになっているので、外から設定を投入してそれをEnvoyが取りに行く、というようなことができる。

EnvoyのTraffic tapping機能を利用する

こちらが本編です。EnvoyにはTraffic tapping機能があり、experimentalではあるもののListenerもしくはClusterのExtensionにある、Transport Socketのtapを指定することでその間の通信をキャプチャすることができる。 これを使うことで、動的にTCPをプロキシしつつ、必要に応じてパケットのキャプチャもできるのではないかというところでxDSサーバを書いていた。

基本的に設定はリファレンスの通りである。これを先ほどのListenerに適用するためには、FilterChainのTransportSocketにTapの設定を入れる。

このTraffic tappingには2種類あり、、トレース内容のProtocol Bufferを 直接ファイルとして保存する方法と、キャプチャ結果をEnvoyのadminのHTTPエンドポイントからProtocol Buffer (JSON)として受け取る方法がある。

まずファイルとして保存する方の設定は以下のようになる。

tapConf := &tap.Tap{
        CommonConfig: &tapv3.CommonExtensionConfig{
            ConfigType: &tapv3.CommonExtensionConfig_StaticConfig{
                StaticConfig: &tapconfigv3.TapConfig{
                    Match: &matcher.MatchPredicate{
                        Rule: &matcher.MatchPredicate_AnyMatch{AnyMatch: true},
                    },
                    OutputConfig: &tapconfigv3.OutputConfig{
                        Sinks: []*tapconfigv3.OutputSink{
                            {
                                Format:         tapconfigv3.OutputSink_PROTO_BINARY,
                                OutputSinkType: &tapconfigv3.OutputSink_FilePerTap{FilePerTap: &tapconfigv3.FilePerTapSink{PathPrefix: fmt.Sprintf("/tmp/%s", "<name>")}},
                            },
                        },
                    },
                },
            },
        },
        TransportSocket: &core.TransportSocket{
            Name: wellknown.TransportSocketRawBuffer,
        },
}

Envoy adminから受け取る場合は以下のようになる。

tapConf := &tap.Tap{
        CommonConfig: &tapv3.CommonExtensionConfig{
            ConfigType: &tapv3.CommonExtensionConfig_AdminConfig{
                AdminConfig: &tapv3.AdminConfig{ConfigId: "<name>"},
            },
        },
        TransportSocket: &core.TransportSocket{
            Name: wellknown.TransportSocketRawBuffer,
        },
}

このtapConfをListenerのFilterChainに突っ込む。

pbtap, err := ptypes.MarshalAny(tapConf)
if err != nil {
    return nil, err
}
tpConfig = &core.TransportSocket{
    Name:       wellknown.TransportSocketTap,
    ConfigType: &core.TransportSocket_TypedConfig{TypedConfig: pbtap},
}

ls := &listener.Listener{
    Name: l.Name,
    Address: &core.Address{
        Address: &core.Address_SocketAddress{
            SocketAddress: &core.SocketAddress{
                Protocol: core.SocketAddress_TCP,
                Address:  "0.0.0.0",
                PortSpecifier: &core.SocketAddress_PortValue{
                    PortValue: <port>,
                },
            },
        },
    },
    FilterChains: []*listener.FilterChain{
        {
            TransportSocket: tpConfig,
            Filters: []*listener.Filter{

                {
                    Name: wellknown.TCPProxy,
                    ConfigType: &listener.Filter_TypedConfig{
                        TypedConfig: pbst,
                    },
                },
            },
        },
    },
    }

これでこのListenerに対してのTraffic tappingが有効になる。

EnvoyのTraffic tappingの結果をデコードする(Protocol Buffers -> PCAP編)

ここまでで、Traffic tappingができるようになった。結果はProtocol Buffersのファイルとして保存される。公式のリファレンスによると、envoyproxy/envoyにあるtap2pcapというスクリプトを使ってPCAPに変換できるとある。このスクリプトを読んでみたところ、トレースからパケットを合成して、text2pcapに食べさせるというスクリプトになっていた。

EnvoyのリポジトリはBazel化されているので、特段システムのPythonに依存せず実行できるものの、さらっと見たところシステムに入っているodやtext2pcapに依存していたので、Golangでなんとか合成したくてgopacketを使ってPCAPファイルを合成してみた。gotap2pcapとしてスクリプトを書いた。

TraceWrapperをUnmarshalした後、Read/Writeというフィールドにパケットが入っているのでそれをPCAPに合成すれば良い。TCPのストリームはそれっぽく作る必要があるので、以下のように雑でああるがIPとTCPのヘッダを突っ込んでから、Protocol BuffersからUnmarshalしたパケットを突っ込む。gopacketの使い方全然知らなかったので勉強になった。この状態だとEthernetヘッダないけど要らないので省いてます。

type LayerInfo struct {
    SrcPort layers.TCPPort
    DstPort layers.TCPPort
    SrcIP   net.IP
    DstIP   net.IP
}

func (w *TCPWriter) WritePacket(t time.Time, l *LayerInfo, b []byte, write bool) error {
    buf := gopacket.NewSerializeBuffer()
    err := gopacket.SerializeLayers(buf, gopacket.SerializeOptions{FixLengths: true}, &layers.IPv4{
        Version:  4,
        TTL:      64,
        SrcIP:    l.SrcIP,
        DstIP:    l.DstIP,
        Protocol: layers.IPProtocolTCP,
    }, &layers.TCP{
        Window:  8192,
        SrcPort: l.SrcPort,
        DstPort: l.DstPort,
        SYN:     false,
        ACK:     true,
        PSH:     false,
        Seq:     w.Seq,
        Ack:     w.Ack,
    },
    gopacket.Payload(b),
    )
    if err != nil {
        return err
    }
    captureInfo := gopacket.CaptureInfo{
        Timestamp:      t,
        CaptureLength:  len(buf.Bytes()),
        Length:         len(buf.Bytes()),
        InterfaceIndex: 0,
        AncillaryData:  nil,
    }
    if !write {
        w.Ack += uint32(len(b))
    } else {
        w.Seq += w.Ack
    }
    return w.w.WritePacket(captureInfo, buf.Bytes())
}

こうして、スクリプトを実行するとWiresharkとかで開ける普通のPCAPファイルをゲットできるようになった。

EnvoyのTraffic tappingの結果をデコードする(Admin編)

本来はHTTP Filter向けっぽいのだが、Traffic tappingの結果をEnvoyのAdminからストリームで受ける設定も可能である。設定は上記で述べた通りなので、設定したListenerに対してtapしたい場合は、以下のようなリクエストをAdminにPOST /tapで投げる。

config_id: <config_id>
tap_config:
  match_config:
    any_match: true
  output_config:
    sinks:
      - streaming_admin: {}

こうすると、JSONの形式でtap結果を受信できる。最初cURLデバッグしていたところ、Ctrl-Cで抜けても次にPOSTリクエストを投げるとAn attached /tap admin stream already exists. Detach it.と表示されてしまいエンドポイントが利用できなくなってしまった。これについてはGitHub Issueにも上げられていて、端的に言うとHTTP/2を使えば解決できるという感じであった。

なので、HTTP/2でこのエンドポイントに叩きつつ、受信した結果をデコードしてログに出力するクライアントを書いてみた。admintapclientとして書いた。HTTP/2ではあるがTLSに行くを無効にしたい場合はどうすれば良いのか全く分からなかったが、http2.Transportにしつつ、DialTLSを普通のDialで被せれば良いらしいのでそうした。ローカルで試すときはこれでいいかなという感じ。

Adminの負荷がどうかなどは全くみてないのでその辺を検証する必要があるが、手軽にtapできそうだなと感じた。一方で、おそらくファイルとAdminは排他で利用する必要があるのでその辺の設定を上手く切り替えられると良いなと思った。(xDSサーバに登録する際に切り替えられる機能をつけた)

おわり

今回はEnvoyのxDSサーバと、Traffic tapping機能についてまとめてみた。Traffic tapping機能があるので、普通のTCPプロキシとしてだけではないような用途にも利用できそうだった。xDSサーバも実装したので動的に対象を切り替えたり、フィルタとかもできるのではないかというところでもうちょっと実装していきたい。

なかなか面白い機能で、コメントアウトされていたものの、tapの出力先にgRPC Streamingなどもサポートしそうだったので今後もこの機能には注目したいなと思った。

あとEnvoyのコントロールプレーンとしてIstioではないような、他の実装ないかなと思って探している。あとEnvoyのアーキテクチャをもうちょい理解する必要がありそう。

参考にした記事