リアルタイムでスコアランキングを登録・表示するツール

はじめに

この記事はGo3 Advent Calendar 2020の2日目の記事として記載してます。 リアルタイムでランキングの更新・表示ができるpackageをつくったのでご紹介させていただきます。

スコア降順のランキングシステム

例えばユーザーであれギルドのようなグループ単位であれ、スコアが高い順に並べるランキングがあるとします。複数スコアラーが同点である場合も加味すると大体下記のような仕様であることが多いかと思われます。

  1. ランキングはスコアの降順で高くなります
  2. 同じスコアの場合、登録時間が早いスコアラーがより高いランクになります

問題点

そのとき単にスコアの降順で表示するだけ(上記の1の仕様だけ)ならRedisのSortedSetsのZREVRANGEを使用すれば一発で解決します。

では2の同点であった場合は早く登録したスコアラーを上位にする仕様はどうでしょうか?

f:id:muroon:20201201210621p:plain

RedisのSortedSetsのZREVRANGEではスコアが同点の場合、キー(uid)の降順で並べてしまいます。よって例えばuid:102, 103が同点の場合、そのスコアの更新時間に関わらず103が上位になってしまいます。このようなときは本当は更新時間の昇順に並んでほしいところです。

このようなとき、リレーショナルデーターベースを使用すれば取得可能ですが件数が多いときのパフォーマンスに難点があります。またわざわざバッチで並べ替えるようなことも避けたいところです。

esrank

github.com

そこでこちらのアプリをつくってみました。

しくみ

esrankではやはりRedisのSortedSetsを内部で使用しています。ではどのように理想の通りに実現しているかというと各スコアラーのキーを工夫しています。

f:id:muroon:20201201213614p:plain

64bitのキーを使用してますが上位32bitに登録時間(厳密にはMAX値から登録時間を引いたもの)、下位32bitにスコアラーのキーを配置してます。 これをスコア登録時に使用すれば、ZREVRANGEにてリスト取得すると都合よく仕様が満たせます。

パフォーマンス

RankingList(ランキング1〜100位のリストを取得)とGetRanking(個人のランキング・スコアを取得)それぞれのレスポンス時間になります。

response time

- esrank(redis) (ms) DB(mysql) (ms) esrank/DB (%)
RankingList 0.02816603333 0.4238205667 6
GetRanking 0.01488041333 0.1489615633 10

使用法

import(
    goredislib "github.com/go-redis/redis/v8"
    "github.com/muroon/esrank"
)
    
    # using go-redis/v8
    client = goredislib.NewClient(&goredislib.Options{
        Addr:     "localhost:6379",
    })

    # new monthly ranking
    now := time.Now()
    st := time.Date(now.Year(), now.Month(), 1, 0, 0, 0,0, time.UTC)
    rank := NewRanking(
        client, // redis client
        Name("my_monthly_ranking"), // ranking name
        StartTime(st), // start time 
    )

スコアの追加

 userID := 1
    score := 100
    err := rank.AddRankingScore(ctx, userID, score)

ランキング1〜100位のリストを取得

 rankingList, err := rank.RankingList(ctx, 0, 99)

個人のランキング・スコアを取得

 rank, score, err := rank.GetRanking(ctx, userID)

規制

使用するにあたり意識してもらうことが2つあります。

  • StartTime(ランキング計測・表示開始時間)
  • TimeMode

StartTime

上記で示している通りある時間からの経過を元にランキングのキーとしています。 どんなに多くても最大32bitしか取れないため、該当するランキングに適切な開始時間を設定する日通用があります。

TimeMode

ランキング計測開始(StartTime)からの最大経過時間は、TimeModeによって異なります。

TimeModeがTimeModeMilliSecの場合、同じスコアが複数ある場合は1ミリ秒以上のスコア登録時間の差を比較してソートします。 1ミリ秒未満の時差は処理できません。 同様に、TimeModeSecの場合は1秒以上の時差を比較し、TimeModeMicroSecの場合は1マイクロ秒です。

TimeMode 同一スコア精度 最大計測期間
TimeModeSec 秒単位 136 年
TimeModeMilliSec (Default) ミリ秒単位 1.36 年
TimeModeMicroSec マイクロ秒単位 4.97 日
 rank := NewRanking(
        client,
        Name("my_monthly_ranking"),
        StartTime(st),
        SetTimeMode(TimeModeMicroSec), // timemode(microsec)
    )

最後に

上記のようなシンプルな仕様のみであればバッチによる集計を必要とせずにランタイムで対応可能だと思います。機会があったら使用していただけたら幸いです。

goroutineでハマったこと

最近、goroutine関連ではミスったことを記載します。

channelの容量(capacity)

1. 容量0(cap:0)のchannelを送信するとブロックする

ソース

2. 容量を1に指定すると問題なく動作する

ソース

3. 容量0(cap:0)のchannelでもgoroutineから呼び出すと問題ない

ソース

1のような使い方は一見すると普通はしないだろうと思うかもしれませんが、こちらのような例では使用した

goroutine の引数

またまたこちらのソースからだが、 goroutine内の低数taskがないと不具合が発生する

    for _, task := range tasks {
        go func(t *Task) {
            defer wg.Done()
        t.Run()
            for {
                select {
                case <-t.doneCh:
                    fmt.Printf("id:%d is done\n", t.id)
            return
        case err := <-t.errCh:
            log.Fatal(err)
            return
                }
            }
        }(task)    
    }

Goのcontainerパッケージを見る

はじめに

Goにcontainerパッケージというものがあったというのを最近知りました。 結構面白いなと思い書いてみました。 予め申し上げておくと個人的な備忘録用に書いているので中身は薄めです。

containerパッケージには下記のデータ構造を扱える機能が存在します。

  • heap (ヒープソート)
  • list (Linked List)
  • ring (Circular Linked List ※末尾の次は先頭のLinked List)

ヒープソートを中心に見ていきたいと思います。

ヒープソートとは

ヒープソート

  • 一般的に一番ポピュラーなソートアルゴリズムはクィックソートであるがデータの中身による計算量の変化量が大聞くなる可能性がある。ヒープソートは平均的にはクィックソート遅いがデータにより計算量の変化が少ない
  • プライオリティのあるキューイングに使用される

どんな思想でつくられたか?

Russ Coxさんが書かれた別の趣旨の記事からの引用ですが

For example, container/heap provides heap-maintenance algorithms as ordinary functions that operate on a heap.Interface, making them applicable to any backing storage, not just a slice of values. This can be very powerful.

At the same time, most programmers who want a priority queue don’t want to implement the underlying storage for it and then invoke the heap algorithms. They would prefer to let the implementation manage its own array, but Go does not permit expressing that in a type-safe way. The closest one can come is to make a priority queue of interface{} values and use type assertions after fetching each element. (The standard container/list and container/ring implementations take this approach.)

  • 数値などの値スライスだけでなく、(heap.Interfaceを継承する)任意の型にて使用可能
  • ストレージ不要でプライオリティキューを使用できる
  • フェッチ(heapで言えばPop()メソッド)してから型変換を行う

使用例

// An IntHeap is a min-heap of ints.
type IntHeap []int

func (h IntHeap) Len() int           { return len(h) }
func (h IntHeap) Less(i, j int) bool { return h[i] < h[j] }
func (h IntHeap) Swap(i, j int)      { h[i], h[j] = h[j], h[i] }

func (h *IntHeap) Push(x interface{}) {
    // Push and Pop use pointer receivers because they modify the slice's length,
    // not just its contents.
    *h = append(*h, x.(int))
}

func (h *IntHeap) Pop() interface{} {
    old := *h
    n := len(old)
    x := old[n-1]
    *h = old[0 : n-1]
    return x
}

// This example inserts several ints into an IntHeap, checks the minimum,
// and removes them in order of priority.
func Example_intHeap() {
    h := &IntHeap{2, 1, 5}
    heap.Init(h)
    heap.Push(h, 3)
    fmt.Printf("minimum: %d\n", (*h)[0])
    for h.Len() > 0 {
        fmt.Printf("%d ", heap.Pop(h))
    }
    // Output:
    // minimum: 1
    // 1 2 3 5
}

参照

プライオリティアイテムの場合はこちら

内部

https://github.com/golang/go/blob/master/src/container/heap/heap.go

要素(heap.Interface)

例題からもわかるように扱う要素はheap.Interfaceを継承している必要があります。 heap.Interfaceはさらにsort.Interfaceが埋め込まれています。

// heap.Interface
type Interface interface {
    sort.Interface
    Push(x interface{}) // add x as element Len()
    Pop() interface{}   // remove and return element Len() - 1.
}
// sort.Interface
type Interface interface {
    // Len is the number of elements in the collection.
    Len() int
    // Less reports whether the element with
    // index i should sort before the element with index j.
    Less(i, j int) bool
    // Swap swaps the elements with indexes i and j.
    Swap(i, j int)
}

ヒープソートアルゴリズム

ヒープソートアルゴリズムはプライベートファンクションとして定義されています。

func up(h Interface, j int) {
    for {
        i := (j - 1) / 2 // parent
        if i == j || !h.Less(j, i) {
            break
        }
        h.Swap(i, j)
        j = i
    }
}

func down(h Interface, i0, n int) bool {
    i := i0
    for {
        j1 := 2*i + 1
        if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
            break
        }
        j := j1 // left child
        if j2 := j1 + 1; j2 < n && h.Less(j2, j1) {
            j = j2 // = 2*i + 2  // right child
        }
        if !h.Less(j, i) {
            break
        }
        h.Swap(i, j)
        i = j
    }
    return i > i0
}

要素の操作

// Init establishes the heap invariants required by the other routines in this package.
// Init is idempotent with respect to the heap invariants
// and may be called whenever the heap invariants may have been invalidated.
// The complexity is O(n) where n = h.Len().
func Init(h Interface) {
    // heapify
    n := h.Len()
    for i := n/2 - 1; i >= 0; i-- {
        down(h, i, n)
    }
}

// Push pushes the element x onto the heap.
// The complexity is O(log n) where n = h.Len().
func Push(h Interface, x interface{}) {
    h.Push(x)
    up(h, h.Len()-1)
}

// Pop removes and returns the minimum element (according to Less) from the heap.
// The complexity is O(log n) where n = h.Len().
// Pop is equivalent to Remove(h, 0).
func Pop(h Interface) interface{} {
    n := h.Len() - 1
    h.Swap(0, n)
    down(h, 0, n)
    return h.Pop()
}

// Remove removes and returns the element at index i from the heap.
// The complexity is O(log n) where n = h.Len().
func Remove(h Interface, i int) interface{} {
    n := h.Len() - 1
    if n != i {
        h.Swap(i, n)
        if !down(h, i, n) {
            up(h, i)
        }
    }
    return h.Pop()
}

// Fix re-establishes the heap ordering after the element at index i has changed its value.
// Changing the value of the element at index i and then calling Fix is equivalent to,
// but less expensive than, calling Remove(h, i) followed by a Push of the new value.
// The complexity is O(log n) where n = h.Len().
func Fix(h Interface, i int) {
    if !down(h, i, h.Len()) {
        up(h, i)
    }
}

最後に

heapだけでなくlist, ringも一度見ておくといいかと思います。 どちらもフェッチ後に型変換を行うことでどんな型でも使用できます。

GoではこのようにInterfaceで引数の間口を広げてしまうソースはあまり好まれません。なので上記のRuss Coxさんの記事ではGenericの有効例としてあげられているのだと思います。

「Webエンジニアのための監視システム実装ガイド」を読んで

はじめに

f:id:muroon:20200414160209j:plain

Webエンジニアのための監視システム実装ガイドについていい本だったのでネタバレしない程度にご紹介させていただきたいと思います。

下記の点で非常に良い本だと思います。

  • Datadog、Prometheus、GrafanaなどのObservabilityに関連する技術の紹介は各所で見かけるようになりましたがそれぞれなんのために存在するか、また監視技術におけるどこの位置づけなのかを非常にわかりやすく教えてくれます。

  • システムの監視環境を構築について必要なことが目標設定から使用技術、組織・人為的な面に至るまで触れられてます。

  • トラブルに見舞われたときのエンジニアの対応方法や心構えなどについても記載されています。

各章について

1章 監視テクノロジーの動向

まず監視には異常検知・状況把握があります。監視とは狭義・広義それぞれ下記を示しますが、この本ではどちらも大事だと述べてます。

監視とは
内容
狭義 定期的・継続的に観測し異常を検知・復旧させること
広義 定期的・継続的に観測しシステムの価値を維持・向上させる営みすべて

また、監視テクノロジーが今日まで発展してきた経緯を〜2000年代、2000年代後半〜、2010年代〜、2010年代中盤〜に分けて説明しています。

2章 監視テクノロジーの概要

可用性(Availability)の計算方法や監視システムの各構成要素(チェック・メトリクス・ログ・トレース・APM)の位置づけを定義しそれぞれどのような関連があるかを説明しています。さらには自己修復機能とその必要性についても述べています。ちなみにChaos Engineeringとは自己修復システムを継続運用させるためのものです。

  • 可用性の測定方法
  • 監視システムの種類・構成要素
  • 自己修復機構について

3章 監視テクノロジーの基礎

メトリクスの種類・データ収集方式の基礎を述べた上で観測誤差や欠損が生じてしまうことなどメトリクスツールの基礎を教えてくれてます。 それと時系列DBとメトリクス結果の保存方法などメトリクスシステムの裏側の概要が説明されてます。 また、メトリクスだけでなくログの基礎技術についても触れています。

  • メトリクスの種類(ゲージ、カウンター)について
  • 観測誤差・欠損について
  • メトリクスのデータ収集方式(Push型、Pull型)
  • 時系列DBとメトリクス結果の保存・閲覧方法について
  • ログの基礎技術

4章 監視テクノロジーの導入

SLIやSLOなどの数値的なものも含めて目標設定の必要性、それに対してどのような通知計画を行うべきなどを説明してます。 そのためにはステークホルダーの洗い出しなど監視システム設計を行うための情報収集・整理が大事です。 目標設定、ステークホルダーの選定、ドキュメント整理、各監視ツールの選定のポイントなどがまとめられています。

  • 異常検知も大事だがそれのみに注意を向けるのではなくSLI導入が大事
  • アラート通知の出し過ぎは注意
    • 通常・正常・異常の判断
  • SLIやSLOなどは元となる情報がないと設定できないので闇雲にはつくらない
    • 中度半端な情報をベースにとりあえずの感じで作らない
  • ステークホルダーの洗い出し
    • 組織・体制観点
    • 製品・サービス観点
  • ドキュメント
  • テスト(検知・発報)
  • 監視ツールの選定
    • Agentのインストール
    • 監視対象からのデータ収集方式
    • アラーティング機能
    • コミュニケーションツールの選定
    • ドキュメントツールの選定
    • チケットツールの選定

5章 監視テクノロジーの実装

監視システムを実装していくために抑えておくべき項目を説明してくれてます。 まずはアラートの目的と観測項目(無尽蔵なアラートはかえって形骸化を生む)。 そして、レスポンスタイムなどエンドポイントのモニタリングからCPU使用率などのOSのモニタリングなど定番の観測項目について取り上げられています。

  • アラート通知の有無の判断
  • アラートの目的と観測項目を決める
  • 定番の観測項目
    • エンドポイント(レスポンスタイムなど)
    • 物理インフラ層(CPUファンの回転数など)
    • ネットワーク(送信・受信データ量・パケット量など)
    • OS(CPU、メモリ…)
    • ミドルウェア(Webサーバー・DBなど)
    • フロントエンド(Javascript、コンテンツ)
    • ビジネス(KPIなど)

6章 インシデント対応実践編

インシデント(トラブルなどの事象)対応に対してエンジニアはどのように準備、対応を行えばいいかを技術面だけでなく精神的側面について述べられてます。 どういう理由でどういう人事配置が大事か、それぞれがどういう心構えや行動をすべきかなどが記載されています。 また、p162のトラブルシューティング時の思考について書かれたコラムはおすすめです。プレッシャーがかかるときこそ論理的思考が大事だということがわかります。

  • インシデント対応の基礎知識
  • インシデント対応の心構え
  • インシデントの各ステータスOpenで行うこと
    • 事実確認
    • 暫定対応
  • インシデントの各ステータスResolvedで行うこと
    • 振り返り
  • 恒久的対応・改善対応

7章 監視構成例

チェック・メトリクス・ログ・トレース・APMの構成を旧来型OSS、最新型OSSSaas型のツール・サービスを使用した場合を構成図にまとめて説明してくれています。 例えば、旧来型OSSならNagiosを中心としてOSSを使用した構成、最新型OSSならPrometheus、Jaegerなどを使用した構成です。 各ツールがどういう位置づけにあるかがひと目でわかります。

  • チェック・メトリクス・ログ・トレース・APMの構成図
  • 通知・コミュニケーション・ドキュメント・チケット例

最後に

個人的にはk8sを使用した自己修復機能やカオスエンジニアリングなんかに興味があり、いつか携わってみたいと思ってます。 ではなぜそれが必要になるのでしょうか? かえってコストがかかる可能性もありますし、リスクを犯して導入する必要はあるのでしょうか?

この本を読んでいて1章の「監視とは"定期的・継続的に観測しシステムの価値を維持・向上させる営みすべて"」とあるようにシステムの価値の維持・向上の為だと思いました。 そのためにもSLIなどの目標設定を設け、そのシステムが有るべき価値を定めてから監視システムを導入・運用していく必要性があると感じました。

Datadog APMをGoで使う

はじめに

Datadog APMの簡単な特徴とGoで使用するための方法をまとめました。

ちなみにQiitaにも同様な内容を記載してます。

公式サイトのドキュメントを見ていただくことを勧めますが、 補足資料として見ていただければと思います。

公式ドキュメント TraceパッケージのGoDoc

また、ここではDatadogのAPM以外の機能については割愛します。

APMとは

Application Performance Managementの頭文字をとった略語で、一般的にはアプリケーションやシステムの性能を管理・監視するものです。

DatadogのAPMではマネージドクライド、オンプレミスに関わらず分散トレーシングに対応しています。 Flame GraphやSpan Listなど整理された形でトレース結果を取得できるようになっています。

  • Flame Graph FlameGraph.png
  • Span List SpanList.png

使用するには

  • Agentのインストールと起動
  • APM用パッケージを取得

Agentのインストール

下記のページから各環境に合わせてインストールとAgentの起動を実施します。 https://app.datadoghq.com/account/settings#agent

基本的には上記のページに従ってインストールを実施すればAgentプロセス起動も実施してくれるが、再起動等を行う場合や設定を変更する場合は下記を参照してくだしあ

APM用パッケージを取得

go get gopkg.in/DataDog/dd-trace-go.v1/ddtrace

トレースの仕方

TraceとSpan

トレースを行うにあたり下記の要素が存在します。

用語 内容
Trace アプリケーションにおけるトレースの開始・実行の単位
Span 1ロジックの開始・終了の実行時間を計測する単位。Traceは1つ以上のSpanを持つ。Span同士は親子関係をもたせられる
Service 処理の名前。複数Spanを一つのService名にすることも、各々独自のService名にすることも可能
Resource 処理の中身を示す。DBにおけるSQLクエリやHTTPにおけるクエリなどを格納する

Span

トレースしたい処理の開始から終了までの計測単位を指します。 Spanはさらに細かいSpanを配下に持つことが可能です。

トレースの中における親子Spanの関係 親子Span.png

例えばWebの1リクエストにおけるトランザクションを親Spanとし その配下にDBのSpan、キャッシュのSpan、外部通信のSpanやさらには特定のロジックなどを子Spanとすること可能です。

APM.png

ソース

Datadog APM用のパッケージは下記の通りです。

GitHub 説明

こちらをトレース開始・終了、必要Spanの開始終了に伴い実行していく

1. トレース開始

tracer.Start()   
defer trace.Stop() // Close 

https://godoc.org/gopkg.in/DataDog/dd-trace-go.v1/ddtrace

2. 汎用的なSpan

Spanの開始と終了

span, ctx := tracer.StartSpanFromContext(ctx, 
    "parent-op", // operation Name
    tracer.ServiceName("parent-service"), // Service Name(必要であれば入れる)
    tracer.ResourceName("parent-resource"), // Resource Name(必要であれば入れる)
)

defer span.Finish()

子Spanの使用

先程作成したSpanの配下に子Spanを設ける場合 親が投入されたcontext.Contextを使用して子Spanの開始すれば親Span情報が継承されます。

子Spanの開始と終了

span, ctx := tracer.StartSpanFromContext(ctx, 
    "child-op", // operation Name
    tracer.ServiceName("child-service"), // Service Name(必要であれば入れる)
    tracer.ResourceName("child-resource"), // Resource Name(必要であれば入れる)
)

defer span.Finish()

専用Spanパッケージを使用する

各パッケージのGoDoc一覧

Web(HTTP Routing)用Span

アプリケーションのHTTPのRoutingで下記のいずれかのパッケージを使用していた場合は専用のパッケージが存在します。

もともと使用しているpackage Datadog APM package
net/http https://godoc.org/gopkg.in/DataDog/dd-trace-go.v1/contrib/net/http
gorilla/mux https://godoc.org/gopkg.in/DataDog/dd-trace-go.v1/contrib/gorilla/mux
julienschmidt/httprouter https://godoc.org/gopkg.in/DataDog/dd-trace-go.v1/contrib/julienschmidt/httprouter

gorilla/mux使用の場合

router = muxtrace.NewRouter( 
    muxtrace.WithServiceName("web-service"),  // ServiceName設定
)

router.HandleFunc("/", handler)
http.ListenAndServe(":8080", router)

DB用Span

database/sql使用の場合

// 登録(DBはMySQLの場合、ServiceNameは省略可)
sqltrace.Register("mysql", &mysql.MySQLDriver{}, sqltrace.WithServiceName("db"))

// traceに登録済のdriverを使用してDBをオープン
db, err := sqltrace.Open("mysql", "user:password@/dbname")
if err != nil {
    log.Fatal(err)
}

// Span作成・開始
span, ctx := tracer.StartSpanFromContext(ctx, 
    "db", // operation Name(必要であれば入れる)
    tracer.SpanType(ext.SpanTypeSQL),
    tracer.ServiceName("db"), // DB用Service Name(必要であれば入れる)
    tracer.ResourceName("db-access"), // Resource Name(必要であれば入れる)
)

// contextを通じて親Spanを継承している
rows, err := db.QueryContext(ctx, "SELECT * FROM memo")
if err != nil {
    log.Fatal(err)
}
rows.Close()
span.Finish(tracer.WithError(err)) // Span終了

その他

Agent関連

Agentのインストールの詳細

Agentのインストールを実施するとdatadog.yamlが指定の場所にダウンロードされます。Agentはそのyamlファイルを設定ファイルとして使用します。

datadog.yamlの設定サンプルはこちら

ダウンロードしたデフォルト設定のままでAPMは使用可能ですが、 API Keyを変えたりAPM関連の設定を変えたりした場合はdatadog.yamlの中身をチェックする必要があります。

  • API Keyが正しく指定されているか
  • APM関連の各種設定

設定変更後はAgentの再起動を行う 再起動コマンドはこちら

Agentコマンド

Agentの起動・終了・再起動コマンド https://docs.datadoghq.com/ja/agent/guide/agent-commands/?tab=agentv6v7

Grpcの場合

datadog_sample_simple_image.png

こちらを参考にgRPC Client側/Server側それぞれにTraceとSpanを設けた場合

下記のようにトレースされます。 FlameGraph_grpc2_exp.png

OpenTracing, OpenCensus, OpenTelemetry

分散トレーシング標準仕様でOpenTracing, OpenCensusに対応済み

OpenTelemetryはまだ正式なRegistoryには入っていない模様(2020/04/10現在) https://opentelemetry.io/registry/

この辺は今後も見ていきたいです。

最後に

以上のようにセットアップも非常に簡単です。 また、有料サービスだけあって専用サーバー管理がいらないのは楽で良いです。

ただ、もし実導入するようであればトレーシングによるアプリの負荷については注意深く見ておく必要があると思います。(Datadogに限った話ではないのですが)