mirror of
				https://github.com/FreeRTOS/FreeRTOS-Kernel.git
				synced 2025-10-24 21:57:46 -04:00 
			
		
		
		
	This adds aws_config_offline, which allows the user to download demo_config.h for the MQTT Mutual Auth Demo using a webpage. This also adds aws_config_quick_start, which provides a means to generate demo_config.h for the Mutual Auth Demo with boto3.
		
			
				
	
	
		
			88 lines
		
	
	
	
		
			2.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			88 lines
		
	
	
	
		
			2.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python
 | |
| 
 | |
| import boto3
 | |
| import json
 | |
| 
 | |
| 
 | |
| class Certificate():
 | |
| 
 | |
|     def __init__(self, certId=''):
 | |
|         self.id = certId
 | |
|         self.arn = ''
 | |
|         self.client = boto3.client('iot')
 | |
|         if (self.id != ''):
 | |
|             result = self.client.describe_certificate(certificateId=self.id)
 | |
|             self.arn = result['certificateDescription']['certificateArn']
 | |
| 
 | |
|     def create(self):
 | |
|         assert not self.exists(), "Cert already exists"
 | |
|         cert = self.create_keys_and_certificate()
 | |
|         self.id = cert["certificateId"]
 | |
|         self.arn = cert["certificateArn"]
 | |
|         return cert
 | |
| 
 | |
|     def create_keys_and_certificate(self):
 | |
|         result = self.client.create_keys_and_certificate(setAsActive=True)
 | |
|         return result
 | |
| 
 | |
|     def delete(self):
 | |
|         cert_not_found = True
 | |
|         # Detach Policies attached to the cert
 | |
|         policies_attached = self.list_policies()
 | |
|         for policy in policies_attached:
 | |
|             self.detach_policy(policy['policyName'])
 | |
| 
 | |
|         # Detach Things attached to the cert
 | |
|         things_attached = self.list_things()
 | |
|         for thing in things_attached:
 | |
|             self.detach_thing(thing)
 | |
| 
 | |
|         # Update the status of the certificate to INACTIVE
 | |
|         try:
 | |
|             self.client.update_certificate(certificateId=self.id,
 | |
|                                            newStatus='INACTIVE')
 | |
|             cert_not_found = False
 | |
|         except self.client.exceptions.ResourceNotFoundException:
 | |
|             cert_not_found = True
 | |
|             return cert_not_found
 | |
| 
 | |
|         # Delete the certificate
 | |
|         try:
 | |
|             self.client.delete_certificate(certificateId=self.id)
 | |
|             cert_not_found = False
 | |
|         except self.client.exceptions.ResourceNotFoundException:
 | |
|             cert_not_found = True
 | |
|         return cert_not_found
 | |
| 
 | |
|     def exists(self):
 | |
|         if self.id == '':
 | |
|             return False
 | |
|         else:
 | |
|             return True
 | |
| 
 | |
|     def get_arn(self):
 | |
|         return self.arn
 | |
| 
 | |
|     def list_policies(self):
 | |
|         policies = self.client.list_principal_policies(principal=self.arn)
 | |
|         policies = policies['policies']
 | |
|         return policies
 | |
| 
 | |
|     def attach_policy(self, policy_name):
 | |
|         self.client.attach_policy(policyName=policy_name, target=self.arn)
 | |
| 
 | |
|     def detach_policy(self, policy_name):
 | |
|         self.client.detach_policy(policyName=policy_name, target=self.arn)
 | |
| 
 | |
|     def list_things(self):
 | |
|         things = self.client.list_principal_things(principal=self.arn)
 | |
|         things = things['things']
 | |
|         return things
 | |
| 
 | |
|     def attach_thing(self, thing_name):
 | |
|         self.client.attach_thing_principal(thingName=thing_name,
 | |
|                                            principal=self.arn)
 | |
| 
 | |
|     def detach_thing(self, thing_name):
 | |
|         self.client.detach_thing_principal(thingName=thing_name,
 | |
|                                            principal=self.arn)
 |