1

現在、ソケットの仕組みを理解しようとしています。Flask-socketio と python socketio クライアントを使用して、基本的な例を実行しています。これが私がこれまでに行ったことです

使用モジュール

-i https://pypi.org/simple
certifi==2019.6.16
chardet==3.0.4
click==7.0
confluent-kafka==1.1.0
dependency-injector==3.14.7
docker==4.0.2
flask-socketio==4.2.1
flask==1.1.1
idna==2.8
itsdangerous==1.1.0
jinja2==2.10.1
markupsafe==1.1.1
python-engineio==3.10.0
python-socketio==4.4.0
requests==2.22.0
six==1.12.0
urllib3==1.25.3
websocket-client==0.56.0
werkzeug==0.15.5

サーバー.py

import json
import logging
import os
import sys
import threading
from threading import Lock
from containers import Configs, Consumers, Managers
from errors import ObjectNotFound
from flask import  Response
from flask_socketio import SocketIO, emit, join_room, leave_room, \
close_room, rooms, disconnect
from flask import Flask, render_template, session, request, \
copy_current_request_context
import sys, traceback
# Configure logger
logging.basicConfig(
level=logging.ERROR,
format='%(name)s - %(levelname)s - %(message)s'
)

broker = 'localhost:9092'
# Check environment variable
if 'KAFKA_BROKER' in os.environ:
   broker = os.environ['KAFKA_BROKER']
elif len(sys.argv) > 1 and sys.argv[1]:
   node_id = sys.argv[1]

   # Override configuration
   Configs.config.override({
   'broker': broker,
   'groupId': 'grpactconsumer'
   })
   async_mode = None
   api = Flask(__name__)
   api.config['SECRET_KEY'] = 'secret!'
   socketio = SocketIO(
            api,
            async_handlers=False,
            ping_timeout=60,
            async_mode=async_mode,
            cors_allowed_origins="*",
            always_connect=True,
            engineio_logger=True
            )
   thread = None
   thread_lock = Lock()
   logging.getLogger('engineio').setLevel(logging.ERROR)

   stat_consumer = Consumers.consumer()
   def worker(topic):
       def on_receive(value):
            socketio.sleep(0)
            socketio.emit('object_stat', value, namespace='/event')

       stat_consumer.consume([topic], on_receive)


   worker_thread = threading.Thread(target=worker, args=['Stats'])
   worker_thread.start()

def on_event_received():
    socketio.emit('object_event', {}, namespace='/event')


# Start listening events
object_manager = Managers.object_manager()
object_manager.listen_events(['Events'], on_event_received)





@socketio.on('disconnect', namespace='/event')
def test_disconnect():
    print('Client disconnected', request.sid)



@socketio.on('my event')
def handle_my_custom_event(json):
    logging.error('received json: ' + str(json))
    print('received json: ' + str(json))

# CORS Policy handlers
@api.after_request
def after_request(response):
    header = response.headers
    header['Access-Control-Allow-Origin'] = '*'
    return response





# Object not found error handler for api
@api.errorhandler(ObjectNotFound)
def not_found_exception(error):
    return json.dumps({}), 404, {'ContentType': 'application/json'}


# Unhandled error handler for api
@api.errorhandler(Exception)
def unhandled_exception(error):
    return json.dumps({}), 500, {'ContentType': 'application/json'}


# Node Endpoints
@api.route('/node/list', methods=['GET'])
def list_nodes():
    nodes = object_manager.node.list()
    objects = {'nodes': nodes}
    return json.dumps(objects, indent=4), 200, {'ContentType': 'application/json'}


# Service Endpoints
@api.route('/service', methods=['POST'])
def create_service():
    service_id = object_manager.service.create(
        request.json['name'],
        request.json['image'],
        request.json['command'],
        request.json['node_labels'],
        request.json['customer_id'],
        request.json['application_id'],
        request.json['limit']
      )

    return json.dumps({'success': True, 'id': service_id}), 200, {'ContentType': 'application/json'}


@api.route('/service/<service_id>', methods=['GET'])
def get_service(service_id):
    service = object_manager.service.get(service_id)
    return Response(json.dumps(service, indent=4), mimetype='application/json')


@api.route('/service/<service_id>/migrate', methods=['PUT'])
def migrate_service(service_id):
    object_manager.service.migrate(
        service_id,
        request.json['node_labels']
     )
    return json.dumps({'success': True}), 200, {'ContentType': 'application/json'}


@api.route('/service/<service_id>/scale', methods=['PUT'])
def scale_service(service_id):
    object_manager.service.scale(
        service_id,
        request.json['replica_count']
    )
    return json.dumps({'success': True}), 200, {'ContentType': 'application/json'}


@api.route('/service/<service_id>', methods=['DELETE'])
def remove_service(service_id):
    object_manager.service.remove(service_id)
    return json.dumps({'success': True}), 200, {'ContentType': 'application/json'}


@api.route('/service/list', methods=['GET'])
def list_services():
    services = object_manager.service.list()
    objects = {'services': services}
    return json.dumps(objects, indent=4), 200, {'ContentType': 'application/json'}


# Container Endpoints
@api.route('/container/list', methods=['GET'])
def list_containers():
    containers = object_manager.container.list()
    objects = {'containers': containers}
    return json.dumps(objects, indent=4), 200, {'ContentType': 'application/json'}



# Start flask server
socketio.run(api, debug=True, use_reloader=False, host='0.0.0.0')

client.js

/** * このファイルはソケット接続に使用されます。このクラスによって受信されたデータは、data.js とのみ共有されます。*/

let namespace = 'http://0.0.0.0:5000/event';
let socket = io.connect(namespace, {'forceNew': true });

socket.on('object_event', function (msg, cb) {
    $.get('http://0.0.0.0:5000/node/list', function (response) {
        node_data_source = JSON.parse(response)['nodes'];

    for (let data_table_id of Object.keys(data_table_listeners)) {
        if ($.fn.DataTable.isDataTable('#' + data_table_id)) {
            data_table_listeners[data_table_id]();
        }
    }
});

if (cb)
    cb();
});


socket.on('object_stat', function (msg, cb) {
    let data = JSON.parse(msg);
    if (stat_data_source) {
        let node_stats = stat_data_source[data['node']];
        node_stats = ( typeof node_stats != 'undefined' && node_stats instanceof Array ) ? 
 node_stats : [];
        node_stats[data['container']] = data;
        stat_data_source[data['node']] = node_stats;
   }
   console.log("Object_stats Socket.js");
   console.log(data);
   for (let stat_listener of Object.keys(stat_data_listeners)) {
      if (stat_data_listeners[stat_listener]) {
           stat_data_listeners[stat_listener]();
       }
   }


   if (cb)
       cb();
  });

エラー

linkp-master.0.dey9j12wugl1@vm    | INFO:engineio.server:3f2175ed5d72425da816e24ffeddb275: Sending packet MESSAGE data 2/event,["object_stat","{\"ram_limit\": 61254823575.552, \"customer_id\": \"-\", \"time\": 1575638732170, \"io_limit\": 0.0, \"container\": \"b703eadb700f\", \"node\": \"953mxmlwyvltfrx88ujlpkx3k\", \"io_usage\": 0.0, \"application_id\": \"-\", \"cpu_percent\": \"1.38\", \"ram_usage\": 407266918.4, \"network_limit\": 6144000.0, \"network_usage\": 3768000.0, \"pids\": \"25\"}"]
linkp-master.0.dey9j12wugl1@vm    | f603fce7e2464586ab77636d127d92dc: Client is gone, closing socket
linkp-master.0.dey9j12wugl1@vm    | INFO:engineio.server:f603fce7e2464586ab77636d127d92dc: Client is gone, closing socket
linkp-master.0.dey9j12wugl1@vm    | f603fce7e2464586ab77636d127d92dc: Client is gone, closing socket
linkp-master.0.dey9j12wugl1@vm    | INFO:engineio.server:f603fce7e2464586ab77636d127d92dc: Client is gone, closing socket
linkp-master.0.dey9j12wugl1@vm    | emitting event "object_stat" to all [/event]
linkp-master.0.dey9j12wugl1@vm    | INFO:socketio.server:emitting event "object_stat" to all [/event]
4

0 に答える 0