What is Megaphone?

What is Megaphone?
The Megaphone project is about enhancing open source chat software. Specifically, the goal is to allow ejabberd to support 1,000,000 simultaneous users. See The Plan page for more details on how I plan to solve this problem. See the About this Blog page for more details on why I created this blog.

Thursday, December 29, 2011

Fun and Games with HTTP

Previously...

  • I made the changes in mod_megaphone to send back an HTTP response.
  • I fixed an issue with an array growing without bounds
  • I fixed an issue with HTTP headers
So now that the connections are more or less working and megaphone is not putting in a bunch of extra HTTP headers things still don't work.  There was an issue with keep alive and nodejs, but I think I have that one working.  At this point, the authentication portion of the handshake is failing and I'm not sure why that is happening.

As I see it, here are my options:
  • Try and figure out why the current approach is not working.
  • Modify ejabberd_http to use the megaphone protocol.
I'm not too keen on either approach to be honest.  I don't understand why the current approach is not working --- the connection should look pretty much the same that it does to pidgin as when it is talking directly to ejabberd.  This being the case, I'm not sure where to start with why the authentication is failing.

Modifying ejabberd_http really does not excite me either since that means mucking about with a log of code that I don't understand.  In addition, it means changing the ECM side so that it accepts straight TCP connections instead of HTTP.  The good side of working on this approach is that at least I will have something to compare with - there should be no difference between how the ejabberd connection and the ECM connection appear.

Yet more decisions.


Tuesday, December 27, 2011

Doh!

I had most carefully written some code to put various HTTP headers in a response when I send one back to ECM.  Of course it's only after I did all that and tried it out that it dawns on me that ECM will take care of all the headers on its own.

Doh!

Oh, and the hashmap, worked the first time I tried it.  And if you believe that I've got a bridge to sell ya...

Monday, December 26, 2011

Simple HashMap for a Simple Mind

Previously...
  • I decided to skip further analysis and go straight to coding.
  • I made the changes in mod_megaphone to send back an HTTP response.
  • I discovered an issue with an array growing without bounds
I thought about creating a HashMap with all the goodies --- chaining and all that stuff, but then I realized something: Scarlett Johansson looks very good in spandex.  OK, I realized something more practical: for this situation the hash value for the key was not set.  That is, I could choose any value I wanted for the key.

This means that I could be in the situation where the connection ID gets chosen in such a way that it does not collide with any of the other entries in the table, and that way avoid having to create chaining algorithms.  What's more, I could use the connection ID as a direct index into the table.

Now this may sound sort of like "the tail wagging the dog."  This is because I'm that lazy.

So I create a largish array --- say 32,000,000 entries or thereabouts, and then, to get a connection ID, just randomly choose a number that's in the range of the table.  Check the array to see if something is there.  If not, then I have my connection ID.  If the entry is already being used, then just start incrementing the index and checking until you find a free entry.  

The performance of this approach would depend greatly on the quality of the random number generator and the degree to which the table is occupied.  Since I'm lazy I'm choosing a relatively "sparse" array --- 32 million entries should roughly 32 times the number of entries that I need.  I can fine tune the table later.


Note that this is perhaps the only time I have ever had the luxury of being able to choose the hash value for an object and thereby being able to use this simple minded approach.  I suppose I could do things to improve the performance by keeping track of the last connection ID used and just increment the value.  By the time the value wraps again, anything still using the old value is almost certainly timed out and can be discarded.


Nah.

Anyhow I will code this up and it will just plain work.  The first time.  Yeah.  Sure it will.  Maybe I need to drink before I do this...

Those Who Live in False Front Buildings

Previously...

  • I analyzed the function handle_http_put in the existing code.
  • I decided to skip further analysis and go straight to coding.
  • I made the changes in mod_megaphone to send back an HTTP response.
Unfortunately, when I had been coding up ECM, I had not finished the part that handles HTTP POST responses before charging into the erlang side.  But that's OK, because I'm doing it now.  And it's better this way.  Honest.  Truly.  

Sigh.

One problem that this uncovered is that of removing an element from a JavaScript array.  I keep track of who a response from the server should go to by means of a connection ID.  The connection ID is chosen randomly, and is a 64 bit integer, so the server is unlikely to have a collision in the realm of connection IDs. I had been led to believe that JavaScript arrays and objects were basically the same thing: hashtables.  Therefore, I reasoned, I could simply delete the element from the array when the connection ID is no longer used.

As it turns out, using delete leaves a hole in the array.  While the reference to the object is removed, the entry in the array remains, it just points to null.  You can, instead, use the splice method to remove the element, but then you must know what the numerical index of the element is.  That means basically doing a linear search through the table for the 64 bit index.  While the table will not have 2^64 entries, it does mean doing a lot of linear searches through a table with a lot (1,000,000) entries.

I have this sinking feeling.  Like I am going to have to implement a hash table like class in JavaScript.  That way the table could remain a constant size.  

But that still means a lot of work.

Sigh.

Back to ECM

So I managed to send out what looks like a well-formed megaphone packet that contains an HTTP response to the post the client had sent.  All it had to do was get back to the client.

This should be a happy time.  A wondrous time.  A time of great rejoicing.  I mean, it's Christmas and all that.

Except for one thing: ECM.

Yes the simple connection manager was not forwarding packets along to the client from the server.  So now I'm back to tinkering with with ECM and trying to get it to work correctly.

Tis the season to be...oh shuzbutt.

Sunday, December 25, 2011

mod_megaphone Changes

Previously...

  • I decided to use a new, new plan.
  • I analyzed the function handle_http_put in the existing code.
  • I decided to skip further analysis and go straight to coding.
Coding up the ability to start up a new process, call ejabberd_http_bind:process_request and waiting for a reply proved very straight-forward.  The hard part is formatting the response for the client. 

According to wireshark, an HTTP response has the following form:


HTTP/1.1 200 OK
Content-Length: 1961
Content-Type: text/xml; charset=utf-8
Access-Control-Allow-Origin: *
Access-Control-Allow-Headers: Content-Type

content...

Each line must be terminated with a carriage-return, linefeed combination.  process_request actually returns most of the headers so the only work is formatting the first line, formatting the Content-Length line, and ensuring that everything uses the proper carriage-return/linefeed combos.


I am currently slogging through getting all the formatting correct.  Hopefully for next time I'll have something interesting to say.

Saturday, December 24, 2011

Lies, Damn Lies and Statistics

Previously...
  • I discovered that my new plan would not work
  • I decided to use a new, new plan.
  • I analyzed the function handle_http_put in the existing code.
I was going to detail send_outpacket and http_put, but I tried analyzing send_outpacket and found it quite difficult to understand.  At this point, I don't think the benefit from the analysis would justify the effort involved.  So for right now, I am going to skip send_outpacket and http_put and go right to the stuff that pertains to megaphone.

So the next steps are now to code up mod_megaphone so that it:
  • Launches a new process when a request comes in.  
  • The process then invokes ejabberd_http_bind:process_request.
  • The process waits for the reply
  • The process takes the reply and formats a valid HTTP response from it.
  • The process sends the reply using the megaphone protocol back to ECM.

handle_http_put and Related Functions

Previously...
  • I decided to change the plan.
  • I discovered that my new plan would not work
  • I decided to use a new, new plan.
So back to the existing code.  The function I left off with was handle_http_put:

handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) ->
    case http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) of
        {error, not_exists} ->
            ?DEBUG("no session associated with sid: ~p", [Sid]),
            {404, ?HEADER, ""};
        {{error, Reason}, Sess} ->
            ?DEBUG("Error on HTTP put. Reason: ~p", [Reason]),
            handle_http_put_error(Reason, Sess);
        {{repeat, OutPacket}, Sess} ->
            ?DEBUG("http_put said 'repeat!' ...~nOutPacket: ~p", [OutPacket]),
            send_outpacket(Sess, OutPacket);
        {{wait, Pause}, _Sess} ->
            ?DEBUG("Trafic Shaper: Delaying request ~p", [Rid]),
            timer:sleep(Pause),
            handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize,
                StreamStart, IP);
        {buffered, _Sess} ->
            {200, ?HEADER, "<body xmlns='"++?NS_HTTP_BIND++"'/>"};
        {ok, Sess} ->
            prepare_response(Sess, Rid, [], StreamStart)
    end.

This function seems to be more of an administrator rather than a worker: it handles various cases and delegates action to other functions.  An interesting aspect is that functions like handle_http_put_error and send_outpacket would need to return immediately instead of waiting, since in those cases the data to be delivered is readily apparent.

Here is the code for http_put:

http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) ->
    ?DEBUG("Looking for session: ~p", [Sid]),
    case mnesia:dirty_read({http_bind, Sid}) of
        [] -> {error, not_exists};

        [#http_bind{pid = FsmRef, hold=Hold, to={To, StreamVersion}}=Sess] ->
            NewStream =
                case StreamStart of
                    true -> {To, StreamVersion};
                    _ -> ""
                end,
            {gen_fsm:sync_send_all_state_event(FsmRef, #http_put{rid = Rid, attrs = Attrs,
                payload = Payload, payload_size = PayloadSize, hold = Hold,
                stream = NewStream, ip = IP}, 30000), Sess}
    end.

I have to remember that this function does not try to generate something that the HTTP server can understand --- it leaves that up to the caller.  In the situation where a session exists for the request, the gen_fsm:sync_send_all_state_event call is used.  This call blocks until a response is received.

The blocking call makes sense in the context of the synchronous nature of the BOSH protocol: the synchronous call does not return until either some data is ready for the client or a timeout occurs.

Backtracking to handle_http_put, here is the handle_http_put_error function:

handle_http_put_error(Reason, #http_bind{pid=FsmRef, version=Version})
  when Version >= 0 ->
    gen_fsm:sync_send_all_state_event(FsmRef, {stop, {put_error,Reason}}),
    case Reason of
        not_exists ->
            {200, ?HEADER,
             xml:element_to_binary(
               {xmlelement, "body",
                [{"xmlns", ?NS_HTTP_BIND},
                 {"type", "terminate"},
                 {"condition", "item-not-found"}], []})};
        bad_key ->
            {200, ?HEADER,
             xml:element_to_binary(
               {xmlelement, "body",
                [{"xmlns", ?NS_HTTP_BIND},
                 {"type", "terminate"},
                 {"condition", "item-not-found"}], []})};
        polling_too_frequently ->
            {200, ?HEADER,
             xml:element_to_binary(
               {xmlelement, "body",
                [{"xmlns", ?NS_HTTP_BIND},
                 {"type", "terminate"},
                 {"condition", "policy-violation"}], []})}
    end;
handle_http_put_error(Reason, #http_bind{pid=FsmRef}) ->
    gen_fsm:sync_send_all_state_event(FsmRef,{stop, {put_error_no_version, Reason}}),
    case Reason of
        not_exists -> %% bad rid
        ?DEBUG("Closing HTTP bind session (Bad rid).", []),
            {404, ?HEADER, ""};
        bad_key ->
        ?DEBUG("Closing HTTP bind session (Bad key).", []),
            {404, ?HEADER, ""};
        polling_too_frequently ->
        ?DEBUG("Closing HTTP bind session (User polling too frequently).", []),
            {403, ?HEADER, ""}
    end.
There are two versions of the method: one for when the version supplied is greater than or equal to zero, and then the other version when that is not the case.  Not sure how erlang would handle the situation where the version is not defined, but I'm guessing it would just take the second pattern instead of the first.  

In either case, the function tells the session FSM to shut down (the sync_send_all_state_event) before creating something that the HTTP server can understand and sending it back.

Backtracking again, I noticed that the code for send_outpacket is pretty involved, I'll save that one for a separate post.  Instead, I will backtrack to prepare_response: 

prepare_response(Sess, Rid, OutputEls, StreamStart) ->
    receive after Sess#http_bind.process_delay -> ok end,
    case catch http_get(Sess, Rid) of
    {ok, cancel} ->
            {200, ?HEADER, "<body type='error' xmlns='"++?NS_HTTP_BIND++"'/>"};
    {ok, empty} ->
            {200, ?HEADER, "<body xmlns='"++?NS_HTTP_BIND++"'/>"};
    {ok, terminate} ->
            {200, ?HEADER, "<body type='terminate' xmlns='"++?NS_HTTP_BIND++"'/>"};
    {ok, ROutPacket} ->
        OutPacket = lists:reverse(ROutPacket),
        prepare_outpacket_response(Sess, Rid, OutputEls++OutPacket, StreamStart);
    {'EXIT', {shutdown, _}} ->
            {200, ?HEADER, "<body type='terminate' condition='system-shutdown' xmlns='"++?NS_HTTP_BIND++"'/>"};
    {'EXIT', _Reason} ->
            {200, ?HEADER, "<body type='terminate' xmlns='"++?NS_HTTP_BIND++"'/>"}
    end.

OK, I don't understand why this bit of code is here:

    receive after Sess#http_bind.process_delay -> ok end,
This basically causes the erlang process to pause for the amount of time in Session.process_delay.  But I was assuming that the call the sync_send_all_state_event in http_put would wait for whatever period of time was needed, so why the second wait?  More mysteries...

The code to format the different responses seems straight-forward enough, teh call to http_get, however, requires more exploration.  Unfortunately, that function, like send_outpacket, is complex so I will save that one for the next post.

So next time:
  • send_outpacket
  • http_put

Once More Unto the Breach


Previously...

  • I read up on various OTP behaviors.
  • I resolved to use a different plan.
  • I went through the "entry point" into ejabberd/BOSH/http-bind
Looking at the approach that I wanted to use before: that is replace the socket interface that ejabberd/BOSH was using to one that is megaphone aware, appears to be incompatible with the way that the BOSH/http-bind module of ejabberd works.

Here is an event trace of how I thought things worked:

Client      Socket      BOSH        Session
                        Module
|           |           |           |
HTTP POST   |           |           |
----------->|           |           |
|           Data        |           |
|           ----------->|           |
|           |           |           |
< The BOSH Module finds the client's session >
|           |           |           |
|           |           Data        |
|           |           ----------->|
|           Response    |           |
|           |<-----------------------
POST response content   |           |
|<-----------           |           |
|           |           |           |

Therefore, replacing the Socket with something that is "megaphone aware" makes a certain sense.

From what I can see, this was dead wrong.

Here is how I now think that the BOSH module actually works:


Client      HTTP        BOSH        
            Server      Module
|           |           |
HTTP POST   |           |
----------->|           |
|           process_request(POST data)
|           ----------->|
|           |           |
< The BOSH module waits until either it has some data
  for the client or a timeout occurs >
|           |           |
|           |           |
|           |Return (POST response)
|           |<-----------
|Response (POST response)
|<-----------           |
|           |           |

This being the case, I don't think I can just swap out the socket module with something that understands that it is using a multiplexed connection.  Instead I think I will need to do something along the lines of this:

ECM         megaphone   Session     BOSH        
                        Process     Module
|           |           |           |
Data        |           |           |
----------->|           |           |
|           start(Data) |           |
|           ----------->|           |
|           |           process_request(Data)
|           |           ----------->|
|           |           |           |
< The BOSH module waits until either it has some 
  data or a timeout occurs >
|           |           |           |
|           |           |Return(POST response)
|           |           |<-----------
|           |Send(POST response)    |
|           |<-----------           |
|           |           |           |
< Session Process terminates >
|           |           |           |
|Data (POST response)   |           |
|<-----------           |           |
|           |           |           |

Or at least that's what my current plan is.  I'm sure this plan is correct.  Absolutely.  Positively.  Unless it's wrong.  In which case I'll have to use something else.  Sigh.

Back to the Wall

Well, I don't understand how the current system works, and this is making it difficult to figure out how to modify the system to use a different means of sending data to the user.  So I guess its back to bashing my head against a wall until I make a hole I can crawl through...

The following pertains to receiving data from the server.  For sending data to the server, the client merely does an HTTP POST and holds the connection open without sending data.

My working theory is that the BOSH/http-bind module is registered with the ejabberd_http module in order to receive HTTP POST events from the user's chat client.  A new process is created for each POST.

Consider the following code:


process([], #request{method = 'POST',
                     data = Data,
                     ip = IP}) ->
    ?DEBUG("Incoming data: ~s", [Data]),
    ejabberd_http_bind:process_request(Data, IP);


So it would seem that the first thing that happens when a new HTTP POST is received is that ejabberd_http_bind:process_request is called.  Consider the following comment from mod_http_fileserver:


% @spec (LocalPath, Request) -> {HTTPCode::integer(), [Header], Page::string()}

So it looks like, when the process function returns, it is expected to return the HTML for the page requested.  In our case, it should return the XML for the BOSH response.

To help facilitate my understanding of what is going on, I broke out the code that I am dealing with from one large function into several smaller ones.  Here is the revised version:

%% Entry point for data coming from client through ejabberd HTTP server:
process_request(Data, IP) ->
    Opts1 = ejabberd_c2s_config:get_c2s_limits(),
    Opts = [{xml_socket, true} | Opts1],
    MaxStanzaSize =
        case lists:keysearch(max_stanza_size, 1, Opts) of
            {value, {_, Size}} -> Size;
            _ -> infinity
        end,
    PayloadSize = iolist_size(Data),
    case catch parse_request(Data, PayloadSize, MaxStanzaSize) of
        %% No existing session:
        {ok, {"", Rid, Attrs, Payload}} ->
            process_no_existing_session(Rid, Attrs, Payload, PayloadSize, IP);

        %% Existing session
        {ok, {Sid, Rid, Attrs, Payload}} ->
            process_existing_session (Sid, Rid, Attrs, Payload, PayloadSize, IP);

        %% error the packet is too large
        {size_limit, Sid} -> process_stanza_too_large(Sid);

        %% unrecognized request --- return an error but keep the session going
        _ ->
            ?DEBUG("Received bad request: ~p", [Data]),
            {400, ?HEADER, ""}
    end.

Not sure what the business with the {xml_socket, true} is doing, because the Opts variable never seems to get passed to anyone.  The stuff that gets MaxStanzaSize seems to be getting the max stanza size for later use in the call to parse_request.  Next we have the four cases:

        %% No existing session:
        {ok, {"", Rid, Attrs, Payload}} ->
            process_no_existing_session(Rid, Attrs, Payload, PayloadSize, IP);

In the first case, we have a valid request that does not contain a session ID.  This results in one of the break-out functions that I created:

process_no_existing_session (Rid, Attrs, Payload, PayloadSize, IP) ->
    case xml:get_attr_s("to",Attrs) of
        "" ->
            %% 
            %% no "to" attribute, we cannot process this one so return an error
            %%
            ?DEBUG("Session not created (Improper addressing)", []),
            {200, ?HEADER,
                "<body type='terminate' condition='improper-addressing' "
                "xmlns='" ++ ?NS_HTTP_BIND ++ "'/>"};

        XmppDomain ->
            %% 
            %% create new session
            %%
            Sid = sha:sha(term_to_binary({now(), make_ref()})),
            case start(XmppDomain, Sid, "", IP) of
                %%
                %% error trying to create the new session, tell the user we 
                %% cannot allow them to connect
                %%
                {error, _} ->
                    {200, ?HEADER,
                    "<body type='terminate' condition='internal-server-error' "
                    "xmlns='" ++ ?NS_HTTP_BIND ++ "'>BOSH module not started</body>"};

                %%
                %% created new session, send them the usual stuff that one
                %% expects when connecting to a BOSH server
                %%
                {ok, Pid} ->
                    handle_session_start(Pid, XmppDomain, Sid, Rid, 
                        Attrs, Payload, PayloadSize, IP)
            end
    end.

A request that contains no session ID generally means that a client wants to start a new session.  For this to be the case, the "to" attribute should be present in the XMPP stanza.  If not, then we treat the request as an error, otherwise we try to create the new session.

        %% Existing session
        {ok, {Sid, Rid, Attrs, Payload}} ->
            process_existing_session (Sid, Rid, Attrs, Payload, PayloadSize, IP);

The next case from the original process_request function deals with requests for existing sessions, or at least requests that have a SID attribute.  This becomes the second break-out function:

rocess_existing_session (Sid, Rid, Attrs, Payload, PayloadSize, IP) ->
    %%
    %% check for a restart connection
    %%
    StreamStart =
        case xml:get_attr_s("xmpp:restart",Attrs) of
            "true" -> true;
            _ -> false
        end,

    %%
    %% if the request is to termiante the stream, then add /stream:stream
    %% to whatever content the client sent us.
    %%
    Payload2 =
        case xml:get_attr_s("type",Attrs) of
            "terminate" ->
                %% close stream
                Payload ++ [{xmlstreamend, "stream:stream"}];
            _ ->
                Payload
        end,

    %%
    %% process the request and return the results to the client
    %%
    handle_http_put(Sid, Rid, Attrs, Payload2, PayloadSize,
        StreamStart, IP).

This function checks for a couple of special cases (restart and terminate) and then passes the request on to handle_http_put.

The next case from the process_request function has to do with requests that are too long:

        %% error the packet is too large
        {size_limit, Sid} -> process_stanza_too_large(Sid);

The breakout for this case simply sends an error message.  If the request pertained to a recognized session, then the session is closed as well:

process_stanza_too_large (Sid) ->
    case mnesia:dirty_read({http_bind, Sid}) of
        %%
        %% the supplied session is unrecognized, return a 404
        %%
        [] -> {404, ?HEADER, ""};

        %%
        %% the session exists.  Terminate the session and tell the client why.
        %%
        [#http_bind{pid = FsmRef}] ->
            gen_fsm:sync_send_all_state_event(FsmRef, {stop, close}),
            {200, ?HEADER, "<body type='terminate' condition='undefined-condition' "
                "xmlns='" ++ ?NS_HTTP_BIND ++ "'>Request Too Large</body>"}
    end.

The final case is one that catches everything else.  

        %% unrecognized request --- return an error but keep the session going
        _ ->
            ?DEBUG("Received bad request: ~p", [Data]),
            {400, ?HEADER, ""}

This was short enough that it did not make sense to break it out into another function.

That's enough for one post.  Next time I am going to dig into handle_http_put.

Thursday, December 22, 2011

Yet Another Change of Plan

I've been running into a lot of problems trying to get the ejabberd BOSH module to understand connection IDs and multiplexing.  After a bit of thought it occurs to me that it would be useful to try contacting the authors of the BOSH/http-bind part of ejabberd.

While writing an email to Stefan Strigler, the author of the excellent JWChat and the original author of the ejabberd BOSH module, I had an idea.  Rather than trying to write a bunch of code that "knows" about connection IDs and multiplexing, why not write something that looks to the BOSH module like a TCP connection, at least on the outside.


On the inside of the module, the implementation will know that it is multiplexing many connections into one connection. 


At least, that is the plan.  We'll see how it works out.

OTP Behavior: gen_fsm

gen_fsm is a generate state machine pattern.  With gen_fsm, each state is represented with a function that has the form:

<state name>(Event, LoopData) -> { next_state, NewState, NewLoopData }


The FSM can also tell the framework to terminate via the following:


<state name>(Event, LoopData) -> { stop, Reason, NewLoopData }


Client API

  • start(Module, Args, Options)
  • start(Name, Module, Args, Options)
  • start_link(Module, Args, Options)
  • start_link(Name, Module, Args, Options)
    Start up the FSM.  These functions will call init/1 in the specified Module.  start_link links the new FSM to the caller, start does not.  The name will register the newly created process locally or globally, as appropriate.
  • send_event (Reference, Event)
    This will cause the corresponding <state name> function to be called.  The call returns immediately.
  • send_all_state_event (Reference, Event)
    Will cause handle_event to be called on the server instead of the function for the current state.
  • sync_send_event (Reference, Event)
    This will cause the corresponding <state name> function to be called.  The call waits for the FSM to complete before continuing
  • sync_send_all_state_event (Reference, Event)
    This will cause the handle_event function to be called on the server.  The caller will wait for the event to be processed before continuing.
  • reply(Caller, Reply)
    This is actually an internal call that is used to set the reply that will be returned in the case of sync_send_event and sync_send_all_state_event.
Server API
  • init(Args) -> { ok, StateName, StateData }
    Start up the state machine.
  • <State Name>(Event, Data) -> {next_state, NextState, NewData}
    Handle an event sent to the state machine via send_event or sync_send_event  Can also return:
    • { stop, Reason, NewData} in order to terminate the FSM.
  • handle_event(Event, State, Data) ->
        {next_state, NextState, NewData}

    Handle an event sent to the FSM via send_all_state_event.
  • handle_sync_event(Event, State, Data) ->
        {next_state, NextState, NewData}

    Handle an event sent to the FSM via sync_send_all_state_event.
  • terminate(Reason, StateName, Data)
    Clean up hook.
A slight change of plan: I am not going to cover the application behavior.

OTP Behavior: supervisor

supervisor
This is a pattern for an erlang process that watches other processes to ensure that they are running, etc.  Unlike gen_server, supervisor is all about the data structure that the callback module returns in response to the init/1 function.


Client API
  • start(Module, Arguments)
  • start(Name, Module, Arguments)
  • start_link(Module, Arguments)
  • start_link(Name, Module, Arguments)
    • Calls the init/1 function in the supplied Module.
    • Name is one of {local, <atom>} or {global, <atom>}
    • start_link starts and links to the calling process, start does not.
Server API
  • init(Arguments) -> 
        {ok, {SupervisorSpecification, ChildSpecification}}
  • SupervisorSpecification has the form:
    { Strategy, AllowedRestarts, MaxSeconds }
    • Strategy determines how crashed children are restarted.  Possible values for this are:
      • one_for_one - if a child terminates, restart it
      • one_for_all - if a child terminates terminate all the other children and then restart all of them.
      • rest_for_one - if a child terminates, terminate all the children after it in the child specification then restart them.
    • AllowedRestarts and MaxSeconds specify the maximum number of restarts that the server will allow during MaxSeconds seconds before giving up and terminating itself.
  • ChildSpecification is a list of tuples of the form:
    { Id, {M,F,A}, Restart, Shutdown, Type, ModuleList }
    • ID is an atom used to identify the process in the context of the supervisor.
    • { Module, Function, Arguments } is used to start the child.
    • Restart is one of
      • transient - never restart the child
      • temporary - restart only if the Reason for the terminate is not "normal"
      • permanent - always restart
    • Shutdown determines how long the supervisor will wait after calling the child's terminate function before unconditionally killing the child.  Possible values are:
      • <integer> - the number of milliseconds to wait
      • infinity - wait until the child returns.
      • brutal_kill - immediately terminate the child without invoking the terminate callback
    • Type is either supervisor or worker.  Its value does not seem to do anything.
Dynamic Children
Supervisors can also change the children that they manage dynamically.  This is basically just changing the list of child specifications at run time.  The corresponding functions are:
  • supervisor:start_child(Supervisor, ChildSpec)
    Add a new child to the tree of processes.  Supervisor is either the name of the of the supervisor as provided in the start function or the PID of the supervisor process.
  • supervisor:terminate_child(Supervisor, ChildId)
    Terminates the child.  I'm assuming that the usual restart process is suspended so that the supervisor will not restart the child unless supervisor:restart_child is called.
  • supervisor:restart_child(Supervisor, ChildId)
    Restarts a child that was previously stopped with terminate_child.
  • supervisor:delete_child(Supervisor, ChildId)
    This removes the child specification from the supervisor.  The child should have been terminated via terminate_child before calling this.
Next up: gen_fsm

Wednesday, December 21, 2011

OTP Behavior: gen_server

Previously
  • I had some success with trying to send messages around.
  • I created some code to format messages correctly on the ejabberd side.
  • I decided that I needed to read up on erlang behaviors.
OK, so last time I said erlang supervisor behavior but I decided to broaden the scope just a bit to include 

  • gen_server
  • supervisor
  • application
  • gen_fsm
I will come up with a very brief summary of what each behavior is supposed to do, what you have to do in order to implement the service, what calls clients make in order to use the service.

gen_server



This is supposed to cover servers that you would normally start up in another process and that you want to monitor.  The monitoring aspect will have to wait until a discussion of the supervisor behavior.


Client API

  • gen_server:start(Module, Args, Options)
    Start up the server. Calls init(Args).
  • gen_server:start(Name, Module, Args, Options)
    Start up the server, registering the process with Name.  
    Calls init(Args) in module, Module.
  • gen_server:start_link(Module, Args, Options)
    Start up the server, link to the calling process.  
    Calls init(Args) in module, Module.
  • gen_server:start_link(Name, Module, Args, Options)
    Start up the server, link to the calling process.  Register the PID for the process as Name.  Calls init(Args) in module, Module.
  • gen_server:call(Server, Message)
    Make a synchronous call to the server.  Name must match the value used in gen_server:start/_link.  Calls handle_call(Message) on the server.  The Server param is one of 
    • PID - the PID of the server process
    • Name - the name of the server process
    • {local, Name} - the name of the server process
    • {global, Name} - the name of the global server process.
  • gen_server:cast(Name, Message)
    Make asynchronous call to the server.  Name must match the value used in gen_server:start/_link.  Calls handle_cast(Message) on the server.

Callbacks

  • init(Arguments) -> {ok, LoopData}Called by start and start_link when the server is started up.
  • handle_call (Message, Data)
    Called by gen_server:call. The results of the call vary depending on the return value:
    • {reply, Reply, NewData} - returns the Reply to the client.
    • {noreply, NewData} - do not reply to the client.
    • {stop, Reason, Reply, NewData}- returns the Reply to the client and then stops the server.
    • {stop, noreply, Reason, NewData}- stop the server without sending a reply.
  • handle_cast (Message, Data) -> {noreply, NewData}
    Called by gen_server:cast.   The results of the call vary depending on the return value: 
    • {noreply, NewState} - does nothing remarkable.  
    • {stop, Reason, NewState} - shuts down the server with reason Reason.

  • terminate(Reason, Data) -> <ignored>Called when the server is shutting down as a result of returning stop from handle_call or handle_cast.  This gives the server a chance to clean up.

  • handle_info(Info, Data)
    optional function
    This is called if the server receives a message other than call or cast. If not defined then receiving such a message will terminate the server.
Next up, the supervisor behavior.

Time to Hit the Books

Previously

  • I started coding up mod_megaphone.
  • I had some success with trying to send messages around.
  • I created some code to format messages correctly on the ejabberd side.

I think I more or less have identified the entry point into megaphone_http_bind where messages should be sent: that is the process_request function.

And then everything comes to a dead stop.

There is a lot about erlang patterns that I do not know about, and it shows.

For example, I don't understand the following statement:

supervisor:start_child(SupervisorProc, [Sid, Key, IP])


I can parrot the erlang documentation:


Dynamically adds a child specification to the supervisor SupRef which starts the corresponding child process.


But this does not tell me why this approach is being used or what function I can expect to be called in response to this.  At this point, I can continue to blindly grope about, or I can read up on erlang.

For the record, I would prefer to blindly grope around.

I think that in the long run it would be a good idea to familiarize myself with some of the erlang concepts that ejabberd is using, since I will probably run into them again and again.  It just sucks that I have to stop with the coding and hit the books.

So the next step is to read up on what things like supervisor:start_child do.

Tuesday, December 20, 2011

Two Steps Forward One Step Back

Previously

  • IT'S ALIVE!
  • I started coding up mod_megaphone
  • I had some success with trying to send messages around

Made some good progress on mod_megaphone, though I made a (possibly bad) mistake.  The good news is that I've added some code to send packets from mod_megaphone to ECM.  The bad news is that I bought a somewhat early Christmas present in the form of Star Wars: the Old Republic (SW:TOR).

On the progress side I dug around  a little just to find something similar to the old trusty sprintf function equivalent in erlang.  It turns out that this is something called io_lib:format.  I wanted to create a zero-padded string to hold the packet size.  The call ended up being:

    io_lib:format("~<digits>..0B", [ PacketSize ])


Unfortunately, this returns returns a somewhat strange list within list structure so the final form for this snippet ends up being:


    lists:flatten(io_lib:format("~<digits>..0B", [ Number ]))


Don't ask me why you have to flatten the output of io_lib:format.  I have enough aggravations.


SW:TOR is a more insidious threat, that has the potential to latch onto me when I'm feeling down when something isn't working.  


SWTOR won't judge you...
SWTOR cares about you...
SWTOR feels that you are an epic, galaxy-hopping hero...


I must resist the temptation!


The next steps are to:

  • Find that portion of the modified HTTP BIND system that accepts packets from the client
  • Modify that part of the system to accept packets from mod_mephone.
  • Intercept the packets going back out of the system.
  • Send the packets over ECM instead.

Progress

Previously

  • I started coding up mod_megaphone.
  • IT'S ALIVE!!
  • Erlang gave me grief
One of the issues I ran into was that the ejabberd distribution seems to have its own version of erlang.  That would be fine if it used that to compile the code but instead it uses whatever happens to be in /usr/local/bin.

This bit me when I tried to use some of the functions detailed in the documentation up on erlang.org and they seemed to work OK when I was coding and testing, but refused to work when I ran them with ejabberd.  So far, I don't have a good alternative to the binary module so I am converting all the data I get to a list before doing anything else with it.

I got to the point where I was able to inject packets to the system via megaphone_http_bind, a modified version of ejabberd_http_bind.  Currently, there is no way for responses to get back to the client, so the next step is figuring out how to do that.

Monday, December 19, 2011

Why I Love Erlang

Previously

  • I identified a possible entry point for delivering packets from mod_megaphone.
  • I started coding up mod_megaphone.
  • IT'S ALIVE!!
Unfortunately, erlang is not quite that easy to work with.  I spent a good deal of time chasing down an annoying problem where the code was compiling, testing OK, but ejabberd was refusing to load it.  But not to worry, I was getting the following, very helpful error message:

    undef

There is a school of thought that says "less is more" when writing software and I do agree with it.  That error message, however, goes a bit too far.

For the curious, the following statement was compiling OK.  I was able to load it and run the code that was not part of the conditional that it belonged to:

BinConnectionId = binary_part(Binary, ?CONNECTION_ID_INDEX, ?CONNECTION_ID_LENGTH),

The correct statement is

BinConnectionId = binary:part(Binary, ?CONNECTION_ID_INDEX, ?CONNECTION_ID_LENGTH),

Did you notice how the underscore was replaced with a colon?  You did?  Well, why didn't you tell me earlier?!!  At any rate, this was the cause of much hair tearing, but I did finally end up catching it.

Sunday, December 18, 2011

mod_megaphone

Previously
  • I started analyzing the logfiles and ejabberd BOSH code.
  • I identified a juncture where I needed to decide whether to modify ejabberd BOSH or create a new and independent module.
I've decided to create an independent module.  It will be based largely on the existing BOSH module, however.  This approach allows me to use the existing BOSH module as a standard to compare against the changes that I am making in order to get megaphone to work.

I've started coding up mod_megaphone and even hooked it up to ECM.  A quick aside:

<insane-cackling>
IT'S ALIVE!!!  DO YOU HEAR ME IGOR?!  IT'S ALIIIIIVE!!!  MUHAHAHAHA!
<insane-cackling>

The problem is that it doesn't yet understand how to demultiplex data - remember that the packets coming from ECM have the format:

    <packet length>|<connection ID>|<contents>

Whereas the (converted) version of the ejabberd BOSH module expects to see just

    <content>

So the next order of business is to parse out the packet length and the connection ID.

A more subtle problem is that the entire packet might not arrive in one TCP delivery; or a single receive might get parts of several packets or even more than one packet.  To coin a phrase, this application needs buffering.

So to summarize, my todo list contains:

  • Create a simple buffering scheme for mod_megaphone
  • Add demultiplexing/packet parsing to mod_megaphone

I Have Seen the Future and it is BOSH

While it would be nice to be able to jump into the code and start hacking, I have found that this does not work very well with erlang in general and ejabberd in particular.  This being the case, I plan on trying to read and comprehend the existing BOSH before I try to start the megaphone module.

Oh boy.

The ejabberd BOSH module is difficult to understand.  First there is the issue of ejabberd itself, which is difficult to figure out; then there is the interface between BOSH and the rest of the system.

After trying to go through the source code I have come to the conclusion that the best way of trying to understand what is going on is to trace through the code as it is processing a BOSH session rather than trying to figure out what is going on by contemplating the code "at rest."

So I'm going to trace through the the logfile... explaining as I go as an exercise to help myself understand what is going on.  The logfile is created for a session with Pidgin to the regular ejabberd/BOSH module.

And to think: this is how I've chosen to spend my vacation time...


1:  =INFO REPORT==== 2011-12-18 07:38:29 ===
2:  I(<0.367.0>:ejabberd_listener:281) : (#Port<0.414>) Accepted connection {{192,168,1,101},62839} -> {{192,168,1,11},5280}


OK, this tells me that ejabberd got a connection on the BOSH port.


3:
4:  =INFO REPORT==== 2011-12-18 07:38:29 ===
5:  D(<0.367.0>:ejabberd_socket:61) : start module: ejabberd_http, SockMod: gen_tcp, Socket = #Port<0.414>
6:
7:  =INFO REPORT==== 2011-12-18 07:38:29 ===
8:  D(<0.367.0>:ejabberd_socket:104) : raw
9:
10:  =INFO REPORT==== 2011-12-18 07:38:29 ===
11:  D(<0.371.0>:ejabberd_http:89) : init
12:
13:  =INFO REPORT==== 2011-12-18 07:38:29 ===
14:  D(<0.371.0>:ejabberd_http:142) : S: [{["web"],mod_http_fileserver},
15:                                       {["captcha"],ejabberd_captcha},
16:                                       {["admin"],ejabberd_web_admin},
17:                                       {["http-bind"],mod_http_bind},
18:                                       {["http-poll"],ejabberd_http_poll}]
19:
20:
21:  =INFO REPORT==== 2011-12-18 07:38:29 ===
22:  I(<0.371.0>:ejabberd_http:144) : started: {gen_tcp,#Port<0.414>}
23:
24:  =INFO REPORT==== 2011-12-18 07:38:29 ===
25:  D(<0.371.0>:ejabberd_http:261) : (#Port<0.414>) http query: 'POST' /http-bind/


Blah, blah, blah...then the bit that mentions that this is an HTTP POST to /http-bind/, which is the URL for the BOSH module.


26:
27:
28:  =INFO REPORT==== 2011-12-18 07:38:29 ===
29:  D(<0.371.0>:ejabberd_http:431) : client data: "<body content='text/xml; charset=utf-8' secure='true' to='ubuntu2' xml:lang='en' xmpp:version='1.0' ver='1.6' xmlns:xmpp='urn:xmpp:xbosh' rid='2230163495637982' wait='60' hold='1' xmlns='http://jabber.org/protocol/httpbind'/>"
30:
31:
32:  =INFO REPORT==== 2011-12-18 07:38:29 ===
33:  D(<0.371.0>:ejabberd_http:330) : ["http-bind"] matches ["http-bind"]
34:

Blah, blah, blah...


35:  =INFO REPORT==== 2011-12-18 07:38:29 ===
36:  D(<0.371.0>:mod_http_bind:69) : Incoming data: <body content='text/xml; charset=utf-8' secure='true' to='ubuntu2' xml:lang='en' xmpp:version='1.0' ver='1.6' xmlns:xmpp='urn:xmpp:xbosh' rid='2230163495637982' wait='60' hold='1' xmlns='http://jabber.org/protocol/httpbind'/>

This is the first entry into the BOSH code.  The source looks like this:




 66 process([], #request{method = 'POST',
 67                      data = Data,
 68                      ip = IP}) ->
 69     ?DEBUG("Incoming data: ~s", [Data]),
 70     ejabberd_http_bind:process_request(Data, IP);


So something is calling the process method in mod_http_bind, which, in turn, calls ejabberd_http_bind:process_request.  Here is the corresponding logging statement:

37:
38:  =INFO REPORT==== 2011-12-18 07:38:29 ===
39:  D(<0.371.0>:ejabberd_http_bind:181) : process_request with data: "<body content='text/xml; charset=utf-8' secure='true' to='ubuntu2' xml:lang='en' xmpp:version='1.0' ver='1.6' xmlns:xmpp='urn:xmpp:xbosh' rid='2230163495637982' wait='60' hold='1' xmlns='http://jabber.org/protocol/httpbind'/>"
40:

Here is the code for the corresponding section:


 180 %% Entry point for data coming from client through ejabberd HTTP server:
 181 process_request(Data, IP) ->
 182 ?DEBUG("process_request with data: ~p", [Data]),
 183     Opts1 = ejabberd_c2s_config:get_c2s_limits(),
 184     Opts = [{xml_socket, true} | Opts1],
 185     MaxStanzaSize =
 186     case lists:keysearch(max_stanza_size, 1, Opts) of
 187         {value, {_, Size}} -> Size;
 188         _ -> infinity
 189     end,
 190     PayloadSize = iolist_size(Data),
 191     case catch parse_request(Data, PayloadSize, MaxStanzaSize) of

OK, so this gets some options and then parses the request.

 192     %% No existing session:
 193     {ok, {"", Rid, Attrs, Payload}} ->

The result of the parse comes up with an XML stanza that does not have a SID (session ID).  According to the BOSH spec this means that the client is trying to establish a new session.  Also according to the spec, a new session should have a "to" attribute in order to tell the BOSH server who the request is for.

 194         case xml:get_attr_s("to",Attrs) of
 195                 "" ->
 196             ?DEBUG("Session not created (Improper addressing)", []),
 197             {200, ?HEADER, "<body type='terminate' "
 198              "condition='improper-addressing' "
 199              "xmlns='" ++ ?NS_HTTP_BIND ++ "'/>"};
 200                 XmppDomain ->
 201                     %% create new session
 202                     Sid = sha:sha(term_to_binary({now(), make_ref()})),
 203                     case start(XmppDomain, Sid, "", IP) of
 204             {error, _} ->
 205                 {200, ?HEADER, "<body type='terminate' "
 206                  "condition='internal-server-error' "
 207                  "xmlns='" ++ ?NS_HTTP_BIND ++ "'>BOSH module not started</body>"};

A bunch of code to handle the error case where the request does not have a to attribute.

 208             {ok, Pid} ->
 209                 handle_session_start(
 210                   Pid, XmppDomain, Sid, Rid, Attrs,
 211                   Payload, PayloadSize, IP)
 212             end
 213             end;


This looks for "to" attribute in the stanza.  Only a start session or a start stream can have this attribute, so if it is missing, it means we have an error case.

What does not make sense is why the value of the to attribute would be an erlang process ID.  Line 208 suggests that the return value from xml:get_attr_s is {ok, Pid}.  I would expect the value of the attribute to be the chat domain that the new session wants to go to.

Trying to make sense of all this, it seems like a reasonable thing to do would be to create something called mod_megaphone that starts listening to a specified port on startup.  When a new message is received, the module should call the equivalent of ejabberd_http_bind:process_request.

At this point I really need to make a decision: should I try to modify the behavior of the BOSH module or strike out on my own.  Changing the behavior of BOSH means that it will not work while I am trying to get the megaphone module to work.  This is a bad thing (tm) because it means that I will be unable to compare the existing BOSH behavior against the new megaphone behavior.

Striking out on my own has the problem that all the module names will be wrong and nothing will work.

More decisions, decisions.