实时消费kafka数据到clickhouse

问题描述 投票:0回答:1

我有很多 ipfix(netflow) 记录插入到 Kafka 中,并且我使用此代码通过 go 语言创建了消费者 包主

import (
    "context"
    "database/sql"
    "encoding/json"
    "flag"
    "fmt"
    "log"
//  "os"
//  "strconv"
    "sync"
    "time"
       "github.com/ClickHouse/clickhouse-go"
    "github.com/segmentio/kafka-go"
    cluster "github.com/bsm/sarama-cluster"
)

type options struct {
    Broker  string
    Topic   string
    Debug   bool
    Workers int
}

type dataField struct {
    I int
    V interface{}
}
type Header struct {
    Version     int 
    Length      int 
    ExportTime  int64 
    SequenceNo  int 
    DomainID    int 
}

type ipfix struct {
    AgentID  string
        Header  Header 
    DataSets [][]dataField
}

type dIPFIXSample struct {
    device string
    sourceIPv4Address    string
    sourceTransportPort   uint64 
    postNATSourceIPv4Address    string
    postNATSourceTransportPort uint64
    destinationIPv4Address string
    postNATDestinationIPv4Address string
    postNATDestinationTransportPort uint64
    dstport   uint64 
       timestamp  string 
       postNATSourceIPv6Address string
       postNATDestinationIPv6Address string
      sourceIPv6Address string
      destinationIPv6Address string
      proto  uint8
     login string
     sessionid  uint64 
}

var opts options

func init() {
    flag.StringVar(&opts.Broker, "broker", "172.18.0.4:9092", "broker ipaddress:port")
    flag.StringVar(&opts.Topic, "topic", "vflow.ipfix", "kafka topic")
    flag.BoolVar(&opts.Debug, "debug", true, "enabled/disabled debug")
    flag.IntVar(&opts.Workers, "workers", 16, "workers number / partition number")

    flag.Parse()
}


func main() {
    var (
        wg sync.WaitGroup
        ch = make(chan ipfix, 10000)
    )

    for i := 0; i < 5; i++ {
        go ingestClickHouse(ch)
    }

    wg.Add(opts.Workers)

    for i := 0; i < opts.Workers; i++ {
        go func(ti int) {
            // create a new kafka reader with the broker and topic
            r := kafka.NewReader(kafka.ReaderConfig{
                Brokers: []string{opts.Broker},
                Topic:   opts.Topic,
                GroupID: "mygroup",
                // start consuming from the earliest message
                StartOffset: 0,
            })

            pCount := 0
            count := 0
            tik := time.Tick(10 * time.Second)

            for {
                select {
                case <-tik:
                    if opts.Debug {
                        log.Printf("partition GroupId#%d,  rate=%d\n", ti, (count-pCount)/10)
                    }
                    pCount = count
                default:
                    // read the next message from kafka
                    m, err := r.ReadMessage(context.Background())
                    if err != nil {
                        if err == kafka.ErrGenerationEnded {
                            log.Println("generation ended")
                            return
                        }
                        log.Println(err)
                        continue
                    }
//                  log.Printf("Received message from Kafka: %s\n", string(m.Value))

                                        
                    // unmarshal the message into an ipfix struct
                     objmap:=  ipfix{}
                    if err := json.Unmarshal(m.Value, &objmap); err != nil {
                        log.Println(err)
                        continue
                    }
                                           fmt.Sprintf("kkkkkkkkkkkkkkkk%v",objmap);
                    // send the ipfix struct to the ingestClickHouse goroutine
                    ch <- objmap
//                                         go ingestClickHouse(ch)

                    // mark the message as processed
                    if err := r.CommitMessages(context.Background(), m); err != nil {
                        log.Println(err)
                        continue
                    }

                    count++
                }
            }
        }(i)
    }

    wg.Wait()
//  close(ch)
}


func ingestClickHouse(ch chan ipfix) {
    var sample ipfix

    connect, err := sql.Open("clickhouse", "tcp://127.0.0.1:9000?debug=true&username=default&password=wawa123")
    if err != nil {
        log.Fatal(err)
    }
    if err := connect.Ping(); err != nil {
        if exception, ok := err.(*clickhouse.Exception); ok {
            log.Printf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
        } else {
            log.Println(err)
        }
        return
    }
    defer connect.Close()
    for {
        tx, err := connect.Begin()
        if err != nil {
            log.Fatal(err)
        }
        stmt, err := tx.Prepare("INSERT INTO natdb.natlogs (timestamp,router_ip,sourceIPv4Address, sourceTransportPort,postNATSourceIPv4Address,postNATSourceTransportPort,destinationIPv4Address,dstport,postNATDestinationIPv4Address, postNATDestinationTransportPort,postNATSourceIPv6Address,postNATDestinationIPv6Address,sourceIPv6Address,destinationIPv6Address,proto,login) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?)")
        if err != nil {
            log.Fatal(err)
        }


        for i := 0; i < 10000; i++ {

            sample = <-ch
            for _, data := range sample.DataSets {
                s := dIPFIXSample{}
                for _, dd := range data {
                    switch dd.I {
                    case 8:
                        s.sourceIPv4Address = dd.V.(string)
                    case 7:
                        s.sourceTransportPort =uint64( dd.V.(float64))
                    case 225:
                        s.postNATSourceIPv4Address =  dd.V.(string)
                    case 227:
                        s.postNATSourceTransportPort = uint64(dd.V.(float64))
                    case 12:
                          s.destinationIPv4Address=dd.V.(string)
                    case 11:
                          s.dstport=uint64(dd.V.(float64))
                    case 226:
                          s.postNATDestinationIPv4Address=dd.V.(string)
                    case 27:
                          s.sourceIPv6Address=dd.V.(string)
                    case 28:
                          s.destinationIPv6Address=dd.V.(string)
                    case 281:
                          s.postNATSourceIPv6Address=dd.V.(string)
                    case 282:
                          s.postNATDestinationIPv6Address=dd.V.(string) 
                    case 2003:
                          s.login =dd.V.(string)
                          log.Printf(dd.V.(string))   
                    case 228:
                          s.postNATDestinationTransportPort=uint64(dd.V.(float64))                      
                    case 4:
                        s.proto = uint8(dd.V.(float64))
                    }
                }
                timestamp := time.Unix(sample.Header.ExportTime, 0).Format("2006-01-02 15:04:05")


 

                if _, err := stmt.Exec(
 
                    timestamp,
                     
                    sample.AgentID,
                    s.sourceIPv4Address,
                    s.sourceTransportPort,
                    s.postNATSourceIPv4Address,
                    s.postNATSourceTransportPort,
                    s.destinationIPv4Address,
                    s.dstport,
                    s.postNATDestinationIPv4Address,
                    s.postNATDestinationTransportPort,
                                        s.postNATSourceIPv6Address,
                                        s.postNATDestinationIPv6Address,
                                        s.sourceIPv6Address,
                                        s.destinationIPv6Address,
                    s.proto,
                                        s.login,
                ); err != nil {
                    log.Fatal(err)
                }

}



     }
        go func(tx *sql.Tx) {
            if err := tx.Commit(); err != nil {
                log.Fatal(err)
            }
        }(tx)


    }
}

代码工作正常,我可以在 clickhouse 中插入数据,但是由于高流量和插入 Kafka 的大量数据,Kafka 和 clickhouse 之间存在延迟,延迟会随着流量的增加而增加,现在我有超过延迟了 20 个小时,你能推荐我任何方法让它更快吗这是我的 clickhouse 表

CREATE TABLE natdb.natlogs
(
    `timestamp` DateTime,
    `router_ip` String,
    `sourceIPv4Address` String,
    `sourceTransportPort` UInt64,
    `postNATSourceIPv4Address` String,
    `postNATSourceTransportPort` UInt64,
    `destinationIPv4Address` String,
    `dstport` UInt64,
    `postNATDestinationIPv4Address` String,
    `postNATDestinationTransportPort` UInt64,
    `proto` UInt8,
    `login` String,
    `sessionid` String,
    `sourceIPv6Address` String,
    `destinationIPv6Address` String,
    `postNATSourceIPv6Address` String,
    `postNATDestinationIPv6Address` String,
    INDEX idx_natlogs_router_source_time_postnat (router_ip, sourceIPv4Address, timestamp, postNATSourceIPv4Address) TYPE minmax GRANULARITY 1
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY router_ip
SETTINGS index_granularity = 8192

我想要有更快的方法在 clickhouse 中插入数据 预先感谢

我尝试了Go Consumer,插入数据工作正常,5分钟可以插入超过200万条记录,但问题是每5分钟进入kafka的数据超过2000万条记录,所以有一个很大的问题kafka 和 clickhouse 之间的延迟

go apache-kafka kafka-consumer-api clickhouse
1个回答
0
投票

经过大量研究,我在我的kafka主题上创建了分区,这使得消费者工作得更快,现在我能够在clickhouse中共享实时数据我刚刚在kafka中应用了这个命令,它的工作就像一个魅力

kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic vflow.ipfix --partitions 16
© www.soinside.com 2019 - 2024. All rights reserved.