1#!/usr/local/bin/python3
2#
3# Copyright (c) 2014 The FreeBSD Foundation
4# All rights reserved.
5#
6# This software was developed by John-Mark Gurney under
7# the sponsorship from the FreeBSD Foundation.
8# Redistribution and use in source and binary forms, with or without
9# modification, are permitted provided that the following conditions
10# are met:
11# 1.  Redistributions of source code must retain the above copyright
12#     notice, this list of conditions and the following disclaimer.
13# 2.  Redistributions in binary form must reproduce the above copyright
14#     notice, this list of conditions and the following disclaimer in the
15#     documentation and/or other materials provided with the distribution.
16#
17# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27# SUCH DAMAGE.
28#
29# $FreeBSD: stable/12/tests/sys/opencrypto/cryptotest.py 367527 2020-11-09 15:37:57Z markj $
30#
31
32
33
34import binascii
35import errno
36import cryptodev
37import itertools
38import os
39import struct
40import unittest
41from cryptodev import *
42from glob import iglob
43
44katdir = '/usr/local/share/nist-kat'
45
46def katg(base, glob):
47	assert os.path.exists(katdir), "Please 'pkg install nist-kat'"
48	if not os.path.exists(os.path.join(katdir, base)):
49		raise unittest.SkipTest("Missing %s test vectors" % (base))
50	return iglob(os.path.join(katdir, base, glob))
51
52aesmodules = [ 'cryptosoft0', 'aesni0', 'ccr0', 'ccp0', 'safexcel0', 'qat0' ]
53desmodules = [ 'cryptosoft0', ]
54shamodules = [ 'cryptosoft0', 'aesni0', 'ccr0', 'ccp0', 'safexcel0', 'qat0' ]
55
56def GenTestCase(cname):
57	try:
58		crid = cryptodev.Crypto.findcrid(cname)
59	except IOError:
60		return None
61
62	class GendCryptoTestCase(unittest.TestCase):
63		###############
64		##### AES #####
65		###############
66		@unittest.skipIf(cname not in aesmodules, 'skipping AES-XTS on %s' % (cname))
67		def test_xts(self):
68			for i in katg('XTSTestVectors/format tweak value input - data unit seq no', '*.rsp'):
69				self.runXTS(i, cryptodev.CRYPTO_AES_XTS)
70
71		@unittest.skipIf(cname not in aesmodules, 'skipping AES-CBC on %s' % (cname))
72		def test_cbc(self):
73			for i in katg('KAT_AES', 'CBC[GKV]*.rsp'):
74				self.runCBC(i)
75
76		@unittest.skipIf(cname not in aesmodules, 'skipping AES-CCM on %s' % (cname))
77		def test_ccm(self):
78			for i in katg('ccmtestvectors', 'V*.rsp'):
79				self.runCCMEncrypt(i)
80
81			for i in katg('ccmtestvectors', 'D*.rsp'):
82				self.runCCMDecrypt(i)
83
84		@unittest.skipIf(cname not in aesmodules, 'skipping AES-GCM on %s' % (cname))
85		def test_gcm(self):
86			for i in katg('gcmtestvectors', 'gcmEncrypt*'):
87				self.runGCM(i, 'ENCRYPT')
88
89			for i in katg('gcmtestvectors', 'gcmDecrypt*'):
90				self.runGCM(i, 'DECRYPT')
91
92		_gmacsizes = { 32: cryptodev.CRYPTO_AES_256_NIST_GMAC,
93			24: cryptodev.CRYPTO_AES_192_NIST_GMAC,
94			16: cryptodev.CRYPTO_AES_128_NIST_GMAC,
95		}
96		def runGCM(self, fname, mode):
97			curfun = None
98			if mode == 'ENCRYPT':
99				swapptct = False
100				curfun = Crypto.encrypt
101			elif mode == 'DECRYPT':
102				swapptct = True
103				curfun = Crypto.decrypt
104			else:
105				raise RuntimeError('unknown mode: %r' % repr(mode))
106
107			for bogusmode, lines in cryptodev.KATParser(fname,
108			    [ 'Count', 'Key', 'IV', 'CT', 'AAD', 'Tag', 'PT', ]):
109				for data in lines:
110					curcnt = int(data['Count'])
111					cipherkey = binascii.unhexlify(data['Key'])
112					iv = binascii.unhexlify(data['IV'])
113					aad = binascii.unhexlify(data['AAD'])
114					tag = binascii.unhexlify(data['Tag'])
115					if 'FAIL' not in data:
116						pt = binascii.unhexlify(data['PT'])
117					ct = binascii.unhexlify(data['CT'])
118
119					if len(iv) != 12:
120						# XXX - isn't supported
121						continue
122
123					try:
124						c = Crypto(cryptodev.CRYPTO_AES_NIST_GCM_16,
125						    cipherkey,
126						    mac=self._gmacsizes[len(cipherkey)],
127						    mackey=cipherkey, crid=crid,
128						    maclen=16)
129					except EnvironmentError as e:
130						# Can't test algorithms the driver does not support.
131						if e.errno != errno.EOPNOTSUPP:
132							raise
133						continue
134
135					if mode == 'ENCRYPT':
136						try:
137							rct, rtag = c.encrypt(pt, iv, aad)
138						except EnvironmentError as e:
139							# Can't test inputs the driver does not support.
140							if e.errno != errno.EINVAL:
141								raise
142							continue
143						rtag = rtag[:len(tag)]
144						data['rct'] = binascii.hexlify(rct)
145						data['rtag'] = binascii.hexlify(rtag)
146						self.assertEqual(rct, ct, repr(data))
147						self.assertEqual(rtag, tag, repr(data))
148					else:
149						if len(tag) != 16:
150							continue
151						args = (ct, iv, aad, tag)
152						if 'FAIL' in data:
153							self.assertRaises(IOError,
154								c.decrypt, *args)
155						else:
156							try:
157								rpt, rtag = c.decrypt(*args)
158							except EnvironmentError as e:
159								# Can't test inputs the driver does not support.
160								if e.errno != errno.EINVAL:
161									raise
162								continue
163							data['rpt'] = binascii.hexlify(rpt)
164							data['rtag'] = binascii.hexlify(rtag)
165							self.assertEqual(rpt, pt,
166							    repr(data))
167
168		def runCBC(self, fname):
169			curfun = None
170			for mode, lines in cryptodev.KATParser(fname,
171			    [ 'COUNT', 'KEY', 'IV', 'PLAINTEXT', 'CIPHERTEXT', ]):
172				if mode == 'ENCRYPT':
173					swapptct = False
174					curfun = Crypto.encrypt
175				elif mode == 'DECRYPT':
176					swapptct = True
177					curfun = Crypto.decrypt
178				else:
179					raise RuntimeError('unknown mode: %r' % repr(mode))
180
181				for data in lines:
182					curcnt = int(data['COUNT'])
183					cipherkey = binascii.unhexlify(data['KEY'])
184					iv = binascii.unhexlify(data['IV'])
185					pt = binascii.unhexlify(data['PLAINTEXT'])
186					ct = binascii.unhexlify(data['CIPHERTEXT'])
187
188					if swapptct:
189						pt, ct = ct, pt
190					# run the fun
191					c = Crypto(cryptodev.CRYPTO_AES_CBC, cipherkey, crid=crid)
192					r = curfun(c, pt, iv)
193					self.assertEqual(r, ct)
194
195		def runXTS(self, fname, meth):
196			curfun = None
197			for mode, lines in cryptodev.KATParser(fname,
198			    [ 'COUNT', 'DataUnitLen', 'Key', 'DataUnitSeqNumber', 'PT',
199			    'CT' ]):
200				if mode == 'ENCRYPT':
201					swapptct = False
202					curfun = Crypto.encrypt
203				elif mode == 'DECRYPT':
204					swapptct = True
205					curfun = Crypto.decrypt
206				else:
207					raise RuntimeError('unknown mode: %r' % repr(mode))
208
209				for data in lines:
210					curcnt = int(data['COUNT'])
211					nbits = int(data['DataUnitLen'])
212					cipherkey = binascii.unhexlify(data['Key'])
213					iv = struct.pack('QQ', int(data['DataUnitSeqNumber']), 0)
214					pt = binascii.unhexlify(data['PT'])
215					ct = binascii.unhexlify(data['CT'])
216
217					if nbits % 128 != 0:
218						# XXX - mark as skipped
219						continue
220					if swapptct:
221						pt, ct = ct, pt
222					# run the fun
223					try:
224						c = Crypto(meth, cipherkey, crid=crid)
225						r = curfun(c, pt, iv)
226					except EnvironmentError as e:
227						# Can't test hashes the driver does not support.
228						if e.errno != errno.EOPNOTSUPP:
229							raise
230						continue
231					self.assertEqual(r, ct)
232
233		def runCCMEncrypt(self, fname):
234			for data in cryptodev.KATCCMParser(fname):
235				Nlen = int(data['Nlen'])
236				if Nlen != 12:
237					# OCF only supports 12 byte IVs
238					continue
239				key = binascii.unhexlify(data['Key'])
240				nonce = binascii.unhexlify(data['Nonce'])
241				Alen = int(data['Alen'])
242				if Alen != 0:
243					aad = binascii.unhexlify(data['Adata'])
244				else:
245					aad = None
246				payload = binascii.unhexlify(data['Payload'])
247				ct = binascii.unhexlify(data['CT'])
248
249				try:
250					c = Crypto(crid=crid,
251					    cipher=cryptodev.CRYPTO_AES_CCM_16,
252					    key=key,
253					    mac=cryptodev.CRYPTO_AES_CCM_CBC_MAC,
254					    mackey=key, maclen=16)
255					r, tag = Crypto.encrypt(c, payload,
256					    nonce, aad)
257				except EnvironmentError as e:
258					if e.errno != errno.EOPNOTSUPP:
259						raise
260					continue
261
262				out = r + tag
263				self.assertEqual(out, ct,
264				    "Count " + data['Count'] + " Actual: " + \
265				    repr(binascii.hexlify(out)) + " Expected: " + \
266				    repr(data) + " on " + cname)
267
268		def runCCMDecrypt(self, fname):
269			# XXX: Note that all of the current CCM
270			# decryption test vectors use IV and tag sizes
271			# that aren't supported by OCF none of the
272			# tests are actually ran.
273			for data in cryptodev.KATCCMParser(fname):
274				Nlen = int(data['Nlen'])
275				if Nlen != 12:
276					# OCF only supports 12 byte IVs
277					continue
278				Tlen = int(data['Tlen'])
279				if Tlen != 16:
280					# OCF only supports 16 byte tags
281					continue
282				key = binascii.unhexlify(data['Key'])
283				nonce = binascii.unhexlify(data['Nonce'])
284				Alen = int(data['Alen'])
285				if Alen != 0:
286					aad = binascii.unhexlify(data['Adata'])
287				else:
288					aad = None
289				ct = binascii.unhexlify(data['CT'])
290				tag = ct[-16:]
291				ct = ct[:-16]
292
293				try:
294					c = Crypto(crid=crid,
295					    cipher=cryptodev.CRYPTO_AES_CCM_16,
296					    key=key,
297					    mac=cryptodev.CRYPTO_AES_CCM_CBC_MAC,
298					    mackey=key, maclen=16)
299				except EnvironmentError as e:
300					if e.errno != errno.EOPNOTSUPP:
301						raise
302					continue
303
304				if data['Result'] == 'Fail':
305					self.assertRaises(IOError,
306					    c.decrypt, payload, nonce, aad, tag)
307				else:
308					r = Crypto.decrypt(c, payload, nonce,
309					    aad, tag)
310
311					payload = binascii.unhexlify(data['Payload'])
312					Plen = int(data('Plen'))
313					payload = payload[:plen]
314					self.assertEqual(r, payload,
315					    "Count " + data['Count'] + \
316					    " Actual: " + repr(binascii.hexlify(r)) + \
317					    " Expected: " + repr(data) + \
318					    " on " + cname)
319
320		###############
321		##### DES #####
322		###############
323		@unittest.skipIf(cname not in desmodules, 'skipping DES on %s' % (cname))
324		def test_tdes(self):
325			for i in katg('KAT_TDES', 'TCBC[a-z]*.rsp'):
326				self.runTDES(i)
327
328		def runTDES(self, fname):
329			curfun = None
330			for mode, lines in cryptodev.KATParser(fname,
331			    [ 'COUNT', 'KEYs', 'IV', 'PLAINTEXT', 'CIPHERTEXT', ]):
332				if mode == 'ENCRYPT':
333					swapptct = False
334					curfun = Crypto.encrypt
335				elif mode == 'DECRYPT':
336					swapptct = True
337					curfun = Crypto.decrypt
338				else:
339					raise RuntimeError('unknown mode: %r' % repr(mode))
340
341				for data in lines:
342					curcnt = int(data['COUNT'])
343					key = data['KEYs'] * 3
344					cipherkey = binascii.unhexlify(key)
345					iv = binascii.unhexlify(data['IV'])
346					pt = binascii.unhexlify(data['PLAINTEXT'])
347					ct = binascii.unhexlify(data['CIPHERTEXT'])
348
349					if swapptct:
350						pt, ct = ct, pt
351					# run the fun
352					c = Crypto(cryptodev.CRYPTO_3DES_CBC, cipherkey, crid=crid)
353					r = curfun(c, pt, iv)
354					self.assertEqual(r, ct)
355
356		###############
357		##### SHA #####
358		###############
359		@unittest.skipIf(cname not in shamodules, 'skipping SHA on %s' % str(cname))
360		def test_sha(self):
361			for i in katg('shabytetestvectors', 'SHA*Msg.rsp'):
362				self.runSHA(i)
363
364		def runSHA(self, fname):
365			# Skip SHA512_(224|256) tests
366			if fname.find('SHA512_') != -1:
367				return
368
369			for hashlength, lines in cryptodev.KATParser(fname,
370			    [ 'Len', 'Msg', 'MD' ]):
371				# E.g., hashlength will be "L=20" (bytes)
372				hashlen = int(hashlength.split("=")[1])
373
374				if hashlen == 20:
375					alg = cryptodev.CRYPTO_SHA1
376				elif hashlen == 28:
377					alg = cryptodev.CRYPTO_SHA2_224
378				elif hashlen == 32:
379					alg = cryptodev.CRYPTO_SHA2_256
380				elif hashlen == 48:
381					alg = cryptodev.CRYPTO_SHA2_384
382				elif hashlen == 64:
383					alg = cryptodev.CRYPTO_SHA2_512
384				else:
385					# Skip unsupported hashes
386					# Slurp remaining input in section
387					for data in lines:
388						continue
389					continue
390
391				for data in lines:
392					msg = binascii.unhexlify(data['Msg'])
393					msg = msg[:int(data['Len'])]
394					md = binascii.unhexlify(data['MD'])
395
396					try:
397						c = Crypto(mac=alg, crid=crid,
398						    maclen=hashlen)
399					except EnvironmentError as e:
400						# Can't test hashes the driver does not support.
401						if e.errno != errno.EOPNOTSUPP:
402							raise
403						continue
404
405					_, r = c.encrypt(msg, iv="")
406
407					self.assertEqual(r, md, "Actual: " + \
408					    repr(binascii.hexlify(r)) + " Expected: " + repr(data) + " on " + cname)
409
410		@unittest.skipIf(cname not in shamodules, 'skipping SHA-HMAC on %s' % str(cname))
411		def test_sha1hmac(self):
412			for i in katg('hmactestvectors', 'HMAC.rsp'):
413				self.runSHA1HMAC(i)
414
415		def runSHA1HMAC(self, fname):
416			for hashlength, lines in cryptodev.KATParser(fname,
417			    [ 'Count', 'Klen', 'Tlen', 'Key', 'Msg', 'Mac' ]):
418				# E.g., hashlength will be "L=20" (bytes)
419				hashlen = int(hashlength.split("=")[1])
420
421				blocksize = None
422				if hashlen == 20:
423					alg = cryptodev.CRYPTO_SHA1_HMAC
424					blocksize = 64
425				elif hashlen == 28:
426					alg = cryptodev.CRYPTO_SHA2_224_HMAC
427					blocksize = 64
428				elif hashlen == 32:
429					alg = cryptodev.CRYPTO_SHA2_256_HMAC
430					blocksize = 64
431				elif hashlen == 48:
432					alg = cryptodev.CRYPTO_SHA2_384_HMAC
433					blocksize = 128
434				elif hashlen == 64:
435					alg = cryptodev.CRYPTO_SHA2_512_HMAC
436					blocksize = 128
437				else:
438					# Skip unsupported hashes
439					# Slurp remaining input in section
440					for data in lines:
441						continue
442					continue
443
444				for data in lines:
445					key = binascii.unhexlify(data['Key'])
446					msg = binascii.unhexlify(data['Msg'])
447					mac = binascii.unhexlify(data['Mac'])
448					tlen = int(data['Tlen'])
449
450					if len(key) > blocksize:
451						continue
452
453					try:
454						c = Crypto(mac=alg, mackey=key,
455						    crid=crid, maclen=hashlen)
456					except EnvironmentError as e:
457						# Can't test hashes the driver does not support.
458						if e.errno != errno.EOPNOTSUPP:
459							raise
460						continue
461
462					_, r = c.encrypt(msg, iv="")
463
464					self.assertEqual(r[:tlen], mac, "Actual: " + \
465					    repr(binascii.hexlify(r)) + " Expected: " + repr(data))
466
467	return GendCryptoTestCase
468
469cryptosoft = GenTestCase('cryptosoft0')
470aesni = GenTestCase('aesni0')
471ccr = GenTestCase('ccr0')
472ccp = GenTestCase('ccp0')
473safexcel = GenTestCase('safexcel0')
474qat = GenTestCase('qat0')
475
476if __name__ == '__main__':
477	unittest.main()
478