import boto3 import os import binascii import requests import json class CustomResource(): # Usage: # from cfncustomresource import CustomResource # # cr = None # # def oncreate(): # cr.success(response={'Key': 'Value'}) # def handler(event, context): # global cr # cr = CustomResource() # cr.add_hook('create', oncreate) # cr.load_event(event) def __init__(self, event = None): self.oncreate = None self.ondelete = None self.onupdate = None self.Payload = {} if event != None: self.load_event(event) def success(self, response): self.Payload['Status'] = 'SUCCESS' self.set_response(response) def fail(self, response): self.Payload['Status'] = 'FAILED' self.set_response(response) def send_response(self): self.Payload['StackId'] = self.StackId self.Payload['RequestId'] = self.RequestId self.Payload['LogicalResourceId'] = self.LogicalResourceId self.Payload['PhysicalResourceId'] = binascii.b2a_hex(os.urandom(15)) requests.put(self.url, data=json.dumps(self.Payload)) def set_response(self, response): self.Payload['Data'] = response self.send_response() def load_event(self, event): self.event = event self.StackId = event['StackId'] self.RequestId = event['RequestId'] self.LogicalResourceId = event['LogicalResourceId'] self.EventType = event['RequestType'] self.url = event['ResponseURL'] self.ResourceProperties = event['ResourceProperties'] self.exec_hook() def add_hook(self, action, func): if action.lower() == 'create': self.oncreate = func if action.lower() == 'delete': self.ondelete = func if action.lower() == 'update': self.onupdate = func def exec_hook(self): if self.EventType.lower() == 'create': if self.oncreate != None: self.oncreate() else: self.success(response = {}) elif self.EventType.lower() == 'delete': if self.ondelete != None: self.ondelete() else: self.success(response = {}) elif self.EventType.lower() == 'update': if self.onupdate != None: self.onupdate() else: self.success(response = {})