Monday 9 November 2015

Shifting to a different site

I'm currently experimenting with a new site over at http://www.timbarrass.co.uk, so new blog posts will appear there. Thanks for looking!

Friday 9 October 2015

Getting your priorities right

I want to take a look at implementing prioritisation in redis.workflow. Currently, all workflows have the same priority; redis.workflow is all about throughput. If all tasks are going to be completed by some SLA-agreed time then it doesn’t technically matter which order everything is run in, and we can run through workflows in any order, as long as there’s no slack time in task turnover.

However, there contexts in which pure throughput isn’t enough. Sometimes its social: someone arbitrarily wants to see a graph showing a steady completion of recognizable sets of workflows. Sometimes it’s based on operational information: not all workloads are heterogeneous, and might have long-tail task duration distributions; and those long runners will breach your delivery SLA if you start them too late.  Sometimes you have combined workloads: the system might be responsible for both batch throughput calculations and ad-hoc calculations issued directly by someone who’s waiting for an answer, and wants some guarantee that they’re not wedged behind a freight train of batch work. Sometimes it’s simply SLA based: you have 24 hours worth of work, but some of it needs to be delivered by 8am, and some of it by 5pm.

In many ways then a pure throughput system might not meet all requirements, and in each of the scenarios above a decision needs to be made about each item of work: do I need to get this work done first, or that? The system needs to understand work priority.

In detail it’s also worth considering where the priority lies. Currently I’m considering prioritising workflows only relative to each other. That said it’s as easy to implement task-level prioritisation as it it to implement workflow-level prioritisation; doing the former gives me a little more flexibility, and I (or any client) could replicate workflow-level prioritisation by giving every task in a workflow the same priority.

Prioritisation itself can be simple or sophisticated, but will definitely have consequences for the structures used in redis.workflow.

Currently redis.workflow uses a queue of submitted tasks, which is fine for a simple throughput scenario. I could extend it to make practical binary priority decisions by enqueuing high priority tasks at the front of the queue, and low priority tasks at the back, without the need for a new structure.

If finer grained priorities are required then things get a little harder. If the number of levels of priority is small then having a queue per priority might be feasible; in fact, combined with the enqueue-to-front-or-back trick you only need half the number of queues as you have priority levels.

If very fine-grained or arbitrary priorities are needed, then I’d need to start thinking about maintaining relative order in the queue or list. Insertion at a given priority could happen between work of higher and lower priority, which would require iterating through the list to work out where to put it, or just enqueuing, then sorting the list. That’s likely to perform underwhelmingly; a more sophisticated solution would be to add a lookup that identifies priority boundaries by queue position.

Each of these is a reasonable way of managing prioritised work. Clearly enabling the use of arbitrary priorities would allow clients to choose, and even use simpler (say binary or even fall back to unary) prioritisation, but come at a cost in terms of complexity.

The binary high/low priority solution – enqueuing to front or back – is relatively easy to imagine in my existing code, as it’s already placing work at the back. Jumping to arbitrary priorities is harder, but it’s worth considering what would need to be done.

To manage arbitrarily prioritised work, I’d need to sort the work submitted in some way. Redis does have a sorted set, which offers O(log N) lookup and insertion by score. I’d need to be able to remove the highest-scoring task, which I would be able to do with

zremrangebyscore submitted:<type> <minPriority> <maxPriority> limit 0 1

except that there’s no limit out-of-the-box for zremrangebyscore. Instead, I’d have to use zrange to return a sorted list of items (although this sorts from lowest to highest score, meaning I need to decide whether 0 is low or high priority) then use zrem to remove the task.

I implemented this sorted-set-based prioritisation and ran it thorugh the benchmark app, giving each task the same priority. It was noticeably slower, taking about 485 seconds to run through the benchmark (a churn of about 1.8k tasks per second). Task and’ actual workflow completion seemed to keep pace with submission, but handling of the messages announcing workflow completion seems to fall behind, suggesting that it’s not scaling well in this example.

benchmark-withpriority

The benchmark needs to be extended to test workflows with varying numbers of tasks, and tasks with varying priorities.

For this moment I think that’s enough; there’s a wide range of ways of tackling the prioritisation problem, but the implementation of each is fairly straightforward in Redis. The way I’ve implemented it gives it a certain amount of flexibility – a client can configure priority at task or workflow level, but does slow things down. It’s possible the other, simpler methods have a less detrimental effect on performance; as the difference in performance is so significant it might make sense to consider making use of prioritisation configurable, although that’ll be difficult as it has fundamental consequences for the structure of the data in Redis.

Friday 2 October 2015

Task type differentiation and looking a bit more SoA

At the moment all tasks are considered the same. If a client wanted to process only “widget painter” tasks it wouldn’t be able to, unless it pulled tasks from the queue then decided whether to process them or not. If not – there’s not much it could do, as the tasks are off the queue. It could throw them away, which isn’t very constructive.

More practically, the client could hand tasks off to components suited to dealing with them. This complicates the client architecture, as it introduces the need for a broker component that can do a fine-grained distribution of work, along with internal queuing based on task type. That can quickly become a full-blown project in itself.

To avoid the need for a broker component (but also not to obviate them entirely) it makes sense to handle this queueing of tasks by type in the workflow management domain; to advertise submission of tasks of each type using a distinct channel, and allow clients to subscribe to channels that advertise submission of tasks that they can handle. In my workflow system that means having a dedicated submitted queue for each arbitrary task type, and in turn, submission messages would need to be published on dedicated channels.

The supporting functionality would all need to become task-type aware – popping a task would involve knowing what task type to pop; pause and release would need to know which queues to place tasks back in; and so on.

Not only do we need dedicated submitted:type queues, but if we’re going to ask questions about all the submitted tasks then I’ll need a submittedTypes set just so I can locate all the submitted:type queues that are in use.

Finally; what should the task type specified be? The most straightforward immediate answer is for it to be an arbitrary string. That means we’ll be creating an arbitrary subschema of lookups that map a set of client-defined types with tasks. That looks like the beginning of a general system I have in mind that allows the client to associate any number of arbitrary properties with tasks, not just a type; but to meet this use case – allowing the client to differentiate tasks by a given type – this should be enough.

Wednesday 30 September 2015

Handling Redis messages in parallel

In the last post it looked like the rate of submission and completion of tasks was constant in time, but that the rate of completion of workflows was not.

There are two end member scenarios: if tasks were completed in strict workflow order then there’d be a steady progression of workflows completing, giving a constant rate; alternatively, if the first 8 tasks in every workflow were completed first, and the last task only started when every other workflow had been touched, then all workflows would complete almost instantaneously right at the end of the run. What I saw in the last results might represent a balance between the two.

The workflow completion behaviour might also be controlled or modified by the messages in flight from Redis, and by the way these are both published and picked up by the client (StackExchange.Redis). I’m assuming that Redis will not stall while it passes a given message to every consumer subscribed, and I’m assuming that StackExchange.Redis will make a separately-threaded callback into my code whenever a message is received, probably subject to some throttling.

There are a couple of things there that I can check. One is the number of running tasks at each moment; if we’re handling these in a multithreaded way (via StackExchange.Redis – there’s no such behaviour in the benchmark app yet) then it should be observable. The other thing to check is the size of the submitted queue.

I added some instrumentation to my benchmark app to output these and, voila:

image

The submitted queue size reaches a plateau then hovers there, which intuitively means that across all workflows we’re finishing one task and replacing it with another in the submitted queue. It tails off roughly in line with the workflow completion curve.

One very interesting point to take away there is that the number of running tasks at any time is 1.

That indicates that somewhere the processing of tasks is either bottlenecked or significantly throttled. I’m not doing anything in my app. Is StackExchange.Redis serializing the handling of the returned messages?

It turns out that it is. By default, StackExchange.Redis forces subscribed message handling to be sequential, principally because it’s safer: it stops clients having to worry about thread conflicts when processing the messages.

However, when you want to scale out it makes sense to let go and account for possible threading issues by design. In this case, each workflow task is intended to be a unit independent of any other, and so they’re embarrassingly parallelisable. In fact, currently the creation of the multiplexer is in the hand of the client, so they can make the choice themselves.

If I set ConnectionMultiplexer.PreserveAsyncOrder = false in the benchmark app, things improve considerably. Task churn leaps to 4k per second; the whole run completes sooner (210 seconds) with the push only finishing at 168 seconds.

benchmark-2

There’s a lot less dispersion in the message handling – the workflow complete queues are much tighter, and the count of tasks completing follows the submitted count much more closely. After a suppressed start the system completes workflows at a steady rate.

For the bulk of the run task submission and completion, and workflow completion, proceed at a constant rate. That indicates that this solution might scale out with an increasing number of tasks or workflows.

It’s not all sunshine and rainbows. When repeated I’ve seen occasional timeouts which currently bring the workflows down. Superficially they seem to be correlated with the points at which Redis forks and saves out its logs – typically I see error messages like

Timeout performing EVAL, inst: 1, mgr: ExecuteSelect, err: never, queue: 22, qu: 0, qs: 22, qc: 0, wr: 0, wq: 0, in: 0, ar: 0, IOCP: (Busy=0,Free=1000,Min=4,Max=1000), WORKER: (Busy=20,Free=2027,Min=4,Max=2047), clientName: TIM-LAPTOP

This seems to indicate that 22 commands have been sent to the server (qs), but no response has been received (in); consequently it might worth increasing the timeout. If it is related to the Redis background fork-and-save, perhaps there’s something I can do there too.

Monday 28 September 2015

First look at performance for workflow processing

I’ve set up a simple benchmarking app to get a look at how my new Redis-based workflow processing system was going, and used it to create 100k workflows, each containing 9 tasks. The workflow structure is identical for each: just a simple chain. I ran it and a Redis instance on my dev laptop, and gathered data about task submit and completion time, and workflow completion time – both actual, and when I received the Redis pub-sub message to confirm completion.

image

It took ~350 seconds to run through 900k tasks (each as close to a no-op as possible) giving an overall churn around 1k tasks per second. It took ~60 seconds to submit 100k workflows with initial tasks, or about 1.5k workflows per second.

Completion of tasks was noticeably suppressed during workflow submission, and as a result the number of completed tasks lags behind the number of submitted tasks. However, it looks like the rate of task submission and completion – the key parts of workflow turnover – are independent of the number of tasks to run, suggesting that this behaviour scales.

There are some features that I don’t immediately understand, however. The rate of workflow completion is not linear; completion is delayed, as you might expect, but the rate increases with time.

Additionally, there’s a delay between actual workflow completion and the delivery of the message that signals that a workflow has completed. The rate of delivery looks consistent with the rate of completion, but the delay is consistently about 50 seconds. Two things immediately occur to me; as the system’s using message passing to flag submission of new tasks that they might also be affected by a delay; and that arguably we might not need to wait for a message to tell us that a workflow has completed. As I’m marking tasks complete synchronously from the client, it could potentially tell me that there are either more tasks submitted as a result, or none, in which case I could reasonably take some sort of action (although that action would need to be shared across all task handling processes, if I had such).

In fact, there’s a problem with the way I’ve labelled the curves – it’s not obvious that workflow completion time is nonlinearly related to the number of remaining tasks. I’m making an assumption.

Next things to check: I’m going to add a periodic sampling of the queue size to add to this chart, and see if its possible to analyse the number of messages being posted. 

Wednesday 23 September 2015

Pausing for thought

This post relates to code changes in this commit to my redis workflow project on GitHub, under this issue .

In my workflow project it’s time to consider how to pause a given workflow, and the first thing is to decide on what pause actually means.

simple-fsm

At first sight it means “hold everything that’s running” and “stop anything new from starting”, but that turns out to be difficult to implement effectively generally, and has some particular wrinkles when using Redis.

It looks easy to stop anything new from starting: I can move all tasks for a given workflow to a state that means they won’t get picked up; let’s say that state is labelled paused, because frankly, this gets complicated enough. To release the tasks all I’ll need to do is move all the paused tasks for this workflow back to the submitted state.

Except: the fact I’m asking for all the submitted (or paused) tasks for a specific workflow means we have a decision to make. At the point of time this functionality was added all these tasks were together in a global submitted state. I could loop through every id in the submitted set, use those ids to lookup tasks and fetch their workflow ids, then decide whether each one needs to be acted on.

Alternatively I could populate an additional set when tasks transition to each new state, keyed by workflow id. When tasks in workflow 1 move to the submitted state, for example, we could add them to a distinct set with key submitted:1. That makes finding tasks in a specific state for a specific workflow much easier, and is the approach I’m taking. It means S x W new sets at worst, where S is the number of states and W the number of workflows. At the scale I’m aiming for that means ~600,000 sets, assuming no clean-up after workflows.

So, when someone asks to pause a workflow we’ll just shift all tasks from submitted state to paused, yes? And on release, we should just move everything from paused to submitted? Not quite.

I’m deciding to let running tasks run to completion. As currently running tasks complete (or fail) they might cause further tasks to be submitted, which isn’t the behaviour we want. It makes sense then to have the completion logic know that if the completed task has moved to the paused state then it shouldn’t cause its dependencies to be submitted, although it can be transitioned to is final desired end state (complete, or failed).

However, when it comes to releasing the workflow again, what should happen to the children of those completed tasks, if all their parents have completed? If I don’t do something then they won’t get picked up when someone asks to release the workflow – there’s nothing left to complete and trigger their submission.

Instead, if a paused task completes and its children become eligible for submission they should be placed in the paused state themselves. Then, when someone requests that the workflow be released, they’ll shift back to the submitted state.

On release, then, that takes care of ensuring I don’t lose track of any tasks, assuming that the release command comes after all outstanding running tasks have completed.

If the release request comes while tasks are still running down, however, releasing the workflow  will be difficult. We won’t know whether to move the tasks to the submitted or running state, because there’s no differentiation in source state for tasks in the paused set. We’ve lost that information. If we shift a task that’s being executed somewhere to the submitted state on release then it might get run twice in parallel, or get confused when it comes to submit and finds it’s not in an expected state.

20150917_001111

The (hopefully) final puzzle piece is then to record the last state of each task, so when a release request is issued we can move it back to the appropriate state. Running tasks should get moved back to running, submitted tasks to submitted. Tasks that have completed already won’t be in the paused state, so won’t get considered for release, and we’ll need to tag newly runnable tasks with a fake last state of submitted.

chart

That should means we don’t lose any information about the state of the workflow during pause and release operations. For such a simple thing it’s still quite possible to get wrong; making a few assumptions and decisions explicitly helps a great deal. I decided to:

1. Stop anything new from starting.

2. Let executing tasks run to completion.

3. Create lookups to answer specific questions.

4. Avoid duplicate task execution.

all of which seem reasonable currently; let’s see how the next chunk of functionality changes that.

Friday 11 September 2015

Picking up where you left off

One thing is certain in any live system: something will break and leave the system in an inconsistent state. If your system isn’t designed with this as a core assumption – not that things will work, but that things will break – then your design is wrong.

Less catastrophically, there are occasions where you want to turn part or all of a system off. Without thought this will have the same effect: the system will be left in an inconsistent state.

In a distributed workflow this generally means that tasks are unofficially abandoned, as the agent responsible for running them can’t complete them and update their state. If a system is deliberately stopped then you have a chance to manage that state properly. You could:

  1. wait for all running tasks to complete before stopping the system, while preventing any new tasks from running, or
  2. transition any running tasks to an appropriate pre-running state.

Typically any final solution comprises some element of the second plan; under the first plan you might still want to be careful about giving tasks a maximum timeout duration, and if tasks reach that timeout then you have no option but to reset them to a pre-running state.

If your tasks define work that’s to be undertaken on a separate system – say a distinct compute cloud – then resetting tasks you might still have some underlying problems to solve if that compute infrastructure doesn’t allow you to cancel running work.

In each case, however, you have a chance to transition tasks to some desired state before agents stop. If an agent has failed you typically don’t get that chance, and you need to find some other way of managing the state. There are a few options for doing that, among them:

  1. just forget; have some backstop process gather up tasks that haven’t had a state update within a certain duration and automatically reset them to a pre-running state, or
  2. make it possible for agents to reconnect to their previous state and decide what to do with abandoned tasks.

The first option there is quite simple, but has implicit problems. How do you choose the duration after which task state gets reset? You run this risk of either resetting in-flight tasks, or leaving tasks abandoned for so long that their results are no longer required. At worst, this backstop is a manual process.

The second presents more explicit problems: how does the system know which state is or was associated with which agent? Two things are needed: first each agent needs to have an identity that’s maintained in time (or at any time there needs to be an agent that’s acting under a given identity), and second that identity needs to be associated with task state when it takes responsibility for it, and dissociated when it gives up responsibility – when it starts and stops the task.

Maintaining a system of agents with distinct identities is relatively straightforward. The trick is to have any new agent process assume one from a pool of identities when it starts up. The actual technique chosen is heavily influenced by the mechanism you use to manage agent processes, but could be as simple as an argument passed in on a command line – “you are agent 1, the first thing you need to do is work out what the last agent 1 was doing”.

Associating an identity with task state is a little more complicated, as there are some options for where that association state could sit. It could be stored locally by the agent; every time it takes a task, it adds the tasks identifier to its local store. If another agent takes its place, it can use that stored task state to reconnect with what was running. Alternatively, it could be stored alongside the task state in whatever state store’s being used.

In each case information from one domain is leaking into another as responsibility is taken and given up for task state. The important thing is to notice that an exchange of information is taking place – when a task is taken, task data is being exchanged for the an agent identity – and that that exchange must happen in a transactional manner so that information isn’t lost.

For example – an agent could pop a task from the task state store, and then store that task information in a local sqllite database. If the agent dies between popping the task and storing the data in a local database then information is completely lost.

That’s bad, and something to be avoided.

If the agent presents its identity information to the state store as part of the task pop process however, the state store can associate that identity with the now-running task as part of the same transaction –- safely.

The latter approach is what I’ve taken with the workflows-on-Redis project I’ve been working on recently in order to enable the recovery of tasks, and ensure that none of that information is lost when taking tasks in the first place.

Thursday 10 September 2015

It turns out I'm not a good blogger :)

Embarrassingly I've had a good bundle of comments waiting for moderation for some time! Apologies to everyone who's got in touch -- I know full well how frustrating it is to reach out to someone's blog and be met with slence -- and thanks for the interest, comments and definitely for the corrections!

Tuesday 24 February 2015

Loading SOS in windbg. Why is it never quite as easy as you hope it’ll be?

I started analysing a production crash dump at my desk, with a set of libraries that don’t match those installed on the production server. Typically windbg would complain:

> !clrstack
The version of SOS does not match the version of CLR you are debugging.  Please
load the matching version of SOS for the version of CLR you are debugging.
CLR Version: 4.0.30319.1026
SOS Version: 4.0.30319.18444
Failed to load data access DLL, 0x80004005
Verify that 1) you have a recent build of the debugger (6.2.14 or newer)
            2) the file mscordacwks.dll that matches your version of clr.dll is 
                in the version directory or on the symbol path
            3) or, if you are debugging a dump file, verify that the file 
                mscordacwks___.dll is on your symbol path.
            4) you are debugging on supported cross platform architecture as 
                the dump file. For example, an ARM dump file must be debugged
                on an X86 or an ARM machine; an AMD64 dump file must be
                debugged on an AMD64 machine.
 
You can also run the debugger command .cordll to control the debugger's
load of mscordacwks.dll.  .cordll -ve -u -l will do a verbose reload.
If that succeeds, the SOS command should work on retry.
 
If you are debugging a minidump, you need to make sure that your executable
path is pointing to clr.dll as well.

The first step is to get hold of the correct libraries from the server:

cp \\server\c$\windows\microsoft.net\Framework64\v4.0.30319\sos.dll c:\temp
cp \\server\c$\windows\microsoft.net\Framework64\v4.0.30319\mscordacwks.dll c:\temp
cp \\server\c$\windows\microsoft.net\Framework64\v4.0.30319\clr.dll c:\temp

In theory at that point, I should just be able to run

> .load c:\temp\sos.dll
> !clrstack

but sadly still no joy. If I run

> .chain

it tells me that it knows about more than one version of SOS

…
Extension DLL chain:
    C:\Windows\Microsoft.NET\Framework64\v4.0.30319\sos: image 4.0.30319.18444, API 1.0.0, built Wed Oct 30 21:40:20 2013
        [path: C:\Windows\Microsoft.NET\Framework64\v4.0.30319\sos.dll]
    c:\temp\sos.dll: image 4.0.30319.1026, API 1.0.0, built Thu Jul 03 07:58:50 2014
        [path: c:\temp\sos.dll]
…

I can unload the non-useful one

> .unload C:\Windows\Microsoft.NET\Framework64\v4.0.30319\sos

and then run SOS commands happily.