diff --git a/engines/shaarli.py b/engines/shaarli.py new file mode 100644 index 0000000..8735021 --- /dev/null +++ b/engines/shaarli.py @@ -0,0 +1,142 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later + +from collections.abc import Iterable +from json import loads +from urllib.parse import urlencode,quote +from searx.utils import to_string, html_to_text +import calendar +import time +import base64 +import jwt + +base_url = '' +visibility = 'private' +content_html_to_text = False +title_html_to_text = False +paging = False +suggestion_query = '' +shaarli_api_secret = '' + + +cookies = {} +headers = { + 'Authorization': '' +} +'''Some engines might offer different result based on cookies or headers. +Possible use-case: To set safesearch cookie or header to moderate.''' + +# parameters for engines with paging support +# +# number of results on each page +# (only needed if the site requires not a page number, but an offset) +page_size = 1 +# number of the first page (usually 0 or 1) +first_page_num = 1 + +def iterate(iterable): + if type(iterable) == dict: + it = iterable.items() + + else: + it = enumerate(iterable) + for index, value in it: + yield str(index), value + + +def is_iterable(obj): + if type(obj) == str: + return False + return isinstance(obj, Iterable) + + +def parse(query): + q = [] + for part in query.split('/'): + if part == '': + continue + else: + q.append(part) + return q + + +def do_query(data, q): + ret = [] + if not q: + return ret + + qkey = q[0] + + for key, value in iterate(data): + + if len(q) == 1: + if key == qkey: + ret.append(value) + elif is_iterable(value): + ret.extend(do_query(value, q)) + else: + if not is_iterable(value): + continue + if key == qkey: + ret.extend(do_query(value, q[1:])) + else: + ret.extend(do_query(value, q)) + return ret + + +def query(data, query_string): + q = parse(query_string) + + return do_query(data, q) + + +def request(query, params): + # 生成 shaarli 认证token + encoded_token = jwt.encode( + {'iat': calendar.timegm(time.gmtime())}, + shaarli_api_secret, + algorithm='HS512', + ) + + search_url = base_url + '/api/v1/links?offset={pageno}&searchterm={query}&searchtags={tag}' + '&visibility=%s&limit=%d' % (visibility, page_size) + + headers['Authorization'] = 'Bearer %s' % encoded_token + query = (urlencode({'q': query})[2:]).split(quote('#')) + + fp = {'query': query[0], 'tag': '+'.join(query[1:]) if len(query) > 1 else ''} + if paging and search_url.find('{pageno}') >= 0: + fp['pageno'] = (params['pageno'] - 1) * page_size + + params['cookies'].update(cookies) + params['headers'].update(headers) + + params['url'] = search_url.format(**fp) + params['query'] = query[0] + + return params + + +def identity(arg): + return arg + + +def response(resp): + results = [] + json = loads(resp.text) + + title_filter = html_to_text if title_html_to_text else identity + content_filter = html_to_text if content_html_to_text else identity + + for url, title, content in zip(query(json, 'url'), query(json, 'title'), query(json, 'description')): + results.append( + { + 'url': to_string(url), + 'title': title_filter(to_string(title)), + 'content': content_filter(to_string(content)), + } + ) + + if not suggestion_query: + return results + for suggestion in query(json, suggestion_query): + results.append({'suggestion': suggestion}) + return results