Coverage for src/oai/protocol.py: 82%

222 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-05 09:56 +0000

1import time 

2from datetime import datetime 

3 

4from django.conf import settings 

5from django.shortcuts import render 

6from django.template import RequestContext 

7from django.utils.html import escape 

8 

9CONTENT_TYPE = "text/xml; charset=UTF-8" 

10 

11 

12class BadArgument(Exception): 

13 pass 

14 

15 

16class BadResumptionToken(Exception): 

17 pass 

18 

19 

20class CanNotDisseminateFormat(Exception): 

21 pass 

22 

23 

24class NoRecordsMatch(Exception): 

25 pass 

26 

27 

28class OAIRequest: 

29 def __init__(self, rq, baseurl, argdescr): 

30 self.rq = rq 

31 self.baseurl = baseurl 

32 self.argdescr = argdescr 

33 

34 def __str__(self): 

35 attrs = "".join( 

36 [ 

37 f' {key}="{escape(value)}"' 

38 for key, value in self.rq.items() 

39 if key == "verb" or key in self.argdescr 

40 ] 

41 ) 

42 return f"<request{attrs}>{self.baseurl}</request>" 

43 

44 

45class Request: 

46 def __init__(self, verb, argdescr, repository): 

47 self._verb = verb 

48 self._argdescr = argdescr 

49 self._repository = repository 

50 self._exclusive = None 

51 for aname, atype in argdescr.items(): 

52 if atype == "exclusive": 

53 self._exclusive = aname 

54 

55 def _processArgs(self, form): 

56 argdescr = self._argdescr 

57 for key, value in form.items(): 

58 if key == "verb": 

59 continue 

60 if key not in argdescr: 60 ↛ 61line 60 didn't jump to line 61 because the condition on line 60 was never true

61 raise BadArgument("Request contains illegal arguments") 

62 if isinstance(value, list): 62 ↛ 63line 62 didn't jump to line 63 because the condition on line 62 was never true

63 raise BadArgument("Request contains repeated arguments") 

64 if self._exclusive and self._exclusive in form: 

65 if len(form.keys()) > 2: 65 ↛ 66line 65 didn't jump to line 66 because the condition on line 65 was never true

66 raise BadArgument("Request contains extra arguments") 

67 else: 

68 for aname, atype in argdescr.items(): 

69 if atype == "required": 

70 if aname not in form: 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true

71 raise BadArgument("Required argument missing") 

72 self._validate(form) 

73 

74 def _validate(self, request): 

75 pass 

76 

77 def error_response(self, code, mess): 

78 self.context["code"] = code 

79 if code in ["BadArgument", "BadResumptionToken", "badVerb", "CanNotDisseminateFormat"]: 

80 status = 405 

81 if code in ["idDoesNotExist", "NoRecordsMatch", "noMetadataFormats", "noSetHierarchy"]: 

82 status = 200 

83 self.context["message"] = mess 

84 return render( 

85 self.request, 

86 template_name="oai/error.html", 

87 context=self.context, 

88 status=status, 

89 content_type=CONTENT_TYPE, 

90 ) 

91 

92 def response(self, template): 

93 return render( 

94 request=self.request, 

95 template_name=template, 

96 context=self.context, 

97 content_type=CONTENT_TYPE, 

98 ) 

99 

100 def __call__(self, request): 

101 context = {} 

102 baseurl = self._repository.base_url 

103 granularity = self._repository.Identify()["granularity"] 

104 context["baseurl"] = baseurl 

105 granularity_format = "Y-m-d" 

106 format_date = "%Y-%m-%d" 

107 if len(granularity) > 10: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true

108 granularity_format = granularity_format + r"\TH:i:sZ" 

109 format_date = "%Y-%m-%dT%H:%M:%S.%fZ" 

110 context["granularity"] = granularity_format 

111 context["now"] = datetime.now().strftime(format_date) 

112 context["oairequest"] = OAIRequest(request.REQUEST, baseurl, self._argdescr) 

113 

114 granularity = self._repository.Identify()["granularity"] 

115 granularity_format = "Y-m-d" 

116 format_date = "%Y-%m-%d" 

117 if len(granularity) > 10: 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true

118 granularity_format = granularity_format + r"\TH:i:sZ" 

119 format_date = "%Y-%m-%dT%H:%M:%S.%fZ" 

120 context["granularity"] = granularity_format 

121 context["now"] = datetime.now().strftime(format_date) 

122 

123 rq = RequestContext(request) 

124 self.context = context 

125 self.request = request 

126 self.rq = rq 

127 try: 

128 self._processArgs(request.REQUEST) 

129 except BadArgument as mess: 

130 return self.error_response("BadArgument", mess) 

131 except BadResumptionToken as mess: 

132 return self.error_response("BadResumptionToken", mess) 

133 except CanNotDisseminateFormat as mess: 

134 return self.error_response("CanNotDisseminateFormat", mess) 

135 except NoRecordsMatch as mess: 

136 return self.error_response("NoRecordsMatch", mess) 

137 return self.real_run() 

138 

139 def real_run(self): 

140 pass 

141 

142 

143class IdentifyRequest(Request): 

144 def real_run(self): 

145 identify = self._repository.Identify() 

146 self.context["identify"] = identify 

147 self.context["granularity"] = identify["granularity"] 

148 return self.response("oai/identify.html") 

149 

150 

151class ListMetadataFormatsRequest(Request): 

152 def real_run(self): 

153 ident = None 

154 if "identifier" in self.request.REQUEST: 

155 if not self._repository.has_identifier(self.request.REQUEST["identifier"]): 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true

156 return self.error_response("idDoesNotExist", "identifier maps to no item") 

157 ident = self.request.REQUEST["identifier"] 

158 formats = self._repository.listmetadataformats(ident) 

159 self.context["formats"] = formats 

160 return self.response("oai/listmetadataformats.html") 

161 

162 

163# from ptf import models 

164 

165 

166class GetRecordRequest(Request): 

167 def real_run(self): 

168 px = self.request.REQUEST["metadataPrefix"] 

169 id = self.request.REQUEST["identifier"] 

170 if not self._repository.has_identifier(id): 

171 return self.error_response("idDoesNotExist", "identifier maps to no item") 

172 else: 

173 self._repository.setmetaDataFormat(px) 

174 rec = self._repository.get(id) 

175 for res in rec: 

176 result = res 

177 item = result["item"] 

178 if not self._repository.has_format(px, item): 

179 return self.error_response( 

180 "CanNotDisseminateFormat", "This server does not support the requested format" 

181 ) 

182 

183 self.context["record"] = result 

184 self.context["base_url"] = settings.SITE_DOMAIN 

185 # self.context['include_template'] = template 

186 self.context["is_authorized"] = is_authorized(self.request) 

187 return self.response("oai/getrecord.xml") 

188 

189 

190class ListSetsRequest(Request): 

191 def _validate(self, _dict): 

192 if "resumptionToken" in _dict: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true

193 raise BadResumptionToken("Invalid resumption token") 

194 

195 def real_run(self): 

196 sets = self._repository.listsets() 

197 if not sets: 197 ↛ 198line 197 didn't jump to line 198 because the condition on line 197 was never true

198 return self.error_response("noSetHierarchy", "This repository does not support sets") 

199 self.context["sets"] = sets 

200 return self.response("oai/listsets.html") 

201 

202 

203class ListRequest(Request): 

204 def _validate(self, request): 

205 Request._validate(self, request) 

206 if "set" in request: 

207 set = request["set"] 

208 if not self._repository.has_set(set): 

209 raise BadArgument("Unknown set specification") 

210 px = request["metadataPrefix"] 

211 if not self._repository.has_format(px, None, set): 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 raise CanNotDisseminateFormat( 

213 f"This server does not support the requested format for set {set}" 

214 ) 

215 self._repository.setSet(set) 

216 else: 

217 set = None 

218 fromdate = untildate = "" 

219 if "from" in request: 

220 datestr = request["from"] 

221 fromdate = datestr 

222 _list = datestr.split("T") 

223 if len(_list) > 1: 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true

224 raise BadArgument("This repository only supports YYYY-MM-DD granularity") 

225 try: 

226 time.strptime(datestr, "%Y-%m-%d") 

227 except ValueError: 

228 raise BadArgument("Invalid from date") 

229 self._repository.setfromDate(fromdate) 

230 if "until" in request: 

231 datestr = request["until"] 

232 untildate = datestr 

233 _list = datestr.split("T") 

234 if len(_list) > 1: 234 ↛ 235line 234 didn't jump to line 235 because the condition on line 234 was never true

235 raise BadArgument("This repository only supports YYYY-MM-DD granularity") 

236 try: 

237 time.strptime(datestr, "%Y-%m-%d") 

238 except ValueError: 

239 raise BadArgument("Invalid until date") 

240 

241 self._repository.setuntilDate(untildate) 

242 if fromdate and untildate: 

243 if fromdate > untildate: 243 ↛ 244line 243 didn't jump to line 244 because the condition on line 243 was never true

244 raise BadArgument("Invalid from date and until date combination") 

245 if untildate: 

246 edt = self._repository.Identify()["earliest_datestamp"] 

247 if untildate < edt: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true

248 raise NoRecordsMatch("Nothing to list") 

249 if "resumptionToken" in request: 

250 token = request["resumptionToken"] 

251 try: 

252 success = self._repository.setresumptionToken(token) 

253 except Exception: 

254 raise BadResumptionToken("Invalid resumption token") 

255 else: 

256 if not success: 256 ↛ 257line 256 didn't jump to line 257 because the condition on line 256 was never true

257 raise BadResumptionToken("Invalid resumption token") 

258 else: 

259 px = request["metadataPrefix"] 

260 if not self._repository.has_format(px): # TODO : duplique ? 

261 raise CanNotDisseminateFormat("This server does not support the requested format") 

262 self._repository.setmetaDataFormat(px) 

263 self.context["by_date_published"] = getattr(settings, "OAI_BY_DATE_PUBLISHED", False) 

264 

265 

266class ListIdsRequest(ListRequest): 

267 def real_run(self): 

268 result = self._repository.listids() 

269 if result.total == 0: 

270 return self.error_response("NoRecordsMatch", "Nothing to list") 

271 self.context["records"] = result 

272 

273 return self.response("oai/listidentifiers.xml") 

274 

275 

276class ListRecordsRequest(ListRequest): 

277 def real_run(self): 

278 result = self._repository.listrecs() 

279 if result.total == 0: 

280 return self.error_response("NoRecordsMatch", "Nothing to list") 

281 self.context["records"] = result 

282 # self.context['include_template'] = template 

283 self.context["base_url"] = settings.SITE_DOMAIN 

284 self.context["is_authorized"] = is_authorized(self.request) 

285 return self.response("oai/listrecords.xml") 

286 

287 

288argdescrs = { 

289 "Identify": ( 

290 {}, 

291 IdentifyRequest, 

292 ), 

293 "ListMetadataFormats": ( 

294 {"identifier": "optional"}, 

295 ListMetadataFormatsRequest, 

296 ), 

297 "GetRecord": ( 

298 {"identifier": "required", "metadataPrefix": "required"}, 

299 GetRecordRequest, 

300 ), 

301 "ListSets": ( 

302 {"resumptionToken": "exclusive"}, 

303 ListSetsRequest, 

304 ), 

305 "ListIdentifiers": ( 

306 { 

307 "from": "optional", 

308 "until": "optional", 

309 "metadataPrefix": "required", 

310 "set": "optional", 

311 "resumptionToken": "exclusive", 

312 }, 

313 ListIdsRequest, 

314 ), 

315 "ListRecords": ( 

316 { 

317 "from": "optional", 

318 "until": "optional", 

319 "set": "optional", 

320 "resumptionToken": "exclusive", 

321 "metadataPrefix": "required", 

322 }, 

323 ListRecordsRequest, 

324 ), 

325} 

326 

327 

328def is_authorized(request): 

329 x_forwarded_for = request.headers.get("x-forwarded-for") 

330 if x_forwarded_for: 330 ↛ 331line 330 didn't jump to line 331 because the condition on line 330 was never true

331 ip = x_forwarded_for.split(",")[0] 

332 else: 

333 ip = request.META.get("REMOTE_ADDR") 

334 if hasattr(settings, "EUDML_AUTHORIZED_IP") and ip in settings.EUDML_AUTHORIZED_IP: 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true

335 return True 

336 return False