You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
137 lines
4.3 KiB
137 lines
4.3 KiB
#!/usr/bin/env python3
|
|
import json
|
|
import csv
|
|
from typing import Any, Dict, List, Set
|
|
|
|
def extract_headers(obj: Any, prefix: str = "", headers: Set[str] = None) -> Set[str]:
|
|
"""递归提取 JSON 的所有字段作为表头"""
|
|
if headers is None:
|
|
headers = set()
|
|
|
|
if isinstance(obj, dict):
|
|
for key, value in obj.items():
|
|
new_prefix = f"{prefix}.{key}" if prefix else key
|
|
extract_headers(value, new_prefix, headers)
|
|
elif isinstance(obj, list):
|
|
if obj:
|
|
# 用第一个元素来推断结构
|
|
first_item = obj[0]
|
|
if isinstance(first_item, (dict, list)):
|
|
# 如果是复杂对象,递归处理
|
|
extract_headers(first_item, f"{prefix}[0]", headers)
|
|
else:
|
|
# 如果是简单类型,用泛用名
|
|
headers.add(f"{prefix}")
|
|
else:
|
|
# 基础类型,添加当前路径
|
|
if prefix:
|
|
headers.add(prefix)
|
|
|
|
return headers
|
|
|
|
def extract_values(obj: Any, headers: List[str]) -> Dict[str, Any]:
|
|
"""根据表头从 JSON 中提取对应的值"""
|
|
result = {}
|
|
|
|
def get_value(path: str, current_obj: Any) -> Any:
|
|
"""根据路径获取值"""
|
|
parts = []
|
|
i = 0
|
|
while i < len(path):
|
|
if path[i] == '[':
|
|
# 处理数组索引
|
|
end = path.find(']', i)
|
|
if end != -1:
|
|
parts.append(path[i:end+1])
|
|
i = end + 1
|
|
if i < len(path) and path[i] == '.':
|
|
i += 1
|
|
else:
|
|
break
|
|
elif path[i] == '.':
|
|
i += 1
|
|
else:
|
|
# 处理普通键
|
|
dot = path.find('.', i)
|
|
bracket = path.find('[', i)
|
|
if dot == -1 and bracket == -1:
|
|
parts.append(path[i:])
|
|
break
|
|
elif dot == -1:
|
|
parts.append(path[i:bracket])
|
|
i = bracket
|
|
elif bracket == -1:
|
|
parts.append(path[i:dot])
|
|
i = dot
|
|
else:
|
|
split = min(dot, bracket)
|
|
parts.append(path[i:split])
|
|
i = split
|
|
|
|
current = current_obj
|
|
for part in parts:
|
|
if part.startswith('[') and part.endswith(']'):
|
|
# 数组索引
|
|
try:
|
|
idx = int(part[1:-1])
|
|
if isinstance(current, list) and idx < len(current):
|
|
current = current[idx]
|
|
else:
|
|
return ""
|
|
except (ValueError, IndexError):
|
|
return ""
|
|
else:
|
|
# 字典键
|
|
if isinstance(current, dict) and part in current:
|
|
current = current[part]
|
|
else:
|
|
return ""
|
|
|
|
# 处理列表,转为逗号分隔的字符串
|
|
if isinstance(current, list):
|
|
return ", ".join(str(x) for x in current)
|
|
|
|
return current
|
|
|
|
for header in headers:
|
|
result[header] = get_value(header, obj)
|
|
|
|
return result
|
|
|
|
def main():
|
|
# 测试 JSON
|
|
json_str = '''
|
|
{
|
|
"name": "张三",
|
|
"contact": {
|
|
"email": "zhang@example.com",
|
|
"phone": "13800138000"
|
|
},
|
|
"tags": ["A", "B"]
|
|
}
|
|
'''
|
|
|
|
data = json.loads(json_str)
|
|
headers = sorted(extract_headers(data))
|
|
|
|
# 生成 CSV 文件
|
|
csv_filename = "/root/.openclaw/workspace/code-generate/json_headers.csv"
|
|
|
|
with open(csv_filename, 'w', newline='', encoding='utf-8-sig') as csvfile:
|
|
writer = csv.DictWriter(csvfile, fieldnames=headers)
|
|
writer.writeheader()
|
|
|
|
# 如果是数组,处理每一行;否则处理单个对象
|
|
if isinstance(data, list):
|
|
for item in data:
|
|
row = extract_values(item, headers)
|
|
writer.writerow(row)
|
|
else:
|
|
row = extract_values(data, headers)
|
|
writer.writerow(row)
|
|
|
|
print(f"✅ CSV 文件已生成:{csv_filename}")
|
|
print(f"\n📊 表头:{', '.join(headers)}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|