Container functions data, logging and error handling

Modified on Thu, 18 May, 2023 at 9:44 PM

TABLE OF CONTENTS

Overview

Internally to Direktiv we refer to the containers as “functions” due to the nature of how their usage is declared in the configuration file:

functions:
- id: aws-cli
  image: gcr.io/direktiv/functions/aws-cli:1.0
  type: knative-workflow

This is the definition for running the gcr.io/direktiv/functions/aws-cli container. Now, let’s look at how Direktiv handles logging & error handling for custom functions (i.e. containers). Refer to the diagram below for the next couple of sub-sections.

Data Input & Output


This has been covered a couple of times, but Direktiv passes JSON data between the container and the Direktiv Functions component as the workflow executes. The JSON data by default is transported over HTTP on port 8080 (unless it’s configured with SSL). More importantly, a Direktiv-ActionID header is set. This header is used for communication between the Direktiv Function and the container and isolates the data being passed for the workflow.


Reporting Errors

If something goes wrong a function (container) can report an error to the workflow instance by adding HTTP headers to the response. If these headers are populated the execution of the function will be considered a failure (regardless of what’s stored in response data).


The headers to report errors are: Direktiv-ErrorCode and Direktiv-ErrorMessage. If an error message is defined without defining an error code the calling workflow instance will be marked as “crashed” without exposing any helpful information, so it’s important to always define both. As an example, the following error headers are defined:

"Direktiv-ErrorCode": "request.failure",
"Direktiv-ErrorMessage": "Failed to complete request"

Errors raised by functions are always ‘catchable’ by their error codes in the workflow configuration (see below):

- id: get-bitcoin-price
  type: action
  action:
    function: get-request
    input:
      method: "GET"
      url: "https://blockchain.info/ticker"
  catch:
    - error: "request.*"

Function Logs

Logging for functions is a simple HTTP POST to the address:

http://localhost:8889/log?aid=$ACTIONID

The content of the body of the request is logged . The important parameter is again $ACTIONID (which is the Direktiv-ActionID). 

As explained earlier, every request gets a Direktiv-ActionID which identifies the workflow instance. This parameter has to be passed back to attach the log to the instance. This information is passed in as in the initial request (Direktiv-ActionID).


Sample Python (logging & errors)

Below is an example of how a Python custom implementation would look for logging and error management between code, container and Direktiv (more examples are available here):

from http.server import BaseHTTPRequestHandler, HTTPServer
import requests
import json
import signal
import sys

PORT = 8080

# Headers
DirektivActionIDHeader     = "Direktiv-ActionID"
DirektivErrorCodeHeader    = "Direktiv-ErrorCode"
DirektivErrorMessageHeader = "Direktiv-ErrorMessage"

InputNameField = "name"

class DirektivHandler(BaseHTTPRequestHandler):
    def _log(self, actionID, msg):
        if actionID != "development" and actionID != "Development":
            try:
                r = requests.post("http://localhost:8889/log?aid=%s" % actionID, headers={"Content-type": "plain/text"}, data = msg)
                if r.status_code != 200:
                    self._send_error("com.greeting-bad-log.error", "log request failed to direktiv")
            except:
                self._send_error("com.greeting-bad-log.error", "failed to log to direktiv")
        else: 
            print(msg)

    def _send_error(self, errorCode, errorMsg):
        self.send_response(400)
        self.send_header('Content-type', 'application/json')
        self.send_header(DirektivErrorCodeHeader, 'application/json')
        self.send_header(DirektivErrorMessageHeader, errorMsg)
        self.end_headers()
        self.wfile.write(json.dumps({"error": errorMsg}).encode())
        return 

    def do_POST(self):
        actionID = ""
        if DirektivActionIDHeader in self.headers:
            actionID = self.headers[DirektivActionIDHeader]
        else:
            return self._send_error("com.greeting-bad-header.error", "Header '%s' must be set" % DirektivActionIDHeader)

        self._log(actionID, "Decoding Input")
        self.data_string = self.rfile.read(int(self.headers['Content-Length']))
        reqData = json.loads(self.data_string)
        
        if InputNameField in reqData:
            self.send_response(200)
            self.send_header('Content-type', 'application/json')
            self.end_headers()

            # Respond Data
            self._log(actionID, "Writing Output")
            self.wfile.write(json.dumps({"greeting": "Welcome to Direktiv, %s" % reqData[InputNameField]}).encode())
            return
        else:
            return self._send_error("com.greeting-input.error","json field '%s' must be set" % InputNameField)


httpd = HTTPServer(('', PORT), DirektivHandler)
print('Starting greeter server on ":%s"' % PORT)

def shutdown(*args):
    print('Shutting down Server')
    httpd.server_close()
    sys.exit(0)

signal.signal(signal.SIGTERM, shutdown)
httpd.serve_forever()

 

Was this article helpful?

That’s Great!

Thank you for your feedback

Sorry! We couldn't be helpful

Thank you for your feedback

Let us know how can we improve this article!

Select at least one of the reasons
CAPTCHA verification is required.

Feedback sent

We appreciate your effort and will try to fix the article