go 使用http长连接读写influxdb
发布日期:2021-05-10 23:13:02 浏览次数:22 分类:精选文章

本文共 3606 字,大约阅读时间需要 12 分钟。

由于服务持续向InfluxDB写入数据,使用短连接会导致频繁的连接创建与关闭。为了优化性能,建议改用长连接方式。

以下是优化后的代码说明:

import (
"context"
"crypto/tls"
"fmt"
"github.com/influxdata/influxdb1-client"
"io/ioutil"
"net"
"net/http"
"net/url"
"path"
"strings"
"time"
)
type Client struct {
url url.URL
unixSocket string
username string
password string
httpClient *http.Client
userAgent string
precision string
}
func (c *Client) WriteLineProtocol(data, database, retentionPolicy, precision, writeConsistency string) (*client.Response, error) {
u := c.url
u.Path = path.Join(u.Path, "write")
r := strings.NewReader(data)
req, err := http.NewRequest("POST", u.String(), r)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "")
req.Header.Set("User-Agent", c.userAgent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Set("db", database)
params.Set("rp", retentionPolicy)
params.Set("precision", precision)
params.Set("consistency", writeConsistency)
req.URL.RawQuery = params.Encode()
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var response client.Response
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
err = fmt.Errorf(string(body))
response.Err = err
return &response, err
}
return nil, nil
}
func longConnectionInfluxClient(c client.Config) (*Client, error) {
tlsConfig := new(tls.Config)
if c.TLS != nil {
tlsConfig = c.TLS.Clone()
}
tlsConfig.InsecureSkipVerify = c.UnsafeSsl
tr := &http.Transport{
Proxy: c.Proxy,
TLSClientConfig: tlsConfig,
DialContext: (&net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 120 * time.Second,
DualStack: true,
}).DialContext,
ForceAttemptHTTP2: true,
MaxConnsPerHost: 100,
MaxIdleConns: 1000,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 300 * time.Second,
}
if c.UnixSocket != "" {
tr.DisableCompression = true
tr.DialContext = func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", c.UnixSocket)
}
}
cl := Client{
url: c.URL,
unixSocket: c.UnixSocket,
username: c.Username,
password: c.Password,
httpClient: &http.Client{
Timeout: c.Timeout,
Transport: tr,
},
userAgent: c.UserAgent,
precision: c.Precision,
}
if cl.userAgent == "" {
cl.userAgent = "InfluxDBClient"
}
return &cl, nil
}
func initClient(hostIp, userName, password string, timeout int) *Client {
host, err := url.Parse(fmt.Sprintf("http://%s", hostIp))
conf := client.Config{
URL: *host,
Username: userName,
Password: password,
Timeout: time.Duration(timeout) * time.Second,
}
cl, err := longConnectionInfluxClient(conf)
if err != nil {
panic(err)
}
return cl
}
func main() {
cl := initClient("10.10.25.50:8086", "", "", 5)
str := "cpu,host=localhost value=10 1564387285"
cl.WriteLineProtocol(str, "test", "autogen", "s", "")
}

这段代码对现有InfluxDB客户端进行了优化,改用长连接方式以减少连接频繁创建与关闭的问题,同时保持了功能的完整性和性能。

上一篇:golang bufio浅析
下一篇:golang解析linux /proc/net/tcp /proc/net/tcp6文件中的IP地址

发表评论

最新留言

做的很好,不错不错
[***.243.131.199]2025年04月21日 09时53分33秒