Files
xiaohongshu-mcp/skills/post-to-xhs/scripts/cdp_publish.py
Angiin 8572c8c5e0 feat: add long article publish mode to post-to-xhs skill
Add "写长文" workflow with template selection support:
- cdp_publish.py: new commands (long-article, select-template, click-next-step)
- publish_pipeline.py: add --mode parameter (image-text / long-article)
- SKILL.md: document long article flow (B.1-B.5 steps)
- publish-workflow.md: add long article selectors, CLI usage, detailed steps
2026-02-27 16:27:16 +08:00

1057 lines
40 KiB
Python

"""
CDP-based Xiaohongshu publisher.
Connects to a Chrome instance via Chrome DevTools Protocol to automate
publishing articles on Xiaohongshu (RED) creator center.
CLI usage:
# Basic commands (image-text mode)
python cdp_publish.py check-login [--headless] [--account NAME]
python cdp_publish.py fill --title "标题" --content "正文" --images img1.jpg [--headless] [--account NAME]
python cdp_publish.py publish --title "标题" --content "正文" --images img1.jpg [--headless] [--account NAME]
python cdp_publish.py click-publish [--headless] [--account NAME]
# Long article mode
python cdp_publish.py long-article --title "标题" --content "正文" [--images img1.jpg] [--account NAME]
python cdp_publish.py click-next-step [--account NAME]
# Account management
python cdp_publish.py login [--account NAME] # open browser for QR login
python cdp_publish.py re-login [--account NAME] # clear cookies and re-login same account
python cdp_publish.py switch-account [--account NAME] # clear cookies + open login for new account
python cdp_publish.py list-accounts # list all configured accounts
python cdp_publish.py add-account NAME [--alias ALIAS] # add a new account
python cdp_publish.py remove-account NAME # remove an account
Library usage:
from cdp_publish import XiaohongshuPublisher
publisher = XiaohongshuPublisher()
publisher.connect()
publisher.check_login()
publisher.publish(
title="Article title",
content="Article body text",
image_paths=["/path/to/img1.jpg", "/path/to/img2.jpg"],
)
"""
import json
import os
import time
import sys
from typing import Any
# Ensure UTF-8 output on Windows consoles
if sys.platform == "win32":
os.environ.setdefault("PYTHONIOENCODING", "utf-8")
try:
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
except Exception:
pass
import requests
import websockets.sync.client as ws_client
# ---------------------------------------------------------------------------
# Configuration - centralised selectors and URLs for easy maintenance
# ---------------------------------------------------------------------------
CDP_HOST = "127.0.0.1"
CDP_PORT = 9222
# Xiaohongshu URLs
XHS_CREATOR_URL = "https://creator.xiaohongshu.com/publish/publish"
XHS_HOME_URL = "https://www.xiaohongshu.com"
XHS_LOGIN_CHECK_URL = "https://creator.xiaohongshu.com"
# DOM selectors (update these when Xiaohongshu changes their page structure)
# Last verified: 2026-02
SELECTORS = {
# "上传图文" tab - must click before uploading images
"image_text_tab": "div.creator-tab",
"image_text_tab_text": "上传图文",
# Upload area - the file input element for images (visible after clicking tab)
"upload_input": "input.upload-input",
"upload_input_alt": 'input[type="file"]',
# Title input field (visible after image upload)
"title_input": 'input[placeholder*="填写标题"]',
"title_input_alt": "input.d-text",
# Content editor area - TipTap/ProseMirror contenteditable div
"content_editor": "div.tiptap.ProseMirror",
"content_editor_alt": 'div.ProseMirror[contenteditable="true"]',
# Publish button
"publish_button_text": "发布",
# Login indicator - URL-based check (redirect to /login if not logged in)
"login_indicator": '.user-info, .creator-header, [class*="user"]',
# Long article mode
"long_article_tab_text": "写长文",
"new_creation_btn_text": "新的创作",
"long_title_input": 'textarea.d-text[placeholder="输入标题"]',
"auto_format_btn_text": "一键排版",
"next_step_btn_text": "下一步",
"template_card": ".template-card",
}
# Timing
PAGE_LOAD_WAIT = 3 # seconds to wait after navigation
TAB_CLICK_WAIT = 2 # seconds to wait after clicking tab
UPLOAD_WAIT = 6 # seconds to wait after image upload for editor to appear
ACTION_INTERVAL = 1 # seconds between actions
AUTO_FORMAT_WAIT = 5 # seconds to wait after clicking auto-format
TEMPLATE_WAIT = 10 # seconds max to wait for template cards to appear
class CDPError(Exception):
"""Error communicating with Chrome via CDP."""
class XiaohongshuPublisher:
"""Automates publishing to Xiaohongshu via CDP."""
def __init__(self, host: str = CDP_HOST, port: int = CDP_PORT):
self.host = host
self.port = port
self.ws = None
self._msg_id = 0
# ------------------------------------------------------------------
# CDP connection management
# ------------------------------------------------------------------
def _get_targets(self) -> list[dict]:
"""Get list of available browser targets (tabs). Retries once on failure."""
url = f"http://{self.host}:{self.port}/json"
for attempt in range(2):
try:
resp = requests.get(url, timeout=5)
resp.raise_for_status()
return resp.json()
except Exception as e:
if attempt == 0:
print(f"[cdp_publish] CDP connection failed ({e}), restarting Chrome...")
from chrome_launcher import ensure_chrome
ensure_chrome(self.port)
time.sleep(2)
else:
raise CDPError(f"Cannot reach Chrome on {self.host}:{self.port}: {e}")
def _find_or_create_tab(self, target_url_prefix: str = "") -> str:
"""Find an existing tab matching the URL prefix, or return the first page tab."""
targets = self._get_targets()
pages = [t for t in targets if t.get("type") == "page"]
if target_url_prefix:
for t in pages:
if t.get("url", "").startswith(target_url_prefix):
return t["webSocketDebuggerUrl"]
# Create a new tab
resp = requests.put(
f"http://{self.host}:{self.port}/json/new?{XHS_CREATOR_URL}",
timeout=5,
)
if resp.ok:
return resp.json().get("webSocketDebuggerUrl", "")
# Fallback: use first available page
if pages:
return pages[0]["webSocketDebuggerUrl"]
raise CDPError("No browser tabs available.")
def connect(self, target_url_prefix: str = ""):
"""Connect to a Chrome tab via WebSocket."""
ws_url = self._find_or_create_tab(target_url_prefix)
if not ws_url:
raise CDPError("Could not obtain WebSocket URL for any tab.")
print(f"[cdp_publish] Connecting to {ws_url}")
self.ws = ws_client.connect(ws_url)
print("[cdp_publish] Connected to Chrome tab.")
def disconnect(self):
"""Close the WebSocket connection."""
if self.ws:
self.ws.close()
self.ws = None
# ------------------------------------------------------------------
# CDP command helpers
# ------------------------------------------------------------------
def _send(self, method: str, params: dict | None = None) -> dict:
"""Send a CDP command and return the result."""
if not self.ws:
raise CDPError("Not connected. Call connect() first.")
self._msg_id += 1
msg = {"id": self._msg_id, "method": method}
if params:
msg["params"] = params
self.ws.send(json.dumps(msg))
# Wait for the matching response
while True:
raw = self.ws.recv()
data = json.loads(raw)
if data.get("id") == self._msg_id:
if "error" in data:
raise CDPError(f"CDP error: {data['error']}")
return data.get("result", {})
# else: it's an event, skip it
def _evaluate(self, expression: str) -> Any:
"""Execute JavaScript in the page and return the result value."""
result = self._send("Runtime.evaluate", {
"expression": expression,
"returnByValue": True,
"awaitPromise": True,
})
remote_obj = result.get("result", {})
if remote_obj.get("subtype") == "error":
raise CDPError(f"JS error: {remote_obj.get('description', remote_obj)}")
return remote_obj.get("value")
def _navigate(self, url: str):
"""Navigate the current tab to the given URL and wait for load."""
print(f"[cdp_publish] Navigating to {url}")
self._send("Page.enable")
self._send("Page.navigate", {"url": url})
time.sleep(PAGE_LOAD_WAIT)
# ------------------------------------------------------------------
# Login check
# ------------------------------------------------------------------
def check_login(self) -> bool:
"""
Navigate to Xiaohongshu creator center and check if the user is logged in.
Returns True if logged in. If not logged in, prints instructions
and returns False.
"""
self._navigate(XHS_LOGIN_CHECK_URL)
time.sleep(2)
# Check if we got redirected to a login page
current_url = self._evaluate("window.location.href")
print(f"[cdp_publish] Current URL: {current_url}")
if "login" in current_url.lower():
print(
"\n[cdp_publish] NOT LOGGED IN.\n"
" Please scan the QR code in the Chrome window to log in,\n"
" then run this script again.\n"
)
return False
print("[cdp_publish] Login confirmed.")
return True
def clear_cookies(self, domain: str = ".xiaohongshu.com"):
"""
Clear all cookies for the given domain to force re-login.
Used when switching accounts.
"""
print(f"[cdp_publish] Clearing cookies for {domain}...")
self._send("Network.enable")
self._send("Network.clearBrowserCookies")
# Also clear storage
self._send("Storage.clearDataForOrigin", {
"origin": "https://www.xiaohongshu.com",
"storageTypes": "cookies,local_storage,session_storage",
})
self._send("Storage.clearDataForOrigin", {
"origin": "https://creator.xiaohongshu.com",
"storageTypes": "cookies,local_storage,session_storage",
})
print("[cdp_publish] Cookies and storage cleared.")
def open_login_page(self):
"""
Navigate to the Xiaohongshu login page for QR code scanning.
Used for initial login or after clearing cookies for account switch.
"""
self._navigate(XHS_LOGIN_CHECK_URL)
time.sleep(2)
current_url = self._evaluate("window.location.href")
if "login" not in current_url.lower():
# Already logged in, navigate to login page explicitly
self._navigate("https://creator.xiaohongshu.com/login")
time.sleep(2)
print(
"\n[cdp_publish] Login page is open.\n"
" Please scan the QR code in the Chrome window to log in.\n"
)
# ------------------------------------------------------------------
# Publishing actions
# ------------------------------------------------------------------
def _click_image_text_tab(self):
"""Click the '上传图文' tab to switch to image+text publish mode."""
print("[cdp_publish] Clicking '上传图文' tab...")
tab_text = SELECTORS["image_text_tab_text"]
selector = SELECTORS["image_text_tab"]
clicked = self._evaluate(f"""
(function() {{
var tabs = document.querySelectorAll('{selector}');
for (var i = 0; i < tabs.length; i++) {{
if (tabs[i].textContent.trim() === '{tab_text}') {{
tabs[i].click();
return true;
}}
}}
return false;
}})();
""")
if not clicked:
raise CDPError(
f"Could not find '{tab_text}' tab. "
"The page structure may have changed."
)
print("[cdp_publish] Tab clicked, waiting for upload area...")
time.sleep(TAB_CLICK_WAIT)
def _upload_images(self, image_paths: list[str]):
"""Upload images via the file input element."""
if not image_paths:
print("[cdp_publish] No images to upload, skipping.")
return
# Normalize paths (forward slashes for CDP)
normalized = [p.replace("\\", "/") for p in image_paths]
print(f"[cdp_publish] Uploading {len(image_paths)} image(s)...")
# Enable DOM domain
self._send("DOM.enable")
# Get the document root
doc = self._send("DOM.getDocument")
root_id = doc["root"]["nodeId"]
# Try primary selector, then fallback
node_id = 0
for selector in (SELECTORS["upload_input"], SELECTORS["upload_input_alt"]):
result = self._send("DOM.querySelector", {
"nodeId": root_id,
"selector": selector,
})
node_id = result.get("nodeId", 0)
if node_id:
break
if not node_id:
raise CDPError(
"Could not find file input element.\n"
"The page structure may have changed. Check references/publish-workflow.md."
)
# Use DOM.setFileInputFiles to set the files
self._send("DOM.setFileInputFiles", {
"nodeId": node_id,
"files": normalized,
})
print("[cdp_publish] Images uploaded. Waiting for editor to appear...")
time.sleep(UPLOAD_WAIT)
def _fill_title(self, title: str):
"""Fill in the article title."""
print(f"[cdp_publish] Setting title: {title[:40]}...")
time.sleep(ACTION_INTERVAL)
for selector in (SELECTORS["title_input"], SELECTORS["title_input_alt"]):
found = self._evaluate(f"!!document.querySelector('{selector}')")
if found:
escaped_title = json.dumps(title)
self._evaluate(f"""
(function() {{
var el = document.querySelector('{selector}');
var nativeSetter = Object.getOwnPropertyDescriptor(
window.HTMLInputElement.prototype, 'value'
).set;
el.focus();
nativeSetter.call(el, {escaped_title});
el.dispatchEvent(new Event('input', {{ bubbles: true }}));
el.dispatchEvent(new Event('change', {{ bubbles: true }}));
}})();
""")
print("[cdp_publish] Title set.")
return
raise CDPError("Could not find title input element.")
def _fill_content(self, content: str):
"""Fill in the article body content using the TipTap/ProseMirror editor."""
print(f"[cdp_publish] Setting content ({len(content)} chars)...")
time.sleep(ACTION_INTERVAL)
for selector in (SELECTORS["content_editor"], SELECTORS["content_editor_alt"]):
found = self._evaluate(f"!!document.querySelector('{selector}')")
if found:
escaped = json.dumps(content)
self._evaluate(f"""
(function() {{
var el = document.querySelector('{selector}');
el.focus();
var text = {escaped};
var paragraphs = text.split('\\n').filter(function(p) {{ return p.trim(); }});
var html = [];
for (var i = 0; i < paragraphs.length; i++) {{
html.push('<p>' + paragraphs[i] + '</p>');
if (i < paragraphs.length - 1) {{
html.push('<p><br></p>');
}}
}}
el.innerHTML = html.join('');
el.dispatchEvent(new Event('input', {{ bubbles: true }}));
}})();
""")
print("[cdp_publish] Content set.")
return
raise CDPError("Could not find content editor element.")
def _click_publish(self):
"""Click the publish button (found by text content)."""
print("[cdp_publish] Clicking publish button...")
time.sleep(ACTION_INTERVAL)
btn_text = SELECTORS["publish_button_text"]
clicked = self._evaluate(f"""
(function() {{
// Strategy 1: search <button> elements by text
var buttons = document.querySelectorAll('button');
for (var i = 0; i < buttons.length; i++) {{
var t = buttons[i].textContent.trim();
if (t === '{btn_text}') {{
buttons[i].click();
return true;
}}
}}
// Strategy 2: search d-button-content / d-text spans
var spans = document.querySelectorAll('.d-button-content .d-text, .d-button-content span');
for (var i = 0; i < spans.length; i++) {{
if (spans[i].textContent.trim() === '{btn_text}') {{
var el = spans[i].closest('button, [role="button"], .d-button, [class*="btn"], [class*="button"]');
if (!el) el = spans[i].closest('.d-button-content');
if (!el) el = spans[i];
el.click();
return true;
}}
}}
return false;
}})();
""")
if clicked:
print("[cdp_publish] Publish button clicked.")
else:
raise CDPError(
"Could not find publish button. "
"Please click it manually in the browser."
)
# ------------------------------------------------------------------
# Long article actions
# ------------------------------------------------------------------
def _click_long_article_tab(self):
"""Click the '写长文' tab to switch to long article mode."""
print("[cdp_publish] Clicking '写长文' tab...")
tab_text = SELECTORS["long_article_tab_text"]
selector = SELECTORS["image_text_tab"] # same container: div.creator-tab
clicked = self._evaluate(f"""
(function() {{
var tabs = document.querySelectorAll('{selector}');
for (var i = 0; i < tabs.length; i++) {{
if (tabs[i].textContent.trim() === '{tab_text}') {{
tabs[i].click();
return true;
}}
}}
return false;
}})();
""")
if not clicked:
raise CDPError(
f"Could not find '{tab_text}' tab. "
"The page structure may have changed."
)
print("[cdp_publish] '写长文' tab clicked.")
time.sleep(TAB_CLICK_WAIT)
def _click_new_creation(self):
"""Click the '新的创作' button to start a new long article."""
print("[cdp_publish] Clicking '新的创作' button...")
btn_text = SELECTORS["new_creation_btn_text"]
clicked = self._evaluate(f"""
(function() {{
// Search all elements for text match
var candidates = document.querySelectorAll(
'.center span, .center div, .center button, .center a, '
+ 'button, [role="button"], [class*="btn"], [class*="creation"]'
);
for (var i = 0; i < candidates.length; i++) {{
if (candidates[i].textContent.trim() === '{btn_text}') {{
candidates[i].click();
return true;
}}
}}
return false;
}})();
""")
if not clicked:
raise CDPError(
f"Could not find '{btn_text}' button. "
"The page structure may have changed."
)
print("[cdp_publish] '新的创作' button clicked.")
time.sleep(PAGE_LOAD_WAIT)
def _fill_long_title(self, title: str):
"""Fill in the long article title (textarea element)."""
print(f"[cdp_publish] Setting long article title: {title[:40]}...")
time.sleep(ACTION_INTERVAL)
selector = SELECTORS["long_title_input"]
found = self._evaluate(f"!!document.querySelector('{selector}')")
if not found:
raise CDPError(
f"Could not find long title textarea ('{selector}'). "
"The page structure may have changed."
)
escaped_title = json.dumps(title)
self._evaluate(f"""
(function() {{
var el = document.querySelector('{selector}');
var nativeSetter = Object.getOwnPropertyDescriptor(
window.HTMLTextAreaElement.prototype, 'value'
).set;
el.focus();
nativeSetter.call(el, {escaped_title});
el.dispatchEvent(new Event('input', {{ bubbles: true }}));
el.dispatchEvent(new Event('change', {{ bubbles: true }}));
}})();
""")
print("[cdp_publish] Long article title set.")
def _click_auto_format(self):
"""Click the '一键排版' button."""
print("[cdp_publish] Clicking '一键排版' button...")
btn_text = SELECTORS["auto_format_btn_text"]
clicked = self._evaluate(f"""
(function() {{
var elems = document.querySelectorAll(
'button, [role="button"], span, div, a, [class*="btn"]'
);
for (var i = 0; i < elems.length; i++) {{
if (elems[i].textContent.trim() === '{btn_text}') {{
elems[i].click();
return true;
}}
}}
return false;
}})();
""")
if not clicked:
raise CDPError(
f"Could not find '{btn_text}' button. "
"The page structure may have changed."
)
print("[cdp_publish] '一键排版' button clicked. Waiting for templates...")
time.sleep(AUTO_FORMAT_WAIT)
def _wait_for_templates(self) -> bool:
"""Wait for template cards to appear after clicking auto-format."""
print("[cdp_publish] Waiting for template cards to load...")
selector = SELECTORS["template_card"]
for attempt in range(TEMPLATE_WAIT):
found = self._evaluate(
f"document.querySelectorAll('{selector}').length"
)
if found and found > 0:
print(f"[cdp_publish] Found {found} template card(s).")
return True
time.sleep(1)
print("[cdp_publish] Warning: No template cards found within timeout.")
return False
def get_template_names(self) -> list[str]:
"""Get the list of available template names from the page."""
selector = SELECTORS["template_card"]
names = self._evaluate(f"""
(function() {{
var cards = document.querySelectorAll('{selector}');
var names = [];
for (var i = 0; i < cards.length; i++) {{
var title = cards[i].querySelector('.template-title');
names.push(title ? title.textContent.trim() : 'Template ' + i);
}}
return names;
}})();
""")
return names or []
def select_template(self, name: str) -> bool:
"""Select a template by clicking the card with the matching name."""
print(f"[cdp_publish] Selecting template: {name}...")
selector = SELECTORS["template_card"]
clicked = self._evaluate(f"""
(function() {{
var cards = document.querySelectorAll('{selector}');
for (var i = 0; i < cards.length; i++) {{
var title = cards[i].querySelector('.template-title');
if (title && title.textContent.trim() === {json.dumps(name)}) {{
cards[i].click();
return true;
}}
}}
return false;
}})();
""")
if clicked:
print(f"[cdp_publish] Template '{name}' selected.")
time.sleep(ACTION_INTERVAL)
else:
print(f"[cdp_publish] Warning: Template '{name}' not found.")
return bool(clicked)
def _click_next_step(self):
"""Click the '下一步' button."""
print("[cdp_publish] Clicking '下一步' button...")
btn_text = SELECTORS["next_step_btn_text"]
clicked = self._evaluate(f"""
(function() {{
var elems = document.querySelectorAll(
'button, [role="button"], span, div, a, [class*="btn"]'
);
for (var i = 0; i < elems.length; i++) {{
if (elems[i].textContent.trim() === '{btn_text}') {{
elems[i].click();
return true;
}}
}}
return false;
}})();
""")
if not clicked:
raise CDPError(
f"Could not find '{btn_text}' button. "
"The page structure may have changed."
)
print("[cdp_publish] '下一步' button clicked.")
time.sleep(PAGE_LOAD_WAIT)
def publish_long_article(
self,
title: str,
content: str,
image_paths: list[str] | None = None,
) -> list[str]:
"""
Execute the full long article publish workflow:
1. Navigate to creator publish page
2. Click '写长文' tab
3. Click '新的创作' button
4. Fill title (textarea)
5. Fill content (TipTap editor)
6. (Optional) Insert images into editor
7. Click '一键排版'
8. Wait for templates
Returns list of available template names for the caller to
present to the user for selection.
Args:
title: Article title
content: Article body text (paragraphs separated by newlines)
image_paths: Optional list of local file paths to images
"""
if not self.ws:
raise CDPError("Not connected. Call connect() first.")
# Step 1: Navigate to publish page
self._navigate(XHS_CREATOR_URL)
time.sleep(2)
# Step 2: Click '写长文' tab
self._click_long_article_tab()
# Step 3: Click '新的创作'
self._click_new_creation()
# Step 4: Fill title
self._fill_long_title(title)
# Step 5: Fill content
self._fill_content(content)
# Step 6: Upload images into editor (if provided)
if image_paths:
print(f"[cdp_publish] Inserting {len(image_paths)} image(s) into editor...")
for img_path in image_paths:
normalized = img_path.replace("\\", "/")
self._evaluate(f"""
(function() {{
var editor = document.querySelector('{SELECTORS["content_editor"]}');
if (!editor) return false;
var img = document.createElement('img');
img.src = 'file:///{normalized}';
editor.appendChild(img);
editor.dispatchEvent(new Event('input', {{ bubbles: true }}));
return true;
}})();
""")
time.sleep(ACTION_INTERVAL)
# Step 7: Click '一键排版'
self._click_auto_format()
# Step 8: Wait for templates and return names
self._wait_for_templates()
template_names = self.get_template_names()
print(
"\n[cdp_publish] Templates loaded.\n"
" Available templates: " + ", ".join(template_names) + "\n"
)
return template_names
def click_next_and_prepare_publish(self, content: str = ""):
"""After user selects a template, click '下一步' and fill the publish page description."""
self._click_next_step()
# The publish page has a separate content editor for the post description
if content:
time.sleep(ACTION_INTERVAL)
self._fill_content(content)
print(
"\n[cdp_publish] Ready to publish.\n"
" Please review in the browser before confirming publish.\n"
)
# ------------------------------------------------------------------
# Main publish workflow (image-text mode)
# ------------------------------------------------------------------
def publish(
self,
title: str,
content: str,
image_paths: list[str] | None = None,
):
"""
Execute the full publish workflow:
1. Navigate to creator publish page
2. Click '上传图文' tab
3. Upload images (this triggers the editor to appear)
4. Fill title
5. Fill content
Args:
title: Article title
content: Article body text (paragraphs separated by newlines)
image_paths: List of local file paths to images to upload
"""
if not self.ws:
raise CDPError("Not connected. Call connect() first.")
if not image_paths:
raise CDPError("At least one image is required to publish on Xiaohongshu.")
# Step 1: Navigate to publish page
self._navigate(XHS_CREATOR_URL)
time.sleep(2)
# Step 2: Click '上传图文' tab
self._click_image_text_tab()
# Step 3: Upload images (editor appears after upload)
self._upload_images(image_paths)
# Step 4: Fill title
self._fill_title(title)
# Step 5: Fill content
self._fill_content(content)
print(
"\n[cdp_publish] Content has been filled in.\n"
" Please review in the browser before publishing.\n"
)
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def main():
import argparse
from chrome_launcher import ensure_chrome, restart_chrome
parser = argparse.ArgumentParser(description="Xiaohongshu CDP Publisher")
parser.add_argument("--headless", action="store_true",
help="Use headless Chrome (no GUI window)")
parser.add_argument("--account", help="Account name to use (default: default account)")
sub = parser.add_subparsers(dest="command", required=True)
# check-login
sub.add_parser("check-login", help="Check login status (exit 0=logged in, 1=not)")
# fill - fill form without clicking publish
p_fill = sub.add_parser("fill", help="Fill title/content/images without publishing")
p_fill.add_argument("--title", required=True)
p_fill.add_argument("--content", default=None)
p_fill.add_argument("--content-file", default=None, help="Read content from file")
p_fill.add_argument("--images", nargs="+", required=True)
# publish - fill form and click publish
p_pub = sub.add_parser("publish", help="Fill form and click publish")
p_pub.add_argument("--title", required=True)
p_pub.add_argument("--content", default=None)
p_pub.add_argument("--content-file", default=None, help="Read content from file")
p_pub.add_argument("--images", nargs="+", required=True)
# long-article - long article mode
p_long = sub.add_parser("long-article", help="Fill long article content with auto-format and template selection")
p_long.add_argument("--title", default=None)
p_long.add_argument("--title-file", default=None, help="Read title from file")
p_long.add_argument("--content", default=None)
p_long.add_argument("--content-file", default=None, help="Read content from file")
p_long.add_argument("--images", nargs="+", default=None, help="Optional image file paths")
# select-template - select a template by name
p_tpl = sub.add_parser("select-template", help="Select a long article template by name")
p_tpl.add_argument("--name", required=True, help="Template name to select")
# click-next-step - click next step button (for long article after template selection)
p_next = sub.add_parser("click-next-step", help="Click '下一步' button after template selection")
p_next.add_argument("--content", default=None, help="Post description text")
p_next.add_argument("--content-file", default=None, help="Read post description from file")
# click-publish - just click the publish button on current page
sub.add_parser("click-publish", help="Click publish button on already-filled page")
# login - open browser for QR code login (always headed)
sub.add_parser("login", help="Open browser for QR code login (always headed mode)")
# re-login - clear cookies and re-login the same account (always headed)
sub.add_parser("re-login", help="Clear cookies and re-login same account (always headed)")
# switch-account - clear cookies and open login page (always headed)
sub.add_parser("switch-account",
help="Clear cookies and open login page for new account (always headed)")
# list-accounts - list all configured accounts
sub.add_parser("list-accounts", help="List all configured accounts")
# add-account - add a new account
p_add = sub.add_parser("add-account", help="Add a new account")
p_add.add_argument("name", help="Account name (unique identifier)")
p_add.add_argument("--alias", help="Display name / description")
# remove-account - remove an account
p_rm = sub.add_parser("remove-account", help="Remove an account")
p_rm.add_argument("name", help="Account name to remove")
p_rm.add_argument("--delete-profile", action="store_true",
help="Also delete the Chrome profile directory")
# set-default-account - set default account
p_def = sub.add_parser("set-default-account", help="Set the default account")
p_def.add_argument("name", help="Account name to set as default")
args = parser.parse_args()
headless = args.headless
account = args.account
# Account management commands that don't need Chrome
if args.command == "list-accounts":
from account_manager import list_accounts
accounts = list_accounts()
if not accounts:
print("No accounts configured.")
return
print(f"{'Name':<20} {'Alias':<25} {'Default':<10}")
print("-" * 55)
for acc in accounts:
default_mark = "*" if acc["is_default"] else ""
print(f"{acc['name']:<20} {acc['alias']:<25} {default_mark:<10}")
return
elif args.command == "add-account":
from account_manager import add_account, get_profile_dir
if add_account(args.name, args.alias):
print(f"Account '{args.name}' added.")
print(f"Profile dir: {get_profile_dir(args.name)}")
print("\nTo log in to this account, run:")
print(f" python cdp_publish.py --account {args.name} login")
else:
print(f"Error: Account '{args.name}' already exists.", file=sys.stderr)
sys.exit(1)
return
elif args.command == "remove-account":
from account_manager import remove_account
if remove_account(args.name, args.delete_profile):
print(f"Account '{args.name}' removed.")
else:
print(f"Error: Cannot remove account '{args.name}'.", file=sys.stderr)
sys.exit(1)
return
elif args.command == "set-default-account":
from account_manager import set_default_account
if set_default_account(args.name):
print(f"Default account set to '{args.name}'.")
else:
print(f"Error: Account '{args.name}' not found.", file=sys.stderr)
sys.exit(1)
return
# Commands that require Chrome - login/re-login/switch-account always headed
if args.command in ("login", "re-login", "switch-account"):
headless = False
if not ensure_chrome(headless=headless, account=account):
print("Failed to start Chrome. Exiting.")
sys.exit(1)
publisher = XiaohongshuPublisher()
try:
if args.command == "check-login":
publisher.connect()
logged_in = publisher.check_login()
if not logged_in and headless:
print(
"[cdp_publish] Headless mode: cannot scan QR code.\n"
" Run with 'login' command or without --headless to log in."
)
sys.exit(0 if logged_in else 1)
elif args.command in ("fill", "publish"):
content = args.content
if args.content_file:
with open(args.content_file, encoding="utf-8") as f:
content = f.read().strip()
if not content:
print("Error: --content or --content-file required.", file=sys.stderr)
sys.exit(1)
publisher.connect()
publisher.publish(title=args.title, content=content, image_paths=args.images)
print("FILL_STATUS: READY_TO_PUBLISH")
if args.command == "publish":
publisher._click_publish()
print("PUBLISH_STATUS: PUBLISHED")
elif args.command == "long-article":
title = args.title
if args.title_file:
with open(args.title_file, encoding="utf-8") as f:
title = f.read().strip()
if not title:
print("Error: --title or --title-file required.", file=sys.stderr)
sys.exit(1)
content = args.content
if args.content_file:
with open(args.content_file, encoding="utf-8") as f:
content = f.read().strip()
if not content:
print("Error: --content or --content-file required.", file=sys.stderr)
sys.exit(1)
publisher.connect()
template_names = publisher.publish_long_article(
title=title, content=content, image_paths=args.images,
)
# Print template names as JSON for programmatic consumption
print("TEMPLATES: " + json.dumps(template_names, ensure_ascii=False))
print("LONG_ARTICLE_STATUS: TEMPLATE_SELECTION")
elif args.command == "select-template":
publisher.connect(target_url_prefix="https://creator.xiaohongshu.com/publish")
if publisher.select_template(args.name):
print(f"TEMPLATE_SELECTED: {args.name}")
else:
print(f"Error: Template '{args.name}' not found.", file=sys.stderr)
sys.exit(1)
elif args.command == "click-next-step":
content = getattr(args, 'content', None)
if getattr(args, 'content_file', None):
with open(args.content_file, encoding="utf-8") as f:
content = f.read().strip()
publisher.connect(target_url_prefix="https://creator.xiaohongshu.com/publish")
publisher.click_next_and_prepare_publish(content=content or "")
print("LONG_ARTICLE_STATUS: READY_TO_PUBLISH")
elif args.command == "click-publish":
publisher.connect(target_url_prefix="https://creator.xiaohongshu.com/publish")
publisher._click_publish()
print("PUBLISH_STATUS: PUBLISHED")
elif args.command == "login":
# Ensure headed mode for QR scanning
restart_chrome(headless=False, account=account)
publisher.connect()
publisher.open_login_page()
print("LOGIN_READY")
elif args.command == "re-login":
# Ensure headed mode, clear cookies, re-open login page for same account
restart_chrome(headless=False, account=account)
publisher.connect()
publisher.clear_cookies()
time.sleep(1)
publisher.open_login_page()
print("RE_LOGIN_READY")
elif args.command == "switch-account":
# Ensure headed mode, clear cookies, open login page
restart_chrome(headless=False, account=account)
publisher.connect()
publisher.clear_cookies()
time.sleep(1)
publisher.open_login_page()
print("SWITCH_ACCOUNT_READY")
finally:
publisher.disconnect()
if __name__ == "__main__":
main()