在Python开发过程中,文件读写操作几乎是每个项目都会涉及的核心功能。无论是处理配置文件、读取数据集、还是进行日志记录,掌握文件操作技巧都是Python开发者的必备技能。
很多初学者在文件操作时经常遇到编码问题、路径错误、文件句柄未关闭等困扰。本文将从实际开发角度出发,为你详细讲解Python文件读写的各种场景和最佳实践,让你彻底掌握这项关键技术。
在Windows环境下,默认编码通常是GBK,而很多文本文件采用UTF-8编码,这经常导致乱码问题:
Python# ❌ 错误示例:可能出现编码问题
with open('data.txt', 'r') as f:
content = f.read() # 可能出现UnicodeDecodeError
Windows的反斜杠路径分隔符经常让开发者头疼,特别是在跨平台开发时:
Python# ❌ 不推荐的路径写法
file_path = "C:\data\file.txt" # 转义字符问题
忘记关闭文件句柄是新手常犯的错误,可能导致内存泄漏:
Python# ❌ 危险的写法
f = open('file.txt', 'r')
content = f.read()
# 忘记调用 f.close()
Python 3.4+推荐使用pathlib模块,它提供了跨平台的路径操作:
Pythonfrom pathlib import Path
# ✅ 推荐的路径处理方式
data_dir = Path("data")
file_path = data_dir / "config.txt"
print(file_path) # 自动适配操作系统路径分隔符

明确指定编码格式,避免编码问题:
Python# ✅ 正确的编码处理
with open('data.txt', 'r', encoding='utf-8') as f:
content = f.read()
使用with语句确保文件句柄正确关闭:
Python# ✅ 安全的资源管理
with open('file.txt', 'r', encoding='utf-8') as f:
content = f.read()
# 文件自动关闭,无需手动调用close()
Pythonfrom pathlib import Path
def safe_read_file(file_path, encoding='utf-8'):
"""
安全读取文件内容
Args:
file_path: 文件路径
encoding: 文件编码,默认utf-8
Returns:
str: 文件内容,如果读取失败返回None
"""
try:
path = Path(file_path)
if not path.exists():
print(f"文件不存在: {file_path}")
return None
with open(path, 'r', encoding=encoding) as f:
return f.read()
except UnicodeDecodeError:
print(f"编码错误,尝试使用GBK编码读取: {file_path}")
try:
with open(path, 'r', encoding='gbk') as f:
return f.read()
except Exception as e:
print(f"读取文件失败: {e}")
return None
except Exception as e:
print(f"读取文件出错: {e}")
return None
# 使用示例
content = safe_read_file("config.txt")
if content:
print("文件内容读取成功")

Pythonimport json
from datetime import datetime
from pathlib import Path
def safe_write_file(file_path, content, encoding='utf-8', mode='w'):
"""
安全写入文件
Args:
file_path: 文件路径
content: 要写入的内容
encoding: 编码格式
mode: 写入模式 ('w': 覆盖, 'a': 追加)
Returns:
bool: 写入是否成功
"""
try:
path = Path(file_path)
# 确保目录存在
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, mode, encoding=encoding) as f:
f.write(content)
return True
except Exception as e:
print(f"写入文件失败: {e}")
return False
# 实用示例:日志记录函数
def write_log(message, log_file="logs/app.log"):
"""写入日志信息"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}\n"
return safe_write_file(log_file, log_entry, mode='a')
# 使用示例
write_log("应用程序启动")
write_log("用户登录成功")

Pythondef process_large_file(file_path, chunk_size=8192):
"""
按块读取大文件,避免内存溢出
Args:
file_path: 文件路径
chunk_size: 每次读取的字节数
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
# 处理每个数据块
yield chunk
except Exception as e:
print(f"处理大文件出错: {e}")
# 使用示例:统计大文件行数
def count_lines(file_path):
"""统计文件行数"""
line_count = 0
try:
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
line_count += 1
return line_count
except Exception as e:
print(f"统计行数失败: {e}")
return 0
print(f"文件行数: {count_lines('large_data.txt')}")

Pythonimport json
import csv
from pathlib import Path
class DataManager:
"""数据文件管理器"""
@staticmethod
def read_json(file_path):
"""读取JSON文件"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
print(f"JSON文件不存在: {file_path}")
return {}
except json.JSONDecodeError as e:
print(f"JSON格式错误: {e}")
return {}
@staticmethod
def write_json(file_path, data, indent=4):
"""写入JSON文件"""
try:
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=indent)
return True
except Exception as e:
print(f"写入JSON失败: {e}")
return False
@staticmethod
def read_csv(file_path):
"""读取CSV文件"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
return list(reader)
except Exception as e:
print(f"读取CSV失败: {e}")
return []
@staticmethod
def write_csv(file_path, data, fieldnames=None):
"""写入CSV文件"""
if not data:
return False
try:
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
with open(file_path, 'w', newline='', encoding='utf-8') as f:
if fieldnames is None:
fieldnames = data[0].keys()
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
return True
except Exception as e:
print(f"写入CSV失败: {e}")
return False
# 使用示例
config_data = {
"database": {
"host": "localhost",
"port": 3306,
"username": "admin"
},
"app_settings": {
"debug": True,
"log_level": "INFO"
}
}
# 保存配置
DataManager.write_json("config/app_config.json", config_data)
# 读取配置
config = DataManager.read_json("config/app_config.json")
print(f"数据库主机: {config.get('database', {}).get('host', 'localhost')}")

Pythonimport os
import shutil
import tempfile
from datetime import datetime
from pathlib import Path
class FileOperator:
"""文件操作工具类"""
@staticmethod
def check_file_status(file_path):
"""检查文件状态"""
path = Path(file_path)
status = {
"exists": path.exists(),
"is_file": path.is_file(),
"is_dir": path.is_dir(),
"size": 0,
"readable": False,
"writable": True
}
if path.exists():
try:
status["size"] = path.stat().st_size
status["readable"] = os.access(path, os.R_OK)
status["writable"] = os.access(path, os.W_OK)
except Exception as e:
print(f"获取文件状态失败: {e}")
return status
@staticmethod
def backup_file(source_path, backup_dir="backups"):
"""创建文件备份"""
try:
source = Path(source_path)
if not source.exists():
print(f"源文件不存在: {source_path}")
return False
backup_path = Path(backup_dir)
backup_path.mkdir(parents=True, exist_ok=True)
# 生成备份文件名(包含时间戳)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = backup_path / f"{source.stem}_{timestamp}{source.suffix}"
shutil.copy2(source, backup_file)
print(f"备份成功: {backup_file}")
return True
except Exception as e:
print(f"备份失败: {e}")
return False
@staticmethod
def safe_write_file(file_path, content, encoding='utf-8'):
"""安全写入文件内容"""
try:
# 创建临时文件
with tempfile.NamedTemporaryFile('w', delete=False, encoding=encoding) as temp_file:
temp_file.write(content)
temp_file_path = temp_file.name
# 替换原文件
shutil.move(temp_file_path, file_path)
print(f"文件内容已安全写入: {file_path}")
return True
except Exception as e:
print(f"写入文件失败: {e}")
return False
@staticmethod
def safe_replace_file(file_path, new_content, encoding='utf-8'):
"""安全替换文件内容(先备份再替换)"""
try:
# 先备份原文件
if Path(file_path).exists():
FileOperator.backup_file(file_path)
# 写入新内容
return FileOperator.safe_write_file(file_path, new_content, encoding)
except Exception as e:
print(f"安全替换文件失败: {e}")
return False
# 使用示例
file_info = FileOperator.check_file_status("important_data.txt")
print(f"文件信息: {file_info}")
if file_info["exists"] and file_info["writable"]:
FileOperator.safe_replace_file("important_data.txt", "新的重要数据")

Pythonfrom pathlib import Path
import json
from pathlib import Path
class DataManager:
@staticmethod
def read_json(file_path):
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"读JSON出错: {e}")
return None
@staticmethod
def write_json(file_path, data):
try:
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
print(f"JSON 成功写入 {file_path}")
return True
except Exception as e:
print(f"JSON 出错: {e}")
return False
class ConfigManager:
"""配置文件管理器 - 实际项目中的应用示例"""
def __init__(self, config_file="config/settings.json"):
self.config_file = Path(config_file)
self.config_data = self.load_config()
def load_config(self):
"""加载配置文件"""
if self.config_file.exists():
return DataManager.read_json(self.config_file)
else:
# 创建默认配置
default_config = {
"app": {
"name": "MyApp",
"version": "1.0.0",
"debug": False
},
"database": {
"host": "localhost",
"port": 5432,
"name": "mydb"
},
"logging": {
"level": "INFO",
"file": "logs/app.log"
}
}
self.save_config(default_config)
return default_config
def get(self, key, default=None):
"""获取配置值"""
keys = key.split('.')
value = self.config_data
try:
for k in keys:
value = value[k]
return value
except (KeyError, TypeError):
return default
def set(self, key, value):
"""设置配置值"""
keys = key.split('.')
config = self.config_data
# 导航到目标位置
for k in keys[:-1]:
if k not in config:
config[k] = {}
config = config[k]
# 设置值
config[keys[-1]] = value
# 保存到文件
self.save_config()
def save_config(self, config_data=None):
"""保存配置到文件"""
data = config_data or self.config_data
return DataManager.write_json(self.config_file, data)
# 使用示例
config = ConfigManager()
# 读取配置
app_name = config.get("app.name", "DefaultApp")
db_host = config.get("database.host", "localhost")
print(f"应用名称: {app_name}")
print(f"数据库主机: {db_host}")
# 修改配置
config.set("app.debug", True)
config.set("database.port", 3306)
print("配置已更新并保存")

Pythondef batch_process_files(file_pattern, processor_func):
"""批量处理文件"""
from glob import glob
files = glob(file_pattern)
results = []
for file_path in files:
try:
result = processor_func(file_path)
results.append({
"file": file_path,
"result": result,
"status": "success"
})
except Exception as e:
results.append({
"file": file_path,
"error": str(e),
"status": "failed"
})
return results
def count_lines(file_path):
"""统计文件行数"""
with open(file_path, 'r', encoding='utf-8') as file:
return sum(1 for _ in file)
# 使用示例:批量统计文件行数
results = batch_process_files("*.txt", count_lines)
for result in results:
if result["status"] == "success":
print(f"{result['file']}: {result['result']} 行")

Pythondef compare_files(file1, file2, chunk_size=8192):
"""比较两个文件是否相同"""
try:
with open(file1, 'rb') as f1, open(file2, 'rb') as f2:
while True:
chunk1 = f1.read(chunk_size)
chunk2 = f2.read(chunk_size)
if chunk1 != chunk2:
return False
if not chunk1: # 文件结束
return True
except Exception as e:
print(f"文件比较失败: {e}")
return False
file1 = 'file1.txt'
file2 = 'file2.txt'
if compare_files(file1, file2):
print(f"{file1} 和 {file2} 是相同的文件。")
else:
print(f"{file1} 和 {file2} 是不同的文件。")

通过本文的详细讲解,我们掌握了Python文件读写操作的核心技能。总结三个关键要点:首先,始终使用pathlib处理路径和with语句管理文件资源,这能避免90%的常见问题;其次,明确指定文件编码格式,特别是在Windows环境下开发时,UTF-8编码能确保跨平台兼容性;最后,构建完善的异常处理机制,让你的程序更加健壮可靠。
掌握这些技巧后,你就能在Python开发和上位机开发项目中游刃有余地处理各种文件操作需求。无论是配置文件管理、数据处理,还是日志记录,这些编程技巧都将成为你开发工具箱中的重要武器。
继续深入学习建议:可以进一步了解异步文件操作(aiofiles)、文件监控(watchdog)、以及数据库文件操作等高级主题,让你的Python开发技能更上一层楼。
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!