add: post-to-xhs skills

This commit is contained in:
Angiin
2026-02-05 19:25:53 +08:00
parent c596ab1be3
commit b50f0aa633
14 changed files with 2066 additions and 0 deletions

View File

@@ -0,0 +1,309 @@
"""
Multi-account manager for Xiaohongshu publishing.
Manages multiple Xiaohongshu accounts with separate Chrome profiles:
- Each account has its own user-data-dir for cookie isolation
- Accounts are stored in a JSON config file
- Supports add/remove/list/switch operations
Usage:
python account_manager.py list
python account_manager.py add <name> [--alias <alias>]
python account_manager.py remove <name>
python account_manager.py info <name>
python account_manager.py set-default <name>
"""
import json
import os
import sys
import shutil
from typing import Optional
# Config file location
CONFIG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "config")
ACCOUNTS_FILE = os.path.join(CONFIG_DIR, "accounts.json")
# Base directory for account profiles
PROFILES_BASE = os.path.join(os.environ.get("LOCALAPPDATA", os.path.expanduser("~")),
"Google", "Chrome", "XiaohongshuProfiles")
# Default account name (for backward compatibility)
DEFAULT_PROFILE_NAME = "default"
def _ensure_config_dir():
"""Ensure the config directory exists."""
os.makedirs(CONFIG_DIR, exist_ok=True)
def _load_accounts() -> dict:
"""Load accounts from config file."""
_ensure_config_dir()
if os.path.exists(ACCOUNTS_FILE):
try:
with open(ACCOUNTS_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
pass
# Default structure
return {
"default_account": DEFAULT_PROFILE_NAME,
"accounts": {
DEFAULT_PROFILE_NAME: {
"alias": "默认账号",
"profile_dir": os.path.join(PROFILES_BASE, DEFAULT_PROFILE_NAME),
"created_at": None,
}
}
}
def _save_accounts(data: dict):
"""Save accounts to config file."""
_ensure_config_dir()
with open(ACCOUNTS_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def get_profile_dir(account_name: Optional[str] = None) -> str:
"""
Get the Chrome profile directory for a given account.
Args:
account_name: Account name. If None, uses the default account.
Returns:
Path to the Chrome user-data-dir for this account.
"""
data = _load_accounts()
if account_name is None:
account_name = data.get("default_account", DEFAULT_PROFILE_NAME)
if account_name not in data["accounts"]:
# Fallback to default
account_name = DEFAULT_PROFILE_NAME
if account_name not in data["accounts"]:
# Create default account entry
data["accounts"][account_name] = {
"alias": "默认账号",
"profile_dir": os.path.join(PROFILES_BASE, account_name),
"created_at": None,
}
_save_accounts(data)
return data["accounts"][account_name]["profile_dir"]
def get_default_account() -> str:
"""Get the name of the default account."""
data = _load_accounts()
return data.get("default_account", DEFAULT_PROFILE_NAME)
def set_default_account(account_name: str) -> bool:
"""
Set the default account.
Returns True if successful, False if account doesn't exist.
"""
data = _load_accounts()
if account_name not in data["accounts"]:
return False
data["default_account"] = account_name
_save_accounts(data)
return True
def list_accounts() -> list[dict]:
"""
List all registered accounts.
Returns a list of dicts with account info.
"""
data = _load_accounts()
default = data.get("default_account", DEFAULT_PROFILE_NAME)
result = []
for name, info in data["accounts"].items():
result.append({
"name": name,
"alias": info.get("alias", ""),
"profile_dir": info.get("profile_dir", ""),
"is_default": name == default,
})
return result
def add_account(name: str, alias: Optional[str] = None) -> bool:
"""
Add a new account.
Args:
name: Unique account name (used as identifier)
alias: Display name / description
Returns True if added, False if name already exists.
"""
data = _load_accounts()
if name in data["accounts"]:
return False
from datetime import datetime
profile_dir = os.path.join(PROFILES_BASE, name)
os.makedirs(profile_dir, exist_ok=True)
data["accounts"][name] = {
"alias": alias or name,
"profile_dir": profile_dir,
"created_at": datetime.now().isoformat(),
}
_save_accounts(data)
return True
def remove_account(name: str, delete_profile: bool = False) -> bool:
"""
Remove an account.
Args:
name: Account name to remove
delete_profile: If True, also delete the Chrome profile directory
Returns True if removed, False if not found or is default.
"""
data = _load_accounts()
if name not in data["accounts"]:
return False
# Don't allow removing the default account if it's the only one
if name == data.get("default_account") and len(data["accounts"]) == 1:
return False
profile_dir = data["accounts"][name].get("profile_dir", "")
del data["accounts"][name]
# If we removed the default, set a new default
if name == data.get("default_account"):
data["default_account"] = next(iter(data["accounts"].keys()))
_save_accounts(data)
# Optionally delete the profile directory
if delete_profile and profile_dir and os.path.isdir(profile_dir):
try:
shutil.rmtree(profile_dir)
except Exception:
pass
return True
def get_account_info(name: str) -> Optional[dict]:
"""Get info for a specific account."""
data = _load_accounts()
if name not in data["accounts"]:
return None
info = data["accounts"][name].copy()
info["name"] = name
info["is_default"] = name == data.get("default_account")
return info
def account_exists(name: str) -> bool:
"""Check if an account exists."""
data = _load_accounts()
return name in data["accounts"]
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
import argparse
parser = argparse.ArgumentParser(description="Xiaohongshu Account Manager")
sub = parser.add_subparsers(dest="command", required=True)
# list
sub.add_parser("list", help="List all accounts")
# add
p_add = sub.add_parser("add", help="Add a new account")
p_add.add_argument("name", help="Account name (unique identifier)")
p_add.add_argument("--alias", help="Display name / description")
# remove
p_rm = sub.add_parser("remove", help="Remove an account")
p_rm.add_argument("name", help="Account name to remove")
p_rm.add_argument("--delete-profile", action="store_true",
help="Also delete the Chrome profile directory")
# info
p_info = sub.add_parser("info", help="Show account info")
p_info.add_argument("name", help="Account name")
# set-default
p_def = sub.add_parser("set-default", help="Set the default account")
p_def.add_argument("name", help="Account name to set as default")
# get-profile-dir (for internal use)
p_dir = sub.add_parser("get-profile-dir", help="Get profile directory for an account")
p_dir.add_argument("--account", help="Account name (default: default account)")
args = parser.parse_args()
if args.command == "list":
accounts = list_accounts()
if not accounts:
print("No accounts configured.")
return
print(f"{'Name':<20} {'Alias':<20} {'Default':<10}")
print("-" * 50)
for acc in accounts:
default_mark = "*" if acc["is_default"] else ""
print(f"{acc['name']:<20} {acc['alias']:<20} {default_mark:<10}")
elif args.command == "add":
if add_account(args.name, args.alias):
print(f"Account '{args.name}' added.")
print(f"Profile dir: {get_profile_dir(args.name)}")
print("\nTo log in to this account, run:")
print(f" python cdp_publish.py --account {args.name} login")
else:
print(f"Error: Account '{args.name}' already exists.", file=sys.stderr)
sys.exit(1)
elif args.command == "remove":
if remove_account(args.name, args.delete_profile):
print(f"Account '{args.name}' removed.")
else:
print(f"Error: Cannot remove account '{args.name}'.", file=sys.stderr)
sys.exit(1)
elif args.command == "info":
info = get_account_info(args.name)
if info:
print(f"Name: {info['name']}")
print(f"Alias: {info.get('alias', '')}")
print(f"Profile dir: {info.get('profile_dir', '')}")
print(f"Default: {'Yes' if info.get('is_default') else 'No'}")
print(f"Created: {info.get('created_at', 'Unknown')}")
else:
print(f"Error: Account '{args.name}' not found.", file=sys.stderr)
sys.exit(1)
elif args.command == "set-default":
if set_default_account(args.name):
print(f"Default account set to '{args.name}'.")
else:
print(f"Error: Account '{args.name}' not found.", file=sys.stderr)
sys.exit(1)
elif args.command == "get-profile-dir":
print(get_profile_dir(args.account))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,686 @@
"""
CDP-based Xiaohongshu publisher.
Connects to a Chrome instance via Chrome DevTools Protocol to automate
publishing articles on Xiaohongshu (RED) creator center.
CLI usage:
# Basic commands
python cdp_publish.py check-login [--headless] [--account NAME]
python cdp_publish.py fill --title "标题" --content "正文" --images img1.jpg [--headless] [--account NAME]
python cdp_publish.py publish --title "标题" --content "正文" --images img1.jpg [--headless] [--account NAME]
python cdp_publish.py click-publish [--headless] [--account NAME]
# Account management
python cdp_publish.py login [--account NAME] # open browser for QR login
python cdp_publish.py re-login [--account NAME] # clear cookies and re-login same account
python cdp_publish.py switch-account [--account NAME] # clear cookies + open login for new account
python cdp_publish.py list-accounts # list all configured accounts
python cdp_publish.py add-account NAME [--alias ALIAS] # add a new account
python cdp_publish.py remove-account NAME # remove an account
Library usage:
from cdp_publish import XiaohongshuPublisher
publisher = XiaohongshuPublisher()
publisher.connect()
publisher.check_login()
publisher.publish(
title="Article title",
content="Article body text",
image_paths=["/path/to/img1.jpg", "/path/to/img2.jpg"],
)
"""
import json
import os
import time
import sys
from typing import Any
# Ensure UTF-8 output on Windows consoles
if sys.platform == "win32":
os.environ.setdefault("PYTHONIOENCODING", "utf-8")
try:
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
except Exception:
pass
import requests
import websockets.sync.client as ws_client
# ---------------------------------------------------------------------------
# Configuration - centralised selectors and URLs for easy maintenance
# ---------------------------------------------------------------------------
CDP_HOST = "127.0.0.1"
CDP_PORT = 9222
# Xiaohongshu URLs
XHS_CREATOR_URL = "https://creator.xiaohongshu.com/publish/publish"
XHS_HOME_URL = "https://www.xiaohongshu.com"
XHS_LOGIN_CHECK_URL = "https://creator.xiaohongshu.com"
# DOM selectors (update these when Xiaohongshu changes their page structure)
# Last verified: 2026-02
SELECTORS = {
# "上传图文" tab - must click before uploading images
"image_text_tab": "div.creator-tab",
"image_text_tab_text": "上传图文",
# Upload area - the file input element for images (visible after clicking tab)
"upload_input": "input.upload-input",
"upload_input_alt": 'input[type="file"]',
# Title input field (visible after image upload)
"title_input": 'input[placeholder*="填写标题"]',
"title_input_alt": "input.d-text",
# Content editor area - TipTap/ProseMirror contenteditable div
"content_editor": "div.tiptap.ProseMirror",
"content_editor_alt": 'div.ProseMirror[contenteditable="true"]',
# Publish button
"publish_button_text": "发布",
# Login indicator - URL-based check (redirect to /login if not logged in)
"login_indicator": '.user-info, .creator-header, [class*="user"]',
}
# Timing
PAGE_LOAD_WAIT = 3 # seconds to wait after navigation
TAB_CLICK_WAIT = 2 # seconds to wait after clicking tab
UPLOAD_WAIT = 6 # seconds to wait after image upload for editor to appear
ACTION_INTERVAL = 1 # seconds between actions
class CDPError(Exception):
"""Error communicating with Chrome via CDP."""
class XiaohongshuPublisher:
"""Automates publishing to Xiaohongshu via CDP."""
def __init__(self, host: str = CDP_HOST, port: int = CDP_PORT):
self.host = host
self.port = port
self.ws = None
self._msg_id = 0
# ------------------------------------------------------------------
# CDP connection management
# ------------------------------------------------------------------
def _get_targets(self) -> list[dict]:
"""Get list of available browser targets (tabs). Retries once on failure."""
url = f"http://{self.host}:{self.port}/json"
for attempt in range(2):
try:
resp = requests.get(url, timeout=5)
resp.raise_for_status()
return resp.json()
except Exception as e:
if attempt == 0:
print(f"[cdp_publish] CDP connection failed ({e}), restarting Chrome...")
from chrome_launcher import ensure_chrome
ensure_chrome(self.port)
time.sleep(2)
else:
raise CDPError(f"Cannot reach Chrome on {self.host}:{self.port}: {e}")
def _find_or_create_tab(self, target_url_prefix: str = "") -> str:
"""Find an existing tab matching the URL prefix, or return the first page tab."""
targets = self._get_targets()
pages = [t for t in targets if t.get("type") == "page"]
if target_url_prefix:
for t in pages:
if t.get("url", "").startswith(target_url_prefix):
return t["webSocketDebuggerUrl"]
# Create a new tab
resp = requests.put(
f"http://{self.host}:{self.port}/json/new?{XHS_CREATOR_URL}",
timeout=5,
)
if resp.ok:
return resp.json().get("webSocketDebuggerUrl", "")
# Fallback: use first available page
if pages:
return pages[0]["webSocketDebuggerUrl"]
raise CDPError("No browser tabs available.")
def connect(self, target_url_prefix: str = ""):
"""Connect to a Chrome tab via WebSocket."""
ws_url = self._find_or_create_tab(target_url_prefix)
if not ws_url:
raise CDPError("Could not obtain WebSocket URL for any tab.")
print(f"[cdp_publish] Connecting to {ws_url}")
self.ws = ws_client.connect(ws_url)
print("[cdp_publish] Connected to Chrome tab.")
def disconnect(self):
"""Close the WebSocket connection."""
if self.ws:
self.ws.close()
self.ws = None
# ------------------------------------------------------------------
# CDP command helpers
# ------------------------------------------------------------------
def _send(self, method: str, params: dict | None = None) -> dict:
"""Send a CDP command and return the result."""
if not self.ws:
raise CDPError("Not connected. Call connect() first.")
self._msg_id += 1
msg = {"id": self._msg_id, "method": method}
if params:
msg["params"] = params
self.ws.send(json.dumps(msg))
# Wait for the matching response
while True:
raw = self.ws.recv()
data = json.loads(raw)
if data.get("id") == self._msg_id:
if "error" in data:
raise CDPError(f"CDP error: {data['error']}")
return data.get("result", {})
# else: it's an event, skip it
def _evaluate(self, expression: str) -> Any:
"""Execute JavaScript in the page and return the result value."""
result = self._send("Runtime.evaluate", {
"expression": expression,
"returnByValue": True,
"awaitPromise": True,
})
remote_obj = result.get("result", {})
if remote_obj.get("subtype") == "error":
raise CDPError(f"JS error: {remote_obj.get('description', remote_obj)}")
return remote_obj.get("value")
def _navigate(self, url: str):
"""Navigate the current tab to the given URL and wait for load."""
print(f"[cdp_publish] Navigating to {url}")
self._send("Page.enable")
self._send("Page.navigate", {"url": url})
time.sleep(PAGE_LOAD_WAIT)
# ------------------------------------------------------------------
# Login check
# ------------------------------------------------------------------
def check_login(self) -> bool:
"""
Navigate to Xiaohongshu creator center and check if the user is logged in.
Returns True if logged in. If not logged in, prints instructions
and returns False.
"""
self._navigate(XHS_LOGIN_CHECK_URL)
time.sleep(2)
# Check if we got redirected to a login page
current_url = self._evaluate("window.location.href")
print(f"[cdp_publish] Current URL: {current_url}")
if "login" in current_url.lower():
print(
"\n[cdp_publish] NOT LOGGED IN.\n"
" Please scan the QR code in the Chrome window to log in,\n"
" then run this script again.\n"
)
return False
print("[cdp_publish] Login confirmed.")
return True
def clear_cookies(self, domain: str = ".xiaohongshu.com"):
"""
Clear all cookies for the given domain to force re-login.
Used when switching accounts.
"""
print(f"[cdp_publish] Clearing cookies for {domain}...")
self._send("Network.enable")
self._send("Network.clearBrowserCookies")
# Also clear storage
self._send("Storage.clearDataForOrigin", {
"origin": "https://www.xiaohongshu.com",
"storageTypes": "cookies,local_storage,session_storage",
})
self._send("Storage.clearDataForOrigin", {
"origin": "https://creator.xiaohongshu.com",
"storageTypes": "cookies,local_storage,session_storage",
})
print("[cdp_publish] Cookies and storage cleared.")
def open_login_page(self):
"""
Navigate to the Xiaohongshu login page for QR code scanning.
Used for initial login or after clearing cookies for account switch.
"""
self._navigate(XHS_LOGIN_CHECK_URL)
time.sleep(2)
current_url = self._evaluate("window.location.href")
if "login" not in current_url.lower():
# Already logged in, navigate to login page explicitly
self._navigate("https://creator.xiaohongshu.com/login")
time.sleep(2)
print(
"\n[cdp_publish] Login page is open.\n"
" Please scan the QR code in the Chrome window to log in.\n"
)
# ------------------------------------------------------------------
# Publishing actions
# ------------------------------------------------------------------
def _click_image_text_tab(self):
"""Click the '上传图文' tab to switch to image+text publish mode."""
print("[cdp_publish] Clicking '上传图文' tab...")
tab_text = SELECTORS["image_text_tab_text"]
selector = SELECTORS["image_text_tab"]
clicked = self._evaluate(f"""
(function() {{
var tabs = document.querySelectorAll('{selector}');
for (var i = 0; i < tabs.length; i++) {{
if (tabs[i].textContent.trim() === '{tab_text}') {{
tabs[i].click();
return true;
}}
}}
return false;
}})();
""")
if not clicked:
raise CDPError(
f"Could not find '{tab_text}' tab. "
"The page structure may have changed."
)
print("[cdp_publish] Tab clicked, waiting for upload area...")
time.sleep(TAB_CLICK_WAIT)
def _upload_images(self, image_paths: list[str]):
"""Upload images via the file input element."""
if not image_paths:
print("[cdp_publish] No images to upload, skipping.")
return
# Normalize paths (forward slashes for CDP)
normalized = [p.replace("\\", "/") for p in image_paths]
print(f"[cdp_publish] Uploading {len(image_paths)} image(s)...")
# Enable DOM domain
self._send("DOM.enable")
# Get the document root
doc = self._send("DOM.getDocument")
root_id = doc["root"]["nodeId"]
# Try primary selector, then fallback
node_id = 0
for selector in (SELECTORS["upload_input"], SELECTORS["upload_input_alt"]):
result = self._send("DOM.querySelector", {
"nodeId": root_id,
"selector": selector,
})
node_id = result.get("nodeId", 0)
if node_id:
break
if not node_id:
raise CDPError(
"Could not find file input element.\n"
"The page structure may have changed. Check references/publish-workflow.md."
)
# Use DOM.setFileInputFiles to set the files
self._send("DOM.setFileInputFiles", {
"nodeId": node_id,
"files": normalized,
})
print("[cdp_publish] Images uploaded. Waiting for editor to appear...")
time.sleep(UPLOAD_WAIT)
def _fill_title(self, title: str):
"""Fill in the article title."""
print(f"[cdp_publish] Setting title: {title[:40]}...")
time.sleep(ACTION_INTERVAL)
for selector in (SELECTORS["title_input"], SELECTORS["title_input_alt"]):
found = self._evaluate(f"!!document.querySelector('{selector}')")
if found:
escaped_title = json.dumps(title)
self._evaluate(f"""
(function() {{
var el = document.querySelector('{selector}');
var nativeSetter = Object.getOwnPropertyDescriptor(
window.HTMLInputElement.prototype, 'value'
).set;
el.focus();
nativeSetter.call(el, {escaped_title});
el.dispatchEvent(new Event('input', {{ bubbles: true }}));
el.dispatchEvent(new Event('change', {{ bubbles: true }}));
}})();
""")
print("[cdp_publish] Title set.")
return
raise CDPError("Could not find title input element.")
def _fill_content(self, content: str):
"""Fill in the article body content using the TipTap/ProseMirror editor."""
print(f"[cdp_publish] Setting content ({len(content)} chars)...")
time.sleep(ACTION_INTERVAL)
for selector in (SELECTORS["content_editor"], SELECTORS["content_editor_alt"]):
found = self._evaluate(f"!!document.querySelector('{selector}')")
if found:
escaped = json.dumps(content)
self._evaluate(f"""
(function() {{
var el = document.querySelector('{selector}');
el.focus();
var text = {escaped};
var paragraphs = text.split('\\n').filter(function(p) {{ return p.trim(); }});
var html = [];
for (var i = 0; i < paragraphs.length; i++) {{
html.push('<p>' + paragraphs[i] + '</p>');
if (i < paragraphs.length - 1) {{
html.push('<p><br></p>');
}}
}}
el.innerHTML = html.join('');
el.dispatchEvent(new Event('input', {{ bubbles: true }}));
}})();
""")
print("[cdp_publish] Content set.")
return
raise CDPError("Could not find content editor element.")
def _click_publish(self):
"""Click the publish button (found by text content)."""
print("[cdp_publish] Clicking publish button...")
time.sleep(ACTION_INTERVAL)
btn_text = SELECTORS["publish_button_text"]
clicked = self._evaluate(f"""
(function() {{
// Strategy 1: search <button> elements by text
var buttons = document.querySelectorAll('button');
for (var i = 0; i < buttons.length; i++) {{
var t = buttons[i].textContent.trim();
if (t === '{btn_text}') {{
buttons[i].click();
return true;
}}
}}
// Strategy 2: search d-button-content / d-text spans
var spans = document.querySelectorAll('.d-button-content .d-text, .d-button-content span');
for (var i = 0; i < spans.length; i++) {{
if (spans[i].textContent.trim() === '{btn_text}') {{
var el = spans[i].closest('button, [role="button"], .d-button, [class*="btn"], [class*="button"]');
if (!el) el = spans[i].closest('.d-button-content');
if (!el) el = spans[i];
el.click();
return true;
}}
}}
return false;
}})();
""")
if clicked:
print("[cdp_publish] Publish button clicked.")
else:
raise CDPError(
"Could not find publish button. "
"Please click it manually in the browser."
)
# ------------------------------------------------------------------
# Main publish workflow
# ------------------------------------------------------------------
def publish(
self,
title: str,
content: str,
image_paths: list[str] | None = None,
):
"""
Execute the full publish workflow:
1. Navigate to creator publish page
2. Click '上传图文' tab
3. Upload images (this triggers the editor to appear)
4. Fill title
5. Fill content
Args:
title: Article title
content: Article body text (paragraphs separated by newlines)
image_paths: List of local file paths to images to upload
"""
if not self.ws:
raise CDPError("Not connected. Call connect() first.")
if not image_paths:
raise CDPError("At least one image is required to publish on Xiaohongshu.")
# Step 1: Navigate to publish page
self._navigate(XHS_CREATOR_URL)
time.sleep(2)
# Step 2: Click '上传图文' tab
self._click_image_text_tab()
# Step 3: Upload images (editor appears after upload)
self._upload_images(image_paths)
# Step 4: Fill title
self._fill_title(title)
# Step 5: Fill content
self._fill_content(content)
print(
"\n[cdp_publish] Content has been filled in.\n"
" Please review in the browser before publishing.\n"
)
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def main():
import argparse
from chrome_launcher import ensure_chrome, restart_chrome
parser = argparse.ArgumentParser(description="Xiaohongshu CDP Publisher")
parser.add_argument("--headless", action="store_true",
help="Use headless Chrome (no GUI window)")
parser.add_argument("--account", help="Account name to use (default: default account)")
sub = parser.add_subparsers(dest="command", required=True)
# check-login
sub.add_parser("check-login", help="Check login status (exit 0=logged in, 1=not)")
# fill - fill form without clicking publish
p_fill = sub.add_parser("fill", help="Fill title/content/images without publishing")
p_fill.add_argument("--title", required=True)
p_fill.add_argument("--content", default=None)
p_fill.add_argument("--content-file", default=None, help="Read content from file")
p_fill.add_argument("--images", nargs="+", required=True)
# publish - fill form and click publish
p_pub = sub.add_parser("publish", help="Fill form and click publish")
p_pub.add_argument("--title", required=True)
p_pub.add_argument("--content", default=None)
p_pub.add_argument("--content-file", default=None, help="Read content from file")
p_pub.add_argument("--images", nargs="+", required=True)
# click-publish - just click the publish button on current page
sub.add_parser("click-publish", help="Click publish button on already-filled page")
# login - open browser for QR code login (always headed)
sub.add_parser("login", help="Open browser for QR code login (always headed mode)")
# re-login - clear cookies and re-login the same account (always headed)
sub.add_parser("re-login", help="Clear cookies and re-login same account (always headed)")
# switch-account - clear cookies and open login page (always headed)
sub.add_parser("switch-account",
help="Clear cookies and open login page for new account (always headed)")
# list-accounts - list all configured accounts
sub.add_parser("list-accounts", help="List all configured accounts")
# add-account - add a new account
p_add = sub.add_parser("add-account", help="Add a new account")
p_add.add_argument("name", help="Account name (unique identifier)")
p_add.add_argument("--alias", help="Display name / description")
# remove-account - remove an account
p_rm = sub.add_parser("remove-account", help="Remove an account")
p_rm.add_argument("name", help="Account name to remove")
p_rm.add_argument("--delete-profile", action="store_true",
help="Also delete the Chrome profile directory")
# set-default-account - set default account
p_def = sub.add_parser("set-default-account", help="Set the default account")
p_def.add_argument("name", help="Account name to set as default")
args = parser.parse_args()
headless = args.headless
account = args.account
# Account management commands that don't need Chrome
if args.command == "list-accounts":
from account_manager import list_accounts
accounts = list_accounts()
if not accounts:
print("No accounts configured.")
return
print(f"{'Name':<20} {'Alias':<25} {'Default':<10}")
print("-" * 55)
for acc in accounts:
default_mark = "*" if acc["is_default"] else ""
print(f"{acc['name']:<20} {acc['alias']:<25} {default_mark:<10}")
return
elif args.command == "add-account":
from account_manager import add_account, get_profile_dir
if add_account(args.name, args.alias):
print(f"Account '{args.name}' added.")
print(f"Profile dir: {get_profile_dir(args.name)}")
print("\nTo log in to this account, run:")
print(f" python cdp_publish.py --account {args.name} login")
else:
print(f"Error: Account '{args.name}' already exists.", file=sys.stderr)
sys.exit(1)
return
elif args.command == "remove-account":
from account_manager import remove_account
if remove_account(args.name, args.delete_profile):
print(f"Account '{args.name}' removed.")
else:
print(f"Error: Cannot remove account '{args.name}'.", file=sys.stderr)
sys.exit(1)
return
elif args.command == "set-default-account":
from account_manager import set_default_account
if set_default_account(args.name):
print(f"Default account set to '{args.name}'.")
else:
print(f"Error: Account '{args.name}' not found.", file=sys.stderr)
sys.exit(1)
return
# Commands that require Chrome - login/re-login/switch-account always headed
if args.command in ("login", "re-login", "switch-account"):
headless = False
if not ensure_chrome(headless=headless, account=account):
print("Failed to start Chrome. Exiting.")
sys.exit(1)
publisher = XiaohongshuPublisher()
try:
if args.command == "check-login":
publisher.connect()
logged_in = publisher.check_login()
if not logged_in and headless:
print(
"[cdp_publish] Headless mode: cannot scan QR code.\n"
" Run with 'login' command or without --headless to log in."
)
sys.exit(0 if logged_in else 1)
elif args.command in ("fill", "publish"):
content = args.content
if args.content_file:
with open(args.content_file, encoding="utf-8") as f:
content = f.read().strip()
if not content:
print("Error: --content or --content-file required.", file=sys.stderr)
sys.exit(1)
publisher.connect()
publisher.publish(title=args.title, content=content, image_paths=args.images)
print("FILL_STATUS: READY_TO_PUBLISH")
if args.command == "publish":
publisher._click_publish()
print("PUBLISH_STATUS: PUBLISHED")
elif args.command == "click-publish":
publisher.connect(target_url_prefix="https://creator.xiaohongshu.com/publish")
publisher._click_publish()
print("PUBLISH_STATUS: PUBLISHED")
elif args.command == "login":
# Ensure headed mode for QR scanning
restart_chrome(headless=False, account=account)
publisher.connect()
publisher.open_login_page()
print("LOGIN_READY")
elif args.command == "re-login":
# Ensure headed mode, clear cookies, re-open login page for same account
restart_chrome(headless=False, account=account)
publisher.connect()
publisher.clear_cookies()
time.sleep(1)
publisher.open_login_page()
print("RE_LOGIN_READY")
elif args.command == "switch-account":
# Ensure headed mode, clear cookies, open login page
restart_chrome(headless=False, account=account)
publisher.connect()
publisher.clear_cookies()
time.sleep(1)
publisher.open_login_page()
print("SWITCH_ACCOUNT_READY")
finally:
publisher.disconnect()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,296 @@
"""
Chrome launcher with CDP remote debugging support.
Manages a dedicated Chrome instance for Xiaohongshu publishing:
- Detects if Chrome is already listening on the debug port
- Launches Chrome with a dedicated user-data-dir for login persistence
- Waits for the debug port to become available
- Supports headless mode for automated publishing without GUI
- Supports switching between headless and headed mode (e.g. for login)
- Supports multiple accounts with separate profile directories
"""
import os
import sys
import time
import socket
import subprocess
import platform
import signal
from typing import Optional
CDP_PORT = 9222
PROFILE_DIR_NAME = "XiaohongshuProfile"
STARTUP_TIMEOUT = 15 # seconds to wait for Chrome to start
# Track the Chrome process we launched so we can kill it later
_chrome_process: subprocess.Popen | None = None
# Track the current account being used
_current_account: Optional[str] = None
def get_chrome_path() -> str:
"""Find Chrome executable on Windows."""
candidates = []
# Standard install locations
for env_var in ("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA"):
base = os.environ.get(env_var, "")
if base:
candidates.append(os.path.join(base, "Google", "Chrome", "Application", "chrome.exe"))
for path in candidates:
if os.path.isfile(path):
return path
# Fallback: check PATH
import shutil
found = shutil.which("chrome") or shutil.which("chrome.exe")
if found:
return found
raise FileNotFoundError(
"Chrome not found. Please install Google Chrome or set its path manually."
)
def get_user_data_dir(account: Optional[str] = None) -> str:
"""
Return the Chrome profile directory path for a given account.
Args:
account: Account name. If None, uses the default account from account_manager.
Returns:
Path to the Chrome user-data-dir for this account.
"""
try:
from account_manager import get_profile_dir
return get_profile_dir(account)
except ImportError:
# Fallback if account_manager not available
local_app_data = os.environ.get("LOCALAPPDATA", "")
if not local_app_data:
local_app_data = os.path.expanduser("~")
return os.path.join(local_app_data, "Google", "Chrome", PROFILE_DIR_NAME)
def is_port_open(port: int, host: str = "127.0.0.1") -> bool:
"""Check if a TCP port is accepting connections."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1)
try:
s.connect((host, port))
return True
except (ConnectionRefusedError, socket.timeout, OSError):
return False
def launch_chrome(port: int = CDP_PORT, headless: bool = False, account: Optional[str] = None) -> subprocess.Popen | None:
"""
Launch Chrome with remote debugging enabled.
Args:
port: CDP remote debugging port.
headless: If True, launch Chrome in headless mode (no GUI window).
account: Account name to use. If None, uses the default account.
Returns the Popen object if a new process was started, or None if Chrome
was already running on the target port.
"""
global _chrome_process, _current_account
if is_port_open(port):
print(f"[chrome_launcher] Chrome already running on port {port}.")
return None
chrome_path = get_chrome_path()
user_data_dir = get_user_data_dir(account)
_current_account = account
cmd = [
chrome_path,
f"--remote-debugging-port={port}",
f"--user-data-dir={user_data_dir}",
"--no-first-run",
"--no-default-browser-check",
]
if headless:
cmd.append("--headless=new")
mode_label = "headless" if headless else "headed"
account_label = account or "default"
print(f"[chrome_launcher] Launching Chrome ({mode_label}, account: {account_label})...")
print(f" executable : {chrome_path}")
print(f" profile dir: {user_data_dir}")
print(f" debug port : {port}")
proc = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
_chrome_process = proc
# Wait for the debug port to become available
deadline = time.time() + STARTUP_TIMEOUT
while time.time() < deadline:
if is_port_open(port):
print(f"[chrome_launcher] Chrome is ready on port {port}.")
return proc
time.sleep(0.5)
print(
f"[chrome_launcher] WARNING: Chrome started but port {port} not responding "
f"after {STARTUP_TIMEOUT}s. It may still be initializing.",
file=sys.stderr,
)
return proc
def kill_chrome(port: int = CDP_PORT):
"""
Kill the Chrome instance on the given debug port.
Tries multiple strategies:
1. Send CDP Browser.close command via HTTP
2. Terminate the tracked subprocess
3. Kill by port on Windows (taskkill)
"""
global _chrome_process
# Strategy 1: CDP Browser.close
try:
import requests
resp = requests.get(f"http://127.0.0.1:{port}/json/version", timeout=2)
if resp.ok:
ws_url = resp.json().get("webSocketDebuggerUrl")
if ws_url:
import websockets.sync.client as ws_client
ws = ws_client.connect(ws_url)
ws.send('{"id":1,"method":"Browser.close"}')
try:
ws.recv(timeout=2)
except Exception:
pass
ws.close()
print("[chrome_launcher] Sent Browser.close via CDP.")
except Exception:
pass
# Wait briefly for Chrome to shut down
time.sleep(1)
# Strategy 2: Terminate tracked subprocess
if _chrome_process and _chrome_process.poll() is None:
try:
_chrome_process.terminate()
_chrome_process.wait(timeout=5)
print("[chrome_launcher] Terminated tracked Chrome process.")
except Exception:
try:
_chrome_process.kill()
except Exception:
pass
_chrome_process = None
# Strategy 3: Windows taskkill by port (fallback)
if sys.platform == "win32" and is_port_open(port):
try:
result = subprocess.run(
["netstat", "-ano"],
capture_output=True, text=True, timeout=5
)
for line in result.stdout.splitlines():
if f":{port}" in line and "LISTENING" in line:
pid = line.strip().split()[-1]
subprocess.run(
["taskkill", "/F", "/PID", pid],
capture_output=True, timeout=5
)
print(f"[chrome_launcher] Killed process {pid} via taskkill.")
break
except Exception:
pass
# Wait for port to be released
deadline = time.time() + 5
while time.time() < deadline:
if not is_port_open(port):
return
time.sleep(0.5)
if is_port_open(port):
print(f"[chrome_launcher] WARNING: port {port} still open after kill attempt.",
file=sys.stderr)
def restart_chrome(port: int = CDP_PORT, headless: bool = False, account: Optional[str] = None) -> subprocess.Popen | None:
"""
Kill the current Chrome instance and relaunch with the specified mode.
Useful for switching between headless and headed mode (e.g. when login
is needed during a headless session), or switching accounts.
Args:
port: CDP remote debugging port.
headless: If True, relaunch in headless mode.
account: Account name to use. If None, uses the default account.
Returns the Popen object for the new Chrome process.
"""
account_label = account or "default"
print(f"[chrome_launcher] Restarting Chrome ({'headless' if headless else 'headed'}, account: {account_label})...")
kill_chrome(port)
time.sleep(1)
return launch_chrome(port, headless=headless, account=account)
def ensure_chrome(port: int = CDP_PORT, headless: bool = False, account: Optional[str] = None) -> bool:
"""
Ensure Chrome is running with remote debugging on the given port.
Args:
port: CDP remote debugging port.
headless: If True, launch in headless mode when starting a new instance.
If Chrome is already running, this parameter is ignored.
account: Account name to use. If None, uses the default account.
Returns True if Chrome is available, False otherwise.
"""
if is_port_open(port):
return True
try:
launch_chrome(port, headless=headless, account=account)
return is_port_open(port)
except FileNotFoundError as e:
print(f"[chrome_launcher] Error: {e}", file=sys.stderr)
return False
def get_current_account() -> Optional[str]:
"""Get the name of the currently active account."""
return _current_account
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Chrome Launcher for CDP")
parser.add_argument("--headless", action="store_true", help="Launch in headless mode")
parser.add_argument("--kill", action="store_true", help="Kill the running Chrome instance")
parser.add_argument("--restart", action="store_true", help="Restart Chrome")
parser.add_argument("--account", help="Account name to use (default: default account)")
args = parser.parse_args()
if args.kill:
kill_chrome()
print("[chrome_launcher] Chrome killed.")
elif args.restart:
restart_chrome(headless=args.headless, account=args.account)
print("[chrome_launcher] Chrome restarted.")
elif ensure_chrome(headless=args.headless, account=args.account):
print("[chrome_launcher] Chrome is ready for CDP connections.")
else:
print("[chrome_launcher] Failed to start Chrome.", file=sys.stderr)
sys.exit(1)

View File

@@ -0,0 +1,5 @@
微软向 Canary 通道推送了 Windows 11 Insider Preview Build 28020.1546 更新,补丁编号 KB5074176。
本次更新为常规改进与修复,属于小幅迭代更新,没有重大功能变化。
Canary 通道是 Windows Insider 最前沿的测试分支,适合愿意尝鲜和接受不稳定性的用户。

View File

@@ -0,0 +1,141 @@
"""
Image downloader for Xiaohongshu publishing.
Downloads images from URLs to a local temp directory for upload,
and cleans up after publishing is complete.
"""
import os
import sys
import tempfile
import shutil
import uuid
from urllib.parse import urlparse, unquote
import requests
DEFAULT_TIMEOUT = 30 # seconds per download
TEMP_DIR_PREFIX = "xhs_images_"
class ImageDownloader:
"""Download images from URLs and manage a temporary directory for them."""
def __init__(self, temp_dir: str | None = None):
if temp_dir:
self.temp_dir = temp_dir
os.makedirs(self.temp_dir, exist_ok=True)
self._owns_dir = False
else:
self.temp_dir = tempfile.mkdtemp(prefix=TEMP_DIR_PREFIX)
self._owns_dir = True
self.downloaded_files: list[str] = []
def _guess_extension(self, url: str, content_type: str | None) -> str:
"""Guess file extension from URL path or Content-Type header."""
# Try URL path first
path = urlparse(url).path
_, ext = os.path.splitext(unquote(path))
if ext and ext.lower() in (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"):
return ext.lower()
# Fall back to Content-Type
ct_map = {
"image/jpeg": ".jpg",
"image/png": ".png",
"image/gif": ".gif",
"image/webp": ".webp",
"image/bmp": ".bmp",
}
if content_type:
for mime, ext in ct_map.items():
if mime in content_type:
return ext
return ".jpg" # safe default
def download(self, url: str, referer: str | None = None) -> str:
"""
Download a single image and return the local file path.
Args:
url: Image URL to download
referer: Optional Referer header. If None, auto-generates from URL domain.
Raises requests.RequestException on network errors.
"""
# Build headers with Referer to bypass hotlink protection
parsed = urlparse(url)
if referer is None:
referer = f"{parsed.scheme}://{parsed.netloc}/"
headers = {
"Referer": referer,
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
}
resp = requests.get(url, timeout=DEFAULT_TIMEOUT, stream=True, headers=headers)
resp.raise_for_status()
ext = self._guess_extension(url, resp.headers.get("Content-Type"))
filename = f"{uuid.uuid4().hex[:12]}{ext}"
filepath = os.path.join(self.temp_dir, filename)
with open(filepath, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
f.write(chunk)
self.downloaded_files.append(filepath)
print(f"[image_downloader] Downloaded: {url}")
print(f" -> {filepath} ({os.path.getsize(filepath)} bytes)")
return filepath
def download_all(self, urls: list[str]) -> list[str]:
"""
Download multiple images. Returns list of local file paths.
Skips URLs that fail to download (logs the error, continues).
"""
paths = []
for url in urls:
try:
path = self.download(url)
paths.append(path)
except Exception as e:
print(f"[image_downloader] Failed to download {url}: {e}", file=sys.stderr)
return paths
def cleanup(self):
"""Remove all downloaded files and the temp directory."""
if self._owns_dir and os.path.isdir(self.temp_dir):
shutil.rmtree(self.temp_dir, ignore_errors=True)
print(f"[image_downloader] Cleaned up temp dir: {self.temp_dir}")
else:
for f in self.downloaded_files:
try:
os.remove(f)
except OSError:
pass
print(f"[image_downloader] Cleaned up {len(self.downloaded_files)} files.")
self.downloaded_files.clear()
def __enter__(self):
return self
def __exit__(self, *_):
self.cleanup()
if __name__ == "__main__":
# Quick test: download URLs passed as command-line arguments
if len(sys.argv) < 2:
print("Usage: python image_downloader.py <url1> [url2] ...")
sys.exit(1)
dl = ImageDownloader()
paths = dl.download_all(sys.argv[1:])
print(f"\nDownloaded {len(paths)} image(s):")
for p in paths:
print(f" {p}")
print(f"Temp dir: {dl.temp_dir}")
print("Files will remain until manually cleaned up.")

View File

@@ -0,0 +1,213 @@
"""
Unified publish pipeline for Xiaohongshu.
Single CLI entry point that orchestrates:
chrome_launcher → login check → image download → form fill → (optional) publish
Usage:
# Fill form only (default) - review in browser before publishing
python publish_pipeline.py --title "标题" --content "正文" --image-urls URL1 URL2
python publish_pipeline.py --title-file t.txt --content-file body.txt --image-urls URL1
# Headless mode (no GUI window) - faster for automated publishing
python publish_pipeline.py --headless --title-file t.txt --content-file body.txt --image-urls URL1
# Publish to a specific account
python publish_pipeline.py --account myaccount --title "标题" --content "正文" --image-urls URL1
# Fill and auto-publish in one step
python publish_pipeline.py --title "标题" --content "正文" --image-urls URL1 --auto-publish
# Use local image files instead of URLs
python publish_pipeline.py --title "标题" --content "正文" --images img1.jpg img2.jpg
Exit codes:
0 = success (READY_TO_PUBLISH or PUBLISHED)
1 = not logged in (NOT_LOGGED_IN) - headless auto-fallback will restart headed
2 = error (see stderr)
"""
import argparse
import os
import sys
# Ensure UTF-8 output on Windows consoles
if sys.platform == "win32":
os.environ.setdefault("PYTHONIOENCODING", "utf-8")
try:
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
except Exception:
pass
# Add scripts dir to path so sibling modules can be imported
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
if SCRIPT_DIR not in sys.path:
sys.path.insert(0, SCRIPT_DIR)
from chrome_launcher import ensure_chrome, restart_chrome
from cdp_publish import XiaohongshuPublisher, CDPError
from image_downloader import ImageDownloader
def main():
parser = argparse.ArgumentParser(
description="Xiaohongshu publish pipeline - unified entry point"
)
# Title
title_group = parser.add_mutually_exclusive_group(required=True)
title_group.add_argument("--title", help="Article title text")
title_group.add_argument("--title-file", help="Read title from UTF-8 file")
# Content
content_group = parser.add_mutually_exclusive_group(required=True)
content_group.add_argument("--content", help="Article body text")
content_group.add_argument("--content-file", help="Read content from UTF-8 file")
# Images
img_group = parser.add_mutually_exclusive_group(required=True)
img_group.add_argument(
"--image-urls", nargs="+", help="Image URLs to download"
)
img_group.add_argument(
"--images", nargs="+", help="Local image file paths"
)
# Publish mode
parser.add_argument(
"--auto-publish",
action="store_true",
default=False,
help="Click publish button after filling (default: fill only)",
)
# Headless mode
parser.add_argument(
"--headless",
action="store_true",
default=False,
help="Run Chrome in headless mode (no GUI). Auto-falls back to headed if login is needed.",
)
# Optional temp dir for downloaded images
parser.add_argument(
"--temp-dir",
default=None,
help="Directory for downloaded images (default: auto-created temp dir)",
)
# Account selection
parser.add_argument(
"--account",
default=None,
help="Account name to publish to (default: default account)",
)
args = parser.parse_args()
headless = args.headless
account = args.account
# --- Resolve title ---
if args.title_file:
with open(args.title_file, encoding="utf-8") as f:
title = f.read().strip()
else:
title = args.title
if not title:
print("Error: title is empty.", file=sys.stderr)
sys.exit(2)
# --- Resolve content ---
if args.content_file:
with open(args.content_file, encoding="utf-8") as f:
content = f.read().strip()
else:
content = args.content
if not content:
print("Error: content is empty.", file=sys.stderr)
sys.exit(2)
# --- Step 1: Ensure Chrome is running ---
mode_label = "headless" if headless else "headed"
account_label = account or "default"
print(f"[pipeline] Step 1: Ensuring Chrome is running ({mode_label}, account: {account_label})...")
if not ensure_chrome(headless=headless, account=account):
print("Error: Failed to start Chrome.", file=sys.stderr)
sys.exit(2)
# --- Step 2: Connect and check login ---
print("[pipeline] Step 2: Checking login status...")
publisher = XiaohongshuPublisher()
try:
publisher.connect()
logged_in = publisher.check_login()
if not logged_in:
publisher.disconnect()
if headless:
# Auto-fallback: restart Chrome in headed mode for QR login
print("[pipeline] Headless mode: not logged in. Switching to headed mode for login...")
restart_chrome(headless=False, account=account)
publisher.connect()
publisher.open_login_page()
print("NOT_LOGGED_IN")
sys.exit(1)
except CDPError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(2)
# --- Step 3: Prepare images ---
image_paths = []
downloader = None
if args.image_urls:
print(f"[pipeline] Step 3: Downloading {len(args.image_urls)} image(s)...")
downloader = ImageDownloader(temp_dir=args.temp_dir)
image_paths = downloader.download_all(args.image_urls)
if not image_paths:
print("Error: All image downloads failed.", file=sys.stderr)
sys.exit(2)
else:
image_paths = args.images
# Verify local files exist
for p in image_paths:
if not os.path.isfile(p):
print(f"Error: Image file not found: {p}", file=sys.stderr)
sys.exit(2)
print(f"[pipeline] Step 3: Using {len(image_paths)} local image(s).")
# --- Step 4: Fill form ---
print("[pipeline] Step 4: Filling form...")
try:
publisher.publish(title=title, content=content, image_paths=image_paths)
print("FILL_STATUS: READY_TO_PUBLISH")
except CDPError as e:
print(f"Error during form fill: {e}", file=sys.stderr)
if downloader:
downloader.cleanup()
sys.exit(2)
# --- Step 5: Publish (optional) ---
if args.auto_publish:
print("[pipeline] Step 5: Clicking publish button...")
try:
publisher._click_publish()
print("PUBLISH_STATUS: PUBLISHED")
except CDPError as e:
print(f"Error clicking publish: {e}", file=sys.stderr)
if downloader:
downloader.cleanup()
sys.exit(2)
# --- Cleanup ---
publisher.disconnect()
if downloader:
downloader.cleanup()
print("[pipeline] Done.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1 @@
Win11 Build 28020 Canary通道更新