ビジネスパーソン・ガジェット置場 empty lot for business

営業や仕事、それに伴う生活を便利に楽にするツール、ガジェットを作ります。既にあるツールも自分用にカスタマイズ。

pythonでメール取得アプリ【with Gmail api】

Gmailの受信日時、送付元、宛先、件名、既読、未読で検索し、受信したメール情報を取得してcsvファイルとしてまとめることができるガジェットです。pythonを使いgmail apiを利用したコードになっています。

f:id:kslabo51:20220410193023j:plain

例えばこんな状況で

毎日何通、何十通ものやり取りが行われているメールボックス。その中から必要なメールを探すとき、もちろん、検索などで絞っていると思いますが最後はやはり、受信メールを遡りながら1通ずつ開いて閉じて内容を確認しますよね。メールをいちいち開いて閉じてを繰り返すのがめんどくさいです。今回は、必要なメールを一覧で一度に確認できたらとてもいいなと思いまして作ったガジェットです。

 

f:id:kslabo51:20220410193518p:plain

 

このガジェットでできることはこちら

  • 受信メールの内容を取得しcsvファイルでまとめることができます。
  • まとめるメールは受信日時、送付元、宛先、件名、既読、未読で検索することができます。
  • 取得するメールの件数も設定可能です。
  • 検索はstreamlitのインターフェイスで入力するので操作性も問題ありません。

 

評価(自己)

役立ち度  ★★★★(良し)
効率化   ★★★★(良し)
ミス防止  ★★★(平均)
楽しさ   ★★★(平均)
操作    ★★★★(良し)

 

実行環境

使用環境    gmail.api、streamlit 
使用言語    python
使用ライブラリ  _future__、os、googleapiclient.discovery、google_auth_oauthlib.flow、google.auth.transport.requests、google.oauth2.credentials、apiclient、base64csv、email、dateutil.parser

 

コード

from __future__ import print_function
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from apiclient import errors
import base64
import csv
import email
import dateutil.parser

import streamlit as st

SCOPE = ['https://www.googleapis.com/auth/gmail.readonly']
tokenPath = 'token.json'
credentialPath = '[your_json_file]'

def gmail_init():
    creds = None
    if os.path.exists(tokenPath):
        creds = Credentials.from_authorized_user_file(tokenPath, SCOPE)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(credentialPath, SCOPE)
            creds = flow.run_local_server(port=0)
        with open(tokenPath, 'w') as token:
            token.write(creds.to_json())
            
    service = build('gmail', 'v1', credentials=creds)
    return service

def get_message_list(service, user_id, query, count):
    messages = []
    try:
        message_ids = (
            service.users()
            .messages()
            .list(userId=user_id, maxResults=count, q=query)
            .execute()
        )
        
        if message_ids["resultSizeEstimate"]==0:
            print("No Data")
            return []
        
        for message_id in message_ids["messages"]:
            detail = (
                service.users()
                .messages()
                .get(userId='me', id=message_id['id'])
                .execute()
            )
            message={}
            message['id'] = message_id['id']
            
            if 'data' in detail['payload']['body']:
                decoded_bytes = base64.urlsafe_b64decode(
                    detail['payload']['body']['data'])
                decoded_message = decoded_bytes.decode('UTF-8')
                message['body'] = decoded_message
            else:
                message['body'] = ""
            
            message['subject'] = [
                header['value']
                for header in detail['payload']['headers']
                if header['name'] =='Subject'
            ][0]
            
            message['from'] = [
                header['value']
                for header in detail['payload']['headers']
                if header['name'] == 'From'
            ][0]
            
            message['date'] = [
                header['value']
                for header in detail['payload']['headers']
                if header['name'] == 'Date'
            ][0]
            messages.append(message)
        return messages
    
    except errors.HttpError as error:
        print("An error occured: %s" % error)
        
def main(query, count=10):
    service = gmail_init()
    messages = get_message_list(service, 'me', query, count=count)
    
    if messages:
        field_names = ['date', 'id', 'body', 'subject', 'from']
        with open('gmails.csv', 'w', newline='', encoding='utf-8') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=field_names)
            writer.writeheader()
            writer.writerows(messages)
            
    else:
        return None

st.title('メール取得アプリ')
st.write('検索条件を決定します')

li = ['subject', 'from', 'to', 'is', 'after']
li_state = ['unread', 'read']
search = st.selectbox('検索の種類を選んでください', options=li)
if search is li[0]:
    query_text = st.text_input('検索文字を入力してください')
    query = ':'.join([search, query_text])
    if query != '':
        count = st.number_input('検索数を入力してください')
        if st.button('検索'):
            main(query=query, count=count)
elif search is li[1]:
    query_text = st.text_input('メールアドレスを入力してください')
    query = ':'.join([search, query_text])
    if query != '':
        count = st.number_input('検索数を入力してください')
        if st.button('検索'):
            main(query=query, count=count)
elif search is li[2]:
    query_text = st.text_input('宛先を入力してください')
    query = ':'.join([search, query_text])
    if query != '':
        count = st.number_input('検索数を入力してください')
        if st.button('検索'):
            main(query=query, count=count)
elif search is li[3]:
    query_text = st.selectbox('メールの状態を選択してください', li_state)
    query = ':'.join([search, query_text])
    if query != '':
        count = st.number_input('検索数を入力してください')
        if st.button('検索'):
            main(query=query, count=count)
elif search is li[4]:
    query_text = st.text_input('日付を入力してください/例)2022/03/26')
    query = ':'.join([search, query_text])
    if query != '':
        count = st.number_input('検索数を入力してください')
        if st.button('検索'):
            main(query=query, count=count)