Skip to content
Snippets Groups Projects
Commit 061d018e authored by EC2 Default User's avatar EC2 Default User
Browse files

asynchronous service demo

parent 66d7d957
No related branches found
No related tags found
1 merge request!2asynchronous service demo
FROM python
WORKDIR /home
COPY . .
RUN pip3 install flask
EXPOSE 5000
CMD python3 ml_service.py
# Asynchronous Diagnosis Service
This service demonstrates asynchronous processing of a diagnosis service. When the endpoint is hit, a record will be added to a database to store the progress of a diagnosis request, a message will be sent to an SQS queue for a subscriber to run the diagnosis processing, and the user can get information about the progress throughout processing.
## Dependency Installation
```
pip install pysqlite3 boto3 flask
```
## Setup
A SQS queue is also required to send messages and trigger the *prediction listener*. This service is configured to send messages to the `digpath-request` queue.
A SQL database is also required to run the service. Run the following command to create a SQLite database with a *request* table:
```
cd setup/
python setup.py
```
The *request* table will keep track of the requests that are submitted, which file was requested for the processing/diagnosis, and the progress of that processing including when the processing is completed.
## Running the Service
Run the following command to start the flask service. This will expose two endpoints */diagnose* and */request_info*.
```
cd svc/
python run.py
```
In the same folder, run the following command to start the SQS subscriber that will run the diagnosis processing triggered when the SQS queue recieves a message.
```
python prediction_listener.py
```
## Testing the Service
Now you are ready to test the service. Hit the */diagnosis* endpoint:
```
curl localhost:5000/diagnose -X GET -H 'Content-Type: application/json' -d '{"file": "test_image.tif"}'
```
Again, hitting the */diagnose* endpoint will add a new row to the *request* DB table, a message is sent to the SQS queue, and the *request_id* is returned to the user to check the progress of the request. The output of the service should look like: `{'request_id': 'REQUEST_ID'}`.
You should see system logs from both the flask service and the prediction listener as the service processes the request. To check the progress of the service, run the following command:
```
curl localhost:5000/request_info -X GET -H 'Content-Type: application/json' -d '{"request_id": "REQUEST_ID"}'
```
import json
import uuid
from flask import Flask, request, jsonify
app = Flask(__name__)
#TODO: load ML model
@app.route('/request_diagnosis', methods=['GET'])
def diagnoseEndpoint():
request_id = str(uuid.uuid4())
content = request.json
print(f"Request received: \nRequest ID: {request_id} \n{json.dumps(content, indent=2)}")
#TODO: ML prediction
response = {
'request_id': request_id,
'diagnosis': 'healthy'
}
return jsonify(response)
if __name__ == '__main__':
app.run(host='0.0.0.0')
import sqlite3
# connecting to the database
connection = sqlite3.connect("../db/digpath.db")
# cursor
crsr = connection.cursor()
# SQL command to create a table in the database
sql_command = """
CREATE TABLE requests (
request_id TEXT(255) PRIMARY KEY,
file TEXT(255),
diagnosis TEXT(255),
total_chips INTEGER,
mild INTEGER,
moderate INTEGER,
severe INTEGER,
status TEXT(255),
timestamp TEXT(255)
);
"""
# execute the statement
crsr.execute(sql_command)
# close the connection
connection.close()
\ No newline at end of file
import json
import uuid
from datetime import datetime
import sqlite3
class DigpathDatabase:
def __init__(self, connection):
self._db = connection
def new_request(self, file, request_id=None):
if request_id is None:
request_id = str(uuid.uuid4())
cur = self._db.cursor()
cur.execute(
'INSERT INTO requests (request_id, file, total_chips, mild, moderate, severe, status, timestamp) VALUES(?,?,?,?,?,?,?,?)',
(request_id, file, 50000, 0, 0, 0, 'in progress', datetime.now().strftime("%Y-%m-%dT%H:%M:%S"))
)
self._db.commit()
return cur.lastrowid
def update_request(
self,
request_id,
diagnosis,
mild,
moderate,
severe,
status,
):
query = 'UPDATE requests SET '
if diagnosis is not None:
query += 'diagnosis="%s", ' % (diagnosis)
query += 'mild=%s, moderate=%s, severe=%s, status="%s" WHERE request_id="%s";' % (
mild, moderate, severe, status, request_id
)
self._db.cursor().execute(query, ())
self._db.commit()
def get_request_info(self, request_id):
cursor = self._db.cursor().execute('SELECT * FROM requests WHERE request_id = "%s";' % (request_id))
response = cursor.fetchall()
if len(response) > 0:
request_info = response[0]
else:
print("Request not found")
results = {
'request_id': request_info[0],
'file': request_info[1],
'diagnosis': request_info[2],
'total_chips': request_info[3],
'mild': request_info[4],
'moderate': request_info[5],
'severe': request_info[6],
'status': request_info[7],
'timestamp': request_info[8]
}
return results
import json
import random
import time
import boto3
import sqlite3
from database_connection import DigpathDatabase
queue_name = "digpath-request"
db_connection = sqlite3.connect("../db/digpath.db", check_same_thread=False)
sqs = boto3.resource("sqs")
queue = sqs.get_queue_by_name(QueueName=queue_name)
digpath_db = DigpathDatabase(db_connection)
def process_request(message_body):
print(f"processing message: {message_body}")
message_data = json.loads(message_body)
request_info = digpath_db.get_request_info(message_data['request_id'])
#TODO: replace the code below with the ML model predictions - below are fake ML predictions
mild = 0
moderate = 0
severe = 0
diagnosis = request_info['diagnosis']
status = request_info['status']
while status == 'in progress':
if random.random() < 0.03:
severe += random.randint(1, 10)
elif random.random() < 0.12:
moderate += random.randint(1, 20)
mild += (1000 - severe - moderate)
if severe > 15:
diagnosis = 'severe'
status = 'complete'
elif moderate > 100:
diagnosis = 'moderate'
status = 'complete'
elif mild + moderate + severe >= request_info['total_chips']:
mild = request_info['total_chips'] - severe - moderate
diagnosis = 'mild'
status = 'complete'
digpath_db.update_request(
request_info['request_id'],
diagnosis,
mild,
moderate,
severe,
status
)
time.sleep(1)
print(f"Processing of {message_data['request_id']} complete")
if __name__ == "__main__":
print(f"Diagnosis listener running. Listening for messages in the '{queue_name}' queue")
while True:
messages = queue.receive_messages()
for message in messages:
try:
message_body = message.body
message.delete()
process_request(message_body)
except Exception as e:
print(e)
import json
import uuid
import boto3
import sqlite3
from flask import Flask, request, jsonify
from database_connection import DigpathDatabase
queue_url = "https://sqs.us-east-1.amazonaws.com/432722299252/digpath-request"
db_connection = sqlite3.connect("../db/digpath.db", check_same_thread=False)
app = Flask(__name__)
digpath_db = DigpathDatabase(db_connection)
sqs = boto3.client('sqs')
@app.route('/diagnose')
def diagnoseEndpoint():
request_id = str(uuid.uuid4())
content = request.json
print(f"Request received: \nRequest ID: {request_id} \n{json.dumps(content, indent=2)}")
#Insert request into database
digpath_db.new_request(content['file'], request_id)
#Send SQS message
sqs_response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps({
'request_id': request_id,
'file': content['file']
}, indent=2)
)
print(f'SQS Message: \n{sqs_response}')
response = {
'request_id': request_id,
}
return jsonify(response)
@app.route('/request_info')
def getDiagnosisEndpoint():
content = request.json
print(f"Getting info for: \n{json.dumps(content, indent=2)}")
request_info = digpath_db.get_request_info(content['request_id'])
return jsonify(request_info)
if __name__ == '__main__':
app.run(host='0.0.0.0')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment