Hibito 営業組織を変革する

Pythonで営業自動化|GTMエンジニアの実践スクリプト集

Pythonで営業業務を自動化する実践スクリプトを解説。CRMデータ取得、リード振り分け、レポート生成、メール自動送信など、GTMエンジニアが現場で使うコード例を紹介します。

W

渡邊悠介


Pythonは、営業オペレーションを自動化するうえで最も汎用性の高いプログラミング言語である。iPaaSツールがノーコードで簡易な連携を実現する一方で、複数APIの結果を結合して独自ロジックで加工する、数万件のリードデータをバッチ処理する、CRMのデータを集計してカスタムレポートをSlackに自動配信する——こうした「iPaaSの守備範囲を超える自動化」にはコードが必要であり、そのコードを書く言語としてPythonが選ばれる理由は明確だ。豊富なライブラリ、読みやすい構文、そしてデータ処理との相性の良さ。本記事では、GTMエンジニアが営業現場で実際に使っているPythonスクリプトの実践例を、コード付きで解説する。

なぜ営業自動化にPythonを使うのか

営業自動化のツールは数多く存在する。Zapier、Make、n8nといったiPaaS、HubSpotのワークフロー機能、Salesforce Flow。これらで十分なケースは確かに多い。では、なぜわざわざPythonを持ち出す必要があるのか。

理由は3つある。

第一に、データ加工の自由度。 iPaaSのデータ変換はフィルタリングや簡単なマッピングが中心だ。「過去6ヶ月の商談データをリードソース別・担当者別にクロス集計し、前年同期比を算出して、閾値を下回ったセグメントだけをアラート対象にする」——このレベルの加工をノーコードで組むのは現実的ではない。Pythonのpandasなら数十行で書ける。

第二に、バッチ処理への対応。 CRMに5万件のコンタクトがあり、全件に対してデータエンリッチメントAPIを叩いてスコアを再計算したい場合、iPaaSではレートリミットの管理やエラーハンドリングに限界がある。Pythonならリトライ制御、レートリミット遵守のためのスリープ処理、途中経過の保存まで自在にコントロールできる。

第三に、複数データソースの統合。 CRMのデータ、MAツールのデータ、CSツールのデータ、Googleスプレッドシートの予算データ——これらを一つのスクリプト内で統合し、整合性を保ったまま処理できるのはプログラミング言語の強みだ。営業データ基盤の構築においても、Pythonはデータパイプラインの中核を担う。

誤解のないように補足しておくと、iPaaSが不要だと言いたいわけではない。むしろ、シンプルな連携はiPaaSに任せ、複雑な処理をPythonで担う——この使い分けがGTMエンジニアの設計力そのものだ。

環境構築——最小限のセットアップ

Pythonで営業自動化を始めるための環境構築は、驚くほどシンプルだ。必要なのは、Python本体、仮想環境、そして数個のライブラリだけである。

# Python 3.10以上を推奨
python3 --version

# プロジェクトディレクトリの作成
mkdir sales-automation && cd sales-automation

# 仮想環境の作成と有効化
python3 -m venv venv
source venv/bin/activate  # Windowsの場合: venv\Scripts\activate

# 必要ライブラリのインストール
pip install requests pandas python-dotenv schedule

各ライブラリの役割を整理する。

ライブラリ用途営業自動化での使い所
requestsHTTP通信CRM API呼び出し、Webhook送信
pandasデータ処理商談データの集計・加工・分析
python-dotenv環境変数管理APIキーの安全な管理
schedule定期実行日次・週次の自動レポート

APIキーはコードにハードコーディングしてはならない。.envファイルに分離し、.gitignoreに追加するのが鉄則だ。

# .env
HUBSPOT_API_KEY=pat-na1-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/XXXXX/XXXXX/XXXXX
# config.py
import os
from dotenv import load_dotenv

load_dotenv()

HUBSPOT_API_KEY = os.getenv("HUBSPOT_API_KEY")
SLACK_WEBHOOK_URL = os.getenv("SLACK_WEBHOOK_URL")

この構成を基盤として、以降の実践スクリプトを構築していく。

実践1: CRMからのデータ取得と加工

営業自動化の起点は、CRMからデータを取得することだ。ここではHubSpot APIを例に、商談(Deal)データを取得してpandasで集計するスクリプトを示す。

import requests
import pandas as pd
from config import HUBSPOT_API_KEY

def fetch_deals():
    """HubSpot APIから全商談データを取得する"""
    url = "https://api.hubapi.com/crm/v3/objects/deals"
    headers = {"Authorization": f"Bearer {HUBSPOT_API_KEY}"}
    params = {
        "limit": 100,
        "properties": "dealname,amount,dealstage,closedate,pipeline,hubspot_owner_id"
    }

    all_deals = []
    after = None

    while True:
        if after:
            params["after"] = after
        response = requests.get(url, headers=headers, params=params)
        response.raise_for_status()
        data = response.json()
        all_deals.extend(data["results"])

        # ページネーション処理
        paging = data.get("paging")
        if paging and "next" in paging:
            after = paging["next"]["after"]
        else:
            break

    return all_deals

def analyze_pipeline(deals):
    """パイプラインをステージ別に集計する"""
    records = []
    for deal in deals:
        props = deal["properties"]
        records.append({
            "name": props.get("dealname", ""),
            "amount": float(props.get("amount", 0) or 0),
            "stage": props.get("dealstage", ""),
            "close_date": props.get("closedate", ""),
            "owner_id": props.get("hubspot_owner_id", "")
        })

    df = pd.DataFrame(records)

    # ステージ別サマリー
    summary = df.groupby("stage").agg(
        deal_count=("name", "count"),
        total_amount=("amount", "sum"),
        avg_amount=("amount", "mean")
    ).round(0)

    return summary

if __name__ == "__main__":
    deals = fetch_deals()
    summary = analyze_pipeline(deals)
    print(summary)

このスクリプトのポイントは3つある。ページネーション対応でAPIのレスポンス上限(100件)を超えるデータも全件取得できること。pandasのgroupbyでSQLと同等の集計をPython側で実行できること。そしてraise_for_status()でAPIエラーを即座に検知できることだ。

取得したデータは、そのまま次のステップ——レポート生成や通知配信——の入力として使える。

実践2: リード自動振り分けスクリプト

新規リードが登録されたとき、業種・企業規模・地域に応じて適切な営業担当者に自動でアサインする。Webhookトリガーと組み合わせれば、リード登録からアサインまでをリアルタイムで完結できる。

import requests
from config import HUBSPOT_API_KEY

# 振り分けルール定義
ASSIGNMENT_RULES = {
    "enterprise": {
        "condition": lambda lead: lead.get("annualrevenue", 0) >= 1_000_000_000,
        "owner_id": "12345678"  # エンタープライズ担当
    },
    "mid_market": {
        "condition": lambda lead: 100_000_000 <= lead.get("annualrevenue", 0) < 1_000_000_000,
        "owner_id": "23456789"  # ミッドマーケット担当
    },
    "smb": {
        "condition": lambda lead: lead.get("annualrevenue", 0) < 100_000_000,
        "owner_id": "34567890"  # SMB担当
    }
}

def assign_lead(contact_id: str, contact_properties: dict) -> str:
    """リードを条件に基づいて担当者にアサインする"""
    revenue = float(contact_properties.get("annualrevenue", 0) or 0)
    lead_data = {"annualrevenue": revenue}

    assigned_owner = None
    for segment, rule in ASSIGNMENT_RULES.items():
        if rule["condition"](lead_data):
            assigned_owner = rule["owner_id"]
            break

    if not assigned_owner:
        assigned_owner = ASSIGNMENT_RULES["smb"]["owner_id"]

    # HubSpotのコンタクトを更新
    url = f"https://api.hubapi.com/crm/v3/objects/contacts/{contact_id}"
    headers = {
        "Authorization": f"Bearer {HUBSPOT_API_KEY}",
        "Content-Type": "application/json"
    }
    payload = {
        "properties": {
            "hubspot_owner_id": assigned_owner
        }
    }

    response = requests.patch(url, headers=headers, json=payload)
    response.raise_for_status()

    return assigned_owner

振り分けルールをディクショナリで定義している点がこのスクリプトの設計上の工夫だ。ルールの追加や変更はコードのロジックに手を入れず、ASSIGNMENT_RULESの定義を更新するだけで済む。実際の運用では、このルール定義をJSONファイルやデータベースに外出しすることで、非エンジニアでもルール変更が可能になる。

実践3: 自動レポート生成とSlack配信

毎朝9時にパイプラインサマリーを集計し、Slackチャンネルに自動投稿するスクリプトだ。営業レポーティングの自動化の最もシンプルかつ効果的な実装パターンである。

import requests
import pandas as pd
from datetime import datetime, timedelta
from config import HUBSPOT_API_KEY, SLACK_WEBHOOK_URL

def generate_daily_report():
    """日次パイプラインレポートを生成してSlackに投稿する"""
    # 商談データの取得(前述のfetch_deals関数を使用)
    deals = fetch_deals()

    records = []
    for deal in deals:
        props = deal["properties"]
        records.append({
            "stage": props.get("dealstage", "unknown"),
            "amount": float(props.get("amount", 0) or 0),
            "close_date": props.get("closedate", "")
        })

    df = pd.DataFrame(records)

    # 今月クローズ予定の商談を抽出
    current_month = datetime.now().strftime("%Y-%m")
    df_this_month = df[df["close_date"].str.startswith(current_month)]

    # サマリー作成
    total_pipeline = df["amount"].sum()
    this_month_forecast = df_this_month["amount"].sum()
    deal_count = len(df)

    # Slack投稿用メッセージの構築
    message = {
        "blocks": [
            {
                "type": "header",
                "text": {
                    "type": "plain_text",
                    "text": f"📊 日次パイプラインレポート({datetime.now().strftime('%Y/%m/%d')})"
                }
            },
            {
                "type": "section",
                "fields": [
                    {"type": "mrkdwn", "text": f"*総パイプライン金額*\n¥{total_pipeline:,.0f}"},
                    {"type": "mrkdwn", "text": f"*今月クローズ予定*\n¥{this_month_forecast:,.0f}"},
                    {"type": "mrkdwn", "text": f"*商談件数*\n{deal_count}件"},
                    {"type": "mrkdwn", "text": f"*平均商談単価*\n¥{total_pipeline/max(deal_count,1):,.0f}"}
                ]
            }
        ]
    }

    # Slack Webhook送信
    response = requests.post(SLACK_WEBHOOK_URL, json=message)
    response.raise_for_status()

    return message

if __name__ == "__main__":
    generate_daily_report()

このスクリプトをcronで毎朝9時に実行すれば、営業マネージャーは出社した瞬間にSlackでパイプラインの状態を把握できる。手動レポート作成の工数はゼロになる。

定期実行の設定はOS標準のcron(Linux/Mac)を使うのが最もシンプルだ。

# crontab -e で以下を追加
0 9 * * 1-5 cd /path/to/sales-automation && /path/to/venv/bin/python daily_report.py

実践4: データエンリッチメントの自動化

CRMに登録されたリードの情報を外部APIで自動補完するスクリプトだ。企業名からドメインを特定し、企業情報APIで業種・従業員数・売上規模を取得してCRMに書き戻す。API連携の実践的な応用パターンである。

import requests
import time
from config import HUBSPOT_API_KEY

def enrich_contacts(contact_ids: list, enrichment_api_key: str):
    """コンタクトリストに対してデータエンリッチメントを実行する"""
    enriched_count = 0
    error_count = 0

    for i, contact_id in enumerate(contact_ids):
        try:
            # 1. HubSpotからコンタクト情報を取得
            contact = get_contact(contact_id)
            company_domain = contact["properties"].get("company_domain", "")

            if not company_domain:
                continue

            # 2. エンリッチメントAPIで企業情報を取得
            enrichment_data = fetch_enrichment(company_domain, enrichment_api_key)

            if enrichment_data:
                # 3. HubSpotのコンタクトを更新
                update_payload = {
                    "properties": {
                        "industry": enrichment_data.get("industry", ""),
                        "numberofemployees": str(enrichment_data.get("employee_count", "")),
                        "annualrevenue": str(enrichment_data.get("revenue", ""))
                    }
                }
                update_contact(contact_id, update_payload)
                enriched_count += 1

            # レートリミット対策: 100ms間隔
            time.sleep(0.1)

            # 進捗ログ
            if (i + 1) % 100 == 0:
                print(f"Progress: {i+1}/{len(contact_ids)} processed")

        except requests.exceptions.HTTPError as e:
            error_count += 1
            print(f"Error enriching {contact_id}: {e}")
            if e.response.status_code == 429:  # Rate limit
                print("Rate limited. Waiting 10 seconds...")
                time.sleep(10)

    return {"enriched": enriched_count, "errors": error_count}

def get_contact(contact_id: str) -> dict:
    url = f"https://api.hubapi.com/crm/v3/objects/contacts/{contact_id}"
    headers = {"Authorization": f"Bearer {HUBSPOT_API_KEY}"}
    params = {"properties": "company_domain,industry,numberofemployees"}
    response = requests.get(url, headers=headers, params=params)
    response.raise_for_status()
    return response.json()

def update_contact(contact_id: str, payload: dict):
    url = f"https://api.hubapi.com/crm/v3/objects/contacts/{contact_id}"
    headers = {
        "Authorization": f"Bearer {HUBSPOT_API_KEY}",
        "Content-Type": "application/json"
    }
    response = requests.patch(url, headers=headers, json=payload)
    response.raise_for_status()

def fetch_enrichment(domain: str, api_key: str) -> dict:
    """外部エンリッチメントAPIの呼び出し(例: Clearbit, Apollo等)"""
    # 実際のAPI仕様に合わせて実装する
    url = f"https://api.enrichment-service.com/v1/company?domain={domain}"
    headers = {"Authorization": f"Bearer {api_key}"}
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        return response.json()
    return None

このスクリプトで注目すべきはレートリミット対策だ。time.sleep(0.1)で100ms間隔を確保し、429エラー(レートリミット超過)を受けたら10秒待機する。数万件のバッチ処理を安定して実行するには、このような防御的なコーディングが不可欠である。iPaaSでは、このレベルのエラーハンドリングをきめ細かく制御するのが難しい。

実践5: 商談停滞アラートの自動検知

一定期間ステージが動いていない商談を自動検知し、担当者にアラートを送る。営業マネジメントにおいて、停滞商談の早期発見は受注率を左右する重要なオペレーションだ。

import requests
import pandas as pd
from datetime import datetime, timedelta
from config import HUBSPOT_API_KEY, SLACK_WEBHOOK_URL

# ステージごとの最大滞留日数
STAGE_THRESHOLDS = {
    "appointmentscheduled": 7,    # 初回商談設定: 7日
    "qualifiedtobuy": 14,         # 見込み評価: 14日
    "presentationscheduled": 10,  # 提案: 10日
    "decisionmakerboughtin": 21,  # 意思決定者合意: 21日
    "contractsent": 7             # 契約書送付: 7日
}

def detect_stale_deals():
    """停滞商談を検知してSlackにアラートを送る"""
    deals = fetch_deals()  # 前述の関数を使用

    stale_deals = []
    now = datetime.now()

    for deal in deals:
        props = deal["properties"]
        stage = props.get("dealstage", "")

        # クローズ済み商談はスキップ
        if stage in ("closedwon", "closedlost"):
            continue

        threshold = STAGE_THRESHOLDS.get(stage)
        if not threshold:
            continue

        # ステージ更新日から経過日数を算出
        last_modified = props.get("hs_lastmodifieddate", "")
        if last_modified:
            modified_date = datetime.fromisoformat(last_modified.replace("Z", "+00:00"))
            days_in_stage = (now.astimezone() - modified_date).days

            if days_in_stage > threshold:
                stale_deals.append({
                    "name": props.get("dealname", "不明"),
                    "stage": stage,
                    "days": days_in_stage,
                    "threshold": threshold,
                    "amount": float(props.get("amount", 0) or 0),
                    "owner": props.get("hubspot_owner_id", "")
                })

    if stale_deals:
        send_stale_alert(stale_deals)

    return stale_deals

def send_stale_alert(stale_deals: list):
    """停滞商談のアラートをSlackに送信する"""
    deal_lines = []
    for d in sorted(stale_deals, key=lambda x: x["amount"], reverse=True)[:10]:
        deal_lines.append(
            f"• *{d['name']}*(¥{d['amount']:,.0f})— {d['stage']}{d['days']}日停滞(上限{d['threshold']}日)"
        )

    message = {
        "text": f"⚠️ 停滞商談アラート: {len(stale_deals)}件の商談が滞留しています",
        "blocks": [
            {
                "type": "header",
                "text": {"type": "plain_text", "text": f"⚠️ 停滞商談アラート({len(stale_deals)}件)"}
            },
            {
                "type": "section",
                "text": {"type": "mrkdwn", "text": "\n".join(deal_lines)}
            }
        ]
    }

    requests.post(SLACK_WEBHOOK_URL, json=message)

STAGE_THRESHOLDSのディクショナリで、ステージごとの許容滞留日数を定義している。この閾値は組織ごとに異なるため、自社の営業サイクルに合わせて調整する必要がある。GTMエンジニアのスキルとして、営業プロセスの理解とコーディング力の両方が求められる好例だ。

運用設計——スクリプトを「仕組み」にする

スクリプトを書いただけでは自動化にならない。書いたコードを安定的に運用し続けるための設計が不可欠だ。

ディレクトリ構成。プロジェクトのファイル構成は以下のようにする。

sales-automation/
├── .env                  # APIキー(.gitignoreに追加)
├── .gitignore
├── config.py             # 環境変数の読み込み
├── fetch_deals.py        # CRMデータ取得
├── assign_leads.py       # リード振り分け
├── daily_report.py       # 日次レポート
├── enrich_contacts.py    # データエンリッチメント
├── stale_deal_alert.py   # 停滞商談アラート
├── requirements.txt      # ライブラリ一覧
└── logs/                 # 実行ログ

ログ出力。print文ではなくloggingモジュールを使う。いつ、何が、どう実行されたかを追跡できなければ、障害対応ができない。

import logging

logging.basicConfig(
    filename="logs/automation.log",
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)

エラー通知。スクリプトが失敗したとき、Slackに即座にエラー通知を飛ばす仕組みを入れておく。try-exceptで全体を囲み、exceptブロック内でSlack通知を送信するパターンが基本だ。

バージョン管理。スクリプトはGitで管理する。振り分けルールの変更履歴、閾値の変更理由——これらが追跡できないと、「なぜこのルールになっているのか」がブラックボックス化する。

この運用設計を怠ると、スクリプトは「書いた人しか触れない属人的なツール」に堕落する。GTMエンジニアの仕事は、コードを書くことではなく、営業組織が持続的に使える「仕組み」を構築することだ。

まとめ——Pythonで営業オペレーションを構造化する

Pythonによる営業自動化の本質は、営業オペレーションを「構造化」することにある。属人的な手作業をコードに落とし込むことで、再現性・正確性・速度のすべてが向上する。

本記事で紹介した5つのスクリプトは、それぞれ独立して動作するが、組み合わせることで真価を発揮する。CRMからデータを取得し(実践1)、リードを振り分け(実践2)、日次でパイプラインを可視化し(実践3)、データを自動補完し(実践4)、停滞を検知する(実践5)。この一連のフローが自動で回る状態を作れれば、営業チームはデータ入力や集計作業から解放され、本来の仕事——顧客との対話——に集中できる。

まずは最もインパクトの大きい1つのスクリプトから始めてほしい。日次レポートの自動化か、停滞商談アラートか。小さく始めて、効果を実感してから拡張する。それが営業自動化を定着させる唯一の方法だ。

参考文献

よくある質問

QPythonで営業自動化を始めるには何から学ぶべきですか?
まずrequestsライブラリでCRMのAPIを叩いてデータを取得する方法を覚えるのが最短ルートです。次にpandasでデータ加工、最後にcronやタスクスケジューラで定期実行を設定すれば、実務レベルの自動化が構築できます。
QiPaaSがあればPythonは不要ではないですか?
単純なツール間連携はiPaaSで十分ですが、複数APIの結果を結合して独自の加工ロジックを適用する、大量データのバッチ処理を行う、といった場面ではPythonが不可欠です。iPaaSとPythonは競合ではなく補完関係にあります。
Q営業担当者もPythonを学ぶべきですか?
営業担当者自身がPythonを書く必要はありません。GTMエンジニアがスクリプトを構築し、営業はその恩恵を受ける分業体制が最も効率的です。ただし営業企画担当者がPythonの基礎を理解していると、自動化の要件定義の精度が格段に上がります。
QPythonスクリプトはどのCRMでも使えますか?
はい。HubSpot、Salesforce、Pipedrive、Zoho CRMなど主要CRMはすべてREST APIを提供しており、Pythonのrequestsライブラリで共通的にアクセスできます。CRMごとにエンドポイントと認証方式が異なるだけで、基本的な実装パターンは同じです。
QPythonスクリプトの実行環境はどう整えるべきですか?
ローカルPCでの手動実行から始め、安定したらクラウド(AWS Lambda、Google Cloud Functions)またはcron(Linux/Mac)で定期実行に移行するのが定石です。仮想環境(venv)でライブラリを管理し、.envファイルでAPIキーを分離するのが運用の基本です。
渡邊悠介

渡邊悠介

代表取締役 / 株式会社Hibito

株式会社Hibito代表取締役。営業企画とAIを掛け合わせた「GTMエンジニア」として、営業組織の仕組み化・自動化を支援。CRMと生成AIを活用し、営業推進機能のAI化を推進する。「全ての人が自分の未来を自分の手で描ける社会」の実現を目指し、組織・個人コーチングも提供。

YouTubeでも発信中