-
Notifications
You must be signed in to change notification settings - Fork 0
/
export_entries.py
111 lines (93 loc) · 2.94 KB
/
export_entries.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# Copyright (c) 2022-2024 Linh Pham
# podcast-bot is released under the terms of the MIT License
# SPDX-License-Identifier: MIT
#
# vim: set noai syntax=python ts=4 sw=4:
"""Export Podcast Feed Database Entries to a JSON File Script."""
import json
import sqlite3
import sys
from argparse import ArgumentParser, Namespace
from pathlib import Path
from sqlite3 import Connection
from typing import Any
def command_parse() -> Namespace:
"""Parse command arguments and options."""
parser: ArgumentParser = ArgumentParser(
description="Export entries from a podcast feed database to a JSON file."
)
parser.add_argument(
"--database",
"--db",
dest="db_file",
help="Path to the source podcast feed database file",
type=str,
required=True,
)
parser.add_argument(
"--json-file",
"--json",
dest="json_file",
help="Path to the destination podcast feed JSON file",
type=str,
required=True,
)
parser.add_argument(
"--podcast-name",
"--podcast",
dest="podcast_name",
help="Podcast name used to tag the exported entries",
type=str,
required=True,
)
return parser.parse_args()
def get_entries(db_file: str, podcast_name: str = None) -> list[dict[str, Any]] | None:
"""Retrieve entries from a podcast feed database file."""
db_file_path = Path(db_file)
if not db_file_path.exists():
print(f"ERROR: Podcast feed database file {db_file} not found.")
sys.exit(1)
database: Connection = sqlite3.connect(db_file)
database.row_factory = sqlite3.Row
cursor = database.execute(
"""SELECT guid, enclosure_url, processed FROM episodes ORDER BY processed ASC"""
)
records = cursor.fetchall()
cursor.close()
if not records:
return
entries = []
for record in records:
entries.append(
{
"podcast_name": (
podcast_name if podcast_name else record["podcast_name"]
),
"guid": record["guid"],
"enclosure_url": record["enclosure_url"],
"processed_date": record["processed"],
}
)
return entries
def export_json(entries: list[dict[str, Any]], json_file: str) -> None:
"""Export entries to a JSON file."""
if not entries:
return
json_file_path = Path(json_file)
with json_file_path.open(mode="wt", encoding="utf-8") as output_file:
json.dump(entries, output_file, sort_keys=False, indent=2)
return
def _main() -> None:
"""Script entry point."""
_command = command_parse()
_entries = get_entries(
db_file=_command.db_file,
podcast_name=_command.podcast_name,
)
if not _entries:
print("No entries to export.")
return
export_json(entries=_entries, json_file=_command.json_file)
return
if __name__ == "__main__":
_main()