Skip to content

Commit 47298e0

Browse files
michaelk-igzomesser
authored andcommitted
Enrich stream (#18)
1 parent 5d185c3 commit 47298e0

File tree

3 files changed

+230
-0
lines changed

3 files changed

+230
-0
lines changed

enrich-stream/enrich-stream.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
# Copyright 2019 The Nuclio Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import requests
16+
import json
17+
import base64
18+
import os
19+
20+
21+
def init_context(context):
22+
23+
# env -> config
24+
setattr(context.user_data, 'config', {
25+
'v3io_api': os.environ['V3IO_API'],
26+
'v3io_username': os.environ['V3IO_USERNAME'],
27+
'container_name': os.environ['CONTAINER_NAME'],
28+
'table_name': os.environ['TABLE_NAME'],
29+
'input_stream_search_key': os.environ['INPUT_STREAM_SEARCH_KEY'],
30+
'output_stream_name': os.environ['OUTPUT_STREAM_NAME'],
31+
'v3io_access_key': os.environ['V3IO_ACCESS_KEY'],
32+
})
33+
34+
35+
def handler(context, event):
36+
config = context.user_data.config
37+
msg = json.loads(event.body)
38+
context.logger.info('Incoming message', msg=msg)
39+
enrichment_data = _search_kv(msg, config)
40+
context.logger.info('Enrichment data', enrichment_data=enrichment_data)
41+
msg['enrichment'] = enrichment_data
42+
_put_records([msg], config)
43+
context.logger.debug('Output message', msg=msg)
44+
45+
46+
def _get_url(v3io_api, container_name, collection_path):
47+
return f'http://{v3io_api}/{container_name}/{collection_path}'
48+
49+
50+
def _get_headers(v3io_function, v3io_access_key):
51+
return {
52+
'Content-Type': 'application/json',
53+
'X-v3io-function': v3io_function,
54+
'cache-control': 'no-cache',
55+
'x-v3io-session-key': v3io_access_key
56+
}
57+
58+
59+
def _search_kv(msg, config):
60+
v3io_api = config['v3io_api']
61+
v3io_username = config['v3io_username']
62+
container_name = config['container_name']
63+
search_value = msg[config['input_stream_search_key']]
64+
table_path_and_key = f"{v3io_username}/examples/stream-enrich/{config['table_name']}/{search_value}"
65+
v3io_access_key = config['v3io_access_key']
66+
67+
url = _get_url(v3io_api, container_name, table_path_and_key)
68+
headers = _get_headers('GetItem', v3io_access_key)
69+
resp = requests.request('POST', url, json={}, headers=headers)
70+
71+
json_response = json.loads(resp.text)
72+
73+
response = {}
74+
if 'Item' in json_response:
75+
response = json_response['Item']
76+
77+
return response
78+
79+
80+
def _put_records(items, config):
81+
v3io_api = config['v3io_api']
82+
v3io_username = config['v3io_username']
83+
container_name = config['container_name']
84+
output_stream_path = f"{v3io_username}/examples/stream-enrich/{config['output_stream_name']}/"
85+
v3io_access_key = config['v3io_access_key']
86+
87+
records = _items_to_records(items)
88+
url = _get_url(v3io_api, container_name, output_stream_path)
89+
headers = _get_headers('PutRecords', v3io_access_key)
90+
91+
return requests.request('PUT', url, json=records, headers=headers)
92+
93+
94+
def _item_to_b64(item):
95+
item_string = json.dumps(item)
96+
return base64.b64encode(item_string.encode('utf-8')).decode('utf-8')
97+
98+
99+
def _items_to_records(items):
100+
return {'Records': [{'Data': _item_to_b64(item)} for item in items]}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
metadata:
2+
name: enrich-stream
3+
spec:
4+
build:
5+
functionSourceCode: {{ .SourceCode }}
6+
commands:
7+
- pip install requests
8+
description: "Nuclio function which is triggered by incoming event-messages to a V3IO-Stream. The function enrich the original event-message with data from V3IO-KV table, and writes the enriched message to an output V3IO-Stream."
9+
handler: "main:handler"
10+
runtime: "python:3.6"
11+
env:
12+
- name: V3IO_API
13+
value: {{ .V3ioAPI }}
14+
- name: V3IO_ACCESS_KEY
15+
value: {{ .V3ioAccessKey }}
16+
- name: V3IO_USERNAME
17+
value: {{ .Username }}
18+
- name: TABLE_NAME
19+
value: {{ .TableName }}
20+
- name: CONTAINER_NAME
21+
value: {{ .ContainerName }}
22+
- name: INPUT_STREAM_SEARCH_KEY
23+
value: {{ .InputStreamSearchKey }}
24+
- name: OUTPUT_STREAM_NAME
25+
value: {{ .OutputStreamName }}
26+
minReplicas: 1
27+
maxReplicas: 1
28+
triggers:
29+
v3ioInputStream:
30+
username: {{ .Username }}
31+
password: {{ .Password }}
32+
kind: v3ioStream
33+
url: {{ .InputStreamUrl }}
34+
attributes:
35+
seekTo: {{ .InputStreamSeekTo }}
36+
partitions:
37+
- {{ .InputStreamPartitions }}

enrich-stream/function.yaml.values

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
V3ioAPI:
2+
displayName: Web API host and port
3+
kind: string
4+
description: "The address of Iguazio WebAPI. you can take it from the services screen"
5+
required: true
6+
order: 0
7+
attributes:
8+
defaultValue: v3io-webapi.default-tenant.svc:8081
9+
10+
V3ioAccessKey:
11+
displayName: V3io Access Key
12+
kind: string
13+
description: "Iguazio user's V3io Access Key"
14+
required: true
15+
order: 1
16+
attributes:
17+
password: true
18+
defaultValue: ""
19+
20+
Username:
21+
displayName: Username
22+
kind: string
23+
description: "Iguazio username"
24+
required: true
25+
order: 2
26+
27+
Password:
28+
displayName: Password
29+
kind: string
30+
description: "Iguazio user's password"
31+
required: true
32+
order: 3
33+
attributes:
34+
password: true
35+
defaultValue: ""
36+
37+
ContainerName:
38+
displayName: Data Container
39+
kind: string
40+
description: "The name of the wanted data container"
41+
required: true
42+
order: 4
43+
attributes:
44+
defaultValue: users
45+
46+
TableName:
47+
displayName: Table Name
48+
kind: string
49+
description: "The name of the key value table"
50+
required: true
51+
order: 5
52+
attributes:
53+
defaultValue: keyValueTable
54+
55+
InputStreamUrl:
56+
displayName: Input Stream Url
57+
kind: string
58+
description: "The url of the input stream e.g. http://<V3ioApi>/<ContainerName>/<StreamName>/"
59+
required: true
60+
order: 6
61+
62+
InputStreamSearchKey:
63+
displayName: Input Stream Search Key
64+
kind: string
65+
description: "The name of the field in the incoming event-message containing the value to search for enrichment"
66+
required: true
67+
order: 7
68+
69+
InputStreamPartitions:
70+
displayName: Input Stream Partitions
71+
kind: string
72+
description: "The stream shard ids to be consumed, e.g. 0"
73+
required: true
74+
order: 8
75+
76+
InputStreamSeekTo:
77+
displayName: Input Stream Seek To
78+
kind: choice
79+
description: "This setting determines weather the function will consume all available events in the stream (earlies), or will consume only new incoming events (latest) "
80+
required: true
81+
order: 9
82+
attributes:
83+
choices: [earliest, latest]
84+
defaultValue: latest
85+
86+
OutputStreamName:
87+
displayName: Output Stream Name
88+
kind: string
89+
description: "The name of the stream the enriched messages will be sent to"
90+
required: true
91+
order: 10
92+
attributes:
93+
defaultValue: outputV3ioStream

0 commit comments

Comments
 (0)