Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ dist/
.venv/
venv/
.idea/
.vscode/
mcp_server_debug.log

# ---- macOS clutter -----------------------------------------------------
.DS_Store

# ---- Secrets -----------------------------------------------------------
client_secret.json

.credentials/*.json*
23 changes: 23 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Run in Debug Mode",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/main.py",
"args": [
"--transport",
"streamable-http",
"--tools",
"docs",
"--single-user"
],
"env": {
"WORKSPACE_MCP_PORT": "8000"
},
"justMyCode": false,
"console": "integratedTerminal"
}
]
}
9 changes: 9 additions & 0 deletions .vscode/mcp.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"servers": {
"google_workspace": {
"type": "stdio",
"command": "uvx",
"args": ["workspace-mcp", "--single-user"]
}
}
}
43 changes: 32 additions & 11 deletions auth/google_auth.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
# auth/google_auth.py

import os
import asyncio
import json
import logging
import asyncio
from typing import List, Optional, Tuple, Dict, Any, Callable
import os
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple

import jwt
from google.auth.exceptions import RefreshError
from google.auth.transport.requests import Request
from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import Flow, InstalledAppFlow
from google.auth.transport.requests import Request
from google.auth.exceptions import RefreshError
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

from auth.scopes import OAUTH_STATE_TO_SESSION_ID_MAP, SCOPES

# Configure logging
Expand Down Expand Up @@ -43,6 +46,7 @@ def _find_any_credentials(base_dir: str = DEFAULT_CREDENTIALS_DIR) -> Optional[C
"""
Find and load any valid credentials from the credentials directory.
Used in single-user mode to bypass session-to-OAuth mapping.
Supports both OAuth2 and service account credentials.

Returns:
First valid Credentials object found, or None if none exist.
Expand All @@ -56,8 +60,24 @@ def _find_any_credentials(base_dir: str = DEFAULT_CREDENTIALS_DIR) -> Optional[C
if filename.endswith('.json'):
filepath = os.path.join(base_dir, filename)
try:
# Check if this is a service account file
if "iam.gserviceaccount.com" in filename:
logger.info(f"[single-user] Found service account file: {filepath}")
try:
credentials = service_account.Credentials.from_service_account_file(filepath,scopes=SCOPES)
logger.info(f"[single-user] Successfully loaded service account credentials from {filepath}")

# Force refresh to get a token, since by default it's not set and the library considers then credentials to be invalid
credentials.refresh(Request())
return credentials
except Exception as e:
logger.warning(f"[single-user] Error loading service account credentials from {filepath}: {e}")
continue
Comment on lines +64 to +75
Copy link

Copilot AI Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relying solely on the filename to identify service account credentials might be fragile; consider validating the file contents for expected service account keys.

Suggested change
if "iam.gserviceaccount.com" in filename:
logger.info(f"[single-user] Found service account file: {filepath}")
try:
credentials = service_account.Credentials.from_service_account_file(filepath,scopes=SCOPES)
logger.info(f"[single-user] Successfully loaded service account credentials from {filepath}")
# Force refresh to get a token, since by default it's not set and the library considers then credentials to be invalid
credentials.refresh(Request())
return credentials
except Exception as e:
logger.warning(f"[single-user] Error loading service account credentials from {filepath}: {e}")
continue
try:
with open(filepath, 'r') as f:
creds_data = json.load(f)
# Validate required keys for service account credentials
if all(key in creds_data for key in ["client_email", "private_key", "project_id"]):
logger.info(f"[single-user] Found valid service account file: {filepath}")
try:
credentials = service_account.Credentials.from_service_account_file(filepath, scopes=SCOPES)
logger.info(f"[single-user] Successfully loaded service account credentials from {filepath}")
# Force refresh to get a token, since by default it's not set and the library considers the credentials to be invalid
credentials.refresh(Request())
return credentials
except Exception as e:
logger.warning(f"[single-user] Error loading service account credentials from {filepath}: {e}")
continue
else:
logger.warning(f"[single-user] File {filepath} does not contain valid service account keys.")
continue
except (IOError, json.JSONDecodeError) as e:
logger.warning(f"[single-user] Error reading or parsing file {filepath}: {e}")
continue

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's actually correct, i pondered about it for a moment and decided that it's a tradeoff between this and reading the file from disk twice, and decided that for single user configuration it will do.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @intval - hm, when I create a service account key it does stub out an email with iam.gserviceaccount.com in it, but the actual generated keyfile name is open-webui-444821-b145f5cb467e.json which would not match this check. A user would have to manually rename the key for this logic to work. Thoughts on splitting the service account PR out from the copy & template bits so we can get that merged and figure out the best approach for service accounts separately?


Comment on lines +64 to +76
Copy link

Copilot AI Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relying solely on the filename to determine if a file contains service account credentials may be brittle; consider inspecting the JSON content for keys unique to service accounts to improve reliability.

Suggested change
if "iam.gserviceaccount.com" in filename:
logger.info(f"[single-user] Found service account file: {filepath}")
try:
credentials = service_account.Credentials.from_service_account_file(filepath,scopes=SCOPES)
logger.info(f"[single-user] Successfully loaded service account credentials from {filepath}")
# Force refresh to get a token, since by default it's not set and the library considers then credentials to be invalid
credentials.refresh(Request())
return credentials
except Exception as e:
logger.warning(f"[single-user] Error loading service account credentials from {filepath}: {e}")
continue
try:
with open(filepath, 'r') as f:
creds_data = json.load(f)
# Check if this is a service account file based on JSON content
if creds_data.get('type') == 'service_account' and 'client_email' in creds_data and 'private_key' in creds_data:
logger.info(f"[single-user] Found service account file: {filepath}")
try:
credentials = service_account.Credentials.from_service_account_info(creds_data, scopes=SCOPES)
logger.info(f"[single-user] Successfully loaded service account credentials from {filepath}")
# Force refresh to get a token, since by default it's not set and the library considers the credentials to be invalid
credentials.refresh(Request())
return credentials
except Exception as e:
logger.warning(f"[single-user] Error loading service account credentials from {filepath}: {e}")
continue
except (IOError, json.JSONDecodeError) as e:
logger.warning(f"[single-user] Error reading or parsing JSON from {filepath}: {e}")
continue

Copilot uses AI. Check for mistakes.
# Handle OAuth2 credentials
with open(filepath, 'r') as f:
creds_data = json.load(f)

credentials = Credentials(
token=creds_data.get('token'),
refresh_token=creds_data.get('refresh_token'),
Expand All @@ -66,7 +86,7 @@ def _find_any_credentials(base_dir: str = DEFAULT_CREDENTIALS_DIR) -> Optional[C
client_secret=creds_data.get('client_secret'),
scopes=creds_data.get('scopes')
)
logger.info(f"[single-user] Found credentials in {filepath}")
logger.info(f"[single-user] Found OAuth2 credentials in {filepath}")
return credentials
except (IOError, json.JSONDecodeError, KeyError) as e:
logger.warning(f"[single-user] Error loading credentials from {filepath}: {e}")
Expand Down Expand Up @@ -122,7 +142,6 @@ def load_credentials_from_file(user_google_email: str, base_dir: str = DEFAULT_C
expiry = None
if creds_data.get('expiry'):
try:
from datetime import datetime
expiry = datetime.fromisoformat(creds_data['expiry'])
except (ValueError, TypeError) as e:
logger.warning(f"Could not parse expiry time for {user_google_email}: {e}")
Expand Down Expand Up @@ -494,7 +513,6 @@ async def get_authenticated_google_service(
session_id=None, # Session ID not available in service layer
)


if not credentials or not credentials.valid:
logger.warning(
f"[{tool_name}] No valid credentials. Email: '{user_google_email}'."
Expand Down Expand Up @@ -525,10 +543,13 @@ async def get_authenticated_google_service(
service = build(service_name, version, credentials=credentials)
log_user_email = user_google_email

# Try to get email from credentials if needed for validation
if credentials and credentials.id_token:
# For service accounts, use the service account email
if hasattr(credentials, 'service_account_email'):
log_user_email = credentials.service_account_email
logger.info(f"[{tool_name}] Using service account: {log_user_email}")
# For OAuth2 credentials, try to get email from id_token
elif credentials and hasattr(credentials, 'id_token') and credentials.id_token:
try:
import jwt
# Decode without verification (just to get email for logging)
decoded_token = jwt.decode(credentials.id_token, options={"verify_signature": False})
token_email = decoded_token.get("email")
Expand Down
146 changes: 144 additions & 2 deletions gdocs/docs_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@
import logging
import asyncio
import io
from typing import List
from typing import List, Annotated, Optional, Dict

from mcp import types
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaIoBaseDownload

Expand All @@ -17,6 +16,8 @@
from core.utils import extract_office_xml_text, handle_http_errors
from core.server import server

from pydantic import Field

logger = logging.getLogger(__name__)

@server.tool()
Expand Down Expand Up @@ -214,3 +215,144 @@ async def create_doc(
msg = f"Created Google Doc '{title}' (ID: {doc_id}) for {user_google_email}. Link: {link}"
logger.info(f"Successfully created Google Doc '{title}' (ID: {doc_id}) for {user_google_email}. Link: {link}")
return msg


@server.tool()
@require_google_service("drive", "drive_read")
async def copy_google_doc(
service,
user_google_email: str,
template_id: str,
new_title: str,
target_folder_id: Optional[str] = None,
) -> str:
"""
Creates a new Google Doc by making a copy of an existing document. This is useful for creating documents from templates
or duplicating existing documents while preserving their formatting and content.

The tool will:
1. Create an exact copy of the source document
2. Give it the specified new title
3. Place it in the specified folder (or root if no folder specified)
4. Return the ID and view link of the new document

Args:
service: Authenticated Google Drive service instance.
user_google_email: Email of the user making the request.
template_id: The Google Drive ID of the source document that will be used as a template. This is the document you want to copy from.
new_title: The title/name that will be given to the new copy of the document. This is what the document will be called in Google Drive.
target_folder_id: Optional Google Drive folder ID where the new document should be created. If not provided, the document will be created in the root of the user's Google Drive.
Returns:
str: A message containing the new document's ID and view link.
"""
logger.info(f"[copy_google_doc] Copying document {template_id} with new title {new_title}. Email: '{user_google_email}'")

try:
# Prepare copy metadata
copy_metadata = {
'name': new_title,
}

if target_folder_id:
copy_metadata['parents'] = [target_folder_id]

# Execute the copy
response = service.files().copy(
fileId=template_id,
body=copy_metadata,
fields='id,name,webViewLink'
).execute()

document_id = response['id']
document_name = response['name']
view_link = response.get('webViewLink')

return f'Successfully created document "{document_name}" with ID: {document_id}\nView Link: {view_link}'

except HttpError as e:
status = e.resp.status
logger.error(f"Error copying document: {str(e)}")
if status == 404:
raise Exception("Template document or parent folder not found. Check the IDs. HTTP Status: 404")
elif status == 403:
raise Exception("Permission denied. Make sure you have read access to the template and write access to the destination folder. HTTP Status: 403")
else:
raise Exception(f"Failed to copy document: {e._get_reason() or 'Unknown error'} HTTP Status: {status}")

except Exception as e:
logger.error(f"Unhandled error: {str(e)}")
raise e


@server.tool()
@require_google_service("docs", "docs_write")
async def replace_text_in_google_doc(
service,
user_google_email: Annotated[str, Field(description="Email of the user making the request")],
document_id: Annotated[str, Field(description="The Google Drive ID of the document where text replacements should be performed")],
replacements: Annotated[Dict[str, str], Field(
description="Dictionary mapping text patterns to their replacements. Each key is the text to find (case-insensitive), and each value is the text to replace it with",
json_schema_extra={"additionalProperties": {"type": "string"}}
)],
) -> str:
"""
Performs multiple text replacements within a Google Doc in a single operation. This is useful for:
- Replacing template placeholders with actual content
- Updating multiple instances of the same text
- Making bulk text changes across the document

The tool will:
1. Find all instances of each specified text pattern (case-insensitive)
2. Replace them with their corresponding replacement text
3. Perform all replacements in a single batch operation
4. Return a summary of how many replacements were made

Args:
service: Authenticated Google Docs service instance.
user_google_email: Email of the user making the request.
document_id: The Google Drive ID of the document where text replacements should be performed. This is the document you want to modify.
replacements: A dictionary mapping text patterns to their replacements. Each key is the text to find (case-insensitive),
and each value is the text to replace it with. Example: {'{{NAME}}': 'John Doe', '(% DATE %)': '2025-01-01'}
will replace all instances of '{{NAME}}' with 'John Doe' and '(% DATE %)' with '2025-01-01'.
Returns:
str: A message confirming the number of replacements that were successfully applied.
"""
logger.info(f'Replacing text in document {document_id}. Amount of replacements: {len(replacements)}')

try:
requests = []
for search_text, replace_text in replacements.items():
requests.append({
"replaceAllText": {
"containsText": {
"text": search_text,
"matchCase": False
},
"replaceText": replace_text
}
})

if not requests:
raise Exception("Error: The replacements dictionary is empty. Please provide at least one replacement.")

service.documents().batchUpdate(
documentId=document_id,
body={"requests": requests}
).execute()

count = len(requests)
return f"Successfully applied {count} text replacement{'s' if count != 1 else ''} to the document."

except HttpError as e:
status = e.resp.status
logger.error(f"Error replacing text in document: {str(e)}")
if status == 404:
raise Exception("Document not found. Check the document ID. HTTP Status: 404") from e
elif status == 403:
raise Exception("Permission denied. Make sure you have write access to the document. HTTP Status: 403") from e
else:
raise Exception(f"Failed to replace text: {e._get_reason() or 'Unknown error'} HTTP Status: {status}") from e

except Exception as e:
logger.error(f"Unhandled error: {str(e)}")
raise e