diff --git a/diagnosis_service/Dockerfile b/diagnosis_service/Dockerfile deleted file mode 100644 index e84c732434d62338600161d7dfdc0db030770daa..0000000000000000000000000000000000000000 --- a/diagnosis_service/Dockerfile +++ /dev/null @@ -1,9 +0,0 @@ -FROM python -WORKDIR /home - -COPY . . - -RUN pip3 install flask - -EXPOSE 5000 -CMD python3 ml_service.py diff --git a/diagnosis_service/README.md b/diagnosis_service/README.md new file mode 100644 index 0000000000000000000000000000000000000000..2d050e30564164003974411a2b124b3f6fb7dfcb --- /dev/null +++ b/diagnosis_service/README.md @@ -0,0 +1,52 @@ +# Asynchronous Diagnosis Service + +This service demonstrates asynchronous processing of a diagnosis service. When the endpoint is hit, a record will be added to a database to store the progress of a diagnosis request, a message will be sent to an SQS queue for a subscriber to run the diagnosis processing, and the user can get information about the progress throughout processing. + +## Dependency Installation +``` +pip install pysqlite3 boto3 flask +``` + +## Setup + +A SQS queue is also required to send messages and trigger the *prediction listener*. This service is configured to send messages to the `digpath-request` queue. + +A SQL database is also required to run the service. Run the following command to create a SQLite database with a *request* table: + +``` +cd setup/ +python setup.py +``` + +The *request* table will keep track of the requests that are submitted, which file was requested for the processing/diagnosis, and the progress of that processing including when the processing is completed. + +## Running the Service + +Run the following command to start the flask service. This will expose two endpoints */diagnose* and */request_info*. + +``` +cd svc/ +python run.py +``` + +In the same folder, run the following command to start the SQS subscriber that will run the diagnosis processing triggered when the SQS queue recieves a message. + +``` +python prediction_listener.py +``` + +## Testing the Service + +Now you are ready to test the service. Hit the */diagnosis* endpoint: + +``` +curl localhost:5000/diagnose -X GET -H 'Content-Type: application/json' -d '{"file": "test_image.tif"}' +``` + +Again, hitting the */diagnose* endpoint will add a new row to the *request* DB table, a message is sent to the SQS queue, and the *request_id* is returned to the user to check the progress of the request. The output of the service should look like: `{'request_id': 'REQUEST_ID'}`. + +You should see system logs from both the flask service and the prediction listener as the service processes the request. To check the progress of the service, run the following command: + +``` +curl localhost:5000/request_info -X GET -H 'Content-Type: application/json' -d '{"request_id": "REQUEST_ID"}' +``` diff --git a/diagnosis_service/ml_service.py b/diagnosis_service/ml_service.py deleted file mode 100644 index 5bab9f352fe87dbd3136187112713aee1c92b4f6..0000000000000000000000000000000000000000 --- a/diagnosis_service/ml_service.py +++ /dev/null @@ -1,26 +0,0 @@ -import json -import uuid - -from flask import Flask, request, jsonify - -app = Flask(__name__) - -#TODO: load ML model - -@app.route('/request_diagnosis', methods=['GET']) -def diagnoseEndpoint(): - request_id = str(uuid.uuid4()) - - content = request.json - print(f"Request received: \nRequest ID: {request_id} \n{json.dumps(content, indent=2)}") - - #TODO: ML prediction - - response = { - 'request_id': request_id, - 'diagnosis': 'healthy' - } - return jsonify(response) - -if __name__ == '__main__': - app.run(host='0.0.0.0') diff --git a/diagnosis_service/setup/setup.py b/diagnosis_service/setup/setup.py new file mode 100644 index 0000000000000000000000000000000000000000..3c6b9a91c054cf7f62a85fe2e8598db62942fb10 --- /dev/null +++ b/diagnosis_service/setup/setup.py @@ -0,0 +1,28 @@ +import sqlite3 + +# connecting to the database +connection = sqlite3.connect("../db/digpath.db") + +# cursor +crsr = connection.cursor() + +# SQL command to create a table in the database +sql_command = """ + CREATE TABLE requests ( + request_id TEXT(255) PRIMARY KEY, + file TEXT(255), + diagnosis TEXT(255), + total_chips INTEGER, + mild INTEGER, + moderate INTEGER, + severe INTEGER, + status TEXT(255), + timestamp TEXT(255) + ); +""" + +# execute the statement +crsr.execute(sql_command) + +# close the connection +connection.close() \ No newline at end of file diff --git a/diagnosis_service/svc/database_connection.py b/diagnosis_service/svc/database_connection.py new file mode 100644 index 0000000000000000000000000000000000000000..42346c944196396d25bba4cb819ce9710e170540 --- /dev/null +++ b/diagnosis_service/svc/database_connection.py @@ -0,0 +1,66 @@ +import json +import uuid +from datetime import datetime + +import sqlite3 + + +class DigpathDatabase: + def __init__(self, connection): + self._db = connection + + def new_request(self, file, request_id=None): + if request_id is None: + request_id = str(uuid.uuid4()) + + cur = self._db.cursor() + cur.execute( + 'INSERT INTO requests (request_id, file, total_chips, mild, moderate, severe, status, timestamp) VALUES(?,?,?,?,?,?,?,?)', + (request_id, file, 50000, 0, 0, 0, 'in progress', datetime.now().strftime("%Y-%m-%dT%H:%M:%S")) + ) + self._db.commit() + return cur.lastrowid + + def update_request( + self, + request_id, + diagnosis, + mild, + moderate, + severe, + status, + ): + query = 'UPDATE requests SET ' + if diagnosis is not None: + query += 'diagnosis="%s", ' % (diagnosis) + + query += 'mild=%s, moderate=%s, severe=%s, status="%s" WHERE request_id="%s";' % ( + mild, moderate, severe, status, request_id + ) + + self._db.cursor().execute(query, ()) + self._db.commit() + + def get_request_info(self, request_id): + cursor = self._db.cursor().execute('SELECT * FROM requests WHERE request_id = "%s";' % (request_id)) + response = cursor.fetchall() + + if len(response) > 0: + request_info = response[0] + else: + print("Request not found") + + results = { + 'request_id': request_info[0], + 'file': request_info[1], + 'diagnosis': request_info[2], + 'total_chips': request_info[3], + 'mild': request_info[4], + 'moderate': request_info[5], + 'severe': request_info[6], + 'status': request_info[7], + 'timestamp': request_info[8] + } + + return results + diff --git a/diagnosis_service/svc/prediction_listener.py b/diagnosis_service/svc/prediction_listener.py new file mode 100644 index 0000000000000000000000000000000000000000..e53977bedadf9072c452c580f3ba47c8a1fd474b --- /dev/null +++ b/diagnosis_service/svc/prediction_listener.py @@ -0,0 +1,68 @@ +import json +import random +import time + +import boto3 +import sqlite3 +from database_connection import DigpathDatabase + +queue_name = "digpath-request" +db_connection = sqlite3.connect("../db/digpath.db", check_same_thread=False) + +sqs = boto3.resource("sqs") +queue = sqs.get_queue_by_name(QueueName=queue_name) +digpath_db = DigpathDatabase(db_connection) + +def process_request(message_body): + print(f"processing message: {message_body}") + message_data = json.loads(message_body) + request_info = digpath_db.get_request_info(message_data['request_id']) + + #TODO: replace the code below with the ML model predictions - below are fake ML predictions + mild = 0 + moderate = 0 + severe = 0 + diagnosis = request_info['diagnosis'] + status = request_info['status'] + while status == 'in progress': + if random.random() < 0.03: + severe += random.randint(1, 10) + elif random.random() < 0.12: + moderate += random.randint(1, 20) + mild += (1000 - severe - moderate) + + if severe > 15: + diagnosis = 'severe' + status = 'complete' + elif moderate > 100: + diagnosis = 'moderate' + status = 'complete' + elif mild + moderate + severe >= request_info['total_chips']: + mild = request_info['total_chips'] - severe - moderate + diagnosis = 'mild' + status = 'complete' + + digpath_db.update_request( + request_info['request_id'], + diagnosis, + mild, + moderate, + severe, + status + ) + + time.sleep(1) + print(f"Processing of {message_data['request_id']} complete") + +if __name__ == "__main__": + print(f"Diagnosis listener running. Listening for messages in the '{queue_name}' queue") + while True: + messages = queue.receive_messages() + for message in messages: + try: + message_body = message.body + message.delete() + + process_request(message_body) + except Exception as e: + print(e) diff --git a/diagnosis_service/svc/run.py b/diagnosis_service/svc/run.py new file mode 100644 index 0000000000000000000000000000000000000000..19a1c78af462019977fc0a7b805ded4d7ebd81b5 --- /dev/null +++ b/diagnosis_service/svc/run.py @@ -0,0 +1,51 @@ +import json +import uuid + +import boto3 +import sqlite3 +from flask import Flask, request, jsonify +from database_connection import DigpathDatabase + +queue_url = "https://sqs.us-east-1.amazonaws.com/432722299252/digpath-request" +db_connection = sqlite3.connect("../db/digpath.db", check_same_thread=False) + +app = Flask(__name__) +digpath_db = DigpathDatabase(db_connection) +sqs = boto3.client('sqs') + +@app.route('/diagnose') +def diagnoseEndpoint(): + request_id = str(uuid.uuid4()) + + content = request.json + print(f"Request received: \nRequest ID: {request_id} \n{json.dumps(content, indent=2)}") + + #Insert request into database + digpath_db.new_request(content['file'], request_id) + + #Send SQS message + sqs_response = sqs.send_message( + QueueUrl=queue_url, + MessageBody=json.dumps({ + 'request_id': request_id, + 'file': content['file'] + }, indent=2) + ) + print(f'SQS Message: \n{sqs_response}') + + response = { + 'request_id': request_id, + } + return jsonify(response) + +@app.route('/request_info') +def getDiagnosisEndpoint(): + + content = request.json + print(f"Getting info for: \n{json.dumps(content, indent=2)}") + + request_info = digpath_db.get_request_info(content['request_id']) + return jsonify(request_info) + +if __name__ == '__main__': + app.run(host='0.0.0.0')