nix-config/hosts/hedgedoc/hedgedoc-util-postgres.diff

167 lines
8.0 KiB
Diff

--- a/files/hedgedoc-util.py 2024-02-03 23:19:47.720921062 +0100
+++ b/files/hedgedoc-util.py 2024-02-03 23:19:42.926928390 +0100
@@ -7,9 +7,7 @@
from subprocess import Popen, PIPE
import click
-import pymysql
-import pymysql.cursors
-import configparser
+import psycopg2
class GlobalState():
def __init__(self, options):
@@ -21,23 +19,17 @@
except Exception as e:
click.echo("Database connection failed: {}".format(repr(e)))
sys.exit(2)
- self._check_schema()
def _get_connection(self):
- return pymysql.connect(host=self.config['dbhost'],
- user=self.config['dbuser'],
- password=self.config['dbpw'],
- database=self.config['dbname'],
- charset='utf8mb4',
- cursorclass=pymysql.cursors.DictCursor)
+ dsn = "host="+self.config['dbhost']
+ if 'dbuser' in self.config:
+ dsn += "user="+self.config['dbuser']
+ if 'dbpw' in self.config:
+ dsn += "password="+self.config['dbpw']
+ if 'dbname' in self.config:
+ dsn += "dbname="+self.config['dbname']
- def _check_schema(self):
- with self.db.cursor() as cursor:
- cursor.execute('SELECT name from SequelizeMeta ORDER BY name ASC')
- schema = ','.join([i['name'] for i in cursor.fetchall()])
- if schema != self.config['dbschema']:
- click.echo("Unsupportet db schema: {}".format(schema))
- sys.exit(2)
+ return psycopg2.connect(dsn)
def _load_config(self, path):
result = {}
@@ -69,6 +61,9 @@
def _decode_nested_json(data, fieldnames):
for i in data:
+ if i == None:
+ continue
+
for fieldname in fieldnames:
if fieldname in i:
if i[fieldname] == None:
@@ -84,34 +79,40 @@
def note_id_encode_to_url(input_id):
return base64.urlsafe_b64encode(binascii.unhexlify(input_id.replace('-', '').encode())).decode().replace('=', '')
-def pad_list(db, columns, last_change_older=0, owner=0):
+def pad_list(db, columns, last_change_older, owner):
+ if owner == None:
+ owner = "00000000-0000-0000-0000-000000000000"
with db.cursor() as cursor:
# this is no sql injection vulnerability because we let click verify the content of "columns" to match a whitelist
- cursor.execute(F"SELECT {','.join(columns)} FROM Notes WHERE (%(last_change_older)s = 0 OR DATEDIFF(NOW(), lastchangeAt) > %(last_change_older)s OR (lastchangeAt IS NULL AND DATEDIFF(NOW(), createdAt) > %(last_change_older)s) AND (%(owner)s = '' OR ownerId = %(owner)s)) ORDER BY id", {'last_change_older': last_change_older, 'owner': owner})
+ cursor.execute(F"SELECT {','.join(columns)} FROM \"Notes\" WHERE (%(last_change_older)s = 0 OR NOW() - \"lastchangeAt\" > interval '%(last_change_older)s' OR (\"lastchangeAt\" IS NULL AND NOW() - \"createdAt\" > interval '%(last_change_older)s') AND (%(owner)s = '' OR \"ownerId\" = %(owner)s::uuid)) ORDER BY id", {'last_change_older': last_change_older, 'owner': owner})
return _decode_nested_json(cursor.fetchall(), ['authorship'])
def pad_get(db, id):
with db.cursor() as cursor:
- cursor.execute('SELECT * FROM Notes WHERE id=%s', (id))
+ cursor.execute('SELECT * FROM "Notes" WHERE id=%s', (id,))
return _decode_nested_json([cursor.fetchone()], ['authorship'])[0]
def pad_get_content(db, id):
return pad_get(db, id).get('content', '')
-def pad_delete(db, id):
- pad = pad_get(db, id)
- urlid = note_id_encode_to_url(id)
- with db:
- with db.cursor() as cursor:
- cursor.execute('DELETE FROM Revisions WHERE noteId=%s', (id))
- cursor.execute('DELETE FROM Notes WHERE id=%s', (id))
- cursor.execute('SELECT id,history FROM Users WHERE JSON_SEARCH(history, "one", %s, "", "$[*].id") is not null;', (urlid))
- with db.cursor() as usercursor:
- for i in cursor:
- history = json.loads(i['history'] or '[]')
- history = [ j for j in history if not j.get('id') == urlid ]
- usercursor.execute('UPDATE Users set history=%s WHERE id=%s;', (json.dumps(history), i['id']))
- db.commit()
+def pad_delete(db, ids):
+ for id in ids:
+ if id == "--":
+ continue
+
+ pad = pad_get(db, id)
+ urlid = note_id_encode_to_url(id)
+ with db:
+ with db.cursor() as cursor:
+ cursor.execute('DELETE FROM "Revisions" WHERE "noteId"=%s', (id,))
+ cursor.execute('DELETE FROM "Notes" WHERE id=%s', (id,))
+ cursor.execute('SELECT id,history FROM "Users" u, json_array_elements(u.history::json) h WHERE h->>\'id\'=%s is not null;', (urlid,))
+ with db.cursor() as usercursor:
+ for i in cursor:
+ history = json.loads(i[1] or '[]')
+ history = [ j for j in history if not j.get('id') == urlid ]
+ usercursor.execute('UPDATE "Users" set history=%s WHERE id=%s;', (json.dumps(history), i[0]))
+ db.commit()
def pad_mail(db, id, template, formats):
with db.cursor() as cursor:
@@ -144,12 +145,12 @@
def user_list(db, columns):
with db.cursor() as cursor:
# this is no sql injection vulnerability because we let click verify the content of "columns" to match a whitelist
- cursor.execute('SELECT {} FROM Users ORDER BY id'.format(','.join(columns)))
+ cursor.execute('SELECT {} FROM "Users" ORDER BY id'.format(','.join(columns)))
return _decode_nested_json(cursor.fetchall(), ['profile', 'history'])
def user_get(db, id):
with db.cursor() as cursor:
- cursor.execute('SELECT * FROM Users WHERE id=%s', (id))
+ cursor.execute('SELECT * FROM "Users" WHERE id=%s', (id,))
return _decode_nested_json([cursor.fetchone()], ['profile', 'history'])[0]
def user_get_mail(db, id):
@@ -192,12 +193,11 @@
@click.group()
@click.option('-o', '--output', type=click.Choice(['text', 'json', 'tsv', 'tsv-noheader']), default='text', help='Select output format', show_default=True, show_envvar=True)
-@click.option('--config', default='/usr/local/etc/hedgedoc-util/hedgedoc-util.cfg', type=click.Path(), help='Config to load db and template default settings from', show_envvar=True, show_default=True)
+@click.option('--config', default='/etc/hedgedoc-util/hedgedoc-util.cfg', type=click.Path(), help='Config to load db and template default settings from', show_envvar=True, show_default=True)
@click.option('--dbuser', help='User name used for the db connection', show_envvar=True)
@click.option('--dbpw', help='Password used for the db connection', show_envvar=True)
@click.option('--dbname', help='Database used', show_envvar=True)
-@click.option('--dbhost', help='Host the db is running on', show_envvar=True)
-@click.option('--dbschema', help='Schema string to verify the db schema against', show_envvar=True)
+@click.option('--dbhost', help='Host the db is running on', default='/run/postgresql/', show_envvar=True)
@click.pass_context
def cli(ctx, **kwargs):
ctx.obj = GlobalState(kwargs)
@@ -219,7 +219,7 @@
@cli_pad.command(name="list", help="List all pads")
@click.option('-c', '--columns', default=['id'], type=click.Choice(['id', 'title', 'content', 'ownerId', 'createdAt', 'updatedAt', 'shortid', 'permission', 'viewcount', 'lastchangeuserId', 'lastchangeAt', 'alias', 'deletedAt', 'authorship']), help="Select what data to display. Can be passed multiple times.", multiple=True, show_default=True, show_envvar=True)
@click.option('--last-change-older', type=click.INT, default=0, help='Only list those pads which are older than this value. In days.', show_envvar=True)
-@click.option('--owner', type=click.STRING, default='', help='Only list pads with this owner, pass the user id', show_envvar=True)
+@click.option('--owner', type=click.STRING, help='Only list pads with this owner, pass the user id', show_envvar=True)
@click.pass_obj
def _pad_list(obj, columns, last_change_older, owner):
output_object(pad_list(obj.db, columns, last_change_older=last_change_older, owner=owner))
@@ -231,10 +231,10 @@
output_object(pad_get(obj.db, id))
@cli_pad.command(name="delete", help="Deletes a pad")
-@click.argument('id')
+@click.argument('ids', nargs=-1)
@click.pass_obj
-def _pad_delete(obj, id):
- pad_delete(obj.db, id)
+def _pad_delete(obj, ids):
+ pad_delete(obj.db, ids)
@cli_pad.command(name="get-content", help="Get the content of one pad by its id")
@click.argument('id')