本篇讲解基于 Python 语言,实现 HTTPs 和 RSA 加解密通信的 API 示例。 示例代码已提交到 Github,参考链接 Python 示例代码
#coding=utf-8 __author__ = 'lihengci' import base64 import md5 import json import requests from Crypto import Random from Crypto.Cipher import PKCS1_v1_5 from Crypto.PublicKey import RSA # 定义 RSA key 的长度 rsa_key_size = 2048 # 加密 使用 key_size / 8 - 11 encrypt_block = rsa_key_size / 8 - 11 # 解密使用 key_size / 8 decrypt_block = rsa_key_size / 8 ''' * Change Python True to true, False to false ''' def json_boolean(encrypt): if encrypt: return 'true' else: return 'false' ''' * 按照字典顺序排列参数 ''' def sorted_key(params): return sorted(params.items(), key=lambda x: x[0]) ''' * 将 Dict 数据拼接为字符串 ''' def dict_to_string(params): return ''.join('{}{}'.format(key, val) for key, val in params.items()) ''' * MD5 签名 * data 待签名数据 * 参数按照字母排序后,进行 MD5 运算 * return 签名 ''' def md5_sign(params): params_str = 'account%sdata%s' % (params['account'], params['data']) return md5.new(params_str).hexdigest() ''' * MD5 验签 * return 验签是否通过 ''' def verify_sign(encrypt, data, sign): params_str = 'data%sencrypt%s' % (data, json_boolean(encrypt)) cal_sign = md5.new(params_str).hexdigest() if cal_sign.upper() == str(sign): return True return False ''' * 使用公钥加密 ''' def rsa_encrypt(params, public_key): key = RSA.importKey(public_key) cipher = PKCS1_v1_5.new(key) raw_str = json.dumps(params) raw_str_len = len(raw_str) # 计算分段加密的block数 (向上取整) block_num = raw_str_len / encrypt_block # 余数非0,block数再加1 if (raw_str_len % encrypt_block) != 0: block_num += 1 ciphertext = "" offset = 0 while offset < raw_str_len: chunk = raw_str[offset:offset+encrypt_block] ciphertext += cipher.encrypt(chunk) offset += encrypt_block cipher_text_base64 = base64.b64encode(ciphertext) return cipher_text_base64 ''' * 使用私钥解密 ''' def rsa_decrypt(data, private_key): # 伪随机数生成器 random_generator = Random.new().read rsakey = RSA.importKey(private_key) cipher = PKCS1_v1_5.new(rsakey) raw_str = base64.b64decode(data) raw_str_len = len(raw_str) # 计算分段加密的block数 (向上取整) block_num = raw_str_len / decrypt_block # 余数非0,block数再加1 if (raw_str_len % decrypt_block) != 0: block_num += 1 cipher_data = "" offset = 0 while offset < raw_str_len: chunk = raw_str[offset:offset+decrypt_block] cipher_data += cipher.decrypt(chunk, random_generator) offset += decrypt_block return cipher_data ''' * account 客户号 * params 请求参数 * keys_content 加解密的密钥 ''' def do_request(account, params, keys_content, url): # RSA 加密请求参数 cipher_text_base64 = rsa_encrypt(params, keys_content['server_public_key']) print('Encrypt data ' + cipher_text_base64) # 计算 md5 sign request_sign = md5_sign({'account': account, 'data': cipher_text_base64}) print('Data sign ' + request_sign) content = {'account': account, 'data': cipher_text_base64, 'sign': request_sign} print('Request parameters ' + json.dumps(content, ensure_ascii=False)) try: if 'https' in url: resp = requests.post(url, json=content, headers={}, timeout=120.0, verify=False) else: resp = requests.post(url, json=content, headers={}, timeout=120.0) respData = resp.json() print(respData) if respData is not None: encrypt_data = respData['data'] if verify_sign(respData['encrypt'], encrypt_data, respData['sign']): dataStr = rsa_decrypt(encrypt_data, keys_content['user_private_key']) print('Decrypt data ' + dataStr) dataObj = json.loads(dataStr) print(json.dumps(dataObj, ensure_ascii=False)) return dataObj else: print('Sign verify failed') else: print('Response None') return None except Exception, err: print('Request Exception') print err return None2017-10-21