python编译问题
最近研究域,不想自己维护员工信息,就想同步企业微信通讯录。刚好看到一篇文章满足需求。但是我是小白,按文中代码编译时各种出错,求python大佬,帮忙编译下。我只要exe文件既可以。原文链接如下https://blog.csdn.net/qq_35209305/article/details/143747690,有精通AD域的大神么,求带。 https://s21.ax1x.com/2025/05/28/pVpm9zt.jpg
原文和你代码没有复制完呢,后面还有很多呢 直接发错误代码 zhuguiyong 发表于 2025-5-21 22:49
直接发错误代码
C:\Users\Administrator\Desktop\ad1\.venv\Scripts\python.exe C:\Users\Administrator\Desktop\ad1\AD_Sync_WeCom.py
File "C:\Users\Administrator\Desktop\ad1\AD_Sync_WeCom.py", line 681
logger.info(f"处理用户: {display_name}, 用户ID: {username}, 邮箱: {email}, 选定
^
SyntaxError: EOL while scanning string literal
进程已结束,退出代码为 1 马克 发表于 2025-5-22 08:48
C:%users\Administrator\Desktop\ad1\.venv\Scripts\python.exe C:%users\Administrator\Desktop\ad1\AD_ ...
把错误扔到deepseek里面 greatzdl 发表于 2025-5-22 09:38
把错误扔到deepseek里面
试过了,扣子空间,腾讯混元,deepseek都试过,可能我太菜了吧 马克 发表于 2025-5-22 08:48
C:%users\Administrator\Desktop\ad1\.venv\Scripts\python.exe C:%users\Administrator\Desktop\ad1\AD_ ...
行尾没有括号结尾?)去哪了?没复制到吗
还有其他错误吗有的话你吧源码发个盘 zhuguiyong 发表于 2025-5-22 12:48
还有其他错误吗有的话你吧源码发个盘
import os
import sys
import logging
import json
import csv
import requests
import subprocess
import configparser
from datetime import datetime
from typing import Dict, List, Optional, Set, Tuple, Any
import time
# 设置标准输入输出的UTF8编码
sys.stdin.reconfigure(encoding='utf-8')
sys.stdout.reconfigure(encoding='utf-8')
def setup_logging() -> logging.Logger:
"""设置日志配置"""
global log_filename# 使变量全局可访问
log_filename = f"ad_wecom_sync_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_filename, encoding='utf-8'),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
class WeComAPI:
"""企业微信API接口类"""
def __init__(self, corpid: str, corpsecret: str):
"""初始化企业微信API"""
self.corpid = corpid
self.corpsecret = corpsecret
self.access_token = self._get_access_token()
self.logger = logging.getLogger(__name__)
def _get_access_token(self) -> str:
"""获取企业微信API访问token"""
url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={self.corpid}&corpsecret={self.corpsecret}"
response = requests.get(url, timeout=10)
result = response.json()
if result.get('errcode') == 0:
return result['access_token']
else:
error_msg = f"获取access_token失败: {result.get('errmsg')}"
self.logger.error(error_msg)
raise Exception(error_msg)
def get_department_list(self) -> List]:
"""获取部门列表"""
url = f"https://qyapi.weixin.qq.com/cgi-bin/department/list?access_token={self.access_token}"
response = requests.get(url, timeout=10)
result = response.json()
if result.get('errcode') == 0:
return result.get("department", [])
else:
error_msg = f"获取部门列表失败: {result.get('errmsg')}"
self.logger.error(error_msg)
raise Exception(error_msg)
def get_department_users(self, department_id: int) -> List]:
"""获取部门成员详情"""
url = f"https://qyapi.weixin.qq.com/cgi-bin/user/list?access_token={self.access_token}&department_id={department_id}&fetch_child=0"
response = requests.get(url, timeout=10)
result = response.json()
if result.get('errcode') == 0:
return result.get("userlist", [])
else:
error_msg = f"获取部门成员失败: {result.get('errmsg')}"
self.logger.error(error_msg)
raise Exception(error_msg)
def get_user_detail(self, userid: str) -> Dict:
"""获取成员详情"""
url = f"https://qyapi.weixin.qq.com/cgi-bin/user/get?access_token={self.access_token}&userid={userid}"
response = requests.get(url, timeout=10)
result = response.json()
if result.get('errcode') == 0:
return result
else:
error_msg = f"获取用户详情失败: {userid}, {result.get('errmsg')}"
self.logger.error(error_msg)
return {}
def get_all_users(self) -> List]:
"""获取所有企业微信用户"""
all_users: List] = []
try:
departments = self.get_department_list()
for dept in departments:
users = self.get_department_users(dept['id'])
all_users.extend(users)
# 去重处理
seen_userids: Set = set()
unique_users: List] = []
for user in all_users:
if user['userid'] not in seen_userids:
seen_userids.add(user['userid'])
unique_users.append(user)
return unique_users
except Exception as e:
self.logger.error(f"获取所有用户失败: {str(e)}")
return []
class ADSync:
"""AD域控同步类"""
def __init__(self, domain: str, exclude_departments: Optional] = None, exclude_accounts: Optional] = None):
"""初始化AD同步类"""
self.domain = domain
self.exclude_departments = exclude_departments or []
self.exclude_accounts = exclude_accounts or []
self.logger = logging.getLogger(__name__)
self.init_powershell_encoding()
self.ensure_disabled_users_ou()# 初始化时确保 Disabled Users OU 存在
def init_powershell_encoding(self) -> None:
"""初始化PowerShell的UTF8编码支持"""
commands = [
"$OutputEncoding = ::OutputEncoding = ::UTF8",
"chcp 65001"
]
for cmd in commands:
self.run_powershell(cmd)
def run_powershell(self, command: str) -> Tuple:
"""执行PowerShell命令并返回结果"""
try:
full_command = f"""
$OutputEncoding = ::OutputEncoding = ::UTF8
::OutputEncoding = ::UTF8
{command}
"""
process = subprocess.run(
["powershell", "-Command", full_command],
capture_output=True,
text=True,
encoding='utf-8'
)
return process.returncode == 0, process.stdout.strip()
except Exception as e:
self.logger.error(f"执行PowerShell命令失败: {str(e)}")
return False, str(e)
def get_ou_dn(self, ou_path: List) -> str:
"""获取OU的Distinguished Name"""
return ','.join() + f',DC={self.domain.replace(".", ",DC=")}'
def ou_exists(self, ou_dn: str) -> bool:
"""检查OU是否存在"""
command = f"Get-ADOrganizationalUnit -Identity '{ou_dn}' -ErrorAction SilentlyContinue"
success, _ = self.run_powershell(command)
return success
def create_ou(self, ou_name: str, parent_dn: str) -> bool:
"""创建OU和对应的安全组"""
try:
# 检查是否在排除列表中
if ou_name in self.exclude_departments:
self.logger.info(f"跳过创建OU: {ou_name} (在排除列表中)")
return True
if not self.ou_exists(f"OU={ou_name},{parent_dn}"):
command = f"""
New-ADOrganizationalUnit `
-Name "{ou_name}" `
-Path "{parent_dn}" `
-ProtectedFromAccidentalDeletion $false
"""
success, output = self.run_powershell(command)
if success:
self.logger.info(f"创建OU成功: {ou_name}")
ou_dn = f"OU={ou_name},{parent_dn}"
group_command = f"""
New-ADGroup `
-Name "{ou_name}" `
-GroupScope Global `
-GroupCategory Security `
-Path "{ou_dn}"
"""
group_success, group_output = self.run_powershell(group_command)
if group_success:
self.logger.info(f"创建同名安全组成功: {ou_name}")
return True
else:
self.logger.error(f"创建安全组失败: {ou_name}, 错误: {group_output}")
else:
self.logger.error(f"创建OU失败: {ou_name}, 错误: {output}")
else:
self.logger.info(f"OU已存在: {ou_name}")
return False
except Exception as e:
self.logger.error(f"创建OU过程出错: {str(e)}")
return False
def get_user(self, username: str) -> Optional]:
"""获取AD用户信息"""
command = f"""
Get-ADUser -Identity '{username}' -Properties * |
Select-Object * |
ConvertTo-Json
"""
success, output = self.run_powershell(command)
if success and output:
try:
return json.loads(output)
except json.JSONDecodeError:
return None
return None
def check_email_exists(self, email: str, exclude_user: Optional = None) -> bool:
"""检查邮箱是否已被其他用户使用"""
try:
exclude_condition = f"-and SamAccountName -ne '{exclude_user}'" if exclude_user else ""
command = f"""
Get-ADUser -Filter {{Mail -eq '{email}' {exclude_condition}}} |
Select-Object -First 1 |
Select-Object -ExpandProperty SamAccountName
"""
success, output = self.run_powershell(command)
return success and bool(output.strip())
except Exception as e:
self.logger.error(f"检查邮箱是否存在时出错: {str(e)}")
return False
def get_user_email(self, username: str) -> str:
"""获取AD用户当前的邮箱地址"""
try:
command = f"""
Get-ADUser -Identity '{username}' -Properties Mail |
Select-Object -ExpandProperty Mail
"""
success, output = self.run_powershell(command)
return output.strip() if success and output.strip() else ""
except Exception as e:
self.logger.error(f"获取用户邮箱失败 {username}: {str(e)}")
return ""
def create_user(self, username: str, display_name: str, email: str, ou_dn: str) -> bool:
"""创建AD用户"""
try:
command = f"""
$securePassword = ConvertTo-SecureString -String 'Notting8899' -AsPlainText -Force
New-ADUser `
-SamAccountName '{username}' `
-Name '{display_name}' `
-DisplayName '{display_name}' `
-EmailAddress '{email}' `
-Enabled $true `
-Path '{ou_dn}' `
-AccountPassword $securePassword `
-ChangePasswordAtLogon $true
"""
success, output = self.run_powershell(command)
if success:
self.logger.info(f"创建用户成功: {username} ({display_name})")
return True
else:
self.logger.error(f"创建用户失败: {username} ({display_name}), 错误: {output}")
return False
except Exception as e:
self.logger.error(f"创建用户过程出错: {str(e)}")
return False
def update_user(self, username: str, display_name: str, email: str, ou_dn: str) -> bool:
"""更新AD用户信息"""
try:
# 检查用户当前是否已有邮箱
current_email = self.get_user_email(username)
if current_email:
self.logger.info(f"用户 {username} 已有邮箱 {current_email},保持不变")
command = f"""
Get-ADUser -Identity '{username}' |
Set-ADUser `
-DisplayName '{display_name}'
"""
else:
self.logger.info(f"用户 {username} 无邮箱,设置新邮箱: {email}")
command = f"""
Get-ADUser -Identity '{username}' |
Set-ADUser `
-DisplayName '{display_name}' `
-EmailAddress '{email}'
"""
success, output = self.run_powershell(command)
if success:
# 移动用户到指定OU
move_command = f"""
Move-ADObject `
-Identity (Get-ADUser -Identity '{username}').DistinguishedName `
-TargetPath '{ou_dn}'
"""
move_success, move_output = self.run_powershell(move_command)
if move_success:
self.logger.info(f"更新用户成功: {username} ({display_name})")
return True
else:
self.logger.error(f"移动用户失败: {username}, 错误: {move_output}")
else:
self.logger.error(f"更新用户信息失败: {username}, 错误: {output}")
return False
except Exception as e:
self.logger.error(f"更新用户过程出错: {str(e)}")
return False
def add_user_to_group(self, username: str, group_name: str) -> bool:
"""将用户添加到安全组"""
try:
command = f"""
Add-ADGroupMember `
-Identity '{group_name}' `
-Members '{username}' `
-ErrorAction SilentlyContinue
"""
success, output = self.run_powershell(command)
if success:
self.logger.info(f"添加用户到组成功: {username} -> {group_name}")
return True
else:
self.logger.error(f"添加用户到组失败: {username} -> {group_name}, 错误: {output}")
return False
except Exception as e:
self.logger.error(f"添加用户到组过程出错: {str(e)}")
return False
def get_all_enabled_users(self) -> List:
"""获取所有启用状态的AD用户账户(排除系统账户和配置的排除账户)"""
try:
# 构建排除账户的过滤条件
exclude_accounts = '|'.join(self.exclude_accounts)
command = f"""
Get-ADUser -Filter {{Enabled -eq $true}} -Properties SamAccountName |
Where-Object {{
$_.SamAccountName -notmatch '^({exclude_accounts})$' -and
$_.SamAccountName -notlike '*$'
}} |
Select-Object -ExpandProperty SamAccountName |
ConvertTo-Json
"""
success, output = self.run_powershell(command)
if success and output:
try:
return json.loads(output)
except json.JSONDecodeError:
self.logger.error("解析AD用户列表失败")
return []
return []
except Exception as e:
self.logger.error(f"获取AD用户列表失败: {str(e)}")
return []
def is_user_active(self, username: str) -> bool:
"""检查用户是否处于启用状态"""
try:
command = f"""
(Get-ADUser -Identity '{username}' -Properties Enabled).Enabled
"""
success, output = self.run_powershell(command)
return success and output.strip().lower() == 'true'
except Exception as e:
self.logger.error(f"检查用户状态失败 {username}: {str(e)}")
return False
def disable_user(self, username: str) -> bool:
"""禁用AD用户账户"""
try:
# 确保 Disabled Users OU 存在
if not self.ensure_disabled_users_ou():
self.logger.error("无法确保 Disabled Users OU 存在,禁用用户操作可能会失败")
# 首先检查用户是否存在且处于启用状态
if not self.is_user_active(username):
self.logger.info(f"用户 {username} 已经处于禁用状态或不存在")
return True
# 禁用账户
disable_command = f"""
$user = Get-ADUser -Identity '{username}'
if ($user) {{
Disable-ADAccount -Identity $user
Set-ADUser -Identity $user `
-Description "Account disabled - Not found in WeChat Work - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
Write-Output "Success"
}} else {{
Write-Error "User not found"
}}
"""
success, output = self.run_powershell(disable_command)
if success and "Success" in output:
self.logger.info(f"成功禁用账户: {username}")
# 移动到禁用用户OU
disabled_ou = f"OU=Disabled Users,DC={self.domain.replace('.', ',DC=')}"
move_command = f"""
$user = Get-ADUser -Identity '{username}'
if ($user) {{
Move-ADObject -Identity $user.DistinguishedName -TargetPath '{disabled_ou}'
Write-Output "Moved"
}}
"""
move_success, _ = self.run_powershell(move_command)
if move_success:
self.logger.info(f"已将禁用账户 {username} 移动到 Disabled Users OU")
return True
else:
self.logger.error(f"禁用账户失败 {username}: {output}")
return False
except Exception as e:
self.logger.error(f"禁用账户过程出错 {username}: {str(e)}")
return False
def get_user_details(self, username: str) -> Dict:
"""获取AD用户的详细信息"""
try:
command = f"""
Get-ADUser -Identity '{username}' -Properties DisplayName, Mail, Created, Modified, LastLogonDate, Description |
Select-Object SamAccountName, DisplayName, Mail, Created, Modified, LastLogonDate, Description |
ConvertTo-Json
"""
success, output = self.run_powershell(command)
if success and output:
try:
return json.loads(output)
except json.JSONDecodeError:
return {}
return {}
except Exception as e:
self.logger.error(f"获取用户详情失败: {str(e)}")
return {}
def ensure_disabled_users_ou(self) -> bool:
"""确保 Disabled Users OU 存在,不存在则创建"""
try:
disabled_ou = f"OU=Disabled Users,DC={self.domain.replace('.', ',DC=')}"
if not self.ou_exists(disabled_ou):
self.logger.info("Disabled Users OU 不存在,正在创建...")
command = f"""
New-ADOrganizationalUnit `
-Name "Disabled Users" `
-Path "DC={self.domain.replace('.', ',DC=')}" `
-Description "存放已禁用的用户账户" `
-ProtectedFromAccidentalDeletion $false
"""
success, output = self.run_powershell(command)
if success:
self.logger.info("成功创建 Disabled Users OU")
return True
else:
self.logger.error(f"创建 Disabled Users OU 失败: {output}")
return False
else:
self.logger.info("Disabled Users OU 已存在")
return True
except Exception as e:
self.logger.error(f"检查/创建 Disabled Users OU 时出错: {str(e)}")
return False
class WeChatBot:
"""企业微信机器人通知类"""
def __init__(self, webhook_url: str):
"""初始化企业微信机器人"""
self.webhook_url = webhook_url
self.logger = logging.getLogger(__name__)
self.logger.info(f"初始化企业微信机器人: {webhook_url}")
def send_message(self, content: str) -> bool:
"""发送消息到企业微信机器人"""
try:
self.logger.info("开始发送企业微信机器人消息")
self.logger.debug(f"消息内容: {content}")
data = {
"msgtype": "markdown",
"markdown": {
"content": content
}
}
response = requests.post(
self.webhook_url,
json=data,
timeout=10# 添加超时设置
)
response.raise_for_status()# 抛出HTTP错误
result = response.json()
if result.get('errcode') == 0:
self.logger.info("机器人消息发送成功")
return True
else:
self.logger.error(f"机器人消息发送失败: {result}")
return False
except requests.RequestException as e:
self.logger.error(f"发送请求失败: {str(e)}")
return False
except json.JSONDecodeError as e:
self.logger.error(f"解析响应JSON失败: {str(e)}")
return False
except Exception as e:
self.logger.error(f"发送机器人消息时出错: {str(e)}")
return False
def format_time_duration(seconds: float) -> str:
"""格式化时间间隔"""
minutes, seconds = divmod(int(seconds), 60)
hours, minutes = divmod(minutes, 60)
if hours > 0:
return f"{hours}小时{minutes}分钟{seconds}秒"
elif minutes > 0:
return f"{minutes}分钟{seconds}秒"
else:
return f"{seconds}秒"
def main() -> None:
"""主函数"""
start_time = time.time()
sync_stats: Dict = {
'total_users': 0,
'processed_users': 0,
'disabled_users': [],
'error_count': 0,
'log_file': ''# 添加日志文件路径
}
# 设置日志
logger = setup_logging()
sync_stats['log_file'] = log_filename# 保存日志文件名
try:
# 读取配置文件
config_parser = configparser.ConfigParser()
config_parser.read('config.ini', encoding='utf-8')
# 配置信息
config = {
'wecom': {
'corpid': config_parser.get('WeChat', 'CorpID'),
'corpsecret': config_parser.get('WeChat', 'CorpSecret')
},
'domain': config_parser.get('Domain', 'Name'),
'exclude_departments': ,
'exclude_accounts': [
*,
*
],
'webhook_url': config_parser.get('WeChatBot', 'WebhookUrl')
}
# 验证机器人webhook地址
if not config['webhook_url'] or 'key=' not in config['webhook_url']:
logger.error("企业微信机器人webhook地址无效")
raise ValueError("无效的webhook地址")
bot = WeChatBot(config['webhook_url'])
# 发送开始执行通知
start_message = f"""## 企业微信-AD同步开始执行
> 开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
> 域名: {config['domain']}
"""
bot.send_message(start_message)
# 初始化企业微信API和AD同步
wecom = WeComAPI(config['wecom']['corpid'], config['wecom']['corpsecret'])
ad_sync = ADSync(
config['domain'],
config['exclude_departments'],
config['exclude_accounts']
)
# 获取所有企业微信用户ID
wecom_users: Set = set()
departments = wecom.get_department_list()
for dept in departments:
users = wecom.get_department_users(dept['id'])
wecom_users.update(user['userid'] for user in users)
logger.info(f"企业微信中共有 {len(wecom_users)} 个用户账户")
# 构建部门树和计算路径的逻辑需要修改
dept_tree: Dict] = {}
for dept in departments:
dept_tree] = {
'name': dept['name'],
'parentid': dept['parentid'],
'path': []
}
# 计算每个部门的完整路径
for dept_id in dept_tree:
path: List = []
current_id = dept_id
while current_id != 0:# 根部门的 parentid 为 0
if current_id not in dept_tree:
break
path.insert(0, dept_tree['name'])
current_id = dept_tree['parentid']
dept_tree['path'] = path
# 同步OU结构
for dept_id, dept_info in dept_tree.items():
current_path: List = []
for ou_name in dept_info['path']:
current_path.append(ou_name)
if len(current_path) > 1:
parent_path = current_path[:-1]
parent_dn = ad_sync.get_ou_dn(parent_path)
else:
parent_dn = f"DC={config['domain'].replace('.', ',DC=')}"
ad_sync.create_ou(ou_name, parent_dn)
# 同步用户
logger.info("开始同步用户...")
processed_users: Set = set()
# 收集用户的所有部门信息
user_departments: Dict] = {}# userid -> {'user_info': Dict, 'departments': List}
for dept_id in dept_tree:
users = wecom.get_department_users(dept_id)
for user in users:
userid = user['userid']
if userid not in user_departments:
user_departments = {
'user_info': user,
'departments': []
}
user_departments['departments'].append(dept_tree)
# 处理所有用户
for userid, info in user_departments.items():
if userid in processed_users:
continue
user = info['user_info']
departments = info['departments']
username = user['userid']
display_name = user['name']
# 选择合适的部门作为用户的OU
target_dept = None
for dept in departments:
# 如果部门不在排除列表中,则选择该部门
if dept['path'] and dept['path'][-1] not in config['exclude_departments']:
target_dept = dept
break
if not target_dept:
logger.warning(f"用户 {username} ({display_name}) 所有部门都在排除列表中,跳过处理")
processed_users.add(userid)
continue
ou_path = target_dept['path']
ou_dn = ad_sync.get_ou_dn(ou_path)
# 获取用户详细信息
user_detail = wecom.get_user_detail(username)
email = user_detail.get('email', '')
# 如果企业微信没有设置邮箱,则使用默认规则生成
if not email:
email = f"{username}@{config['domain']}"
logger.warning(f"用户 {display_name}({username}) 在企业微信中未设置邮箱,使用默认邮箱: {email}")
logger.info(f"处理用户: {display_name}, 用户ID: {username}, 邮箱: {email}, 选定 这贴要沉了么,求个好心人帮编译一下啊
页:
[1]
2