Cloud Pub/SubでFirestore triggersっぽいことをと思って、
いろいろ調べてみたときの備忘録(*´ω`*)
流れとしてはこんな感じ。
- Firestoreの監査ロギング情報を有効化する
- Cloud LoggingのログルータでPub/Subに転送
- Pub/Subでpullサブスクリプションを使って実行
Cloud Pub/Subの概要
Pub/Subサービスとは、メッセージの送信者とメッセージの受信者を切り離す方式のメッセージサービスのこと
メッセージは、サービスを通って移動するデータのこと。
PublisherとSubscriberは任意のアプリケーション。
メッセージをTopicに送信すると、
TopicからSubscriptionに配信される。
メッセージを受け取るアプリケーション(Subscriber)が、
Subscriptionからメッセージを受信する。
メッセージのステータス
- 確認済みメッセージ(確認済み)
- Subscriberアプリが確認応答をPub/Subに返信したメッセージ
- 全Subscriptionが確認済みになるとメッセージは削除される
- 確認応答されていないメッセージ(確認応答なし)
- 確認応答期限内に確認応答の返信がないメッセージ
- 複数回配信される可能性がある。一時的なネットワーク問題のときなど
- 否定応答メッセージ(否定応答)
- Subscriberアプリが否定応答をPub/Subに返信したメッセージ
- 無効なメッセージを否定応答する場合やメッセージを処理できない場合に利用
- メッセージが失われないようにして、最終的に正常に処理されるようにできる
Pub/Subのパターン
- ファンイン(多対 1)
- 複数のPublisherからのメッセージをまとめる
- 負荷分散(多対多)
- 複数のSubscriberで並列化する
- ファンアウト(1 対多)
- 同じメッセージに対して異なる処理を行う
Subscriptionのタイプ
pullサブスクリプション
Subscriberアプリが主体のタイプ。
PullとStreamingPullの2種類がある。
さらに、非同期pullモードと同期pullモードがある。
pushサブスクリプション
Subscriptionが主体のタイプ。
HTTPリクエストとして、Subscriberアプリのエンドポイントにメッセージを送信する。
使い方
Topicを作る
まずはメッセージを受け取るTopicを作る
Subscriptionを作る
次に作成したTopicに対し、
Subscriptionを作る。
Cloud Loggingからメッセージを送る
Firestore triggersっぽく使いたいので、
Cloud LoggingをPublisherアプリとして使う。
監査ロギングの有効化
Cloud LoggingにFirestoreの書き込みログが出力されるように、
Firestoreの監査ロギング情報を有効化する。
(IAMと管理 > 監査ログ
のFirestore/Datastore API
の「データ書き込み」をON)
ログルーターの作成
特定のログをPub/SubのTopicに送るためにログルータを利用する。
シンクの宛先に、作成したTopicを指定して、
protoPayload.request.writes.update.name
に
メッセージを送りたいfirestoreのパスがヒットする正規表現でフィルタリングする。
包含フィルタはこんな感じ。COLLECTION_ID_A
などは任意で。
## Firestoreの監査ロギング情報 logName =~ "\/logs\/cloudaudit.googleapis.com%2Fdata_access" ## "/COLLECTION_ID_A/<id>/COLLECTION_ID_B/<id>"のパスのみ AND protoPayload.request.writes.update.name =~ "\/COLLECTION_ID_A\/[^/]+\/COLLECTION_ID_B\/[^/]+$" ## 既存のドキュメントがない=createの場合のみ AND (NOT protoPayload.request.writes.currentDocument:* OR protoPayload.request.writes.currentDocument.exists = false)
これで、ログルーター作成以降は、Pub/Subにメッセージが送られるようになる。
クライアントライブラリでメッセージを受け取る
最後にクライアントライブラリでメッセージを受け取ってみる。
$ npm install @google-cloud/pubsub
継続的に取得するStreaming Pullの場合はこんな感じ。
認証はgoogle-auth-libraryと同じ。環境変数で指定するでもOK
import { Message, PubSub } from "@google-cloud/pubsub"; // pub/subクライアントのインスタンスを取得 const pubSubClient = new PubSub({ projectId: "YOUR_PROJECT_ID", credentials: { client_email: "YOUR_CLIENT_EMAIL", private_key: "YOUR_PRIVATE_KEY", } }); // 対象のサブスクリプションを取得 const subscription = pubSubClient.subscription( "projects/<YOUR_PROJECT_ID>/subscriptions/<YOUR_SUBSCRIPTION_NAME>" ); // 受け取ったメッセージを処理する関数 const messageHandler = async (message: Message) => { // message.dataはBufferなので、JSONに変換する const jsonData = JSON.parse(Buffer.from(message.data).toString()); // cloud loggingの情報がそのまま入っているので、 // その中から、document pathを取得する const dataWrites = data["protoPayload"]["request"]["writes"]; const docPath = dataWrites.flatMap((v: any) => v["update"]["name"])[0]; // document pathから各document idを取得する const pattern = /\/COLLECTION_ID_A\/([^\/]+)\/COLLECTION_ID_B\/([^\/]+)$/; const result = path.match(pattern); if (!result) return; const aId = result[1]; const bId = result[2]; // TODO: なにかの処理 // メッセージを確認済みにする message.ack(); }; // メッセージの受信を開始 subscription.on("message", messageHandler); // メッセージの受信を停止 subscription.removeListener('message', messageHandler);
また、メッセージを受け取るには、
「Pub/Sub サブスクライバー」権限が必要なので、
IAMのロールを確認する。
以上!! なんとかそれっぽい感じのことができた(*´ω`*)
参考にしたサイトさま
- Firestore triggers Cloud Run を実現する
- Firestore の監査ロギング情報 | Google Cloud
- pullサブスクリプション | Cloud Pub/Sub ドキュメント | Google Cloud
- 転送とストレージの概要 | Cloud Logging | Google Cloud
- Logging のクエリ言語を使用したクエリの作成 | Google Cloud
- 料金 | Cloud Pub/Sub | Google Cloud
- Pub/Sub または Pub/Sub Lite の選択 | Cloud Pub/Sub ドキュメント | Google Cloud
- Pub/Sub client libraries | Cloud Pub/Sub Documentation | Google Cloud