You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

137 lines
4.3 KiB

#!/usr/bin/env python3
import json
import csv
from typing import Any, Dict, List, Set
def extract_headers(obj: Any, prefix: str = "", headers: Set[str] = None) -> Set[str]:
"""递归提取 JSON 的所有字段作为表头"""
if headers is None:
headers = set()
if isinstance(obj, dict):
for key, value in obj.items():
new_prefix = f"{prefix}.{key}" if prefix else key
extract_headers(value, new_prefix, headers)
elif isinstance(obj, list):
if obj:
# 用第一个元素来推断结构
first_item = obj[0]
if isinstance(first_item, (dict, list)):
# 如果是复杂对象,递归处理
extract_headers(first_item, f"{prefix}[0]", headers)
else:
# 如果是简单类型,用泛用名
headers.add(f"{prefix}")
else:
# 基础类型,添加当前路径
if prefix:
headers.add(prefix)
return headers
def extract_values(obj: Any, headers: List[str]) -> Dict[str, Any]:
"""根据表头从 JSON 中提取对应的值"""
result = {}
def get_value(path: str, current_obj: Any) -> Any:
"""根据路径获取值"""
parts = []
i = 0
while i < len(path):
if path[i] == '[':
# 处理数组索引
end = path.find(']', i)
if end != -1:
parts.append(path[i:end+1])
i = end + 1
if i < len(path) and path[i] == '.':
i += 1
else:
break
elif path[i] == '.':
i += 1
else:
# 处理普通键
dot = path.find('.', i)
bracket = path.find('[', i)
if dot == -1 and bracket == -1:
parts.append(path[i:])
break
elif dot == -1:
parts.append(path[i:bracket])
i = bracket
elif bracket == -1:
parts.append(path[i:dot])
i = dot
else:
split = min(dot, bracket)
parts.append(path[i:split])
i = split
current = current_obj
for part in parts:
if part.startswith('[') and part.endswith(']'):
# 数组索引
try:
idx = int(part[1:-1])
if isinstance(current, list) and idx < len(current):
current = current[idx]
else:
return ""
except (ValueError, IndexError):
return ""
else:
# 字典键
if isinstance(current, dict) and part in current:
current = current[part]
else:
return ""
# 处理列表,转为逗号分隔的字符串
if isinstance(current, list):
return ", ".join(str(x) for x in current)
return current
for header in headers:
result[header] = get_value(header, obj)
return result
def main():
# 测试 JSON
json_str = '''
{
"name": "张三",
"contact": {
"email": "zhang@example.com",
"phone": "13800138000"
},
"tags": ["A", "B"]
}
'''
data = json.loads(json_str)
headers = sorted(extract_headers(data))
# 生成 CSV 文件
csv_filename = "/root/.openclaw/workspace/code-generate/json_headers.csv"
with open(csv_filename, 'w', newline='', encoding='utf-8-sig') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=headers)
writer.writeheader()
# 如果是数组,处理每一行;否则处理单个对象
if isinstance(data, list):
for item in data:
row = extract_values(item, headers)
writer.writerow(row)
else:
row = extract_values(data, headers)
writer.writerow(row)
print(f"✅ CSV 文件已生成:{csv_filename}")
print(f"\n📊 表头:{', '.join(headers)}")
if __name__ == "__main__":
main()