Coverage for src/oai/protocol.py: 82%
222 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-05 09:56 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-05 09:56 +0000
1import time
2from datetime import datetime
4from django.conf import settings
5from django.shortcuts import render
6from django.template import RequestContext
7from django.utils.html import escape
9CONTENT_TYPE = "text/xml; charset=UTF-8"
12class BadArgument(Exception):
13 pass
16class BadResumptionToken(Exception):
17 pass
20class CanNotDisseminateFormat(Exception):
21 pass
24class NoRecordsMatch(Exception):
25 pass
28class OAIRequest:
29 def __init__(self, rq, baseurl, argdescr):
30 self.rq = rq
31 self.baseurl = baseurl
32 self.argdescr = argdescr
34 def __str__(self):
35 attrs = "".join(
36 [
37 f' {key}="{escape(value)}"'
38 for key, value in self.rq.items()
39 if key == "verb" or key in self.argdescr
40 ]
41 )
42 return f"<request{attrs}>{self.baseurl}</request>"
45class Request:
46 def __init__(self, verb, argdescr, repository):
47 self._verb = verb
48 self._argdescr = argdescr
49 self._repository = repository
50 self._exclusive = None
51 for aname, atype in argdescr.items():
52 if atype == "exclusive":
53 self._exclusive = aname
55 def _processArgs(self, form):
56 argdescr = self._argdescr
57 for key, value in form.items():
58 if key == "verb":
59 continue
60 if key not in argdescr: 60 ↛ 61line 60 didn't jump to line 61 because the condition on line 60 was never true
61 raise BadArgument("Request contains illegal arguments")
62 if isinstance(value, list): 62 ↛ 63line 62 didn't jump to line 63 because the condition on line 62 was never true
63 raise BadArgument("Request contains repeated arguments")
64 if self._exclusive and self._exclusive in form:
65 if len(form.keys()) > 2: 65 ↛ 66line 65 didn't jump to line 66 because the condition on line 65 was never true
66 raise BadArgument("Request contains extra arguments")
67 else:
68 for aname, atype in argdescr.items():
69 if atype == "required":
70 if aname not in form: 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true
71 raise BadArgument("Required argument missing")
72 self._validate(form)
74 def _validate(self, request):
75 pass
77 def error_response(self, code, mess):
78 self.context["code"] = code
79 if code in ["BadArgument", "BadResumptionToken", "badVerb", "CanNotDisseminateFormat"]:
80 status = 405
81 if code in ["idDoesNotExist", "NoRecordsMatch", "noMetadataFormats", "noSetHierarchy"]:
82 status = 200
83 self.context["message"] = mess
84 return render(
85 self.request,
86 template_name="oai/error.html",
87 context=self.context,
88 status=status,
89 content_type=CONTENT_TYPE,
90 )
92 def response(self, template):
93 return render(
94 request=self.request,
95 template_name=template,
96 context=self.context,
97 content_type=CONTENT_TYPE,
98 )
100 def __call__(self, request):
101 context = {}
102 baseurl = self._repository.base_url
103 granularity = self._repository.Identify()["granularity"]
104 context["baseurl"] = baseurl
105 granularity_format = "Y-m-d"
106 format_date = "%Y-%m-%d"
107 if len(granularity) > 10: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true
108 granularity_format = granularity_format + r"\TH:i:sZ"
109 format_date = "%Y-%m-%dT%H:%M:%S.%fZ"
110 context["granularity"] = granularity_format
111 context["now"] = datetime.now().strftime(format_date)
112 context["oairequest"] = OAIRequest(request.REQUEST, baseurl, self._argdescr)
114 granularity = self._repository.Identify()["granularity"]
115 granularity_format = "Y-m-d"
116 format_date = "%Y-%m-%d"
117 if len(granularity) > 10: 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true
118 granularity_format = granularity_format + r"\TH:i:sZ"
119 format_date = "%Y-%m-%dT%H:%M:%S.%fZ"
120 context["granularity"] = granularity_format
121 context["now"] = datetime.now().strftime(format_date)
123 rq = RequestContext(request)
124 self.context = context
125 self.request = request
126 self.rq = rq
127 try:
128 self._processArgs(request.REQUEST)
129 except BadArgument as mess:
130 return self.error_response("BadArgument", mess)
131 except BadResumptionToken as mess:
132 return self.error_response("BadResumptionToken", mess)
133 except CanNotDisseminateFormat as mess:
134 return self.error_response("CanNotDisseminateFormat", mess)
135 except NoRecordsMatch as mess:
136 return self.error_response("NoRecordsMatch", mess)
137 return self.real_run()
139 def real_run(self):
140 pass
143class IdentifyRequest(Request):
144 def real_run(self):
145 identify = self._repository.Identify()
146 self.context["identify"] = identify
147 self.context["granularity"] = identify["granularity"]
148 return self.response("oai/identify.html")
151class ListMetadataFormatsRequest(Request):
152 def real_run(self):
153 ident = None
154 if "identifier" in self.request.REQUEST:
155 if not self._repository.has_identifier(self.request.REQUEST["identifier"]): 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true
156 return self.error_response("idDoesNotExist", "identifier maps to no item")
157 ident = self.request.REQUEST["identifier"]
158 formats = self._repository.listmetadataformats(ident)
159 self.context["formats"] = formats
160 return self.response("oai/listmetadataformats.html")
163# from ptf import models
166class GetRecordRequest(Request):
167 def real_run(self):
168 px = self.request.REQUEST["metadataPrefix"]
169 id = self.request.REQUEST["identifier"]
170 if not self._repository.has_identifier(id):
171 return self.error_response("idDoesNotExist", "identifier maps to no item")
172 else:
173 self._repository.setmetaDataFormat(px)
174 rec = self._repository.get(id)
175 for res in rec:
176 result = res
177 item = result["item"]
178 if not self._repository.has_format(px, item):
179 return self.error_response(
180 "CanNotDisseminateFormat", "This server does not support the requested format"
181 )
183 self.context["record"] = result
184 self.context["base_url"] = settings.SITE_DOMAIN
185 # self.context['include_template'] = template
186 self.context["is_authorized"] = is_authorized(self.request)
187 return self.response("oai/getrecord.xml")
190class ListSetsRequest(Request):
191 def _validate(self, _dict):
192 if "resumptionToken" in _dict: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true
193 raise BadResumptionToken("Invalid resumption token")
195 def real_run(self):
196 sets = self._repository.listsets()
197 if not sets: 197 ↛ 198line 197 didn't jump to line 198 because the condition on line 197 was never true
198 return self.error_response("noSetHierarchy", "This repository does not support sets")
199 self.context["sets"] = sets
200 return self.response("oai/listsets.html")
203class ListRequest(Request):
204 def _validate(self, request):
205 Request._validate(self, request)
206 if "set" in request:
207 set = request["set"]
208 if not self._repository.has_set(set):
209 raise BadArgument("Unknown set specification")
210 px = request["metadataPrefix"]
211 if not self._repository.has_format(px, None, set): 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 raise CanNotDisseminateFormat(
213 f"This server does not support the requested format for set {set}"
214 )
215 self._repository.setSet(set)
216 else:
217 set = None
218 fromdate = untildate = ""
219 if "from" in request:
220 datestr = request["from"]
221 fromdate = datestr
222 _list = datestr.split("T")
223 if len(_list) > 1: 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true
224 raise BadArgument("This repository only supports YYYY-MM-DD granularity")
225 try:
226 time.strptime(datestr, "%Y-%m-%d")
227 except ValueError:
228 raise BadArgument("Invalid from date")
229 self._repository.setfromDate(fromdate)
230 if "until" in request:
231 datestr = request["until"]
232 untildate = datestr
233 _list = datestr.split("T")
234 if len(_list) > 1: 234 ↛ 235line 234 didn't jump to line 235 because the condition on line 234 was never true
235 raise BadArgument("This repository only supports YYYY-MM-DD granularity")
236 try:
237 time.strptime(datestr, "%Y-%m-%d")
238 except ValueError:
239 raise BadArgument("Invalid until date")
241 self._repository.setuntilDate(untildate)
242 if fromdate and untildate:
243 if fromdate > untildate: 243 ↛ 244line 243 didn't jump to line 244 because the condition on line 243 was never true
244 raise BadArgument("Invalid from date and until date combination")
245 if untildate:
246 edt = self._repository.Identify()["earliest_datestamp"]
247 if untildate < edt: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true
248 raise NoRecordsMatch("Nothing to list")
249 if "resumptionToken" in request:
250 token = request["resumptionToken"]
251 try:
252 success = self._repository.setresumptionToken(token)
253 except Exception:
254 raise BadResumptionToken("Invalid resumption token")
255 else:
256 if not success: 256 ↛ 257line 256 didn't jump to line 257 because the condition on line 256 was never true
257 raise BadResumptionToken("Invalid resumption token")
258 else:
259 px = request["metadataPrefix"]
260 if not self._repository.has_format(px): # TODO : duplique ?
261 raise CanNotDisseminateFormat("This server does not support the requested format")
262 self._repository.setmetaDataFormat(px)
263 self.context["by_date_published"] = getattr(settings, "OAI_BY_DATE_PUBLISHED", False)
266class ListIdsRequest(ListRequest):
267 def real_run(self):
268 result = self._repository.listids()
269 if result.total == 0:
270 return self.error_response("NoRecordsMatch", "Nothing to list")
271 self.context["records"] = result
273 return self.response("oai/listidentifiers.xml")
276class ListRecordsRequest(ListRequest):
277 def real_run(self):
278 result = self._repository.listrecs()
279 if result.total == 0:
280 return self.error_response("NoRecordsMatch", "Nothing to list")
281 self.context["records"] = result
282 # self.context['include_template'] = template
283 self.context["base_url"] = settings.SITE_DOMAIN
284 self.context["is_authorized"] = is_authorized(self.request)
285 return self.response("oai/listrecords.xml")
288argdescrs = {
289 "Identify": (
290 {},
291 IdentifyRequest,
292 ),
293 "ListMetadataFormats": (
294 {"identifier": "optional"},
295 ListMetadataFormatsRequest,
296 ),
297 "GetRecord": (
298 {"identifier": "required", "metadataPrefix": "required"},
299 GetRecordRequest,
300 ),
301 "ListSets": (
302 {"resumptionToken": "exclusive"},
303 ListSetsRequest,
304 ),
305 "ListIdentifiers": (
306 {
307 "from": "optional",
308 "until": "optional",
309 "metadataPrefix": "required",
310 "set": "optional",
311 "resumptionToken": "exclusive",
312 },
313 ListIdsRequest,
314 ),
315 "ListRecords": (
316 {
317 "from": "optional",
318 "until": "optional",
319 "set": "optional",
320 "resumptionToken": "exclusive",
321 "metadataPrefix": "required",
322 },
323 ListRecordsRequest,
324 ),
325}
328def is_authorized(request):
329 x_forwarded_for = request.headers.get("x-forwarded-for")
330 if x_forwarded_for: 330 ↛ 331line 330 didn't jump to line 331 because the condition on line 330 was never true
331 ip = x_forwarded_for.split(",")[0]
332 else:
333 ip = request.META.get("REMOTE_ADDR")
334 if hasattr(settings, "EUDML_AUTHORIZED_IP") and ip in settings.EUDML_AUTHORIZED_IP: 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true
335 return True
336 return False