SMTP・API連携で高速メール配信するならブラストエンジン

Webhookの実装方法をPython・Node.jsコード付きで解説

更新日: API連携

現代のWebサービス開発において「リアルタイム性」はますます重要になっています。ユーザーの行動を瞬時に捉え、即座にサービスへ反映させることが、顧客満足度やビジネス成果を左右する時代です。こうしたニーズを支える技術のひとつがWebhookです。

Webhookはアプリケーション間でデータをリアルタイムに連携する仕組みであり、イベントが発生した瞬間に外部へ通知を送る「プッシュ型」の通信を実現します。例えば、メール配信システムでエラーが起きた際に即座に通知を受け取り、自動で対応フローを走らせるなどビジネスロジックと緊密に連携できるのが大きな魅力です。

一方で、Webhookの導入には「セキュリティ対策」や「異常系ハンドリング」など、実装上のハードルが少なくありません。正しく実装しなければ、再送ループによるシステム負荷や認証不足によるセキュリティリスクが発生する恐れもあります。

本記事では「Webhookの実装方法」 をテーマに、次のような開発者が知りたいポイントを徹底解説します。

  • Webhookの仕組みやメリット
  • blastengine Webhookの仕様と実装例(Python・Node.js)
  • 実装前に整えるべき環境や準備
  • 正常系・異常系それぞれのハンドリング
  • テスト方法と運用・監視のベストプラクティス

ただのコードサンプル紹介に留まらず、実運用に耐えうるノウハウを詰め込んだ内容です。Webhookを自社のシステムへ安全かつ効果的に取り入れたい方は、ぜひ最後までご覧ください。

ブラストエンジンのバナー画像

Webhookとは?

Webhookとはアプリケーション間でリアルタイムにデータを通知する仕組みです。APIでデータを取得する場合はポーリングが必要ですが、Webhookではイベント発生時に自動で通知が送信されます。

例えば、メール配信システムの場合、ドロップやエラーが発生した瞬間に指定したURLへデータを送信してくれるため、リアルタイム性の高い処理が可能です。

技術的には特定のイベントが発生した際にHTTP POSTリクエストを事前登録されたエンドポイントに送信する仕組みで、プッシュ型通知とも呼ばれています。

関連記事:Webhookとは?仕組みやメリット、APIとの違い、利用方法について分かりやすく解説

blastengine Event Webhookの仕様

Webhookを実装する前にblastengineが提供する仕様を確認しておきましょう。blastengineのWebhookは、次のようなイベントに対応しています。

イベントタイプ発生ケース補足
DROPエラー停止リスト内のメールアドレスへの送信配信リクエストの内容や、配信停止リクエストを確認してください。
SOFTERROR  宛先の問題などによる一次的なエラーリトライにより一定時間経過後に到達する場合があります。
HARDERRORメールアドレスが存在しないなど恒久的なエラー当該アドレスの削除又は修正を実施してください。

イベントはJSON形式で通知されます。パラメータは以下の通りです。

項目説明
type通知するイベントタイプ
datetimeバウンスメールを受信した時間(ISO 8601拡張形式)
detail通知内容の詳細
mailaddressエラーになったメールアドレス
subject配信したメールの件名
error_code配信先からのレスポンスコード
error_messageエラーメッセージ(blastengine内で解析した文章)
delivery_id配信ID
insert_codes通知差し込みコード(未設定時は含まれません)
key管理画面で設定した差し込みの対象となるキー文字列
value差し込みする値

例えば以下のようなデータが送られてきます。

{
  "events": [
    {
      "event": {
        "type": "DROP",
        "datetime": "2025-06-19T03:30:00+09:00",
        "detail": {
          "mailaddress": "drop@example.com",
          "subject": "テスト DROP",
          "error_code": "554",
          "error_message": "エラー停止リストに含まれる宛先であるため、配信がドロップされました",
          "delivery_id": 12345
        }
      }
    },
    {
      "event": {
        "type": "HARDERROR",
        "datetime": "2025-06-19T03:31:00+09:00",
        "detail": {
          "mailaddress": "hard@example.com",
          "subject": "テスト HARDERROR",
          "error_code": "550",
          "error_message": "宛先のメールアドレスがありません",
          "delivery_id": 12346
        }
      }
    }
  ]
}

関連記事:Webhook設定方法について

実装前の事前準備(環境構築)

Webhook実装を始める前に次の環境を用意しておきましょう。

  • 受信エンドポイントを一時公開できるツール
     例:ngrok、Cloudflare Tunnel、ポート開放+Let’s Encrypt
  • 開発環境(Python、Node.jsなど)
  • テスト環境(ローカル、ステージング)
  • 本番環境(独自ドメイン+常時HTTPS、Basic認証を設定できる公開サーバー)

こうした準備を整えておくことで開発から本番環境への移行までスムーズに進められます。

受信エンドポイントの実装例

受信エンドポイントはblastengineから送信されるイベント通知を受け取り、必要に応じてビジネスロジックを実行する役割を持ちます。実装言語としては、Python、Node.js、PHP、Rubyなど様々な選択肢がありますが、本記事ではPythonとNode.jsの実装例に絞って解説します。

例えば、以下のようなポイントを意識して実装すると良いでしょう。

  • JSON形式で送られてくるリクエストボディを正しくパースする
  • 正常に受け取った場合はHTTPステータス200を返す
  • セキュリティ対策として署名検証やBasic認証の導入を検討する
  • エラー時のレスポンス内容を決めておく

Pythonの基本実装例

まずはPythonでの基本的な実装例を見ていきましょう。ここでは blastengine からのイベント通知を受信しシンプルに「accepted」というメッセージを返すだけの最小構成です。

from fastapi import FastAPI, Request, HTTPException, status
from fastapi.responses import JSONResponse

app = FastAPI()

@app.post("/webhook", status_code=status.HTTP_200_OK)
async def blastengine_webhook(request: Request):
    """blastengine からの POST を受信する最小エンドポイント."""
    try:
        payload = await request.json()  # application/json を想定
    except Exception:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Request payload must be valid JSON"
        )

    return JSONResponse({"status": "accepted"})

この実装例はblastengine から送信されるWebhook通知を単純に受け取るだけの構成です。JSON形式のリクエストのみを想定しており、バリデーションや認証処理は含まれていません。実際の運用では次のような対応が必須です。

  • リクエスト内容の構造や必須項目の検証
  • 不正アクセスを防ぐための Basic 認証や署名検証
  • payload の中身を解析し、イベントごとのビジネスロジックを実装

例えば、DROP や HARDERROR といったイベントごとに処理を分けることが多いでしょう。

Node.js の基本実装例

Node.js での実装例は以下のようになります。

import express from 'express';

const app = express();
const PORT = process.env.PORT || 8000;

// ─── JSON パーサー(strict モード) ──
app.use(
  express.json({
    strict: true, // 配列先頭に BOM などがあると 400
  })
);

// ─── ルーティング ──
app.post('/webhook', (req, res) => {
  // ここまで来れば JSON の読み取りに成功
  return res.status(200).json({ status: 'accepted' });
});

// ─── JSON パースエラーを捕捉 ──
app.use((err, _req, res, _next) => {
  if (err instanceof SyntaxError && 'body' in err) {
    // 無効な JSON
    return res
      .status(400)
      .json({ detail: 'Request payload must be valid JSON' });
  }
  // それ以外のエラーは Express の既定処理へ
  return res.status(500).end();
});

// ─── サーバー起動 ──
app.listen(PORT, () => {
  console.log(`Server listening on http://localhost:${PORT}`);
});

このコードでは express.json() を使うことで、リクエストボディを自動的に JSON としてパースしています。blastengine の Webhook は Content-Type: application/json で送信されるため、この方法で問題なく受信できます。Python の例と同様に、実際の運用では以下のような対策が必要です。

  • 認証(Basic 認証や署名検証など)
  • 想定外の構造や欠落項目に対するバリデーション
  • イベントタイプごとのビジネスロジック実装

例えば、特定のイベントタイプ(DROP、HARDERRORなど)が発生した際に、自動で監視アラートを飛ばすなどの処理を組み込むケースも多いです。

ビジネスロジックの追加

Webhookを受信したら単にログを記録するだけではなく、実際にイベントを処理するロジックを実装する必要があります。これらのロジックは blastengine の利用用途によって大きく異なるため、自社の要件に合わせて適切に実装することが重要です。例えば、次のようなケースがあります。

  • DROP イベントを検知したら該当アドレスを顧客DBから除外する
  • HARDERROR の際に運用チームへアラートを送る
  • 配信結果を分析用のシステムへ送信する

blastengineのWebhook は1回の受信で複数のイベントがまとめて通知される場合があるため、forや forEachなどを使って全てのイベントを処理する必要があります。

Python の実装例

以下はPythonの例です。ここではイベントごとに処理を分け、match/case 構文(Python 3.10 以上)を利用しています。

from fastapi import FastAPI, Request, HTTPException

app = FastAPI()

@app.post("/webhook")
async def blastengine_webhook(request: Request):
    try:
        data = await request.json()
    except Exception:
        raise HTTPException(status_code=400, detail="invalid JSON")

    for wrap in data.get("events", []):
        evt = wrap.get("event", {})
        evt_type = evt.get("type")

        match evt_type:  # Python 3.10+ の match/case
            case "DROP":
                # DROP 受信時の処理をここに実装
                print("[DROP] process done.")
            case "SOFTERROR":
                # SOFTERROR 受信時の処理をここに実装
                print("[SOFTERROR] process done.")
            case "HARDERROR":
                # HARDERROR 受信時の処理をここに実装
                print("[HARDERROR] process done.")
            case _:
                print(f"[UNKNOWN] {evt_type}")

    return {"status": "accepted"}  # 処理成功として 200 OK を返す

Node.js の実装例

次に Node.js の例です。こちらでも複数のイベントを順に処理するよう実装しています。

import express from 'express';

const app = express();
const PORT = process.env.PORT || 8000;

// JSON だけ受け付ける
app.use(express.json());

// --- Webhook --- //
app.post('/webhook', (req, res) => {
  const { events } = req.body ?? {};

  if (!Array.isArray(events)) {
    // 想定フォーマットでない場合は 400
    return res.status(400).json({ detail: '"events" must be an array' });
  }

  // 複数イベントを順に処理
  events.forEach(({ event = {} }) => {
    const { type, detail = {} } = event;

    switch (type) {
      case 'DROP':
        // DROP 受信時の処理をここに実装
        console.log('[DROP] process done.');
        break;
      case 'SOFTERROR':
        // SOFTERROR 受信時の処理をここに実装
        console.log('[SOFTERROR] process done.');
        break;
      case 'HARDERROR':
        // HARDERROR 受信時の処理をここに実装
        console.log('[HARDERROR] process done.');
        break;
      default:
        console.log('[UNKNOWN] process done.');
    }
  });

  // 最後に 200 OK を返却
  return res.json({ status: 'accepted' });
});

// --- JSON パースエラー捕捉 --- //
app.use((err, _req, res, _next) => {
  if (err instanceof SyntaxError && 'body' in err) {
    return res
      .status(400)
      .json({ detail: 'Request payload must be valid JSON' });
  }
  return res.status(500).end();
});

// --- 起動 --- //
app.listen(PORT, () =>
  console.log(`Server listening on http://localhost:${PORT}`)
);

このコードでは、express.json() によりリクエストボディを JSON としてパースし、イベントタイプごとに処理を分岐させています。blastengine の Webhook は Content-Type: application/json で送信されるため、この方法で問題なく受け取ることができます。

ブラストエンジンのバナー画像

異常系ハンドリング

ここまでは正常系の実装を紹介してきましたが、Webhook の開発において異常系のハンドリングは、安定したシステム運用のために不可欠です。ここからは、実装すべき代表的な異常系について解説します。

リクエストバリデーションエラー

JSON の構文エラーや、必須フィールドが欠落している場合には、明示的に 400 Bad Request422 Unprocessable Entity を返すようにしましょう。不正なリクエストをそのまま処理してしまうことを防ぐためです。

認証エラー

Webhook のエンドポイントはインターネット上に公開されるため、誰でも curl コマンドなどでアクセスできてしまいます。そのため、Webhook の認証は必須です。blastengine ではBasic 認証がサポートされており、受信したリクエストが blastengine から送られたものであることを確認できます。

Basic 認証の設定方法

blastengine で Webhook に Basic 認証を設定する場合は、管理画面の「Webhook」設定画面でエンドポイント URL に次のように認証情報を含めて保存します。

https://username:password@example.com/webhook

例えば上記のように、ユーザー名とパスワードを URL に埋め込む形式です。注意点として、ユーザー名とパスワードには任意の英数字を設定できますが、blastengine のログイン情報と同じ値を使うことは避けましょう。また、記号を含む場合は URL エンコードが必要です。サーバー側では、受信リクエストに含まれる Authorization ヘッダーを検証することで、認証が完了します。これにより、不正なリクエストを排除し、Webhook のセキュリティを高められます。

ビジネスロジックの例外処理

Webhook の受信後、多くの場合は外部 API 連携やデータベース更新、アラート通知などのビジネスロジックを実行することになります。その際に重要なのが例外処理です。想定外のエラーが発生した場合も、システム全体が停止しないように備えておきましょう。

特に重要なのは、ビジネスロジック内でエラーが起きた場合も Webhook のレスポンス自体は必ず 200 OK を返す という点です。レスポンスがエラーになると、blastengine 側では配信エラーと判定され、再送を繰り返してしまうためです。Webhook の受信成功とビジネスロジックの成功は別々に考える必要があります。

また、ビジネスロジックで発生した例外は、ログやアラート通知を通じてオペレーターに即座に知らせる仕組みを作ることが推奨されます。

Python での実装例

以下は、ここまでの異常系ハンドリングを全て盛り込んだ Python 実装例です。

from typing import List, Literal, Set, Callable, Awaitable
from fastapi import (
    FastAPI,
    HTTPException,
    Request,
    Depends,
    status,
)
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from pydantic import BaseModel, Field, ValidationError
import logging, os, secrets

# -------------------------------------------------
app = FastAPI()
security = HTTPBasic()

# ---------- ロガー ----------
logger = logging.getLogger("webhook")
logging.basicConfig(
    level=logging.INFO,
    format="[%(asctime)s] %(levelname)s - %(message)s",
)

# ---------- 認証情報 ----------
BASIC_USER = os.getenv("BASIC_USER", "user")
BASIC_PASS = os.getenv("BASIC_PASS", "password")

# ---------- 冪等性メモリ ----------
processed_ids: Set[int] = set()

# ========== Pydantic スキーマ ==========
class EventDetail(BaseModel):
    mailaddress: str
    subject: str | None = None
    error_code: str | None = None
    error_message: str | None = None
    delivery_id: int

class Event(BaseModel):
    type: Literal["DROP", "SOFTERROR", "HARDERROR"]
    datetime: str | None = None
    detail: EventDetail

class EventWrapper(BaseModel):
    event: Event  # ← ラッパー層を表現

class WebhookBody(BaseModel):
    events: List[EventWrapper] = Field(..., min_items=1)

# ========== ログ用ミドルウェア ==========
@app.middleware("http")
async def log_request(request: Request, call_next: Callable[[Request], Awaitable]):
    body = await request.body()
    logger.info(
        "REQUEST %s %s   Headers=%s   Body=%s...",
        request.method,
        request.url.path,
        {k: v for k, v in request.headers.items()},
        body[:128].decode("utf-8", "ignore"),
    )
    request = Request(request.scope, receive=lambda: {"type": "http.request", "body": body})
    return await call_next(request)

# ========== Basic 認証 ==========
def check_basic(credentials: HTTPBasicCredentials = Depends(security)):
    if not (
        secrets.compare_digest(credentials.username, BASIC_USER)
        and secrets.compare_digest(credentials.password, BASIC_PASS)
    ):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid credentials",
            headers={"WWW-Authenticate": 'Basic realm="Webhook"'},
        )

# ========== メインエンドポイント ==========
@app.post("/webhook", status_code=status.HTTP_200_OK, dependencies=[Depends(check_basic)])
async def blastengine_webhook(request: Request):
    try:
        payload = WebhookBody(**await request.json())
    except ValidationError as ve:
        raise HTTPException(status_code=422, detail=ve.errors())
    except Exception:
        raise HTTPException(status_code=400, detail="invalid JSON")

    # -------- ビジネスロジック(例外は握りつぶす) --------
    for wrap in payload.events:
        evt = wrap.event  # ← unwrap
        try:
            if evt.detail.delivery_id in processed_ids:
                logger.info("Skip duplicate id=%s", evt.detail.delivery_id)
                continue
            processed_ids.add(evt.detail.delivery_id)

            match evt.type:
                case "DROP":
                    logger.info("[DROP] %s", evt.detail.mailaddress)
                case "SOFTERROR":
                    logger.info("[SOFTERROR] %s", evt.detail.mailaddress)
                case "HARDERROR":
                    logger.info("[HARDERROR] %s", evt.detail.mailaddress)
        except Exception as exc:
            logger.exception(
                "Business logic failed for id=%s: %s",
                evt.detail.delivery_id,
                exc,
            )

    return {"status": "accepted"}

※ 認証情報は、例として username を “user”, password を “password” に設定しています。本番環境では必ずより複雑で推測されにくい文字列に置き換えてください。

テストのやり方

Webhook を実装したら、本番環境に展開する前に必ず十分なテストを行いましょう。テストにはローカル環境で行う方法と、ステージング環境で行う方法があります。

ローカル環境でのテスト方法

ローカル環境でのテストは、以下の手順で比較的簡単に実施できます。

【curl での擬似的なイベント通知】

  • FastAPI サーバーをローカルポート(例:8080)で起動する
  • curl や Postman を使ってサンプル JSON を POST し、サーバーログを確認する
  • Basic 認証、重複 delivery_id、スキーマエラーなど、想定ケースを順に送信し、HTTP ステータス(400、401、422 など)のレスポンスが期待どおりか検証する

【blastengine から実際のイベントを通知】

  • ngrok http 8080 などで HTTPS トンネルを発行し、外部アクセス用の URL を取得する
  • blastengine の管理画面で Webhook URL を「Basic 認証情報 + ngrok URL + /webhook」の形式で設定し保存する
  • curl を使って blastengine の API を叩き、配信停止リストや存在しないアドレスにメールを送信し、DROP やエラーを意図的に発生させる
  • Webhook がイベントを正しく受信・処理できているか、ログなどを確認する

例えば、curl を使った POST の例は以下のようになります。

curl -X POST "http://localhost:8080/webhook" \
  -H "Content-Type: application/json" \
  -u user:password \
  -d '{
    "events": [
      {
        "event": {
          "type": "DROP",
          "datetime": "2025-06-19T03:30:00+09:00",
          "detail": {
            "mailaddress": "test@example.com",
            "subject": "テスト",
            "delivery_id": 12345
          }
        }
      }
    ]
  }'

ステージング環境でのテスト方法

ステージング環境でテストを行うことで、本番に近いネットワーク構成や認証設定で動作確認でき、環境依存の不具合や設定ミスを事前に防ぐことができます。基本的な流れは、先述したローカルテストの 【blastengine から実際のイベントを通知】 の方法をステージング環境で行うだけです。

以下の点に注意しながらテストを進めましょう。

  • 本番と同じエンドポイント構成(URL スキーム、パス、認証情報)で Webhook を受信できるようにする
  • blastengine 側で専用のテスト用送信元アドレスを用意し、DROP やエラーを意図的に発生させる
  • 受信ログやレスポンスコード(200、401、422 など)を確認し、実装したバリデーションや認証処理が正しく機能しているかを検証する
  • 実際の再送挙動(リトライ)や delivery_id の冪等性チェックも含めて検証し、ログやアラート通知の動作も確認する
  • 不要な影響を避けるため、ステージング環境の URL が誤って本番環境に設定されていないかを必ず再確認する

こうしたテストを徹底することで、想定外のエラーを未然に防ぎ、安心して本番環境へリリースできます。

運用・監視のベストプラクティス

Webhook 受信機能を安定して運用するためには、実装だけでなく監視と障害対応の体制を整えておくことが非常に重要です。とくに Webhook は非同期かつ外部からの通信に依存するため、もし失敗しても即座に表面化しないリスクがあります。ここでは、代表的な運用・監視のベストプラクティスをご紹介します。

メトリクスの収集と可視化

FastAPI を Prometheus と統合することで、以下のようなメトリクスを収集できます。

  • リクエスト数
  • レスポンスステータス(200、401、422、5xx など)
  • 処理時間

例えば、prometheus-fastapi-instrumentator のような Exporter を導入すれば、Webhook の応答状況を簡単に可視化でき、Grafana などを使ってダッシュボードを構築することが可能です。

アラート設定

次のような状況を監視し、Prometheus Alertmanager でアラートを設定しておくと安心です。

  • 200 以外のレスポンスが一定回数以上連続して発生した場合
  • リクエスト数が急増または急減した場合

アラートはSlack やメールへ通知するのがおすすめです。再送ループや認証エラーなどを早期に検知し、迅速に対応することが可能になります。

ログの一元管理と検索

Webhook の運用では以下のようなログを詳細に記録し、後で検索できるようにすることが大切です。

  • リクエスト・レスポンスの内容
  • 認証失敗
  • バリデーションエラー
  • ビジネスロジック内の例外

こうしたログを構造化して Elasticsearch や Cloud Logging に集約しておけば、トラブル発生時の原因調査や過去データの分析が圧倒的に効率化します。

再送判定と通知

blastengine では 2xx 以外のレスポンスが返ると自動的に再送が行われます。そのため、アプリケーション側で次のような仕組みを用意しておくことが重要です。

  • 同じ delivery_id が複数回送られてきたかを記録する
  • 異常と判定した場合には即座にアラートを飛ばす

例えば、冪等性の確保や再送ループの早期検知に役立ちます。

まとめ

Webhookは、リアルタイムに外部サービスと連携する強力な仕組みです。特に blastengine のようなメール配信システムと組み合わせることで、配信結果を即時に把握し、ビジネスロジックへダイレクトに反映させることが可能になります。

しかし、その利便性の裏には、認証や異常系処理、運用監視といった設計上のポイントが数多く存在します。今回ご紹介した実装例やテスト手法、運用ノウハウを参考に、自社の要件にあった Webhook の構築を進めてみてください。

最初は小さく始めても構いません。重要なのは、異常を見える化し、素早く気付ける体制を作ること です。しっかりと準備を整えれば、Webhook はあなたのサービスをさらに強くする武器になるでしょう。

「Webhook の実装方法」を探している方の参考になれば幸いです。

Gmail送信者ガイドライン対応バナー

blastengine(プラストエンジン)ロゴ

エンジニアを面倒なメールに関するトラブルから解放するために作られたブラストエンジン
まずは無料トライアルで、その使いやすさを実感してみませんか?

\メールアドレス入力のみ/

無料トライアル