打點(diǎn)統(tǒng)計(jì)——3(go日志讀取分析寫入)

uid是服務(wù)端給客戶端種下的cookie。比如訪問百度,同一臺電腦同一個瀏覽器,不管是百度哪個頁面,都是這個uid:
打點(diǎn)統(tǒng)計(jì)——3(go日志讀取分析寫入)

創(chuàng)新互聯(lián)建站-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價(jià)比河源網(wǎng)站開發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫,直接使用。一站式河源網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋河源地區(qū)。費(fèi)用合理售后完善,十余年實(shí)體公司更值得信賴。


區(qū)分PV、IV、UV如下:
1、pv訪問量(Page View),即頁面訪問量,每打開一次頁面PV計(jì)數(shù)+1,刷新頁面也是。

2、UV訪問數(shù)(Unique Visitor)指獨(dú)立訪客訪問數(shù),一臺電腦終端為一個訪客。

3、IV是初始向量(IV,Initialization Vector)。


redis數(shù)據(jù)結(jié)構(gòu)HyperLogLog
如果我們要實(shí)現(xiàn)記錄網(wǎng)站每天訪問的獨(dú)立IP數(shù)量這樣的一個功能

集合實(shí)現(xiàn):

使用集合來儲存每個訪客的 IP ,通過集合性質(zhì)(集合中的每個元素都各不相同)來得到多個獨(dú)立 IP ,
然后通過調(diào)用 SCARD 命令來得出獨(dú)立 IP 的數(shù)量。
舉個例子,程序可以使用以下代碼來記錄 2014 年 8 月 15 日,每個網(wǎng)站訪客的 IP :
ip = get_vistor_ip()
SADD '2014.8.15::unique::ip' ip
然后使用以下代碼來獲得當(dāng)天的唯一 IP 數(shù)量:
SCARD '2014.8.15::unique::ip'

集合實(shí)現(xiàn)的問題

使用字符串來儲存每個 IPv4 地址最多需要耗費(fèi) 15 字節(jié)(格式為 'XXX.XXX.XXX.XXX' ,比如
'202.189.128.186')。
下表給出了使用集合記錄不同數(shù)量的獨(dú)立 IP 時,需要耗費(fèi)的內(nèi)存數(shù)量:
獨(dú)立 IP 數(shù)量一天一個月一年
一百萬15 MB 450 MB 5.4 GB
一千萬150 MB 4.5 GB 54 GB
一億1.5 GB 45 GB 540 GB
隨著集合記錄的 IP 越來越多,消耗的內(nèi)存也會越來越多。
另外如果要儲存 IPv6 地址的話,需要的內(nèi)存還會更多一些

為了更好地解決像獨(dú)立 IP 地址計(jì)算這種問題,
Redis 在 2.8.9 版本添加了 HyperLogLog 結(jié)構(gòu)。

HyperLogLog介紹

HyperLogLog 可以接受多個元素作為輸入,并給出輸入元素的基數(shù)估算值:
? 基數(shù):集合中不同元素的數(shù)量。比如 {'apple', 'banana', 'cherry', 'banana', 'apple'} 的基數(shù)就是 3 。
? 估算值:算法給出的基數(shù)并不是精確的,可能會比實(shí)際稍微多一些或者稍微少一些,但會控制在合
理的范圍之內(nèi)。
HyperLogLog 的優(yōu)點(diǎn)是,即使輸入元素的數(shù)量或者體積非常非常大,計(jì)算基數(shù)所需的空間總是固定
的、并且是很小的。
在 Redis 里面,每個 HyperLogLog 鍵只需要花費(fèi) 12 KB 內(nèi)存,就可以計(jì)算接近 2^64 個不同元素的基
數(shù)。這和計(jì)算基數(shù)時,元素越多耗費(fèi)內(nèi)存就越多的集合形成鮮明對比。
但是,因?yàn)?HyperLogLog 只會根據(jù)輸入元素來計(jì)算基數(shù),而不會儲存輸入元素本身,所以
HyperLogLog 不能像集合那樣,返回輸入的各個元素。

將元素添加至 HyperLogLog
PFADD key element [element ...]
將任意數(shù)量的元素添加到指定的 HyperLogLog 里面。
這個命令可能會對 HyperLogLog 進(jìn)行修改,以便反映新的基數(shù)估算值,如果 HyperLogLog 的基數(shù)估算
值在命令執(zhí)行之后出現(xiàn)了變化, 那么命令返回 1 , 否則返回 0 。
命令的復(fù)雜度為 O(N) ,N 為被添加元素的數(shù)量。

返回給定 HyperLogLog 的基數(shù)估算值
PFCOUNT key [key ...]
當(dāng)只給定一個 HyperLogLog 時,命令返回給定 HyperLogLog 的基數(shù)估算值。
當(dāng)給定多個 HyperLogLog 時,命令會先對給定的 HyperLogLog 進(jìn)行并集計(jì)算,得出一個合并后的
HyperLogLog ,然后返回這個合并 HyperLogLog 的基數(shù)估算值作為命令的結(jié)果(合并得出的
HyperLogLog 不會被儲存,使用之后就會被刪掉)。
當(dāng)命令作用于單個 HyperLogLog 時, 復(fù)雜度為 O(1) , 并且具有非常低的平均常數(shù)時間。
當(dāng)命令作用于多個 HyperLogLog 時, 復(fù)雜度為 O(N) ,并且常數(shù)時間也比處理單個 HyperLogLog 時要
大得多。

PFADD 和 PFCOUNT 的使用示例
redis> PFADD unique::ip::counter '192.168.0.1'
(integer) 1
redis> PFADD unique::ip::counter '127.0.0.1'
(integer) 1
redis> PFADD unique::ip::counter '255.255.255.255'
(integer) 1
redis> PFCOUNT unique::ip::counter
(integer) 3

合并多個 HyperLogLog
PFMERGE destkey sourcekey [sourcekey ...]
將多個 HyperLogLog 合并為一個 HyperLogLog ,合并后的 HyperLogLog 的基數(shù)估算值是通過對所有
給定 HyperLogLog 進(jìn)行并集計(jì)算得出的。
命令的復(fù)雜度為 O(N) , 其中 N 為被合并的 HyperLogLog 數(shù)量, 不過這個命令的常數(shù)復(fù)雜度比較高。

PFMERGE 的使用示例
redis> PFADD str1 "apple" "banana" "cherry"
(integer) 1
redis> PFCOUNT str1
(integer) 3
redis> PFADD str2 "apple" "cherry" "durian" "mongo"
(integer) 1
redis> PFCOUNT str2
(integer) 4
redis> PFMERGE str1&2 str1 str2
OK
redis> PFCOUNT str1&2
(integer) 5

HyperLogLog 實(shí)現(xiàn)獨(dú)立 IP 計(jì)算功能

獨(dú)立 IP 數(shù)量一天一個月一年一年(使用集合)
一百萬12 KB 360 KB 4.32 MB 5.4 GB
一千萬12 KB 360 KB 4.32 MB 54 GB
一億12 KB 360 KB 4.32 MB 540 GB
下表列出了使用 HyperLogLog 記錄不同數(shù)量的獨(dú)立 IP 時,需要耗費(fèi)的內(nèi)存數(shù)量:
可以看到,要統(tǒng)計(jì)相同數(shù)量的獨(dú)立 IP ,HyperLogLog 所需的內(nèi)存要比集合少得多。


打點(diǎn)統(tǒng)計(jì)——3(go日志讀取分析寫入)


package main

import (
    "flag"
    "github.com/sirupsen/logrus"
    "time"
    "os"
    "bufio"
    "io"
    "strings"
    "github.com/mgutz/str"
    "net/url"
    "crypto/md5"
    "encoding/hex"
    "github.com/mediocregopher/radix.v2/pool"
    "strconv"
)

const HANDLE_DIG = " /dig?"
const HANDLE_MOVIE = "/movie/"
const HANDLE_LIST = "/list/"
const HANDLE_HTML = ".html"

type cmdParams struct {
    logFilePath string
    routineNum int
}
type digData struct{
    time   string
    url    string
    refer  string
    ua        string
}
type urlData struct {
    data   digData
    uid    string
    unode  urlNode
}
type urlNode struct {
    unType     string // 詳情頁 或者 列表頁 或者 首頁
    unRid  int       // Resource ID 資源ID
    unUrl  string // 當(dāng)前這個頁面的url
    unTime  string // 當(dāng)前訪問這個頁面的時間
}
type storageBlock struct {
    counterType       string
    storageModel   string
    unode        urlNode
}

var log = logrus.New()

func init() {
    log.Out = os.Stdout //聲明用什么輸出日志
    log.SetLevel( logrus.DebugLevel ) //設(shè)置日志的等級
}

func main() {
    // 獲取參數(shù)
    logFilePath := flag.String( "logFilePath", "F:/phpStudy/PHPTutorial/nginx/logs/access.log", "log file path" ) //日志文件路徑
    routineNum := flag.Int( "routineNum", 5, "consumer numble by goroutine" ) //routine數(shù)量,默認(rèn)為5
    l := flag.String( "l", "./log.log", "this programe runtime log target file path" ) //go生成的日志存放路徑
    flag.Parse()

    params := cmdParams{ *logFilePath, *routineNum }

    // 打日志
    logFd, err := os.OpenFile( *l, os.O_CREATE|os.O_WRONLY, 0644 ) //打開go生成的日志
    if err == nil {
        log.Out = logFd //打開出錯,則用日志文件存錯誤信息
        defer logFd.Close() //關(guān)閉文件
    }
    log.Infof( "Exec start." ) //提示日志文件啟動
    log.Infof( "Params: logFilePath=%s, routineNum=%d", params.logFilePath, params.routineNum ) //提示輸入的/默認(rèn)參數(shù)

    // 初始化一些channel,用于數(shù)據(jù)傳遞
    var logChannel = make(chan string, 3*params.routineNum) //讀取日志文件量更大,設(shè)置為3倍
    var pvChannel = make(chan urlData, params.routineNum)
    var uvChannel = make(chan urlData, params.routineNum)
    var storageChannel = make(chan storageBlock, params.routineNum)

    // Redis Pool
    redisPool, err := pool.New( "tcp", "localhost:6379", 2*params.routineNum ); //連接池,2*params.routineNum是連接池?cái)?shù)
    if err != nil{
        log.Fatalln( "Redis pool created failed." )
        panic(err)
    } else {
        //空閑時間過了后,客戶端(也就是連接池和遠(yuǎn)端服務(wù)器會斷開)。所以以一定的間隔去ping
        go func(){
            for{
                redisPool.Cmd( "PING" )
                time.Sleep( 3*time.Second )
            }
        }()
    }

    // 日志消費(fèi)者
    go readFileLinebyLine( params, logChannel )

    // 創(chuàng)建一組日志處理
    for i:=0; i<params.routineNum; i++ {
        go logConsumer( logChannel, pvChannel, uvChannel )
    }

    // 創(chuàng)建PV UV 統(tǒng)計(jì)器
    go pvCounter( pvChannel, storageChannel )
    go uvCounter( uvChannel, storageChannel, redisPool )
    // 可擴(kuò)展的 xxxCounter(如果還有別的要統(tǒng)計(jì)的,則:go xxCounter(...))

    // 創(chuàng)建 存儲器
    go dataStorage( storageChannel, redisPool )

    time.Sleep( 1000*time.Second )
}

// HBase 劣勢:列簇需要聲明清楚。所以這里用redis來存儲
func dataStorage( storageChannel chan storageBlock, redisPool *pool.Pool) {
    for block := range storageChannel {
        prefix := block.counterType + "_"

        // 逐層添加,加洋蔥皮的過程
        // 維度: 天-小時-分鐘
        // 層級: 定級-大分類-小分類-終極頁面
        // 存儲模型: Redis  SortedSet
        setKeys := []string{
            prefix+"day_"+getTime(block.unode.unTime, "day"),
            prefix+"hour_"+getTime(block.unode.unTime, "hour"),
            prefix+"min_"+getTime(block.unode.unTime, "min"),
            prefix+block.unode.unType+"_day_"+getTime(block.unode.unTime, "day"),
            prefix+block.unode.unType+"_hour_"+getTime(block.unode.unTime, "hour"),
            prefix+block.unode.unType+"_min_"+getTime(block.unode.unTime, "min"),
        }

        rowId := block.unode.unRid

        for _,key := range setKeys {
            ret, err := redisPool.Cmd( block.storageModel, key, 1, rowId ).Int()
            if ret<=0 || err!=nil {
                log.Errorln( "DataStorage redis storage error.", block.storageModel, key, rowId )
            }
        }
    }
}

func pvCounter( pvChannel chan urlData, storageChannel chan storageBlock ) {
    for data := range pvChannel {
        sItem := storageBlock{ "pv", "ZINCRBY", data.unode }
        storageChannel <- sItem
    }
}

func uvCounter( uvChannel chan urlData, storageChannel chan storageBlock, redisPool *pool.Pool ) {
    for data := range uvChannel {
        //HyperLoglog redis
        hyperLogLogKey := "uv_hpll_" + getTime(data.data.time, "day") //uv_hpll_ + 天級別的時間 組成集合中的鍵
        ret, err := redisPool.Cmd( "PFADD", hyperLogLogKey, data.uid, "EX", 86400 ).Int()
        if err!=nil {
            log.Warningln( "UvCounter check redis hyperloglog failed, ", err )
        }
        if ret!=1 {
            continue
        }

        sItem := storageBlock{ "uv", "ZINCRBY", data.unode }
        storageChannel <- sItem
    }
}

//消費(fèi)一行行讀取到的日志
func logConsumer( logChannel chan string, pvChannel, uvChannel chan urlData ) error {
    for logStr := range logChannel {
        // 切割日志字符串,扣出打點(diǎn)上報(bào)的數(shù)據(jù)
        data := cutLogFetchData( logStr )

        // uid
        // 說明: 課程中模擬生成uid(不是現(xiàn)實(shí)環(huán)境中服務(wù)器給瀏覽器種下的cookie中的uid), md5(refer+ua)
        hasher := md5.New()
        hasher.Write( []byte( data.refer+data.ua ) )
        uid := hex.EncodeToString( hasher.Sum(nil) )

        // 很多解析的工作都可以放到這里完成
        // ...
        // ...

        uData := urlData{ data, uid, formatUrl( data.url, data.time ) }

        pvChannel <- uData
        uvChannel <- uData
        /* 如果有其他要塞入的:xxChannel <- uData */

    }
    return nil
}

func cutLogFetchData( logStr string ) digData {
    logStr = strings.TrimSpace( logStr )
    pos1 := str.IndexOf( logStr,  HANDLE_DIG, 0)
    if pos1==-1 {
        return digData{}
    }
    pos1 += len( HANDLE_DIG )
    pos2 := str.IndexOf( logStr, " HTTP/", pos1 )
    d := str.Substr( logStr, pos1, pos2-pos1 )

    urlInfo, err := url.Parse( "http://localhost/?"+d ) //url.Parse只認(rèn)完整的網(wǎng)址,所以 加上:http://localhost/?
    if err != nil {
        return digData{}
    }
    data := urlInfo.Query()
    return digData{
        data.Get("time"),
        data.Get("refer"),
        data.Get("url"),
        data.Get("ua"),
    }
}

func readFileLinebyLine( params cmdParams, logChannel chan string ) error {
    fd, err := os.Open( params.logFilePath ) //打開nginx日志文件
    if err != nil {
        log.Warningf( "ReadFileLinebyLine can't open file:%s", params.logFilePath )
        return err
    }

    defer fd.Close() //關(guān)閉是好習(xí)慣

    count := 0
    bufferRead := bufio.NewReader( fd )
    for {
        line, err := bufferRead.ReadString( '\n' ) //一行行讀
        logChannel <- line //讀出一行寫入一次logChannel
        count++

        if count%(1000*params.routineNum) == 0 { //每1000*params.routineNum行日志輸出一次信息到控制臺
            log.Infof( "ReadFileLinebyLine line: %d", count )
        }
        if err != nil { //error部位空有兩種情況,一種是錯誤,一種是讀到尾部了
            if err == io.EOF { //讀到尾部了(讀完了),休息3秒鐘
                time.Sleep( 3*time.Second )
                log.Infof( "ReadFileLinebyLine wait, raedline:%d", count ) //提醒在等待,已經(jīng)讀到了第n行
            } else {
                log.Warningf( "ReadFileLinebyLine read log error" ) //錯誤則打出錯誤
            }
        }
    }
    return nil
}

func formatUrl( url, t string ) urlNode{
    // 一定從量大的著手,  詳情頁>列表頁≥首頁
    pos1 := str.IndexOf( url, HANDLE_MOVIE, 0)
    if pos1!=-1 {
        pos1 += len( HANDLE_MOVIE )
        pos2 := str.IndexOf( url, HANDLE_HTML, 0 )
        idStr := str.Substr( url , pos1, pos2-pos1 )
        id, _ := strconv.Atoi( idStr )
        return urlNode{ "movie", id, url, t }
    } else {
        pos1 = str.IndexOf( url, HANDLE_LIST, 0 )
        if pos1!=-1 {
            pos1 += len( HANDLE_LIST )
            pos2 := str.IndexOf( url, HANDLE_HTML, 0 )
            idStr := str.Substr( url , pos1, pos2-pos1 )
            id, _ := strconv.Atoi( idStr )
            return urlNode{ "list", id, url, t }
        } else {
            return urlNode{ "home", 1, url, t}
        } // 如果頁面url有很多種,就不斷在這里擴(kuò)展
    }
}

//去重需要在一定的時間內(nèi)
func getTime( logTime, timeType string ) string {
    var item string
    switch timeType {
    case "day":
        item = "2006-01-02"
        break
    case "hour":
        item = "2006-01-02 15"
        break
    case "min":
        item = "2006-01-02 15:04"
        break
    }
    t, _ := time.Parse( item, time.Now().Format(item) )
    return strconv.FormatInt( t.Unix(), 10 ) //將unix時間戳轉(zhuǎn)換為10位字符串
}

文章題目:打點(diǎn)統(tǒng)計(jì)——3(go日志讀取分析寫入)
文章起源:http://muchs.cn/article16/ihssgg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站制作、軟件開發(fā)網(wǎng)站營銷、營銷型網(wǎng)站建設(shè)云服務(wù)器、定制開發(fā)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)站建設(shè)公司