くらげになりたい。

くらげのようにふわふわ生きたい日曜プログラマなブログ。趣味の備忘録です。

GCPのCloud Pub/Subに入門してみた

Cloud Pub/SubでFirestore triggersっぽいことをと思って、
いろいろ調べてみたときの備忘録(*´ω`*)

流れとしてはこんな感じ。

  • Firestoreの監査ロギング情報を有効化する
  • Cloud LoggingのログルータでPub/Subに転送
  • Pub/Subでpullサブスクリプションを使って実行

Cloud Pub/Subの概要

Pub/Subサービスとは、メッセージの送信者とメッセージの受信者を切り離す方式のメッセージサービスのこと

メッセージは、サービスを通って移動するデータのこと。
PublisherとSubscriberは任意のアプリケーション。

pub/sub overview

メッセージをTopicに送信すると、
TopicからSubscriptionに配信される。

メッセージを受け取るアプリケーション(Subscriber)が、
Subscriptionからメッセージを受信する。

メッセージのステータス

  • 確認済みメッセージ(確認済み)
    • Subscriberアプリが確認応答をPub/Subに返信したメッセージ
    • 全Subscriptionが確認済みになるとメッセージは削除される
  • 確認応答されていないメッセージ(確認応答なし)
    • 確認応答期限内に確認応答の返信がないメッセージ
    • 複数回配信される可能性がある。一時的なネットワーク問題のときなど
  • 否定応答メッセージ(否定応答)
    • Subscriberアプリが否定応答をPub/Subに返信したメッセージ
    • 無効なメッセージを否定応答する場合やメッセージを処理できない場合に利用
    • メッセージが失われないようにして、最終的に正常に処理されるようにできる

Pub/Subのパターン

pub/sub pattern

  • ファンイン(多対 1)
    • 複数のPublisherからのメッセージをまとめる
  • 負荷分散(多対多)
    • 複数のSubscriberで並列化する
  • ファンアウト(1 対多)
    • 同じメッセージに対して異なる処理を行う

Subscriptionのタイプ

pullサブスクリプション

Subscriberアプリが主体のタイプ。
PullとStreamingPullの2種類がある。

pull subscription stream pull subscription

さらに、非同期pullモードと同期pullモードがある。

pushサブスクリプション

Subscriptionが主体のタイプ。
HTTPリクエストとして、Subscriberアプリのエンドポイントにメッセージを送信する。

push subscription

使い方

Topicを作る

まずはメッセージを受け取るTopicを作る

Subscriptionを作る

次に作成したTopicに対し、
Subscriptionを作る。

Cloud Loggingからメッセージを送る

Firestore triggersっぽく使いたいので、
Cloud LoggingをPublisherアプリとして使う。

監査ロギングの有効化

Cloud LoggingにFirestoreの書き込みログが出力されるように、
Firestoreの監査ロギング情報を有効化する。 (IAMと管理 > 監査ログFirestore/Datastore APIの「データ書き込み」をON)

ログルーターの作成

特定のログをPub/SubのTopicに送るためにログルータを利用する。

シンクの宛先に、作成したTopicを指定して、
protoPayload.request.writes.update.name
メッセージを送りたいfirestoreのパスがヒットする正規表現でフィルタリングする。

包含フィルタはこんな感じ。COLLECTION_ID_Aなどは任意で。

## Firestoreの監査ロギング情報
logName =~ "\/logs\/cloudaudit.googleapis.com%2Fdata_access"
## "/COLLECTION_ID_A/<id>/COLLECTION_ID_B/<id>"のパスのみ
AND protoPayload.request.writes.update.name =~ "\/COLLECTION_ID_A\/[^/]+\/COLLECTION_ID_B\/[^/]+$"
## 既存のドキュメントがない=createの場合のみ
AND (NOT protoPayload.request.writes.currentDocument:* OR protoPayload.request.writes.currentDocument.exists = false)

これで、ログルーター作成以降は、Pub/Subにメッセージが送られるようになる。

クライアントライブラリでメッセージを受け取る

最後にクライアントライブラリでメッセージを受け取ってみる。

$ npm install @google-cloud/pubsub

継続的に取得するStreaming Pullの場合はこんな感じ。
認証はgoogle-auth-libraryと同じ。環境変数で指定するでもOK

import { Message, PubSub } from "@google-cloud/pubsub";

// pub/subクライアントのインスタンスを取得
const pubSubClient = new PubSub({
  projectId: "YOUR_PROJECT_ID",
  credentials: {
    client_email: "YOUR_CLIENT_EMAIL",
    private_key: "YOUR_PRIVATE_KEY",
  }
});

// 対象のサブスクリプションを取得
const subscription = pubSubClient.subscription(
  "projects/<YOUR_PROJECT_ID>/subscriptions/<YOUR_SUBSCRIPTION_NAME>"
);

// 受け取ったメッセージを処理する関数
const messageHandler = async (message: Message) => {
  // message.dataはBufferなので、JSONに変換する
  const jsonData = JSON.parse(Buffer.from(message.data).toString());
  
  // cloud loggingの情報がそのまま入っているので、
  // その中から、document pathを取得する
  const dataWrites = data["protoPayload"]["request"]["writes"];
  const docPath = dataWrites.flatMap((v: any) => v["update"]["name"])[0];
  
  // document pathから各document idを取得する
  const pattern = /\/COLLECTION_ID_A\/([^\/]+)\/COLLECTION_ID_B\/([^\/]+)$/;
  const result = path.match(pattern);
  if (!result) return;
  const aId = result[1];
  const bId = result[2];
  
  // TODO: なにかの処理
  
  // メッセージを確認済みにする
  message.ack();
};

// メッセージの受信を開始
subscription.on("message", messageHandler);

// メッセージの受信を停止
subscription.removeListener('message', messageHandler);

また、メッセージを受け取るには、
「Pub/Sub サブスクライバー」権限が必要なので、
IAMのロールを確認する。


以上!! なんとかそれっぽい感じのことができた(*´ω`*)

参考にしたサイトさま